serializer.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing,
  13. * software distributed under the License is distributed on an
  14. * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. * KIND, either express or implied. See the License for the
  16. * specific language governing permissions and limitations
  17. * under the License.
  18. */
  19. package thrift
  20. import (
  21. "context"
  22. "sync"
  23. )
  24. type TSerializer struct {
  25. Transport *TMemoryBuffer
  26. Protocol TProtocol
  27. }
  28. type TStruct interface {
  29. Write(ctx context.Context, p TProtocol) error
  30. Read(ctx context.Context, p TProtocol) error
  31. }
  32. func NewTSerializer() *TSerializer {
  33. transport := NewTMemoryBufferLen(1024)
  34. protocol := NewTBinaryProtocolTransport(transport)
  35. return &TSerializer{
  36. Transport: transport,
  37. Protocol: protocol,
  38. }
  39. }
  40. func (t *TSerializer) WriteString(ctx context.Context, msg TStruct) (s string, err error) {
  41. t.Transport.Reset()
  42. if err = msg.Write(ctx, t.Protocol); err != nil {
  43. return
  44. }
  45. if err = t.Protocol.Flush(ctx); err != nil {
  46. return
  47. }
  48. if err = t.Transport.Flush(ctx); err != nil {
  49. return
  50. }
  51. return t.Transport.String(), nil
  52. }
  53. func (t *TSerializer) Write(ctx context.Context, msg TStruct) (b []byte, err error) {
  54. t.Transport.Reset()
  55. if err = msg.Write(ctx, t.Protocol); err != nil {
  56. return
  57. }
  58. if err = t.Protocol.Flush(ctx); err != nil {
  59. return
  60. }
  61. if err = t.Transport.Flush(ctx); err != nil {
  62. return
  63. }
  64. b = append(b, t.Transport.Bytes()...)
  65. return
  66. }
  67. // TSerializerPool is the thread-safe version of TSerializer, it uses resource
  68. // pool of TSerializer under the hood.
  69. //
  70. // It must be initialized with either NewTSerializerPool or
  71. // NewTSerializerPoolSizeFactory.
  72. type TSerializerPool struct {
  73. pool sync.Pool
  74. }
  75. // NewTSerializerPool creates a new TSerializerPool.
  76. //
  77. // NewTSerializer can be used as the arg here.
  78. func NewTSerializerPool(f func() *TSerializer) *TSerializerPool {
  79. return &TSerializerPool{
  80. pool: sync.Pool{
  81. New: func() interface{} {
  82. return f()
  83. },
  84. },
  85. }
  86. }
  87. // NewTSerializerPoolSizeFactory creates a new TSerializerPool with the given
  88. // size and protocol factory.
  89. //
  90. // Note that the size is not the limit. The TMemoryBuffer underneath can grow
  91. // larger than that. It just dictates the initial size.
  92. func NewTSerializerPoolSizeFactory(size int, factory TProtocolFactory) *TSerializerPool {
  93. return &TSerializerPool{
  94. pool: sync.Pool{
  95. New: func() interface{} {
  96. transport := NewTMemoryBufferLen(size)
  97. protocol := factory.GetProtocol(transport)
  98. return &TSerializer{
  99. Transport: transport,
  100. Protocol: protocol,
  101. }
  102. },
  103. },
  104. }
  105. }
  106. func (t *TSerializerPool) WriteString(ctx context.Context, msg TStruct) (string, error) {
  107. s := t.pool.Get().(*TSerializer)
  108. defer t.pool.Put(s)
  109. return s.WriteString(ctx, msg)
  110. }
  111. func (t *TSerializerPool) Write(ctx context.Context, msg TStruct) ([]byte, error) {
  112. s := t.pool.Get().(*TSerializer)
  113. defer t.pool.Put(s)
  114. return s.Write(ctx, msg)
  115. }