reporter.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. // Copyright (c) 2017 Uber Technologies, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package jaeger
  15. import (
  16. "fmt"
  17. "sync"
  18. "sync/atomic"
  19. "time"
  20. "github.com/opentracing/opentracing-go"
  21. "github.com/uber/jaeger-client-go/internal/reporterstats"
  22. "github.com/uber/jaeger-client-go/log"
  23. )
  24. // Reporter is called by the tracer when a span is completed to report the span to the tracing collector.
  25. type Reporter interface {
  26. // Report submits a new span to collectors, possibly asynchronously and/or with buffering.
  27. // If the reporter is processing Span asynchronously then it needs to Retain() the span,
  28. // and then Release() it when no longer needed, to avoid span data corruption.
  29. Report(span *Span)
  30. // Close does a clean shutdown of the reporter, flushing any traces that may be buffered in memory.
  31. Close()
  32. }
  33. // ------------------------------
  34. type nullReporter struct{}
  35. // NewNullReporter creates a no-op reporter that ignores all reported spans.
  36. func NewNullReporter() Reporter {
  37. return &nullReporter{}
  38. }
  39. // Report implements Report() method of Reporter by doing nothing.
  40. func (r *nullReporter) Report(span *Span) {
  41. // no-op
  42. }
  43. // Close implements Close() method of Reporter by doing nothing.
  44. func (r *nullReporter) Close() {
  45. // no-op
  46. }
  47. // ------------------------------
  48. type loggingReporter struct {
  49. logger Logger
  50. }
  51. // NewLoggingReporter creates a reporter that logs all reported spans to provided logger.
  52. func NewLoggingReporter(logger Logger) Reporter {
  53. return &loggingReporter{logger}
  54. }
  55. // Report implements Report() method of Reporter by logging the span to the logger.
  56. func (r *loggingReporter) Report(span *Span) {
  57. r.logger.Infof("Reporting span %+v", span)
  58. }
  59. // Close implements Close() method of Reporter by doing nothing.
  60. func (r *loggingReporter) Close() {
  61. // no-op
  62. }
  63. // ------------------------------
  64. // InMemoryReporter is used for testing, and simply collects spans in memory.
  65. type InMemoryReporter struct {
  66. spans []opentracing.Span
  67. lock sync.Mutex
  68. }
  69. // NewInMemoryReporter creates a reporter that stores spans in memory.
  70. // NOTE: the Tracer should be created with options.PoolSpans = false.
  71. func NewInMemoryReporter() *InMemoryReporter {
  72. return &InMemoryReporter{
  73. spans: make([]opentracing.Span, 0, 10),
  74. }
  75. }
  76. // Report implements Report() method of Reporter by storing the span in the buffer.
  77. func (r *InMemoryReporter) Report(span *Span) {
  78. r.lock.Lock()
  79. // Need to retain the span otherwise it will be released
  80. r.spans = append(r.spans, span.Retain())
  81. r.lock.Unlock()
  82. }
  83. // Close implements Close() method of Reporter
  84. func (r *InMemoryReporter) Close() {
  85. r.Reset()
  86. }
  87. // SpansSubmitted returns the number of spans accumulated in the buffer.
  88. func (r *InMemoryReporter) SpansSubmitted() int {
  89. r.lock.Lock()
  90. defer r.lock.Unlock()
  91. return len(r.spans)
  92. }
  93. // GetSpans returns accumulated spans as a copy of the buffer.
  94. func (r *InMemoryReporter) GetSpans() []opentracing.Span {
  95. r.lock.Lock()
  96. defer r.lock.Unlock()
  97. copied := make([]opentracing.Span, len(r.spans))
  98. copy(copied, r.spans)
  99. return copied
  100. }
  101. // Reset clears all accumulated spans.
  102. func (r *InMemoryReporter) Reset() {
  103. r.lock.Lock()
  104. defer r.lock.Unlock()
  105. // Before reset the collection need to release Span memory
  106. for _, span := range r.spans {
  107. span.(*Span).Release()
  108. }
  109. r.spans = r.spans[:0]
  110. }
  111. // ------------------------------
  112. type compositeReporter struct {
  113. reporters []Reporter
  114. }
  115. // NewCompositeReporter creates a reporter that ignores all reported spans.
  116. func NewCompositeReporter(reporters ...Reporter) Reporter {
  117. return &compositeReporter{reporters: reporters}
  118. }
  119. // Report implements Report() method of Reporter by delegating to each underlying reporter.
  120. func (r *compositeReporter) Report(span *Span) {
  121. for _, reporter := range r.reporters {
  122. reporter.Report(span)
  123. }
  124. }
  125. // Close implements Close() method of Reporter by closing each underlying reporter.
  126. func (r *compositeReporter) Close() {
  127. for _, reporter := range r.reporters {
  128. reporter.Close()
  129. }
  130. }
  131. // ------------- REMOTE REPORTER -----------------
  132. type reporterQueueItemType int
  133. const (
  134. defaultQueueSize = 100
  135. defaultBufferFlushInterval = 1 * time.Second
  136. reporterQueueItemSpan reporterQueueItemType = iota
  137. reporterQueueItemClose
  138. )
  139. type reporterQueueItem struct {
  140. itemType reporterQueueItemType
  141. span *Span
  142. close *sync.WaitGroup
  143. }
  144. // reporterStats implements reporterstats.ReporterStats.
  145. type reporterStats struct {
  146. droppedCount int64 // provided to Transports to report data loss to the backend
  147. }
  148. // SpansDroppedFromQueue implements reporterstats.ReporterStats.
  149. func (r *reporterStats) SpansDroppedFromQueue() int64 {
  150. return atomic.LoadInt64(&r.droppedCount)
  151. }
  152. func (r *reporterStats) incDroppedCount() {
  153. atomic.AddInt64(&r.droppedCount, 1)
  154. }
  155. type remoteReporter struct {
  156. // These fields must be first in the struct because `sync/atomic` expects 64-bit alignment.
  157. // Cf. https://github.com/uber/jaeger-client-go/issues/155, https://goo.gl/zW7dgq
  158. queueLength int64 // used to update metrics.Gauge
  159. closed int64 // 0 - not closed, 1 - closed
  160. reporterOptions
  161. sender Transport
  162. queue chan reporterQueueItem
  163. reporterStats *reporterStats
  164. }
  165. // NewRemoteReporter creates a new reporter that sends spans out of process by means of Sender.
  166. // Calls to Report(Span) return immediately (side effect: if internal buffer is full the span is dropped).
  167. // Periodically the transport buffer is flushed even if it hasn't reached max packet size.
  168. // Calls to Close() block until all spans reported prior to the call to Close are flushed.
  169. func NewRemoteReporter(sender Transport, opts ...ReporterOption) Reporter {
  170. options := reporterOptions{}
  171. for _, option := range opts {
  172. option(&options)
  173. }
  174. if options.bufferFlushInterval <= 0 {
  175. options.bufferFlushInterval = defaultBufferFlushInterval
  176. }
  177. if options.logger == nil {
  178. options.logger = log.NullLogger
  179. }
  180. if options.metrics == nil {
  181. options.metrics = NewNullMetrics()
  182. }
  183. if options.queueSize <= 0 {
  184. options.queueSize = defaultQueueSize
  185. }
  186. reporter := &remoteReporter{
  187. reporterOptions: options,
  188. sender: sender,
  189. queue: make(chan reporterQueueItem, options.queueSize),
  190. reporterStats: new(reporterStats),
  191. }
  192. if receiver, ok := sender.(reporterstats.Receiver); ok {
  193. receiver.SetReporterStats(reporter.reporterStats)
  194. }
  195. go reporter.processQueue()
  196. return reporter
  197. }
  198. // Report implements Report() method of Reporter.
  199. // It passes the span to a background go-routine for submission to Jaeger backend.
  200. // If the internal queue is full, the span is dropped and metrics.ReporterDropped counter is incremented.
  201. // If Report() is called after the reporter has been Close()-ed, the additional spans will not be
  202. // sent to the backend, but the metrics.ReporterDropped counter may not reflect them correctly,
  203. // because some of them may still be successfully added to the queue.
  204. func (r *remoteReporter) Report(span *Span) {
  205. select {
  206. // Need to retain the span otherwise it will be released
  207. case r.queue <- reporterQueueItem{itemType: reporterQueueItemSpan, span: span.Retain()}:
  208. atomic.AddInt64(&r.queueLength, 1)
  209. default:
  210. r.metrics.ReporterDropped.Inc(1)
  211. r.reporterStats.incDroppedCount()
  212. }
  213. }
  214. // Close implements Close() method of Reporter by waiting for the queue to be drained.
  215. func (r *remoteReporter) Close() {
  216. r.logger.Debugf("closing reporter")
  217. if swapped := atomic.CompareAndSwapInt64(&r.closed, 0, 1); !swapped {
  218. r.logger.Error("Repeated attempt to close the reporter is ignored")
  219. return
  220. }
  221. r.sendCloseEvent()
  222. _ = r.sender.Close()
  223. }
  224. func (r *remoteReporter) sendCloseEvent() {
  225. wg := &sync.WaitGroup{}
  226. wg.Add(1)
  227. item := reporterQueueItem{itemType: reporterQueueItemClose, close: wg}
  228. r.queue <- item // if the queue is full we will block until there is space
  229. atomic.AddInt64(&r.queueLength, 1)
  230. wg.Wait()
  231. }
  232. // processQueue reads spans from the queue, converts them to Thrift, and stores them in an internal buffer.
  233. // When the buffer length reaches batchSize, it is flushed by submitting the accumulated spans to Jaeger.
  234. // Buffer also gets flushed automatically every batchFlushInterval seconds, just in case the tracer stopped
  235. // reporting new spans.
  236. func (r *remoteReporter) processQueue() {
  237. // flush causes the Sender to flush its accumulated spans and clear the buffer
  238. flush := func() {
  239. if flushed, err := r.sender.Flush(); err != nil {
  240. r.metrics.ReporterFailure.Inc(int64(flushed))
  241. r.logger.Error(fmt.Sprintf("failed to flush Jaeger spans to server: %s", err.Error()))
  242. } else if flushed > 0 {
  243. r.metrics.ReporterSuccess.Inc(int64(flushed))
  244. }
  245. }
  246. timer := time.NewTicker(r.bufferFlushInterval)
  247. for {
  248. select {
  249. case <-timer.C:
  250. flush()
  251. case item := <-r.queue:
  252. atomic.AddInt64(&r.queueLength, -1)
  253. switch item.itemType {
  254. case reporterQueueItemSpan:
  255. span := item.span
  256. if flushed, err := r.sender.Append(span); err != nil {
  257. r.metrics.ReporterFailure.Inc(int64(flushed))
  258. r.logger.Error(fmt.Sprintf("error reporting Jaeger span %q: %s", span.OperationName(), err.Error()))
  259. } else if flushed > 0 {
  260. r.metrics.ReporterSuccess.Inc(int64(flushed))
  261. // to reduce the number of gauge stats, we only emit queue length on flush
  262. r.metrics.ReporterQueueLength.Update(atomic.LoadInt64(&r.queueLength))
  263. r.logger.Debugf("flushed %d spans", flushed)
  264. }
  265. span.Release()
  266. case reporterQueueItemClose:
  267. timer.Stop()
  268. flush()
  269. item.close.Done()
  270. return
  271. }
  272. }
  273. }
  274. }