rpc.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. // Copyright (c) 2012-2020 Ugorji Nwoke. All rights reserved.
  2. // Use of this source code is governed by a MIT license found in the LICENSE file.
  3. package codec
  4. import (
  5. "bufio"
  6. "errors"
  7. "io"
  8. "net/rpc"
  9. )
  10. var (
  11. errRpcIsClosed = errors.New("rpc - connection has been closed")
  12. errRpcNoConn = errors.New("rpc - no connection")
  13. rpcSpaceArr = [1]byte{' '}
  14. )
  15. // Rpc provides a rpc Server or Client Codec for rpc communication.
  16. type Rpc interface {
  17. ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec
  18. ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec
  19. }
  20. // RPCOptions holds options specific to rpc functionality
  21. type RPCOptions struct {
  22. // RPCNoBuffer configures whether we attempt to buffer reads and writes during RPC calls.
  23. //
  24. // Set RPCNoBuffer=true to turn buffering off.
  25. // Buffering can still be done if buffered connections are passed in, or
  26. // buffering is configured on the handle.
  27. RPCNoBuffer bool
  28. }
  29. // rpcCodec defines the struct members and common methods.
  30. type rpcCodec struct {
  31. c io.Closer
  32. r io.Reader
  33. w io.Writer
  34. f ioFlusher
  35. dec *Decoder
  36. enc *Encoder
  37. h Handle
  38. cls atomicClsErr
  39. }
  40. func newRPCCodec(conn io.ReadWriteCloser, h Handle) rpcCodec {
  41. return newRPCCodec2(conn, conn, conn, h)
  42. }
  43. func newRPCCodec2(r io.Reader, w io.Writer, c io.Closer, h Handle) rpcCodec {
  44. bh := h.getBasicHandle()
  45. // if the writer can flush, ensure we leverage it, else
  46. // we may hang waiting on read if write isn't flushed.
  47. // var f ioFlusher
  48. f, ok := w.(ioFlusher)
  49. if !bh.RPCNoBuffer {
  50. if bh.WriterBufferSize <= 0 {
  51. if !ok { // a flusher means there's already a buffer
  52. bw := bufio.NewWriter(w)
  53. f, w = bw, bw
  54. }
  55. }
  56. if bh.ReaderBufferSize <= 0 {
  57. if _, ok = w.(ioBuffered); !ok {
  58. r = bufio.NewReader(r)
  59. }
  60. }
  61. }
  62. return rpcCodec{
  63. c: c,
  64. w: w,
  65. r: r,
  66. f: f,
  67. h: h,
  68. enc: NewEncoder(w, h),
  69. dec: NewDecoder(r, h),
  70. }
  71. }
  72. func (c *rpcCodec) write(obj ...interface{}) (err error) {
  73. err = c.ready()
  74. if err != nil {
  75. return
  76. }
  77. if c.f != nil {
  78. defer func() {
  79. flushErr := c.f.Flush()
  80. if flushErr != nil && err == nil {
  81. err = flushErr
  82. }
  83. }()
  84. }
  85. for _, o := range obj {
  86. err = c.enc.Encode(o)
  87. if err != nil {
  88. return
  89. }
  90. // defensive: ensure a space is always written after each encoding,
  91. // in case the value was a number, and encoding a value right after
  92. // without a space will lead to invalid output.
  93. if c.h.isJson() {
  94. _, err = c.w.Write(rpcSpaceArr[:])
  95. if err != nil {
  96. return
  97. }
  98. }
  99. }
  100. return
  101. }
  102. func (c *rpcCodec) read(obj interface{}) (err error) {
  103. err = c.ready()
  104. if err == nil {
  105. //If nil is passed in, we should read and discard
  106. if obj == nil {
  107. // return c.dec.Decode(&obj)
  108. err = c.dec.swallowErr()
  109. } else {
  110. err = c.dec.Decode(obj)
  111. }
  112. }
  113. return
  114. }
  115. func (c *rpcCodec) Close() (err error) {
  116. if c.c != nil {
  117. cls := c.cls.load()
  118. if !cls.closed {
  119. cls.err = c.c.Close()
  120. cls.closed = true
  121. c.cls.store(cls)
  122. }
  123. err = cls.err
  124. }
  125. return
  126. }
  127. func (c *rpcCodec) ready() (err error) {
  128. if c.c == nil {
  129. err = errRpcNoConn
  130. } else {
  131. cls := c.cls.load()
  132. if cls.closed {
  133. if err = cls.err; err == nil {
  134. err = errRpcIsClosed
  135. }
  136. }
  137. }
  138. return
  139. }
  140. func (c *rpcCodec) ReadResponseBody(body interface{}) error {
  141. return c.read(body)
  142. }
  143. // -------------------------------------
  144. type goRpcCodec struct {
  145. rpcCodec
  146. }
  147. func (c *goRpcCodec) WriteRequest(r *rpc.Request, body interface{}) error {
  148. return c.write(r, body)
  149. }
  150. func (c *goRpcCodec) WriteResponse(r *rpc.Response, body interface{}) error {
  151. return c.write(r, body)
  152. }
  153. func (c *goRpcCodec) ReadResponseHeader(r *rpc.Response) error {
  154. return c.read(r)
  155. }
  156. func (c *goRpcCodec) ReadRequestHeader(r *rpc.Request) error {
  157. return c.read(r)
  158. }
  159. func (c *goRpcCodec) ReadRequestBody(body interface{}) error {
  160. return c.read(body)
  161. }
  162. // -------------------------------------
  163. // goRpc is the implementation of Rpc that uses the communication protocol
  164. // as defined in net/rpc package.
  165. type goRpc struct{}
  166. // GoRpc implements Rpc using the communication protocol defined in net/rpc package.
  167. //
  168. // Note: network connection (from net.Dial, of type io.ReadWriteCloser) is not buffered.
  169. //
  170. // For performance, you should configure WriterBufferSize and ReaderBufferSize on the handle.
  171. // This ensures we use an adequate buffer during reading and writing.
  172. // If not configured, we will internally initialize and use a buffer during reads and writes.
  173. // This can be turned off via the RPCNoBuffer option on the Handle.
  174. // var handle codec.JsonHandle
  175. // handle.RPCNoBuffer = true // turns off attempt by rpc module to initialize a buffer
  176. //
  177. // Example 1: one way of configuring buffering explicitly:
  178. // var handle codec.JsonHandle // codec handle
  179. // handle.ReaderBufferSize = 1024
  180. // handle.WriterBufferSize = 1024
  181. // var conn io.ReadWriteCloser // connection got from a socket
  182. // var serverCodec = GoRpc.ServerCodec(conn, handle)
  183. // var clientCodec = GoRpc.ClientCodec(conn, handle)
  184. //
  185. // Example 2: you can also explicitly create a buffered connection yourself,
  186. // and not worry about configuring the buffer sizes in the Handle.
  187. // var handle codec.Handle // codec handle
  188. // var conn io.ReadWriteCloser // connection got from a socket
  189. // var bufconn = struct { // bufconn here is a buffered io.ReadWriteCloser
  190. // io.Closer
  191. // *bufio.Reader
  192. // *bufio.Writer
  193. // }{conn, bufio.NewReader(conn), bufio.NewWriter(conn)}
  194. // var serverCodec = GoRpc.ServerCodec(bufconn, handle)
  195. // var clientCodec = GoRpc.ClientCodec(bufconn, handle)
  196. //
  197. var GoRpc goRpc
  198. func (x goRpc) ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec {
  199. return &goRpcCodec{newRPCCodec(conn, h)}
  200. }
  201. func (x goRpc) ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec {
  202. return &goRpcCodec{newRPCCodec(conn, h)}
  203. }