binary_protocol.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555
  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing,
  13. * software distributed under the License is distributed on an
  14. * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. * KIND, either express or implied. See the License for the
  16. * specific language governing permissions and limitations
  17. * under the License.
  18. */
  19. package thrift
  20. import (
  21. "bytes"
  22. "context"
  23. "encoding/binary"
  24. "errors"
  25. "fmt"
  26. "io"
  27. "math"
  28. )
  29. type TBinaryProtocol struct {
  30. trans TRichTransport
  31. origTransport TTransport
  32. cfg *TConfiguration
  33. buffer [64]byte
  34. }
  35. type TBinaryProtocolFactory struct {
  36. cfg *TConfiguration
  37. }
  38. // Deprecated: Use NewTBinaryProtocolConf instead.
  39. func NewTBinaryProtocolTransport(t TTransport) *TBinaryProtocol {
  40. return NewTBinaryProtocolConf(t, &TConfiguration{
  41. noPropagation: true,
  42. })
  43. }
  44. // Deprecated: Use NewTBinaryProtocolConf instead.
  45. func NewTBinaryProtocol(t TTransport, strictRead, strictWrite bool) *TBinaryProtocol {
  46. return NewTBinaryProtocolConf(t, &TConfiguration{
  47. TBinaryStrictRead: &strictRead,
  48. TBinaryStrictWrite: &strictWrite,
  49. noPropagation: true,
  50. })
  51. }
  52. func NewTBinaryProtocolConf(t TTransport, conf *TConfiguration) *TBinaryProtocol {
  53. PropagateTConfiguration(t, conf)
  54. p := &TBinaryProtocol{
  55. origTransport: t,
  56. cfg: conf,
  57. }
  58. if et, ok := t.(TRichTransport); ok {
  59. p.trans = et
  60. } else {
  61. p.trans = NewTRichTransport(t)
  62. }
  63. return p
  64. }
  65. // Deprecated: Use NewTBinaryProtocolFactoryConf instead.
  66. func NewTBinaryProtocolFactoryDefault() *TBinaryProtocolFactory {
  67. return NewTBinaryProtocolFactoryConf(&TConfiguration{
  68. noPropagation: true,
  69. })
  70. }
  71. // Deprecated: Use NewTBinaryProtocolFactoryConf instead.
  72. func NewTBinaryProtocolFactory(strictRead, strictWrite bool) *TBinaryProtocolFactory {
  73. return NewTBinaryProtocolFactoryConf(&TConfiguration{
  74. TBinaryStrictRead: &strictRead,
  75. TBinaryStrictWrite: &strictWrite,
  76. noPropagation: true,
  77. })
  78. }
  79. func NewTBinaryProtocolFactoryConf(conf *TConfiguration) *TBinaryProtocolFactory {
  80. return &TBinaryProtocolFactory{
  81. cfg: conf,
  82. }
  83. }
  84. func (p *TBinaryProtocolFactory) GetProtocol(t TTransport) TProtocol {
  85. return NewTBinaryProtocolConf(t, p.cfg)
  86. }
  87. func (p *TBinaryProtocolFactory) SetTConfiguration(conf *TConfiguration) {
  88. p.cfg = conf
  89. }
  90. /**
  91. * Writing Methods
  92. */
  93. func (p *TBinaryProtocol) WriteMessageBegin(ctx context.Context, name string, typeId TMessageType, seqId int32) error {
  94. if p.cfg.GetTBinaryStrictWrite() {
  95. version := uint32(VERSION_1) | uint32(typeId)
  96. e := p.WriteI32(ctx, int32(version))
  97. if e != nil {
  98. return e
  99. }
  100. e = p.WriteString(ctx, name)
  101. if e != nil {
  102. return e
  103. }
  104. e = p.WriteI32(ctx, seqId)
  105. return e
  106. } else {
  107. e := p.WriteString(ctx, name)
  108. if e != nil {
  109. return e
  110. }
  111. e = p.WriteByte(ctx, int8(typeId))
  112. if e != nil {
  113. return e
  114. }
  115. e = p.WriteI32(ctx, seqId)
  116. return e
  117. }
  118. return nil
  119. }
  120. func (p *TBinaryProtocol) WriteMessageEnd(ctx context.Context) error {
  121. return nil
  122. }
  123. func (p *TBinaryProtocol) WriteStructBegin(ctx context.Context, name string) error {
  124. return nil
  125. }
  126. func (p *TBinaryProtocol) WriteStructEnd(ctx context.Context) error {
  127. return nil
  128. }
  129. func (p *TBinaryProtocol) WriteFieldBegin(ctx context.Context, name string, typeId TType, id int16) error {
  130. e := p.WriteByte(ctx, int8(typeId))
  131. if e != nil {
  132. return e
  133. }
  134. e = p.WriteI16(ctx, id)
  135. return e
  136. }
  137. func (p *TBinaryProtocol) WriteFieldEnd(ctx context.Context) error {
  138. return nil
  139. }
  140. func (p *TBinaryProtocol) WriteFieldStop(ctx context.Context) error {
  141. e := p.WriteByte(ctx, STOP)
  142. return e
  143. }
  144. func (p *TBinaryProtocol) WriteMapBegin(ctx context.Context, keyType TType, valueType TType, size int) error {
  145. e := p.WriteByte(ctx, int8(keyType))
  146. if e != nil {
  147. return e
  148. }
  149. e = p.WriteByte(ctx, int8(valueType))
  150. if e != nil {
  151. return e
  152. }
  153. e = p.WriteI32(ctx, int32(size))
  154. return e
  155. }
  156. func (p *TBinaryProtocol) WriteMapEnd(ctx context.Context) error {
  157. return nil
  158. }
  159. func (p *TBinaryProtocol) WriteListBegin(ctx context.Context, elemType TType, size int) error {
  160. e := p.WriteByte(ctx, int8(elemType))
  161. if e != nil {
  162. return e
  163. }
  164. e = p.WriteI32(ctx, int32(size))
  165. return e
  166. }
  167. func (p *TBinaryProtocol) WriteListEnd(ctx context.Context) error {
  168. return nil
  169. }
  170. func (p *TBinaryProtocol) WriteSetBegin(ctx context.Context, elemType TType, size int) error {
  171. e := p.WriteByte(ctx, int8(elemType))
  172. if e != nil {
  173. return e
  174. }
  175. e = p.WriteI32(ctx, int32(size))
  176. return e
  177. }
  178. func (p *TBinaryProtocol) WriteSetEnd(ctx context.Context) error {
  179. return nil
  180. }
  181. func (p *TBinaryProtocol) WriteBool(ctx context.Context, value bool) error {
  182. if value {
  183. return p.WriteByte(ctx, 1)
  184. }
  185. return p.WriteByte(ctx, 0)
  186. }
  187. func (p *TBinaryProtocol) WriteByte(ctx context.Context, value int8) error {
  188. e := p.trans.WriteByte(byte(value))
  189. return NewTProtocolException(e)
  190. }
  191. func (p *TBinaryProtocol) WriteI16(ctx context.Context, value int16) error {
  192. v := p.buffer[0:2]
  193. binary.BigEndian.PutUint16(v, uint16(value))
  194. _, e := p.trans.Write(v)
  195. return NewTProtocolException(e)
  196. }
  197. func (p *TBinaryProtocol) WriteI32(ctx context.Context, value int32) error {
  198. v := p.buffer[0:4]
  199. binary.BigEndian.PutUint32(v, uint32(value))
  200. _, e := p.trans.Write(v)
  201. return NewTProtocolException(e)
  202. }
  203. func (p *TBinaryProtocol) WriteI64(ctx context.Context, value int64) error {
  204. v := p.buffer[0:8]
  205. binary.BigEndian.PutUint64(v, uint64(value))
  206. _, err := p.trans.Write(v)
  207. return NewTProtocolException(err)
  208. }
  209. func (p *TBinaryProtocol) WriteDouble(ctx context.Context, value float64) error {
  210. return p.WriteI64(ctx, int64(math.Float64bits(value)))
  211. }
  212. func (p *TBinaryProtocol) WriteString(ctx context.Context, value string) error {
  213. e := p.WriteI32(ctx, int32(len(value)))
  214. if e != nil {
  215. return e
  216. }
  217. _, err := p.trans.WriteString(value)
  218. return NewTProtocolException(err)
  219. }
  220. func (p *TBinaryProtocol) WriteBinary(ctx context.Context, value []byte) error {
  221. e := p.WriteI32(ctx, int32(len(value)))
  222. if e != nil {
  223. return e
  224. }
  225. _, err := p.trans.Write(value)
  226. return NewTProtocolException(err)
  227. }
  228. /**
  229. * Reading methods
  230. */
  231. func (p *TBinaryProtocol) ReadMessageBegin(ctx context.Context) (name string, typeId TMessageType, seqId int32, err error) {
  232. size, e := p.ReadI32(ctx)
  233. if e != nil {
  234. return "", typeId, 0, NewTProtocolException(e)
  235. }
  236. if size < 0 {
  237. typeId = TMessageType(size & 0x0ff)
  238. version := int64(int64(size) & VERSION_MASK)
  239. if version != VERSION_1 {
  240. return name, typeId, seqId, NewTProtocolExceptionWithType(BAD_VERSION, fmt.Errorf("Bad version in ReadMessageBegin"))
  241. }
  242. name, e = p.ReadString(ctx)
  243. if e != nil {
  244. return name, typeId, seqId, NewTProtocolException(e)
  245. }
  246. seqId, e = p.ReadI32(ctx)
  247. if e != nil {
  248. return name, typeId, seqId, NewTProtocolException(e)
  249. }
  250. return name, typeId, seqId, nil
  251. }
  252. if p.cfg.GetTBinaryStrictRead() {
  253. return name, typeId, seqId, NewTProtocolExceptionWithType(BAD_VERSION, fmt.Errorf("Missing version in ReadMessageBegin"))
  254. }
  255. name, e2 := p.readStringBody(size)
  256. if e2 != nil {
  257. return name, typeId, seqId, e2
  258. }
  259. b, e3 := p.ReadByte(ctx)
  260. if e3 != nil {
  261. return name, typeId, seqId, e3
  262. }
  263. typeId = TMessageType(b)
  264. seqId, e4 := p.ReadI32(ctx)
  265. if e4 != nil {
  266. return name, typeId, seqId, e4
  267. }
  268. return name, typeId, seqId, nil
  269. }
  270. func (p *TBinaryProtocol) ReadMessageEnd(ctx context.Context) error {
  271. return nil
  272. }
  273. func (p *TBinaryProtocol) ReadStructBegin(ctx context.Context) (name string, err error) {
  274. return
  275. }
  276. func (p *TBinaryProtocol) ReadStructEnd(ctx context.Context) error {
  277. return nil
  278. }
  279. func (p *TBinaryProtocol) ReadFieldBegin(ctx context.Context) (name string, typeId TType, seqId int16, err error) {
  280. t, err := p.ReadByte(ctx)
  281. typeId = TType(t)
  282. if err != nil {
  283. return name, typeId, seqId, err
  284. }
  285. if t != STOP {
  286. seqId, err = p.ReadI16(ctx)
  287. }
  288. return name, typeId, seqId, err
  289. }
  290. func (p *TBinaryProtocol) ReadFieldEnd(ctx context.Context) error {
  291. return nil
  292. }
  293. var invalidDataLength = NewTProtocolExceptionWithType(INVALID_DATA, errors.New("Invalid data length"))
  294. func (p *TBinaryProtocol) ReadMapBegin(ctx context.Context) (kType, vType TType, size int, err error) {
  295. k, e := p.ReadByte(ctx)
  296. if e != nil {
  297. err = NewTProtocolException(e)
  298. return
  299. }
  300. kType = TType(k)
  301. v, e := p.ReadByte(ctx)
  302. if e != nil {
  303. err = NewTProtocolException(e)
  304. return
  305. }
  306. vType = TType(v)
  307. size32, e := p.ReadI32(ctx)
  308. if e != nil {
  309. err = NewTProtocolException(e)
  310. return
  311. }
  312. if size32 < 0 {
  313. err = invalidDataLength
  314. return
  315. }
  316. size = int(size32)
  317. return kType, vType, size, nil
  318. }
  319. func (p *TBinaryProtocol) ReadMapEnd(ctx context.Context) error {
  320. return nil
  321. }
  322. func (p *TBinaryProtocol) ReadListBegin(ctx context.Context) (elemType TType, size int, err error) {
  323. b, e := p.ReadByte(ctx)
  324. if e != nil {
  325. err = NewTProtocolException(e)
  326. return
  327. }
  328. elemType = TType(b)
  329. size32, e := p.ReadI32(ctx)
  330. if e != nil {
  331. err = NewTProtocolException(e)
  332. return
  333. }
  334. if size32 < 0 {
  335. err = invalidDataLength
  336. return
  337. }
  338. size = int(size32)
  339. return
  340. }
  341. func (p *TBinaryProtocol) ReadListEnd(ctx context.Context) error {
  342. return nil
  343. }
  344. func (p *TBinaryProtocol) ReadSetBegin(ctx context.Context) (elemType TType, size int, err error) {
  345. b, e := p.ReadByte(ctx)
  346. if e != nil {
  347. err = NewTProtocolException(e)
  348. return
  349. }
  350. elemType = TType(b)
  351. size32, e := p.ReadI32(ctx)
  352. if e != nil {
  353. err = NewTProtocolException(e)
  354. return
  355. }
  356. if size32 < 0 {
  357. err = invalidDataLength
  358. return
  359. }
  360. size = int(size32)
  361. return elemType, size, nil
  362. }
  363. func (p *TBinaryProtocol) ReadSetEnd(ctx context.Context) error {
  364. return nil
  365. }
  366. func (p *TBinaryProtocol) ReadBool(ctx context.Context) (bool, error) {
  367. b, e := p.ReadByte(ctx)
  368. v := true
  369. if b != 1 {
  370. v = false
  371. }
  372. return v, e
  373. }
  374. func (p *TBinaryProtocol) ReadByte(ctx context.Context) (int8, error) {
  375. v, err := p.trans.ReadByte()
  376. return int8(v), err
  377. }
  378. func (p *TBinaryProtocol) ReadI16(ctx context.Context) (value int16, err error) {
  379. buf := p.buffer[0:2]
  380. err = p.readAll(ctx, buf)
  381. value = int16(binary.BigEndian.Uint16(buf))
  382. return value, err
  383. }
  384. func (p *TBinaryProtocol) ReadI32(ctx context.Context) (value int32, err error) {
  385. buf := p.buffer[0:4]
  386. err = p.readAll(ctx, buf)
  387. value = int32(binary.BigEndian.Uint32(buf))
  388. return value, err
  389. }
  390. func (p *TBinaryProtocol) ReadI64(ctx context.Context) (value int64, err error) {
  391. buf := p.buffer[0:8]
  392. err = p.readAll(ctx, buf)
  393. value = int64(binary.BigEndian.Uint64(buf))
  394. return value, err
  395. }
  396. func (p *TBinaryProtocol) ReadDouble(ctx context.Context) (value float64, err error) {
  397. buf := p.buffer[0:8]
  398. err = p.readAll(ctx, buf)
  399. value = math.Float64frombits(binary.BigEndian.Uint64(buf))
  400. return value, err
  401. }
  402. func (p *TBinaryProtocol) ReadString(ctx context.Context) (value string, err error) {
  403. size, e := p.ReadI32(ctx)
  404. if e != nil {
  405. return "", e
  406. }
  407. err = checkSizeForProtocol(size, p.cfg)
  408. if err != nil {
  409. return
  410. }
  411. if size < 0 {
  412. err = invalidDataLength
  413. return
  414. }
  415. if size == 0 {
  416. return "", nil
  417. }
  418. if size < int32(len(p.buffer)) {
  419. // Avoid allocation on small reads
  420. buf := p.buffer[:size]
  421. read, e := io.ReadFull(p.trans, buf)
  422. return string(buf[:read]), NewTProtocolException(e)
  423. }
  424. return p.readStringBody(size)
  425. }
  426. func (p *TBinaryProtocol) ReadBinary(ctx context.Context) ([]byte, error) {
  427. size, e := p.ReadI32(ctx)
  428. if e != nil {
  429. return nil, e
  430. }
  431. if err := checkSizeForProtocol(size, p.cfg); err != nil {
  432. return nil, err
  433. }
  434. buf, err := safeReadBytes(size, p.trans)
  435. return buf, NewTProtocolException(err)
  436. }
  437. func (p *TBinaryProtocol) Flush(ctx context.Context) (err error) {
  438. return NewTProtocolException(p.trans.Flush(ctx))
  439. }
  440. func (p *TBinaryProtocol) Skip(ctx context.Context, fieldType TType) (err error) {
  441. return SkipDefaultDepth(ctx, p, fieldType)
  442. }
  443. func (p *TBinaryProtocol) Transport() TTransport {
  444. return p.origTransport
  445. }
  446. func (p *TBinaryProtocol) readAll(ctx context.Context, buf []byte) (err error) {
  447. var read int
  448. _, deadlineSet := ctx.Deadline()
  449. for {
  450. read, err = io.ReadFull(p.trans, buf)
  451. if deadlineSet && read == 0 && isTimeoutError(err) && ctx.Err() == nil {
  452. // This is I/O timeout without anything read,
  453. // and we still have time left, keep retrying.
  454. continue
  455. }
  456. // For anything else, don't retry
  457. break
  458. }
  459. return NewTProtocolException(err)
  460. }
  461. func (p *TBinaryProtocol) readStringBody(size int32) (value string, err error) {
  462. buf, err := safeReadBytes(size, p.trans)
  463. return string(buf), NewTProtocolException(err)
  464. }
  465. func (p *TBinaryProtocol) SetTConfiguration(conf *TConfiguration) {
  466. PropagateTConfiguration(p.trans, conf)
  467. PropagateTConfiguration(p.origTransport, conf)
  468. p.cfg = conf
  469. }
  470. var (
  471. _ TConfigurationSetter = (*TBinaryProtocolFactory)(nil)
  472. _ TConfigurationSetter = (*TBinaryProtocol)(nil)
  473. )
  474. // This function is shared between TBinaryProtocol and TCompactProtocol.
  475. //
  476. // It tries to read size bytes from trans, in a way that prevents large
  477. // allocations when size is insanely large (mostly caused by malformed message).
  478. func safeReadBytes(size int32, trans io.Reader) ([]byte, error) {
  479. if size < 0 {
  480. return nil, nil
  481. }
  482. buf := new(bytes.Buffer)
  483. _, err := io.CopyN(buf, trans, int64(size))
  484. return buf.Bytes(), err
  485. }