header_protocol.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing,
  13. * software distributed under the License is distributed on an
  14. * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. * KIND, either express or implied. See the License for the
  16. * specific language governing permissions and limitations
  17. * under the License.
  18. */
  19. package thrift
  20. import (
  21. "context"
  22. "errors"
  23. )
  24. // THeaderProtocol is a thrift protocol that implements THeader:
  25. // https://github.com/apache/thrift/blob/master/doc/specs/HeaderFormat.md
  26. //
  27. // It supports either binary or compact protocol as the wrapped protocol.
  28. //
  29. // Most of the THeader handlings are happening inside THeaderTransport.
  30. type THeaderProtocol struct {
  31. transport *THeaderTransport
  32. // Will be initialized on first read/write.
  33. protocol TProtocol
  34. cfg *TConfiguration
  35. }
  36. // Deprecated: Use NewTHeaderProtocolConf instead.
  37. func NewTHeaderProtocol(trans TTransport) *THeaderProtocol {
  38. return newTHeaderProtocolConf(trans, &TConfiguration{
  39. noPropagation: true,
  40. })
  41. }
  42. // NewTHeaderProtocolConf creates a new THeaderProtocol from the underlying
  43. // transport with given TConfiguration.
  44. //
  45. // The passed in transport will be wrapped with THeaderTransport.
  46. //
  47. // Note that THeaderTransport handles frame and zlib by itself,
  48. // so the underlying transport should be a raw socket transports (TSocket or TSSLSocket),
  49. // instead of rich transports like TZlibTransport or TFramedTransport.
  50. func NewTHeaderProtocolConf(trans TTransport, conf *TConfiguration) *THeaderProtocol {
  51. return newTHeaderProtocolConf(trans, conf)
  52. }
  53. func newTHeaderProtocolConf(trans TTransport, cfg *TConfiguration) *THeaderProtocol {
  54. t := NewTHeaderTransportConf(trans, cfg)
  55. p, _ := t.cfg.GetTHeaderProtocolID().GetProtocol(t)
  56. PropagateTConfiguration(p, cfg)
  57. return &THeaderProtocol{
  58. transport: t,
  59. protocol: p,
  60. cfg: cfg,
  61. }
  62. }
  63. type tHeaderProtocolFactory struct {
  64. cfg *TConfiguration
  65. }
  66. func (f tHeaderProtocolFactory) GetProtocol(trans TTransport) TProtocol {
  67. return newTHeaderProtocolConf(trans, f.cfg)
  68. }
  69. func (f *tHeaderProtocolFactory) SetTConfiguration(cfg *TConfiguration) {
  70. f.cfg = cfg
  71. }
  72. // Deprecated: Use NewTHeaderProtocolFactoryConf instead.
  73. func NewTHeaderProtocolFactory() TProtocolFactory {
  74. return NewTHeaderProtocolFactoryConf(&TConfiguration{
  75. noPropagation: true,
  76. })
  77. }
  78. // NewTHeaderProtocolFactoryConf creates a factory for THeader with given
  79. // TConfiguration.
  80. func NewTHeaderProtocolFactoryConf(conf *TConfiguration) TProtocolFactory {
  81. return tHeaderProtocolFactory{
  82. cfg: conf,
  83. }
  84. }
  85. // Transport returns the underlying transport.
  86. //
  87. // It's guaranteed to be of type *THeaderTransport.
  88. func (p *THeaderProtocol) Transport() TTransport {
  89. return p.transport
  90. }
  91. // GetReadHeaders returns the THeaderMap read from transport.
  92. func (p *THeaderProtocol) GetReadHeaders() THeaderMap {
  93. return p.transport.GetReadHeaders()
  94. }
  95. // SetWriteHeader sets a header for write.
  96. func (p *THeaderProtocol) SetWriteHeader(key, value string) {
  97. p.transport.SetWriteHeader(key, value)
  98. }
  99. // ClearWriteHeaders clears all write headers previously set.
  100. func (p *THeaderProtocol) ClearWriteHeaders() {
  101. p.transport.ClearWriteHeaders()
  102. }
  103. // AddTransform add a transform for writing.
  104. func (p *THeaderProtocol) AddTransform(transform THeaderTransformID) error {
  105. return p.transport.AddTransform(transform)
  106. }
  107. func (p *THeaderProtocol) Flush(ctx context.Context) error {
  108. return p.transport.Flush(ctx)
  109. }
  110. func (p *THeaderProtocol) WriteMessageBegin(ctx context.Context, name string, typeID TMessageType, seqID int32) error {
  111. newProto, err := p.transport.Protocol().GetProtocol(p.transport)
  112. if err != nil {
  113. return err
  114. }
  115. PropagateTConfiguration(newProto, p.cfg)
  116. p.protocol = newProto
  117. p.transport.SequenceID = seqID
  118. return p.protocol.WriteMessageBegin(ctx, name, typeID, seqID)
  119. }
  120. func (p *THeaderProtocol) WriteMessageEnd(ctx context.Context) error {
  121. if err := p.protocol.WriteMessageEnd(ctx); err != nil {
  122. return err
  123. }
  124. return p.transport.Flush(ctx)
  125. }
  126. func (p *THeaderProtocol) WriteStructBegin(ctx context.Context, name string) error {
  127. return p.protocol.WriteStructBegin(ctx, name)
  128. }
  129. func (p *THeaderProtocol) WriteStructEnd(ctx context.Context) error {
  130. return p.protocol.WriteStructEnd(ctx)
  131. }
  132. func (p *THeaderProtocol) WriteFieldBegin(ctx context.Context, name string, typeID TType, id int16) error {
  133. return p.protocol.WriteFieldBegin(ctx, name, typeID, id)
  134. }
  135. func (p *THeaderProtocol) WriteFieldEnd(ctx context.Context) error {
  136. return p.protocol.WriteFieldEnd(ctx)
  137. }
  138. func (p *THeaderProtocol) WriteFieldStop(ctx context.Context) error {
  139. return p.protocol.WriteFieldStop(ctx)
  140. }
  141. func (p *THeaderProtocol) WriteMapBegin(ctx context.Context, keyType TType, valueType TType, size int) error {
  142. return p.protocol.WriteMapBegin(ctx, keyType, valueType, size)
  143. }
  144. func (p *THeaderProtocol) WriteMapEnd(ctx context.Context) error {
  145. return p.protocol.WriteMapEnd(ctx)
  146. }
  147. func (p *THeaderProtocol) WriteListBegin(ctx context.Context, elemType TType, size int) error {
  148. return p.protocol.WriteListBegin(ctx, elemType, size)
  149. }
  150. func (p *THeaderProtocol) WriteListEnd(ctx context.Context) error {
  151. return p.protocol.WriteListEnd(ctx)
  152. }
  153. func (p *THeaderProtocol) WriteSetBegin(ctx context.Context, elemType TType, size int) error {
  154. return p.protocol.WriteSetBegin(ctx, elemType, size)
  155. }
  156. func (p *THeaderProtocol) WriteSetEnd(ctx context.Context) error {
  157. return p.protocol.WriteSetEnd(ctx)
  158. }
  159. func (p *THeaderProtocol) WriteBool(ctx context.Context, value bool) error {
  160. return p.protocol.WriteBool(ctx, value)
  161. }
  162. func (p *THeaderProtocol) WriteByte(ctx context.Context, value int8) error {
  163. return p.protocol.WriteByte(ctx, value)
  164. }
  165. func (p *THeaderProtocol) WriteI16(ctx context.Context, value int16) error {
  166. return p.protocol.WriteI16(ctx, value)
  167. }
  168. func (p *THeaderProtocol) WriteI32(ctx context.Context, value int32) error {
  169. return p.protocol.WriteI32(ctx, value)
  170. }
  171. func (p *THeaderProtocol) WriteI64(ctx context.Context, value int64) error {
  172. return p.protocol.WriteI64(ctx, value)
  173. }
  174. func (p *THeaderProtocol) WriteDouble(ctx context.Context, value float64) error {
  175. return p.protocol.WriteDouble(ctx, value)
  176. }
  177. func (p *THeaderProtocol) WriteString(ctx context.Context, value string) error {
  178. return p.protocol.WriteString(ctx, value)
  179. }
  180. func (p *THeaderProtocol) WriteBinary(ctx context.Context, value []byte) error {
  181. return p.protocol.WriteBinary(ctx, value)
  182. }
  183. // ReadFrame calls underlying THeaderTransport's ReadFrame function.
  184. func (p *THeaderProtocol) ReadFrame(ctx context.Context) error {
  185. return p.transport.ReadFrame(ctx)
  186. }
  187. func (p *THeaderProtocol) ReadMessageBegin(ctx context.Context) (name string, typeID TMessageType, seqID int32, err error) {
  188. if err = p.transport.ReadFrame(ctx); err != nil {
  189. return
  190. }
  191. var newProto TProtocol
  192. newProto, err = p.transport.Protocol().GetProtocol(p.transport)
  193. if err != nil {
  194. var tAppExc TApplicationException
  195. if !errors.As(err, &tAppExc) {
  196. return
  197. }
  198. if e := p.protocol.WriteMessageBegin(ctx, "", EXCEPTION, seqID); e != nil {
  199. return
  200. }
  201. if e := tAppExc.Write(ctx, p.protocol); e != nil {
  202. return
  203. }
  204. if e := p.protocol.WriteMessageEnd(ctx); e != nil {
  205. return
  206. }
  207. if e := p.transport.Flush(ctx); e != nil {
  208. return
  209. }
  210. return
  211. }
  212. PropagateTConfiguration(newProto, p.cfg)
  213. p.protocol = newProto
  214. return p.protocol.ReadMessageBegin(ctx)
  215. }
  216. func (p *THeaderProtocol) ReadMessageEnd(ctx context.Context) error {
  217. return p.protocol.ReadMessageEnd(ctx)
  218. }
  219. func (p *THeaderProtocol) ReadStructBegin(ctx context.Context) (name string, err error) {
  220. return p.protocol.ReadStructBegin(ctx)
  221. }
  222. func (p *THeaderProtocol) ReadStructEnd(ctx context.Context) error {
  223. return p.protocol.ReadStructEnd(ctx)
  224. }
  225. func (p *THeaderProtocol) ReadFieldBegin(ctx context.Context) (name string, typeID TType, id int16, err error) {
  226. return p.protocol.ReadFieldBegin(ctx)
  227. }
  228. func (p *THeaderProtocol) ReadFieldEnd(ctx context.Context) error {
  229. return p.protocol.ReadFieldEnd(ctx)
  230. }
  231. func (p *THeaderProtocol) ReadMapBegin(ctx context.Context) (keyType TType, valueType TType, size int, err error) {
  232. return p.protocol.ReadMapBegin(ctx)
  233. }
  234. func (p *THeaderProtocol) ReadMapEnd(ctx context.Context) error {
  235. return p.protocol.ReadMapEnd(ctx)
  236. }
  237. func (p *THeaderProtocol) ReadListBegin(ctx context.Context) (elemType TType, size int, err error) {
  238. return p.protocol.ReadListBegin(ctx)
  239. }
  240. func (p *THeaderProtocol) ReadListEnd(ctx context.Context) error {
  241. return p.protocol.ReadListEnd(ctx)
  242. }
  243. func (p *THeaderProtocol) ReadSetBegin(ctx context.Context) (elemType TType, size int, err error) {
  244. return p.protocol.ReadSetBegin(ctx)
  245. }
  246. func (p *THeaderProtocol) ReadSetEnd(ctx context.Context) error {
  247. return p.protocol.ReadSetEnd(ctx)
  248. }
  249. func (p *THeaderProtocol) ReadBool(ctx context.Context) (value bool, err error) {
  250. return p.protocol.ReadBool(ctx)
  251. }
  252. func (p *THeaderProtocol) ReadByte(ctx context.Context) (value int8, err error) {
  253. return p.protocol.ReadByte(ctx)
  254. }
  255. func (p *THeaderProtocol) ReadI16(ctx context.Context) (value int16, err error) {
  256. return p.protocol.ReadI16(ctx)
  257. }
  258. func (p *THeaderProtocol) ReadI32(ctx context.Context) (value int32, err error) {
  259. return p.protocol.ReadI32(ctx)
  260. }
  261. func (p *THeaderProtocol) ReadI64(ctx context.Context) (value int64, err error) {
  262. return p.protocol.ReadI64(ctx)
  263. }
  264. func (p *THeaderProtocol) ReadDouble(ctx context.Context) (value float64, err error) {
  265. return p.protocol.ReadDouble(ctx)
  266. }
  267. func (p *THeaderProtocol) ReadString(ctx context.Context) (value string, err error) {
  268. return p.protocol.ReadString(ctx)
  269. }
  270. func (p *THeaderProtocol) ReadBinary(ctx context.Context) (value []byte, err error) {
  271. return p.protocol.ReadBinary(ctx)
  272. }
  273. func (p *THeaderProtocol) Skip(ctx context.Context, fieldType TType) error {
  274. return p.protocol.Skip(ctx, fieldType)
  275. }
  276. // SetTConfiguration implements TConfigurationSetter.
  277. func (p *THeaderProtocol) SetTConfiguration(cfg *TConfiguration) {
  278. PropagateTConfiguration(p.transport, cfg)
  279. PropagateTConfiguration(p.protocol, cfg)
  280. p.cfg = cfg
  281. }
  282. var (
  283. _ TConfigurationSetter = (*tHeaderProtocolFactory)(nil)
  284. _ TConfigurationSetter = (*THeaderProtocol)(nil)
  285. )