transport_udp.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. // Copyright (c) 2017 Uber Technologies, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package jaeger
  15. import (
  16. "context"
  17. "errors"
  18. "fmt"
  19. "github.com/uber/jaeger-client-go/internal/reporterstats"
  20. "github.com/uber/jaeger-client-go/log"
  21. "github.com/uber/jaeger-client-go/thrift"
  22. j "github.com/uber/jaeger-client-go/thrift-gen/jaeger"
  23. "github.com/uber/jaeger-client-go/utils"
  24. )
  25. // Empirically obtained constant for how many bytes in the message are used for envelope.
  26. // The total datagram size is:
  27. // sizeof(Span) * numSpans + processByteSize + emitBatchOverhead <= maxPacketSize
  28. //
  29. // Note that due to the use of Compact Thrift protocol, overhead grows with the number of spans
  30. // in the batch, because the length of the list is encoded as varint32, as well as SeqId.
  31. //
  32. // There is a unit test `TestEmitBatchOverhead` that validates this number, it fails at <68.
  33. const emitBatchOverhead = 70
  34. var errSpanTooLarge = errors.New("span is too large")
  35. type udpSender struct {
  36. client *utils.AgentClientUDP
  37. maxPacketSize int // max size of datagram in bytes
  38. maxSpanBytes int // max number of bytes to record spans (excluding envelope) in the datagram
  39. byteBufferSize int // current number of span bytes accumulated in the buffer
  40. spanBuffer []*j.Span // spans buffered before a flush
  41. thriftBuffer *thrift.TMemoryBuffer // buffer used to calculate byte size of a span
  42. thriftProtocol thrift.TProtocol
  43. process *j.Process
  44. processByteSize int
  45. // reporterStats provides access to stats that are only known to Reporter
  46. reporterStats reporterstats.ReporterStats
  47. // The following counters are always non-negative, but we need to send them in signed i64 Thrift fields,
  48. // so we keep them as signed. At 10k QPS, overflow happens in about 300 million years.
  49. batchSeqNo int64
  50. tooLargeDroppedSpans int64
  51. failedToEmitSpans int64
  52. }
  53. // UDPTransportParams allows specifying options for initializing a UDPTransport. An instance of this struct should
  54. // be passed to NewUDPTransportWithParams.
  55. type UDPTransportParams struct {
  56. utils.AgentClientUDPParams
  57. }
  58. // NewUDPTransportWithParams creates a reporter that submits spans to jaeger-agent.
  59. // TODO: (breaking change) move to transport/ package.
  60. func NewUDPTransportWithParams(params UDPTransportParams) (Transport, error) {
  61. if len(params.HostPort) == 0 {
  62. params.HostPort = fmt.Sprintf("%s:%d", DefaultUDPSpanServerHost, DefaultUDPSpanServerPort)
  63. }
  64. if params.Logger == nil {
  65. params.Logger = log.StdLogger
  66. }
  67. if params.MaxPacketSize == 0 {
  68. params.MaxPacketSize = utils.UDPPacketMaxLength
  69. }
  70. protocolFactory := thrift.NewTCompactProtocolFactory()
  71. // Each span is first written to thriftBuffer to determine its size in bytes.
  72. thriftBuffer := thrift.NewTMemoryBufferLen(params.MaxPacketSize)
  73. thriftProtocol := protocolFactory.GetProtocol(thriftBuffer)
  74. client, err := utils.NewAgentClientUDPWithParams(params.AgentClientUDPParams)
  75. if err != nil {
  76. return nil, err
  77. }
  78. return &udpSender{
  79. client: client,
  80. maxSpanBytes: params.MaxPacketSize - emitBatchOverhead,
  81. thriftBuffer: thriftBuffer,
  82. thriftProtocol: thriftProtocol,
  83. }, nil
  84. }
  85. // NewUDPTransport creates a reporter that submits spans to jaeger-agent.
  86. // TODO: (breaking change) move to transport/ package.
  87. func NewUDPTransport(hostPort string, maxPacketSize int) (Transport, error) {
  88. return NewUDPTransportWithParams(UDPTransportParams{
  89. AgentClientUDPParams: utils.AgentClientUDPParams{
  90. HostPort: hostPort,
  91. MaxPacketSize: maxPacketSize,
  92. },
  93. })
  94. }
  95. // SetReporterStats implements reporterstats.Receiver.
  96. func (s *udpSender) SetReporterStats(rs reporterstats.ReporterStats) {
  97. s.reporterStats = rs
  98. }
  99. func (s *udpSender) calcSizeOfSerializedThrift(thriftStruct thrift.TStruct) int {
  100. s.thriftBuffer.Reset()
  101. _ = thriftStruct.Write(context.Background(), s.thriftProtocol)
  102. return s.thriftBuffer.Len()
  103. }
  104. func (s *udpSender) Append(span *Span) (int, error) {
  105. if s.process == nil {
  106. s.process = BuildJaegerProcessThrift(span)
  107. s.processByteSize = s.calcSizeOfSerializedThrift(s.process)
  108. s.byteBufferSize += s.processByteSize
  109. }
  110. jSpan := BuildJaegerThrift(span)
  111. spanSize := s.calcSizeOfSerializedThrift(jSpan)
  112. if spanSize > s.maxSpanBytes {
  113. s.tooLargeDroppedSpans++
  114. return 1, errSpanTooLarge
  115. }
  116. s.byteBufferSize += spanSize
  117. if s.byteBufferSize <= s.maxSpanBytes {
  118. s.spanBuffer = append(s.spanBuffer, jSpan)
  119. if s.byteBufferSize < s.maxSpanBytes {
  120. return 0, nil
  121. }
  122. return s.Flush()
  123. }
  124. // the latest span did not fit in the buffer
  125. n, err := s.Flush()
  126. s.spanBuffer = append(s.spanBuffer, jSpan)
  127. s.byteBufferSize = spanSize + s.processByteSize
  128. return n, err
  129. }
  130. func (s *udpSender) Flush() (int, error) {
  131. n := len(s.spanBuffer)
  132. if n == 0 {
  133. return 0, nil
  134. }
  135. s.batchSeqNo++
  136. batchSeqNo := int64(s.batchSeqNo)
  137. err := s.client.EmitBatch(context.Background(), &j.Batch{
  138. Process: s.process,
  139. Spans: s.spanBuffer,
  140. SeqNo: &batchSeqNo,
  141. Stats: s.makeStats(),
  142. })
  143. s.resetBuffers()
  144. if err != nil {
  145. s.failedToEmitSpans += int64(n)
  146. }
  147. return n, err
  148. }
  149. func (s *udpSender) Close() error {
  150. return s.client.Close()
  151. }
  152. func (s *udpSender) resetBuffers() {
  153. for i := range s.spanBuffer {
  154. s.spanBuffer[i] = nil
  155. }
  156. s.spanBuffer = s.spanBuffer[:0]
  157. s.byteBufferSize = s.processByteSize
  158. }
  159. func (s *udpSender) makeStats() *j.ClientStats {
  160. var dropped int64
  161. if s.reporterStats != nil {
  162. dropped = s.reporterStats.SpansDroppedFromQueue()
  163. }
  164. return &j.ClientStats{
  165. FullQueueDroppedSpans: dropped,
  166. TooLargeDroppedSpans: s.tooLargeDroppedSpans,
  167. FailedToEmitSpans: s.failedToEmitSpans,
  168. }
  169. }