header_transport.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810
  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing,
  13. * software distributed under the License is distributed on an
  14. * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. * KIND, either express or implied. See the License for the
  16. * specific language governing permissions and limitations
  17. * under the License.
  18. */
  19. package thrift
  20. import (
  21. "bufio"
  22. "bytes"
  23. "compress/zlib"
  24. "context"
  25. "encoding/binary"
  26. "errors"
  27. "fmt"
  28. "io"
  29. "io/ioutil"
  30. )
  31. // Size in bytes for 32-bit ints.
  32. const size32 = 4
  33. type headerMeta struct {
  34. MagicFlags uint32
  35. SequenceID int32
  36. HeaderLength uint16
  37. }
  38. const headerMetaSize = 10
  39. type clientType int
  40. const (
  41. clientUnknown clientType = iota
  42. clientHeaders
  43. clientFramedBinary
  44. clientUnframedBinary
  45. clientFramedCompact
  46. clientUnframedCompact
  47. )
  48. // Constants defined in THeader format:
  49. // https://github.com/apache/thrift/blob/master/doc/specs/HeaderFormat.md
  50. const (
  51. THeaderHeaderMagic uint32 = 0x0fff0000
  52. THeaderHeaderMask uint32 = 0xffff0000
  53. THeaderFlagsMask uint32 = 0x0000ffff
  54. THeaderMaxFrameSize uint32 = 0x3fffffff
  55. )
  56. // THeaderMap is the type of the header map in THeader transport.
  57. type THeaderMap map[string]string
  58. // THeaderProtocolID is the wrapped protocol id used in THeader.
  59. type THeaderProtocolID int32
  60. // Supported THeaderProtocolID values.
  61. const (
  62. THeaderProtocolBinary THeaderProtocolID = 0x00
  63. THeaderProtocolCompact THeaderProtocolID = 0x02
  64. THeaderProtocolDefault = THeaderProtocolBinary
  65. )
  66. // Declared globally to avoid repetitive allocations, not really used.
  67. var globalMemoryBuffer = NewTMemoryBuffer()
  68. // Validate checks whether the THeaderProtocolID is a valid/supported one.
  69. func (id THeaderProtocolID) Validate() error {
  70. _, err := id.GetProtocol(globalMemoryBuffer)
  71. return err
  72. }
  73. // GetProtocol gets the corresponding TProtocol from the wrapped protocol id.
  74. func (id THeaderProtocolID) GetProtocol(trans TTransport) (TProtocol, error) {
  75. switch id {
  76. default:
  77. return nil, NewTApplicationException(
  78. INVALID_PROTOCOL,
  79. fmt.Sprintf("THeader protocol id %d not supported", id),
  80. )
  81. case THeaderProtocolBinary:
  82. return NewTBinaryProtocolTransport(trans), nil
  83. case THeaderProtocolCompact:
  84. return NewTCompactProtocol(trans), nil
  85. }
  86. }
  87. // THeaderTransformID defines the numeric id of the transform used.
  88. type THeaderTransformID int32
  89. // THeaderTransformID values.
  90. //
  91. // Values not defined here are not currently supported, namely HMAC and Snappy.
  92. const (
  93. TransformNone THeaderTransformID = iota // 0, no special handling
  94. TransformZlib // 1, zlib
  95. )
  96. var supportedTransformIDs = map[THeaderTransformID]bool{
  97. TransformNone: true,
  98. TransformZlib: true,
  99. }
  100. // TransformReader is an io.ReadCloser that handles transforms reading.
  101. type TransformReader struct {
  102. io.Reader
  103. closers []io.Closer
  104. }
  105. var _ io.ReadCloser = (*TransformReader)(nil)
  106. // NewTransformReaderWithCapacity initializes a TransformReader with expected
  107. // closers capacity.
  108. //
  109. // If you don't know the closers capacity beforehand, just use
  110. //
  111. // &TransformReader{Reader: baseReader}
  112. //
  113. // instead would be sufficient.
  114. func NewTransformReaderWithCapacity(baseReader io.Reader, capacity int) *TransformReader {
  115. return &TransformReader{
  116. Reader: baseReader,
  117. closers: make([]io.Closer, 0, capacity),
  118. }
  119. }
  120. // Close calls the underlying closers in appropriate order,
  121. // stops at and returns the first error encountered.
  122. func (tr *TransformReader) Close() error {
  123. // Call closers in reversed order
  124. for i := len(tr.closers) - 1; i >= 0; i-- {
  125. if err := tr.closers[i].Close(); err != nil {
  126. return err
  127. }
  128. }
  129. return nil
  130. }
  131. // AddTransform adds a transform.
  132. func (tr *TransformReader) AddTransform(id THeaderTransformID) error {
  133. switch id {
  134. default:
  135. return NewTApplicationException(
  136. INVALID_TRANSFORM,
  137. fmt.Sprintf("THeaderTransformID %d not supported", id),
  138. )
  139. case TransformNone:
  140. // no-op
  141. case TransformZlib:
  142. readCloser, err := zlib.NewReader(tr.Reader)
  143. if err != nil {
  144. return err
  145. }
  146. tr.Reader = readCloser
  147. tr.closers = append(tr.closers, readCloser)
  148. }
  149. return nil
  150. }
  151. // TransformWriter is an io.WriteCloser that handles transforms writing.
  152. type TransformWriter struct {
  153. io.Writer
  154. closers []io.Closer
  155. }
  156. var _ io.WriteCloser = (*TransformWriter)(nil)
  157. // NewTransformWriter creates a new TransformWriter with base writer and transforms.
  158. func NewTransformWriter(baseWriter io.Writer, transforms []THeaderTransformID) (io.WriteCloser, error) {
  159. writer := &TransformWriter{
  160. Writer: baseWriter,
  161. closers: make([]io.Closer, 0, len(transforms)),
  162. }
  163. for _, id := range transforms {
  164. if err := writer.AddTransform(id); err != nil {
  165. return nil, err
  166. }
  167. }
  168. return writer, nil
  169. }
  170. // Close calls the underlying closers in appropriate order,
  171. // stops at and returns the first error encountered.
  172. func (tw *TransformWriter) Close() error {
  173. // Call closers in reversed order
  174. for i := len(tw.closers) - 1; i >= 0; i-- {
  175. if err := tw.closers[i].Close(); err != nil {
  176. return err
  177. }
  178. }
  179. return nil
  180. }
  181. // AddTransform adds a transform.
  182. func (tw *TransformWriter) AddTransform(id THeaderTransformID) error {
  183. switch id {
  184. default:
  185. return NewTApplicationException(
  186. INVALID_TRANSFORM,
  187. fmt.Sprintf("THeaderTransformID %d not supported", id),
  188. )
  189. case TransformNone:
  190. // no-op
  191. case TransformZlib:
  192. writeCloser := zlib.NewWriter(tw.Writer)
  193. tw.Writer = writeCloser
  194. tw.closers = append(tw.closers, writeCloser)
  195. }
  196. return nil
  197. }
  198. // THeaderInfoType is the type id of the info headers.
  199. type THeaderInfoType int32
  200. // Supported THeaderInfoType values.
  201. const (
  202. _ THeaderInfoType = iota // Skip 0
  203. InfoKeyValue // 1
  204. // Rest of the info types are not supported.
  205. )
  206. // THeaderTransport is a Transport mode that implements THeader.
  207. //
  208. // Note that THeaderTransport handles frame and zlib by itself,
  209. // so the underlying transport should be a raw socket transports (TSocket or TSSLSocket),
  210. // instead of rich transports like TZlibTransport or TFramedTransport.
  211. type THeaderTransport struct {
  212. SequenceID int32
  213. Flags uint32
  214. transport TTransport
  215. // THeaderMap for read and write
  216. readHeaders THeaderMap
  217. writeHeaders THeaderMap
  218. // Reading related variables.
  219. reader *bufio.Reader
  220. // When frame is detected, we read the frame fully into frameBuffer.
  221. frameBuffer bytes.Buffer
  222. // When it's non-nil, Read should read from frameReader instead of
  223. // reader, and EOF error indicates end of frame instead of end of all
  224. // transport.
  225. frameReader io.ReadCloser
  226. // Writing related variables
  227. writeBuffer bytes.Buffer
  228. writeTransforms []THeaderTransformID
  229. clientType clientType
  230. protocolID THeaderProtocolID
  231. cfg *TConfiguration
  232. // buffer is used in the following scenarios to avoid repetitive
  233. // allocations, while 4 is big enough for all those scenarios:
  234. //
  235. // * header padding (max size 4)
  236. // * write the frame size (size 4)
  237. buffer [4]byte
  238. }
  239. var _ TTransport = (*THeaderTransport)(nil)
  240. // Deprecated: Use NewTHeaderTransportConf instead.
  241. func NewTHeaderTransport(trans TTransport) *THeaderTransport {
  242. return NewTHeaderTransportConf(trans, &TConfiguration{
  243. noPropagation: true,
  244. })
  245. }
  246. // NewTHeaderTransportConf creates THeaderTransport from the
  247. // underlying transport, with given TConfiguration attached.
  248. //
  249. // If trans is already a *THeaderTransport, it will be returned as is,
  250. // but with TConfiguration overridden by the value passed in.
  251. //
  252. // The protocol ID in TConfiguration is only useful for client transports.
  253. // For servers,
  254. // the protocol ID will be overridden again to the one set by the client,
  255. // to ensure that servers always speak the same dialect as the client.
  256. func NewTHeaderTransportConf(trans TTransport, conf *TConfiguration) *THeaderTransport {
  257. if ht, ok := trans.(*THeaderTransport); ok {
  258. ht.SetTConfiguration(conf)
  259. return ht
  260. }
  261. PropagateTConfiguration(trans, conf)
  262. return &THeaderTransport{
  263. transport: trans,
  264. reader: bufio.NewReader(trans),
  265. writeHeaders: make(THeaderMap),
  266. protocolID: conf.GetTHeaderProtocolID(),
  267. cfg: conf,
  268. }
  269. }
  270. // Open calls the underlying transport's Open function.
  271. func (t *THeaderTransport) Open() error {
  272. return t.transport.Open()
  273. }
  274. // IsOpen calls the underlying transport's IsOpen function.
  275. func (t *THeaderTransport) IsOpen() bool {
  276. return t.transport.IsOpen()
  277. }
  278. // ReadFrame tries to read the frame header, guess the client type, and handle
  279. // unframed clients.
  280. func (t *THeaderTransport) ReadFrame(ctx context.Context) error {
  281. if !t.needReadFrame() {
  282. // No need to read frame, skipping.
  283. return nil
  284. }
  285. // Peek and handle the first 32 bits.
  286. // They could either be the length field of a framed message,
  287. // or the first bytes of an unframed message.
  288. var buf []byte
  289. var err error
  290. // This is also usually the first read from a connection,
  291. // so handle retries around socket timeouts.
  292. _, deadlineSet := ctx.Deadline()
  293. for {
  294. buf, err = t.reader.Peek(size32)
  295. if deadlineSet && isTimeoutError(err) && ctx.Err() == nil {
  296. // This is I/O timeout and we still have time,
  297. // continue trying
  298. continue
  299. }
  300. // For anything else, do not retry
  301. break
  302. }
  303. if err != nil {
  304. return err
  305. }
  306. frameSize := binary.BigEndian.Uint32(buf)
  307. if frameSize&VERSION_MASK == VERSION_1 {
  308. t.clientType = clientUnframedBinary
  309. return nil
  310. }
  311. if buf[0] == COMPACT_PROTOCOL_ID && buf[1]&COMPACT_VERSION_MASK == COMPACT_VERSION {
  312. t.clientType = clientUnframedCompact
  313. return nil
  314. }
  315. // At this point it should be a framed message,
  316. // sanity check on frameSize then discard the peeked part.
  317. if frameSize > THeaderMaxFrameSize || frameSize > uint32(t.cfg.GetMaxFrameSize()) {
  318. return NewTProtocolExceptionWithType(
  319. SIZE_LIMIT,
  320. errors.New("frame too large"),
  321. )
  322. }
  323. t.reader.Discard(size32)
  324. // Read the frame fully into frameBuffer.
  325. _, err = io.CopyN(&t.frameBuffer, t.reader, int64(frameSize))
  326. if err != nil {
  327. return err
  328. }
  329. t.frameReader = ioutil.NopCloser(&t.frameBuffer)
  330. // Peek and handle the next 32 bits.
  331. buf = t.frameBuffer.Bytes()[:size32]
  332. version := binary.BigEndian.Uint32(buf)
  333. if version&THeaderHeaderMask == THeaderHeaderMagic {
  334. t.clientType = clientHeaders
  335. return t.parseHeaders(ctx, frameSize)
  336. }
  337. if version&VERSION_MASK == VERSION_1 {
  338. t.clientType = clientFramedBinary
  339. return nil
  340. }
  341. if buf[0] == COMPACT_PROTOCOL_ID && buf[1]&COMPACT_VERSION_MASK == COMPACT_VERSION {
  342. t.clientType = clientFramedCompact
  343. return nil
  344. }
  345. if err := t.endOfFrame(); err != nil {
  346. return err
  347. }
  348. return NewTProtocolExceptionWithType(
  349. NOT_IMPLEMENTED,
  350. errors.New("unsupported client transport type"),
  351. )
  352. }
  353. // endOfFrame does end of frame handling.
  354. //
  355. // It closes frameReader, and also resets frame related states.
  356. func (t *THeaderTransport) endOfFrame() error {
  357. defer func() {
  358. t.frameBuffer.Reset()
  359. t.frameReader = nil
  360. }()
  361. return t.frameReader.Close()
  362. }
  363. func (t *THeaderTransport) parseHeaders(ctx context.Context, frameSize uint32) error {
  364. if t.clientType != clientHeaders {
  365. return nil
  366. }
  367. var err error
  368. var meta headerMeta
  369. if err = binary.Read(&t.frameBuffer, binary.BigEndian, &meta); err != nil {
  370. return err
  371. }
  372. frameSize -= headerMetaSize
  373. t.Flags = meta.MagicFlags & THeaderFlagsMask
  374. t.SequenceID = meta.SequenceID
  375. headerLength := int64(meta.HeaderLength) * 4
  376. if int64(frameSize) < headerLength {
  377. return NewTProtocolExceptionWithType(
  378. SIZE_LIMIT,
  379. errors.New("header size is larger than the whole frame"),
  380. )
  381. }
  382. headerBuf := NewTMemoryBuffer()
  383. _, err = io.CopyN(headerBuf, &t.frameBuffer, headerLength)
  384. if err != nil {
  385. return err
  386. }
  387. hp := NewTCompactProtocol(headerBuf)
  388. hp.SetTConfiguration(t.cfg)
  389. // At this point the header is already read into headerBuf,
  390. // and t.frameBuffer starts from the actual payload.
  391. protoID, err := hp.readVarint32()
  392. if err != nil {
  393. return err
  394. }
  395. t.protocolID = THeaderProtocolID(protoID)
  396. var transformCount int32
  397. transformCount, err = hp.readVarint32()
  398. if err != nil {
  399. return err
  400. }
  401. if transformCount > 0 {
  402. reader := NewTransformReaderWithCapacity(
  403. &t.frameBuffer,
  404. int(transformCount),
  405. )
  406. t.frameReader = reader
  407. transformIDs := make([]THeaderTransformID, transformCount)
  408. for i := 0; i < int(transformCount); i++ {
  409. id, err := hp.readVarint32()
  410. if err != nil {
  411. return err
  412. }
  413. transformIDs[i] = THeaderTransformID(id)
  414. }
  415. // The transform IDs on the wire was added based on the order of
  416. // writing, so on the reading side we need to reverse the order.
  417. for i := transformCount - 1; i >= 0; i-- {
  418. id := transformIDs[i]
  419. if err := reader.AddTransform(id); err != nil {
  420. return err
  421. }
  422. }
  423. }
  424. // The info part does not use the transforms yet, so it's
  425. // important to continue using headerBuf.
  426. headers := make(THeaderMap)
  427. for {
  428. infoType, err := hp.readVarint32()
  429. if errors.Is(err, io.EOF) {
  430. break
  431. }
  432. if err != nil {
  433. return err
  434. }
  435. if THeaderInfoType(infoType) == InfoKeyValue {
  436. count, err := hp.readVarint32()
  437. if err != nil {
  438. return err
  439. }
  440. for i := 0; i < int(count); i++ {
  441. key, err := hp.ReadString(ctx)
  442. if err != nil {
  443. return err
  444. }
  445. value, err := hp.ReadString(ctx)
  446. if err != nil {
  447. return err
  448. }
  449. headers[key] = value
  450. }
  451. } else {
  452. // Skip reading info section on the first
  453. // unsupported info type.
  454. break
  455. }
  456. }
  457. t.readHeaders = headers
  458. return nil
  459. }
  460. func (t *THeaderTransport) needReadFrame() bool {
  461. if t.clientType == clientUnknown {
  462. // This is a new connection that's never read before.
  463. return true
  464. }
  465. if t.isFramed() && t.frameReader == nil {
  466. // We just finished the last frame.
  467. return true
  468. }
  469. return false
  470. }
  471. func (t *THeaderTransport) Read(p []byte) (read int, err error) {
  472. // Here using context.Background instead of a context passed in is safe.
  473. // First is that there's no way to pass context into this function.
  474. // Then, 99% of the case when calling this Read frame is already read
  475. // into frameReader. ReadFrame here is more of preventing bugs that
  476. // didn't call ReadFrame before calling Read.
  477. err = t.ReadFrame(context.Background())
  478. if err != nil {
  479. return
  480. }
  481. if t.frameReader != nil {
  482. read, err = t.frameReader.Read(p)
  483. if err == nil && t.frameBuffer.Len() <= 0 {
  484. // the last Read finished the frame, do endOfFrame
  485. // handling here.
  486. err = t.endOfFrame()
  487. } else if err == io.EOF {
  488. err = t.endOfFrame()
  489. if err != nil {
  490. return
  491. }
  492. if read == 0 {
  493. // Try to read the next frame when we hit EOF
  494. // (end of frame) immediately.
  495. // When we got here, it means the last read
  496. // finished the previous frame, but didn't
  497. // do endOfFrame handling yet.
  498. // We have to read the next frame here,
  499. // as otherwise we would return 0 and nil,
  500. // which is a case not handled well by most
  501. // protocol implementations.
  502. return t.Read(p)
  503. }
  504. }
  505. return
  506. }
  507. return t.reader.Read(p)
  508. }
  509. // Write writes data to the write buffer.
  510. //
  511. // You need to call Flush to actually write them to the transport.
  512. func (t *THeaderTransport) Write(p []byte) (int, error) {
  513. return t.writeBuffer.Write(p)
  514. }
  515. // Flush writes the appropriate header and the write buffer to the underlying transport.
  516. func (t *THeaderTransport) Flush(ctx context.Context) error {
  517. if t.writeBuffer.Len() == 0 {
  518. return nil
  519. }
  520. defer t.writeBuffer.Reset()
  521. switch t.clientType {
  522. default:
  523. fallthrough
  524. case clientUnknown:
  525. t.clientType = clientHeaders
  526. fallthrough
  527. case clientHeaders:
  528. headers := NewTMemoryBuffer()
  529. hp := NewTCompactProtocol(headers)
  530. hp.SetTConfiguration(t.cfg)
  531. if _, err := hp.writeVarint32(int32(t.protocolID)); err != nil {
  532. return NewTTransportExceptionFromError(err)
  533. }
  534. if _, err := hp.writeVarint32(int32(len(t.writeTransforms))); err != nil {
  535. return NewTTransportExceptionFromError(err)
  536. }
  537. for _, transform := range t.writeTransforms {
  538. if _, err := hp.writeVarint32(int32(transform)); err != nil {
  539. return NewTTransportExceptionFromError(err)
  540. }
  541. }
  542. if len(t.writeHeaders) > 0 {
  543. if _, err := hp.writeVarint32(int32(InfoKeyValue)); err != nil {
  544. return NewTTransportExceptionFromError(err)
  545. }
  546. if _, err := hp.writeVarint32(int32(len(t.writeHeaders))); err != nil {
  547. return NewTTransportExceptionFromError(err)
  548. }
  549. for key, value := range t.writeHeaders {
  550. if err := hp.WriteString(ctx, key); err != nil {
  551. return NewTTransportExceptionFromError(err)
  552. }
  553. if err := hp.WriteString(ctx, value); err != nil {
  554. return NewTTransportExceptionFromError(err)
  555. }
  556. }
  557. }
  558. padding := 4 - headers.Len()%4
  559. if padding < 4 {
  560. buf := t.buffer[:padding]
  561. for i := range buf {
  562. buf[i] = 0
  563. }
  564. if _, err := headers.Write(buf); err != nil {
  565. return NewTTransportExceptionFromError(err)
  566. }
  567. }
  568. var payload bytes.Buffer
  569. meta := headerMeta{
  570. MagicFlags: THeaderHeaderMagic + t.Flags&THeaderFlagsMask,
  571. SequenceID: t.SequenceID,
  572. HeaderLength: uint16(headers.Len() / 4),
  573. }
  574. if err := binary.Write(&payload, binary.BigEndian, meta); err != nil {
  575. return NewTTransportExceptionFromError(err)
  576. }
  577. if _, err := io.Copy(&payload, headers); err != nil {
  578. return NewTTransportExceptionFromError(err)
  579. }
  580. writer, err := NewTransformWriter(&payload, t.writeTransforms)
  581. if err != nil {
  582. return NewTTransportExceptionFromError(err)
  583. }
  584. if _, err := io.Copy(writer, &t.writeBuffer); err != nil {
  585. return NewTTransportExceptionFromError(err)
  586. }
  587. if err := writer.Close(); err != nil {
  588. return NewTTransportExceptionFromError(err)
  589. }
  590. // First write frame length
  591. buf := t.buffer[:size32]
  592. binary.BigEndian.PutUint32(buf, uint32(payload.Len()))
  593. if _, err := t.transport.Write(buf); err != nil {
  594. return NewTTransportExceptionFromError(err)
  595. }
  596. // Then write the payload
  597. if _, err := io.Copy(t.transport, &payload); err != nil {
  598. return NewTTransportExceptionFromError(err)
  599. }
  600. case clientFramedBinary, clientFramedCompact:
  601. buf := t.buffer[:size32]
  602. binary.BigEndian.PutUint32(buf, uint32(t.writeBuffer.Len()))
  603. if _, err := t.transport.Write(buf); err != nil {
  604. return NewTTransportExceptionFromError(err)
  605. }
  606. fallthrough
  607. case clientUnframedBinary, clientUnframedCompact:
  608. if _, err := io.Copy(t.transport, &t.writeBuffer); err != nil {
  609. return NewTTransportExceptionFromError(err)
  610. }
  611. }
  612. select {
  613. default:
  614. case <-ctx.Done():
  615. return NewTTransportExceptionFromError(ctx.Err())
  616. }
  617. return t.transport.Flush(ctx)
  618. }
  619. // Close closes the transport, along with its underlying transport.
  620. func (t *THeaderTransport) Close() error {
  621. if err := t.Flush(context.Background()); err != nil {
  622. return err
  623. }
  624. return t.transport.Close()
  625. }
  626. // RemainingBytes calls underlying transport's RemainingBytes.
  627. //
  628. // Even in framed cases, because of all the possible compression transforms
  629. // involved, the remaining frame size is likely to be different from the actual
  630. // remaining readable bytes, so we don't bother to keep tracking the remaining
  631. // frame size by ourselves and just use the underlying transport's
  632. // RemainingBytes directly.
  633. func (t *THeaderTransport) RemainingBytes() uint64 {
  634. return t.transport.RemainingBytes()
  635. }
  636. // GetReadHeaders returns the THeaderMap read from transport.
  637. func (t *THeaderTransport) GetReadHeaders() THeaderMap {
  638. return t.readHeaders
  639. }
  640. // SetWriteHeader sets a header for write.
  641. func (t *THeaderTransport) SetWriteHeader(key, value string) {
  642. t.writeHeaders[key] = value
  643. }
  644. // ClearWriteHeaders clears all write headers previously set.
  645. func (t *THeaderTransport) ClearWriteHeaders() {
  646. t.writeHeaders = make(THeaderMap)
  647. }
  648. // AddTransform add a transform for writing.
  649. func (t *THeaderTransport) AddTransform(transform THeaderTransformID) error {
  650. if !supportedTransformIDs[transform] {
  651. return NewTProtocolExceptionWithType(
  652. NOT_IMPLEMENTED,
  653. fmt.Errorf("THeaderTransformID %d not supported", transform),
  654. )
  655. }
  656. t.writeTransforms = append(t.writeTransforms, transform)
  657. return nil
  658. }
  659. // Protocol returns the wrapped protocol id used in this THeaderTransport.
  660. func (t *THeaderTransport) Protocol() THeaderProtocolID {
  661. switch t.clientType {
  662. default:
  663. return t.protocolID
  664. case clientFramedBinary, clientUnframedBinary:
  665. return THeaderProtocolBinary
  666. case clientFramedCompact, clientUnframedCompact:
  667. return THeaderProtocolCompact
  668. }
  669. }
  670. func (t *THeaderTransport) isFramed() bool {
  671. switch t.clientType {
  672. default:
  673. return false
  674. case clientHeaders, clientFramedBinary, clientFramedCompact:
  675. return true
  676. }
  677. }
  678. // SetTConfiguration implements TConfigurationSetter.
  679. func (t *THeaderTransport) SetTConfiguration(cfg *TConfiguration) {
  680. PropagateTConfiguration(t.transport, cfg)
  681. t.cfg = cfg
  682. }
  683. // THeaderTransportFactory is a TTransportFactory implementation to create
  684. // THeaderTransport.
  685. //
  686. // It also implements TConfigurationSetter.
  687. type THeaderTransportFactory struct {
  688. // The underlying factory, could be nil.
  689. Factory TTransportFactory
  690. cfg *TConfiguration
  691. }
  692. // Deprecated: Use NewTHeaderTransportFactoryConf instead.
  693. func NewTHeaderTransportFactory(factory TTransportFactory) TTransportFactory {
  694. return NewTHeaderTransportFactoryConf(factory, &TConfiguration{
  695. noPropagation: true,
  696. })
  697. }
  698. // NewTHeaderTransportFactoryConf creates a new *THeaderTransportFactory with
  699. // the given *TConfiguration.
  700. func NewTHeaderTransportFactoryConf(factory TTransportFactory, conf *TConfiguration) TTransportFactory {
  701. return &THeaderTransportFactory{
  702. Factory: factory,
  703. cfg: conf,
  704. }
  705. }
  706. // GetTransport implements TTransportFactory.
  707. func (f *THeaderTransportFactory) GetTransport(trans TTransport) (TTransport, error) {
  708. if f.Factory != nil {
  709. t, err := f.Factory.GetTransport(trans)
  710. if err != nil {
  711. return nil, err
  712. }
  713. return NewTHeaderTransportConf(t, f.cfg), nil
  714. }
  715. return NewTHeaderTransportConf(trans, f.cfg), nil
  716. }
  717. // SetTConfiguration implements TConfigurationSetter.
  718. func (f *THeaderTransportFactory) SetTConfiguration(cfg *TConfiguration) {
  719. PropagateTConfiguration(f.Factory, f.cfg)
  720. f.cfg = cfg
  721. }
  722. var (
  723. _ TConfigurationSetter = (*THeaderTransportFactory)(nil)
  724. _ TConfigurationSetter = (*THeaderTransport)(nil)
  725. )