reconnecting_udp_conn.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. // Copyright (c) 2020 The Jaeger Authors.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package utils
  15. import (
  16. "fmt"
  17. "net"
  18. "sync"
  19. "sync/atomic"
  20. "time"
  21. "github.com/uber/jaeger-client-go/log"
  22. )
  23. // reconnectingUDPConn is an implementation of udpConn that resolves hostPort every resolveTimeout, if the resolved address is
  24. // different than the current conn then the new address is dialed and the conn is swapped.
  25. type reconnectingUDPConn struct {
  26. hostPort string
  27. resolveFunc resolveFunc
  28. dialFunc dialFunc
  29. logger log.Logger
  30. bufferBytes int64
  31. connMtx sync.RWMutex
  32. conn *net.UDPConn
  33. destAddr *net.UDPAddr
  34. closeChan chan struct{}
  35. }
  36. type resolveFunc func(network string, hostPort string) (*net.UDPAddr, error)
  37. type dialFunc func(network string, laddr, raddr *net.UDPAddr) (*net.UDPConn, error)
  38. // newReconnectingUDPConn returns a new udpConn that resolves hostPort every resolveTimeout, if the resolved address is
  39. // different than the current conn then the new address is dialed and the conn is swapped.
  40. func newReconnectingUDPConn(hostPort string, resolveTimeout time.Duration, resolveFunc resolveFunc, dialFunc dialFunc, logger log.Logger) (*reconnectingUDPConn, error) {
  41. conn := &reconnectingUDPConn{
  42. hostPort: hostPort,
  43. resolveFunc: resolveFunc,
  44. dialFunc: dialFunc,
  45. logger: logger,
  46. closeChan: make(chan struct{}),
  47. }
  48. if err := conn.attemptResolveAndDial(); err != nil {
  49. logger.Error(fmt.Sprintf("failed resolving destination address on connection startup, with err: %q. retrying in %s", err.Error(), resolveTimeout))
  50. }
  51. go conn.reconnectLoop(resolveTimeout)
  52. return conn, nil
  53. }
  54. func (c *reconnectingUDPConn) reconnectLoop(resolveTimeout time.Duration) {
  55. ticker := time.NewTicker(resolveTimeout)
  56. defer ticker.Stop()
  57. for {
  58. select {
  59. case <-c.closeChan:
  60. return
  61. case <-ticker.C:
  62. if err := c.attemptResolveAndDial(); err != nil {
  63. c.logger.Error(err.Error())
  64. }
  65. }
  66. }
  67. }
  68. func (c *reconnectingUDPConn) attemptResolveAndDial() error {
  69. newAddr, err := c.resolveFunc("udp", c.hostPort)
  70. if err != nil {
  71. return fmt.Errorf("failed to resolve new addr for host %q, with err: %w", c.hostPort, err)
  72. }
  73. c.connMtx.RLock()
  74. curAddr := c.destAddr
  75. c.connMtx.RUnlock()
  76. // dont attempt dial if an addr was successfully dialed previously and, resolved addr is the same as current conn
  77. if curAddr != nil && newAddr.String() == curAddr.String() {
  78. return nil
  79. }
  80. if err := c.attemptDialNewAddr(newAddr); err != nil {
  81. return fmt.Errorf("failed to dial newly resolved addr '%s', with err: %w", newAddr, err)
  82. }
  83. return nil
  84. }
  85. func (c *reconnectingUDPConn) attemptDialNewAddr(newAddr *net.UDPAddr) error {
  86. connUDP, err := c.dialFunc(newAddr.Network(), nil, newAddr)
  87. if err != nil {
  88. return err
  89. }
  90. if bufferBytes := int(atomic.LoadInt64(&c.bufferBytes)); bufferBytes != 0 {
  91. if err = connUDP.SetWriteBuffer(bufferBytes); err != nil {
  92. return err
  93. }
  94. }
  95. c.connMtx.Lock()
  96. c.destAddr = newAddr
  97. // store prev to close later
  98. prevConn := c.conn
  99. c.conn = connUDP
  100. c.connMtx.Unlock()
  101. if prevConn != nil {
  102. return prevConn.Close()
  103. }
  104. return nil
  105. }
  106. // Write calls net.udpConn.Write, if it fails an attempt is made to connect to a new addr, if that succeeds the write is retried before returning
  107. func (c *reconnectingUDPConn) Write(b []byte) (int, error) {
  108. var bytesWritten int
  109. var err error
  110. c.connMtx.RLock()
  111. if c.conn == nil {
  112. // if connection is not initialized indicate this with err in order to hook into retry logic
  113. err = fmt.Errorf("UDP connection not yet initialized, an address has not been resolved")
  114. } else {
  115. bytesWritten, err = c.conn.Write(b)
  116. }
  117. c.connMtx.RUnlock()
  118. if err == nil {
  119. return bytesWritten, nil
  120. }
  121. // attempt to resolve and dial new address in case that's the problem, if resolve and dial succeeds, try write again
  122. if reconnErr := c.attemptResolveAndDial(); reconnErr == nil {
  123. c.connMtx.RLock()
  124. defer c.connMtx.RUnlock()
  125. return c.conn.Write(b)
  126. }
  127. // return original error if reconn fails
  128. return bytesWritten, err
  129. }
  130. // Close stops the reconnectLoop, then closes the connection via net.udpConn 's implementation
  131. func (c *reconnectingUDPConn) Close() error {
  132. close(c.closeChan)
  133. // acquire rw lock before closing conn to ensure calls to Write drain
  134. c.connMtx.Lock()
  135. defer c.connMtx.Unlock()
  136. if c.conn != nil {
  137. return c.conn.Close()
  138. }
  139. return nil
  140. }
  141. // SetWriteBuffer defers to the net.udpConn SetWriteBuffer implementation wrapped with a RLock. if no conn is currently held
  142. // and SetWriteBuffer is called store bufferBytes to be set for new conns
  143. func (c *reconnectingUDPConn) SetWriteBuffer(bytes int) error {
  144. var err error
  145. c.connMtx.RLock()
  146. if c.conn != nil {
  147. err = c.conn.SetWriteBuffer(bytes)
  148. }
  149. c.connMtx.RUnlock()
  150. if err == nil {
  151. atomic.StoreInt64(&c.bufferBytes, int64(bytes))
  152. }
  153. return err
  154. }