123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322 |
- // Copyright (c) 2017 Uber Technologies, Inc.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package jaeger
- import (
- "fmt"
- "sync"
- "sync/atomic"
- "time"
- "github.com/opentracing/opentracing-go"
- "github.com/uber/jaeger-client-go/internal/reporterstats"
- "github.com/uber/jaeger-client-go/log"
- )
- // Reporter is called by the tracer when a span is completed to report the span to the tracing collector.
- type Reporter interface {
- // Report submits a new span to collectors, possibly asynchronously and/or with buffering.
- // If the reporter is processing Span asynchronously then it needs to Retain() the span,
- // and then Release() it when no longer needed, to avoid span data corruption.
- Report(span *Span)
- // Close does a clean shutdown of the reporter, flushing any traces that may be buffered in memory.
- Close()
- }
- // ------------------------------
- type nullReporter struct{}
- // NewNullReporter creates a no-op reporter that ignores all reported spans.
- func NewNullReporter() Reporter {
- return &nullReporter{}
- }
- // Report implements Report() method of Reporter by doing nothing.
- func (r *nullReporter) Report(span *Span) {
- // no-op
- }
- // Close implements Close() method of Reporter by doing nothing.
- func (r *nullReporter) Close() {
- // no-op
- }
- // ------------------------------
- type loggingReporter struct {
- logger Logger
- }
- // NewLoggingReporter creates a reporter that logs all reported spans to provided logger.
- func NewLoggingReporter(logger Logger) Reporter {
- return &loggingReporter{logger}
- }
- // Report implements Report() method of Reporter by logging the span to the logger.
- func (r *loggingReporter) Report(span *Span) {
- r.logger.Infof("Reporting span %+v", span)
- }
- // Close implements Close() method of Reporter by doing nothing.
- func (r *loggingReporter) Close() {
- // no-op
- }
- // ------------------------------
- // InMemoryReporter is used for testing, and simply collects spans in memory.
- type InMemoryReporter struct {
- spans []opentracing.Span
- lock sync.Mutex
- }
- // NewInMemoryReporter creates a reporter that stores spans in memory.
- // NOTE: the Tracer should be created with options.PoolSpans = false.
- func NewInMemoryReporter() *InMemoryReporter {
- return &InMemoryReporter{
- spans: make([]opentracing.Span, 0, 10),
- }
- }
- // Report implements Report() method of Reporter by storing the span in the buffer.
- func (r *InMemoryReporter) Report(span *Span) {
- r.lock.Lock()
- // Need to retain the span otherwise it will be released
- r.spans = append(r.spans, span.Retain())
- r.lock.Unlock()
- }
- // Close implements Close() method of Reporter
- func (r *InMemoryReporter) Close() {
- r.Reset()
- }
- // SpansSubmitted returns the number of spans accumulated in the buffer.
- func (r *InMemoryReporter) SpansSubmitted() int {
- r.lock.Lock()
- defer r.lock.Unlock()
- return len(r.spans)
- }
- // GetSpans returns accumulated spans as a copy of the buffer.
- func (r *InMemoryReporter) GetSpans() []opentracing.Span {
- r.lock.Lock()
- defer r.lock.Unlock()
- copied := make([]opentracing.Span, len(r.spans))
- copy(copied, r.spans)
- return copied
- }
- // Reset clears all accumulated spans.
- func (r *InMemoryReporter) Reset() {
- r.lock.Lock()
- defer r.lock.Unlock()
- // Before reset the collection need to release Span memory
- for _, span := range r.spans {
- span.(*Span).Release()
- }
- r.spans = r.spans[:0]
- }
- // ------------------------------
- type compositeReporter struct {
- reporters []Reporter
- }
- // NewCompositeReporter creates a reporter that ignores all reported spans.
- func NewCompositeReporter(reporters ...Reporter) Reporter {
- return &compositeReporter{reporters: reporters}
- }
- // Report implements Report() method of Reporter by delegating to each underlying reporter.
- func (r *compositeReporter) Report(span *Span) {
- for _, reporter := range r.reporters {
- reporter.Report(span)
- }
- }
- // Close implements Close() method of Reporter by closing each underlying reporter.
- func (r *compositeReporter) Close() {
- for _, reporter := range r.reporters {
- reporter.Close()
- }
- }
- // ------------- REMOTE REPORTER -----------------
- type reporterQueueItemType int
- const (
- defaultQueueSize = 100
- defaultBufferFlushInterval = 1 * time.Second
- reporterQueueItemSpan reporterQueueItemType = iota
- reporterQueueItemClose
- )
- type reporterQueueItem struct {
- itemType reporterQueueItemType
- span *Span
- close *sync.WaitGroup
- }
- // reporterStats implements reporterstats.ReporterStats.
- type reporterStats struct {
- droppedCount int64 // provided to Transports to report data loss to the backend
- }
- // SpansDroppedFromQueue implements reporterstats.ReporterStats.
- func (r *reporterStats) SpansDroppedFromQueue() int64 {
- return atomic.LoadInt64(&r.droppedCount)
- }
- func (r *reporterStats) incDroppedCount() {
- atomic.AddInt64(&r.droppedCount, 1)
- }
- type remoteReporter struct {
- // These fields must be first in the struct because `sync/atomic` expects 64-bit alignment.
- // Cf. https://github.com/uber/jaeger-client-go/issues/155, https://goo.gl/zW7dgq
- queueLength int64 // used to update metrics.Gauge
- closed int64 // 0 - not closed, 1 - closed
- reporterOptions
- sender Transport
- queue chan reporterQueueItem
- reporterStats *reporterStats
- }
- // NewRemoteReporter creates a new reporter that sends spans out of process by means of Sender.
- // Calls to Report(Span) return immediately (side effect: if internal buffer is full the span is dropped).
- // Periodically the transport buffer is flushed even if it hasn't reached max packet size.
- // Calls to Close() block until all spans reported prior to the call to Close are flushed.
- func NewRemoteReporter(sender Transport, opts ...ReporterOption) Reporter {
- options := reporterOptions{}
- for _, option := range opts {
- option(&options)
- }
- if options.bufferFlushInterval <= 0 {
- options.bufferFlushInterval = defaultBufferFlushInterval
- }
- if options.logger == nil {
- options.logger = log.NullLogger
- }
- if options.metrics == nil {
- options.metrics = NewNullMetrics()
- }
- if options.queueSize <= 0 {
- options.queueSize = defaultQueueSize
- }
- reporter := &remoteReporter{
- reporterOptions: options,
- sender: sender,
- queue: make(chan reporterQueueItem, options.queueSize),
- reporterStats: new(reporterStats),
- }
- if receiver, ok := sender.(reporterstats.Receiver); ok {
- receiver.SetReporterStats(reporter.reporterStats)
- }
- go reporter.processQueue()
- return reporter
- }
- // Report implements Report() method of Reporter.
- // It passes the span to a background go-routine for submission to Jaeger backend.
- // If the internal queue is full, the span is dropped and metrics.ReporterDropped counter is incremented.
- // If Report() is called after the reporter has been Close()-ed, the additional spans will not be
- // sent to the backend, but the metrics.ReporterDropped counter may not reflect them correctly,
- // because some of them may still be successfully added to the queue.
- func (r *remoteReporter) Report(span *Span) {
- select {
- // Need to retain the span otherwise it will be released
- case r.queue <- reporterQueueItem{itemType: reporterQueueItemSpan, span: span.Retain()}:
- atomic.AddInt64(&r.queueLength, 1)
- default:
- r.metrics.ReporterDropped.Inc(1)
- r.reporterStats.incDroppedCount()
- }
- }
- // Close implements Close() method of Reporter by waiting for the queue to be drained.
- func (r *remoteReporter) Close() {
- r.logger.Debugf("closing reporter")
- if swapped := atomic.CompareAndSwapInt64(&r.closed, 0, 1); !swapped {
- r.logger.Error("Repeated attempt to close the reporter is ignored")
- return
- }
- r.sendCloseEvent()
- _ = r.sender.Close()
- }
- func (r *remoteReporter) sendCloseEvent() {
- wg := &sync.WaitGroup{}
- wg.Add(1)
- item := reporterQueueItem{itemType: reporterQueueItemClose, close: wg}
- r.queue <- item // if the queue is full we will block until there is space
- atomic.AddInt64(&r.queueLength, 1)
- wg.Wait()
- }
- // processQueue reads spans from the queue, converts them to Thrift, and stores them in an internal buffer.
- // When the buffer length reaches batchSize, it is flushed by submitting the accumulated spans to Jaeger.
- // Buffer also gets flushed automatically every batchFlushInterval seconds, just in case the tracer stopped
- // reporting new spans.
- func (r *remoteReporter) processQueue() {
- // flush causes the Sender to flush its accumulated spans and clear the buffer
- flush := func() {
- if flushed, err := r.sender.Flush(); err != nil {
- r.metrics.ReporterFailure.Inc(int64(flushed))
- r.logger.Error(fmt.Sprintf("failed to flush Jaeger spans to server: %s", err.Error()))
- } else if flushed > 0 {
- r.metrics.ReporterSuccess.Inc(int64(flushed))
- }
- }
- timer := time.NewTicker(r.bufferFlushInterval)
- for {
- select {
- case <-timer.C:
- flush()
- case item := <-r.queue:
- atomic.AddInt64(&r.queueLength, -1)
- switch item.itemType {
- case reporterQueueItemSpan:
- span := item.span
- if flushed, err := r.sender.Append(span); err != nil {
- r.metrics.ReporterFailure.Inc(int64(flushed))
- r.logger.Error(fmt.Sprintf("error reporting Jaeger span %q: %s", span.OperationName(), err.Error()))
- } else if flushed > 0 {
- r.metrics.ReporterSuccess.Inc(int64(flushed))
- // to reduce the number of gauge stats, we only emit queue length on flush
- r.metrics.ReporterQueueLength.Update(atomic.LoadInt64(&r.queueLength))
- r.logger.Debugf("flushed %d spans", flushed)
- }
- span.Release()
- case reporterQueueItemClose:
- timer.Stop()
- flush()
- item.close.Done()
- return
- }
- }
- }
- }
|