udp_client.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. // Copyright (c) 2017 Uber Technologies, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package utils
  15. import (
  16. "context"
  17. "errors"
  18. "fmt"
  19. "io"
  20. "net"
  21. "time"
  22. "github.com/uber/jaeger-client-go/log"
  23. "github.com/uber/jaeger-client-go/thrift"
  24. "github.com/uber/jaeger-client-go/thrift-gen/agent"
  25. "github.com/uber/jaeger-client-go/thrift-gen/jaeger"
  26. "github.com/uber/jaeger-client-go/thrift-gen/zipkincore"
  27. )
  28. // UDPPacketMaxLength is the max size of UDP packet we want to send, synced with jaeger-agent
  29. const UDPPacketMaxLength = 65000
  30. // AgentClientUDP is a UDP client to Jaeger agent that implements agent.Agent interface.
  31. type AgentClientUDP struct {
  32. agent.Agent
  33. io.Closer
  34. connUDP udpConn
  35. client *agent.AgentClient
  36. maxPacketSize int // max size of datagram in bytes
  37. thriftBuffer *thrift.TMemoryBuffer // buffer used to calculate byte size of a span
  38. }
  39. type udpConn interface {
  40. Write([]byte) (int, error)
  41. SetWriteBuffer(int) error
  42. Close() error
  43. }
  44. // AgentClientUDPParams allows specifying options for initializing an AgentClientUDP. An instance of this struct should
  45. // be passed to NewAgentClientUDPWithParams.
  46. type AgentClientUDPParams struct {
  47. HostPort string
  48. MaxPacketSize int
  49. Logger log.Logger
  50. DisableAttemptReconnecting bool
  51. AttemptReconnectInterval time.Duration
  52. }
  53. // NewAgentClientUDPWithParams creates a client that sends spans to Jaeger Agent over UDP.
  54. func NewAgentClientUDPWithParams(params AgentClientUDPParams) (*AgentClientUDP, error) {
  55. // validate hostport
  56. if _, _, err := net.SplitHostPort(params.HostPort); err != nil {
  57. return nil, err
  58. }
  59. if params.MaxPacketSize == 0 {
  60. params.MaxPacketSize = UDPPacketMaxLength
  61. }
  62. if params.Logger == nil {
  63. params.Logger = log.StdLogger
  64. }
  65. if !params.DisableAttemptReconnecting && params.AttemptReconnectInterval == 0 {
  66. params.AttemptReconnectInterval = time.Second * 30
  67. }
  68. thriftBuffer := thrift.NewTMemoryBufferLen(params.MaxPacketSize)
  69. protocolFactory := thrift.NewTCompactProtocolFactory()
  70. client := agent.NewAgentClientFactory(thriftBuffer, protocolFactory)
  71. var connUDP udpConn
  72. var err error
  73. if params.DisableAttemptReconnecting {
  74. destAddr, err := net.ResolveUDPAddr("udp", params.HostPort)
  75. if err != nil {
  76. return nil, err
  77. }
  78. connUDP, err = net.DialUDP(destAddr.Network(), nil, destAddr)
  79. if err != nil {
  80. return nil, err
  81. }
  82. } else {
  83. // host is hostname, setup resolver loop in case host record changes during operation
  84. connUDP, err = newReconnectingUDPConn(params.HostPort, params.AttemptReconnectInterval, net.ResolveUDPAddr, net.DialUDP, params.Logger)
  85. if err != nil {
  86. return nil, err
  87. }
  88. }
  89. if err := connUDP.SetWriteBuffer(params.MaxPacketSize); err != nil {
  90. return nil, err
  91. }
  92. return &AgentClientUDP{
  93. connUDP: connUDP,
  94. client: client,
  95. maxPacketSize: params.MaxPacketSize,
  96. thriftBuffer: thriftBuffer,
  97. }, nil
  98. }
  99. // NewAgentClientUDP creates a client that sends spans to Jaeger Agent over UDP.
  100. func NewAgentClientUDP(hostPort string, maxPacketSize int) (*AgentClientUDP, error) {
  101. return NewAgentClientUDPWithParams(AgentClientUDPParams{
  102. HostPort: hostPort,
  103. MaxPacketSize: maxPacketSize,
  104. })
  105. }
  106. // EmitZipkinBatch implements EmitZipkinBatch() of Agent interface
  107. func (a *AgentClientUDP) EmitZipkinBatch(context.Context, []*zipkincore.Span) error {
  108. return errors.New("Not implemented")
  109. }
  110. // EmitBatch implements EmitBatch() of Agent interface
  111. func (a *AgentClientUDP) EmitBatch(ctx context.Context, batch *jaeger.Batch) error {
  112. a.thriftBuffer.Reset()
  113. if err := a.client.EmitBatch(ctx, batch); err != nil {
  114. return err
  115. }
  116. if a.thriftBuffer.Len() > a.maxPacketSize {
  117. return fmt.Errorf("data does not fit within one UDP packet; size %d, max %d, spans %d",
  118. a.thriftBuffer.Len(), a.maxPacketSize, len(batch.Spans))
  119. }
  120. _, err := a.connUDP.Write(a.thriftBuffer.Bytes())
  121. return err
  122. }
  123. // Close implements Close() of io.Closer and closes the underlying UDP connection.
  124. func (a *AgentClientUDP) Close() error {
  125. return a.connUDP.Close()
  126. }