simple_server.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing,
  13. * software distributed under the License is distributed on an
  14. * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. * KIND, either express or implied. See the License for the
  16. * specific language governing permissions and limitations
  17. * under the License.
  18. */
  19. package thrift
  20. import (
  21. "errors"
  22. "fmt"
  23. "io"
  24. "sync"
  25. "sync/atomic"
  26. "time"
  27. )
  28. // ErrAbandonRequest is a special error server handler implementations can
  29. // return to indicate that the request has been abandoned.
  30. //
  31. // TSimpleServer will check for this error, and close the client connection
  32. // instead of writing the response/error back to the client.
  33. //
  34. // It shall only be used when the server handler implementation know that the
  35. // client already abandoned the request (by checking that the passed in context
  36. // is already canceled, for example).
  37. var ErrAbandonRequest = errors.New("request abandoned")
  38. // ServerConnectivityCheckInterval defines the ticker interval used by
  39. // connectivity check in thrift compiled TProcessorFunc implementations.
  40. //
  41. // It's defined as a variable instead of constant, so that thrift server
  42. // implementations can change its value to control the behavior.
  43. //
  44. // If it's changed to <=0, the feature will be disabled.
  45. var ServerConnectivityCheckInterval = time.Millisecond * 5
  46. /*
  47. * This is not a typical TSimpleServer as it is not blocked after accept a socket.
  48. * It is more like a TThreadedServer that can handle different connections in different goroutines.
  49. * This will work if golang user implements a conn-pool like thing in client side.
  50. */
  51. type TSimpleServer struct {
  52. closed int32
  53. wg sync.WaitGroup
  54. mu sync.Mutex
  55. processorFactory TProcessorFactory
  56. serverTransport TServerTransport
  57. inputTransportFactory TTransportFactory
  58. outputTransportFactory TTransportFactory
  59. inputProtocolFactory TProtocolFactory
  60. outputProtocolFactory TProtocolFactory
  61. // Headers to auto forward in THeaderProtocol
  62. forwardHeaders []string
  63. logger Logger
  64. }
  65. func NewTSimpleServer2(processor TProcessor, serverTransport TServerTransport) *TSimpleServer {
  66. return NewTSimpleServerFactory2(NewTProcessorFactory(processor), serverTransport)
  67. }
  68. func NewTSimpleServer4(processor TProcessor, serverTransport TServerTransport, transportFactory TTransportFactory, protocolFactory TProtocolFactory) *TSimpleServer {
  69. return NewTSimpleServerFactory4(NewTProcessorFactory(processor),
  70. serverTransport,
  71. transportFactory,
  72. protocolFactory,
  73. )
  74. }
  75. func NewTSimpleServer6(processor TProcessor, serverTransport TServerTransport, inputTransportFactory TTransportFactory, outputTransportFactory TTransportFactory, inputProtocolFactory TProtocolFactory, outputProtocolFactory TProtocolFactory) *TSimpleServer {
  76. return NewTSimpleServerFactory6(NewTProcessorFactory(processor),
  77. serverTransport,
  78. inputTransportFactory,
  79. outputTransportFactory,
  80. inputProtocolFactory,
  81. outputProtocolFactory,
  82. )
  83. }
  84. func NewTSimpleServerFactory2(processorFactory TProcessorFactory, serverTransport TServerTransport) *TSimpleServer {
  85. return NewTSimpleServerFactory6(processorFactory,
  86. serverTransport,
  87. NewTTransportFactory(),
  88. NewTTransportFactory(),
  89. NewTBinaryProtocolFactoryDefault(),
  90. NewTBinaryProtocolFactoryDefault(),
  91. )
  92. }
  93. func NewTSimpleServerFactory4(processorFactory TProcessorFactory, serverTransport TServerTransport, transportFactory TTransportFactory, protocolFactory TProtocolFactory) *TSimpleServer {
  94. return NewTSimpleServerFactory6(processorFactory,
  95. serverTransport,
  96. transportFactory,
  97. transportFactory,
  98. protocolFactory,
  99. protocolFactory,
  100. )
  101. }
  102. func NewTSimpleServerFactory6(processorFactory TProcessorFactory, serverTransport TServerTransport, inputTransportFactory TTransportFactory, outputTransportFactory TTransportFactory, inputProtocolFactory TProtocolFactory, outputProtocolFactory TProtocolFactory) *TSimpleServer {
  103. return &TSimpleServer{
  104. processorFactory: processorFactory,
  105. serverTransport: serverTransport,
  106. inputTransportFactory: inputTransportFactory,
  107. outputTransportFactory: outputTransportFactory,
  108. inputProtocolFactory: inputProtocolFactory,
  109. outputProtocolFactory: outputProtocolFactory,
  110. }
  111. }
  112. func (p *TSimpleServer) ProcessorFactory() TProcessorFactory {
  113. return p.processorFactory
  114. }
  115. func (p *TSimpleServer) ServerTransport() TServerTransport {
  116. return p.serverTransport
  117. }
  118. func (p *TSimpleServer) InputTransportFactory() TTransportFactory {
  119. return p.inputTransportFactory
  120. }
  121. func (p *TSimpleServer) OutputTransportFactory() TTransportFactory {
  122. return p.outputTransportFactory
  123. }
  124. func (p *TSimpleServer) InputProtocolFactory() TProtocolFactory {
  125. return p.inputProtocolFactory
  126. }
  127. func (p *TSimpleServer) OutputProtocolFactory() TProtocolFactory {
  128. return p.outputProtocolFactory
  129. }
  130. func (p *TSimpleServer) Listen() error {
  131. return p.serverTransport.Listen()
  132. }
  133. // SetForwardHeaders sets the list of header keys that will be auto forwarded
  134. // while using THeaderProtocol.
  135. //
  136. // "forward" means that when the server is also a client to other upstream
  137. // thrift servers, the context object user gets in the processor functions will
  138. // have both read and write headers set, with write headers being forwarded.
  139. // Users can always override the write headers by calling SetWriteHeaderList
  140. // before calling thrift client functions.
  141. func (p *TSimpleServer) SetForwardHeaders(headers []string) {
  142. size := len(headers)
  143. if size == 0 {
  144. p.forwardHeaders = nil
  145. return
  146. }
  147. keys := make([]string, size)
  148. copy(keys, headers)
  149. p.forwardHeaders = keys
  150. }
  151. // SetLogger sets the logger used by this TSimpleServer.
  152. //
  153. // If no logger was set before Serve is called, a default logger using standard
  154. // log library will be used.
  155. func (p *TSimpleServer) SetLogger(logger Logger) {
  156. p.logger = logger
  157. }
  158. func (p *TSimpleServer) innerAccept() (int32, error) {
  159. client, err := p.serverTransport.Accept()
  160. p.mu.Lock()
  161. defer p.mu.Unlock()
  162. closed := atomic.LoadInt32(&p.closed)
  163. if closed != 0 {
  164. return closed, nil
  165. }
  166. if err != nil {
  167. return 0, err
  168. }
  169. if client != nil {
  170. p.wg.Add(1)
  171. go func() {
  172. defer p.wg.Done()
  173. if err := p.processRequests(client); err != nil {
  174. p.logger(fmt.Sprintf("error processing request: %v", err))
  175. }
  176. }()
  177. }
  178. return 0, nil
  179. }
  180. func (p *TSimpleServer) AcceptLoop() error {
  181. for {
  182. closed, err := p.innerAccept()
  183. if err != nil {
  184. return err
  185. }
  186. if closed != 0 {
  187. return nil
  188. }
  189. }
  190. }
  191. func (p *TSimpleServer) Serve() error {
  192. p.logger = fallbackLogger(p.logger)
  193. err := p.Listen()
  194. if err != nil {
  195. return err
  196. }
  197. p.AcceptLoop()
  198. return nil
  199. }
  200. func (p *TSimpleServer) Stop() error {
  201. p.mu.Lock()
  202. defer p.mu.Unlock()
  203. if atomic.LoadInt32(&p.closed) != 0 {
  204. return nil
  205. }
  206. atomic.StoreInt32(&p.closed, 1)
  207. p.serverTransport.Interrupt()
  208. p.wg.Wait()
  209. return nil
  210. }
  211. // If err is actually EOF, return nil, otherwise return err as-is.
  212. func treatEOFErrorsAsNil(err error) error {
  213. if err == nil {
  214. return nil
  215. }
  216. if errors.Is(err, io.EOF) {
  217. return nil
  218. }
  219. var te TTransportException
  220. if errors.As(err, &te) && te.TypeId() == END_OF_FILE {
  221. return nil
  222. }
  223. return err
  224. }
  225. func (p *TSimpleServer) processRequests(client TTransport) (err error) {
  226. defer func() {
  227. err = treatEOFErrorsAsNil(err)
  228. }()
  229. processor := p.processorFactory.GetProcessor(client)
  230. inputTransport, err := p.inputTransportFactory.GetTransport(client)
  231. if err != nil {
  232. return err
  233. }
  234. inputProtocol := p.inputProtocolFactory.GetProtocol(inputTransport)
  235. var outputTransport TTransport
  236. var outputProtocol TProtocol
  237. // for THeaderProtocol, we must use the same protocol instance for
  238. // input and output so that the response is in the same dialect that
  239. // the server detected the request was in.
  240. headerProtocol, ok := inputProtocol.(*THeaderProtocol)
  241. if ok {
  242. outputProtocol = inputProtocol
  243. } else {
  244. oTrans, err := p.outputTransportFactory.GetTransport(client)
  245. if err != nil {
  246. return err
  247. }
  248. outputTransport = oTrans
  249. outputProtocol = p.outputProtocolFactory.GetProtocol(outputTransport)
  250. }
  251. if inputTransport != nil {
  252. defer inputTransport.Close()
  253. }
  254. if outputTransport != nil {
  255. defer outputTransport.Close()
  256. }
  257. for {
  258. if atomic.LoadInt32(&p.closed) != 0 {
  259. return nil
  260. }
  261. ctx := SetResponseHelper(
  262. defaultCtx,
  263. TResponseHelper{
  264. THeaderResponseHelper: NewTHeaderResponseHelper(outputProtocol),
  265. },
  266. )
  267. if headerProtocol != nil {
  268. // We need to call ReadFrame here, otherwise we won't
  269. // get any headers on the AddReadTHeaderToContext call.
  270. //
  271. // ReadFrame is safe to be called multiple times so it
  272. // won't break when it's called again later when we
  273. // actually start to read the message.
  274. if err := headerProtocol.ReadFrame(ctx); err != nil {
  275. return err
  276. }
  277. ctx = AddReadTHeaderToContext(ctx, headerProtocol.GetReadHeaders())
  278. ctx = SetWriteHeaderList(ctx, p.forwardHeaders)
  279. }
  280. ok, err := processor.Process(ctx, inputProtocol, outputProtocol)
  281. if errors.Is(err, ErrAbandonRequest) {
  282. return client.Close()
  283. }
  284. if errors.As(err, new(TTransportException)) && err != nil {
  285. return err
  286. }
  287. var tae TApplicationException
  288. if errors.As(err, &tae) && tae.TypeId() == UNKNOWN_METHOD {
  289. continue
  290. }
  291. if !ok {
  292. break
  293. }
  294. }
  295. return nil
  296. }