123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136 |
- /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
- package thrift
- import (
- "context"
- "sync"
- )
- type TSerializer struct {
- Transport *TMemoryBuffer
- Protocol TProtocol
- }
- type TStruct interface {
- Write(ctx context.Context, p TProtocol) error
- Read(ctx context.Context, p TProtocol) error
- }
- func NewTSerializer() *TSerializer {
- transport := NewTMemoryBufferLen(1024)
- protocol := NewTBinaryProtocolTransport(transport)
- return &TSerializer{
- Transport: transport,
- Protocol: protocol,
- }
- }
- func (t *TSerializer) WriteString(ctx context.Context, msg TStruct) (s string, err error) {
- t.Transport.Reset()
- if err = msg.Write(ctx, t.Protocol); err != nil {
- return
- }
- if err = t.Protocol.Flush(ctx); err != nil {
- return
- }
- if err = t.Transport.Flush(ctx); err != nil {
- return
- }
- return t.Transport.String(), nil
- }
- func (t *TSerializer) Write(ctx context.Context, msg TStruct) (b []byte, err error) {
- t.Transport.Reset()
- if err = msg.Write(ctx, t.Protocol); err != nil {
- return
- }
- if err = t.Protocol.Flush(ctx); err != nil {
- return
- }
- if err = t.Transport.Flush(ctx); err != nil {
- return
- }
- b = append(b, t.Transport.Bytes()...)
- return
- }
- // TSerializerPool is the thread-safe version of TSerializer, it uses resource
- // pool of TSerializer under the hood.
- //
- // It must be initialized with either NewTSerializerPool or
- // NewTSerializerPoolSizeFactory.
- type TSerializerPool struct {
- pool sync.Pool
- }
- // NewTSerializerPool creates a new TSerializerPool.
- //
- // NewTSerializer can be used as the arg here.
- func NewTSerializerPool(f func() *TSerializer) *TSerializerPool {
- return &TSerializerPool{
- pool: sync.Pool{
- New: func() interface{} {
- return f()
- },
- },
- }
- }
- // NewTSerializerPoolSizeFactory creates a new TSerializerPool with the given
- // size and protocol factory.
- //
- // Note that the size is not the limit. The TMemoryBuffer underneath can grow
- // larger than that. It just dictates the initial size.
- func NewTSerializerPoolSizeFactory(size int, factory TProtocolFactory) *TSerializerPool {
- return &TSerializerPool{
- pool: sync.Pool{
- New: func() interface{} {
- transport := NewTMemoryBufferLen(size)
- protocol := factory.GetProtocol(transport)
- return &TSerializer{
- Transport: transport,
- Protocol: protocol,
- }
- },
- },
- }
- }
- func (t *TSerializerPool) WriteString(ctx context.Context, msg TStruct) (string, error) {
- s := t.pool.Get().(*TSerializer)
- defer t.pool.Put(s)
- return s.WriteString(ctx, msg)
- }
- func (t *TSerializerPool) Write(ctx context.Context, msg TStruct) ([]byte, error) {
- s := t.pool.Get().(*TSerializer)
- defer t.pool.Put(s)
- return s.Write(ctx, msg)
- }
|