123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503 |
- // Copyright (c) 2017-2018 Uber Technologies, Inc.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package jaeger
- import (
- "sync"
- "sync/atomic"
- "time"
- "github.com/opentracing/opentracing-go"
- "github.com/opentracing/opentracing-go/ext"
- "github.com/opentracing/opentracing-go/log"
- )
- // Span implements opentracing.Span
- type Span struct {
- // referenceCounter used to increase the lifetime of
- // the object before return it into the pool.
- referenceCounter int32
- sync.RWMutex
- tracer *Tracer
- // TODO: (breaking change) change to use a pointer
- context SpanContext
- // The name of the "operation" this span is an instance of.
- // Known as a "span name" in some implementations.
- operationName string
- // firstInProcess, if true, indicates that this span is the root of the (sub)tree
- // of spans in the current process. In other words it's true for the root spans,
- // and the ingress spans when the process joins another trace.
- firstInProcess bool
- // startTime is the timestamp indicating when the span began, with microseconds precision.
- startTime time.Time
- // duration returns duration of the span with microseconds precision.
- // Zero value means duration is unknown.
- duration time.Duration
- // tags attached to this span
- tags []Tag
- // The span's "micro-log"
- logs []opentracing.LogRecord
- // The number of logs dropped because of MaxLogsPerSpan.
- numDroppedLogs int
- // references for this span
- references []Reference
- observer ContribSpanObserver
- }
- // Tag is a simple key value wrapper.
- // TODO (breaking change) deprecate in the next major release, use opentracing.Tag instead.
- type Tag struct {
- key string
- value interface{}
- }
- // NewTag creates a new Tag.
- // TODO (breaking change) deprecate in the next major release, use opentracing.Tag instead.
- func NewTag(key string, value interface{}) Tag {
- return Tag{key: key, value: value}
- }
- // SetOperationName sets or changes the operation name.
- func (s *Span) SetOperationName(operationName string) opentracing.Span {
- s.Lock()
- s.operationName = operationName
- ctx := s.context
- s.Unlock()
- if !ctx.isSamplingFinalized() {
- decision := s.tracer.sampler.OnSetOperationName(s, operationName)
- s.applySamplingDecision(decision, true)
- }
- s.observer.OnSetOperationName(operationName)
- return s
- }
- // SetTag implements SetTag() of opentracing.Span
- func (s *Span) SetTag(key string, value interface{}) opentracing.Span {
- return s.setTagInternal(key, value, true)
- }
- func (s *Span) setTagInternal(key string, value interface{}, lock bool) opentracing.Span {
- var ctx SpanContext
- var operationName string
- if lock {
- ctx = s.SpanContext()
- operationName = s.OperationName()
- } else {
- ctx = s.context
- operationName = s.operationName
- }
- s.observer.OnSetTag(key, value)
- if key == string(ext.SamplingPriority) && !setSamplingPriority(ctx.samplingState, operationName, s.tracer, value) {
- return s
- }
- if !ctx.isSamplingFinalized() {
- decision := s.tracer.sampler.OnSetTag(s, key, value)
- s.applySamplingDecision(decision, lock)
- }
- if ctx.isWriteable() {
- if lock {
- s.Lock()
- defer s.Unlock()
- }
- s.appendTagNoLocking(key, value)
- }
- return s
- }
- // SpanContext returns span context
- func (s *Span) SpanContext() SpanContext {
- s.Lock()
- defer s.Unlock()
- return s.context
- }
- // StartTime returns span start time
- func (s *Span) StartTime() time.Time {
- s.Lock()
- defer s.Unlock()
- return s.startTime
- }
- // Duration returns span duration
- func (s *Span) Duration() time.Duration {
- s.Lock()
- defer s.Unlock()
- return s.duration
- }
- // Tags returns tags for span
- func (s *Span) Tags() opentracing.Tags {
- s.Lock()
- defer s.Unlock()
- var result = make(opentracing.Tags, len(s.tags))
- for _, tag := range s.tags {
- result[tag.key] = tag.value
- }
- return result
- }
- // Logs returns micro logs for span
- func (s *Span) Logs() []opentracing.LogRecord {
- s.Lock()
- defer s.Unlock()
- logs := append([]opentracing.LogRecord(nil), s.logs...)
- if s.numDroppedLogs != 0 {
- fixLogs(logs, s.numDroppedLogs)
- }
- return logs
- }
- // References returns references for this span
- func (s *Span) References() []opentracing.SpanReference {
- s.Lock()
- defer s.Unlock()
- if s.references == nil || len(s.references) == 0 {
- return nil
- }
- result := make([]opentracing.SpanReference, len(s.references))
- for i, r := range s.references {
- result[i] = opentracing.SpanReference{Type: r.Type, ReferencedContext: r.Context}
- }
- return result
- }
- func (s *Span) appendTagNoLocking(key string, value interface{}) {
- s.tags = append(s.tags, Tag{key: key, value: value})
- }
- // LogFields implements opentracing.Span API
- func (s *Span) LogFields(fields ...log.Field) {
- s.Lock()
- defer s.Unlock()
- if !s.context.IsSampled() {
- return
- }
- s.logFieldsNoLocking(fields...)
- }
- // this function should only be called while holding a Write lock
- func (s *Span) logFieldsNoLocking(fields ...log.Field) {
- lr := opentracing.LogRecord{
- Fields: fields,
- Timestamp: time.Now(),
- }
- s.appendLogNoLocking(lr)
- }
- // LogKV implements opentracing.Span API
- func (s *Span) LogKV(alternatingKeyValues ...interface{}) {
- s.RLock()
- sampled := s.context.IsSampled()
- s.RUnlock()
- if !sampled {
- return
- }
- fields, err := log.InterleavedKVToFields(alternatingKeyValues...)
- if err != nil {
- s.LogFields(log.Error(err), log.String("function", "LogKV"))
- return
- }
- s.LogFields(fields...)
- }
- // LogEvent implements opentracing.Span API
- func (s *Span) LogEvent(event string) {
- s.Log(opentracing.LogData{Event: event})
- }
- // LogEventWithPayload implements opentracing.Span API
- func (s *Span) LogEventWithPayload(event string, payload interface{}) {
- s.Log(opentracing.LogData{Event: event, Payload: payload})
- }
- // Log implements opentracing.Span API
- func (s *Span) Log(ld opentracing.LogData) {
- s.Lock()
- defer s.Unlock()
- if s.context.IsSampled() {
- if ld.Timestamp.IsZero() {
- ld.Timestamp = s.tracer.timeNow()
- }
- s.appendLogNoLocking(ld.ToLogRecord())
- }
- }
- // this function should only be called while holding a Write lock
- func (s *Span) appendLogNoLocking(lr opentracing.LogRecord) {
- maxLogs := s.tracer.options.maxLogsPerSpan
- if maxLogs == 0 || len(s.logs) < maxLogs {
- s.logs = append(s.logs, lr)
- return
- }
- // We have too many logs. We don't touch the first numOld logs; we treat the
- // rest as a circular buffer and overwrite the oldest log among those.
- numOld := (maxLogs - 1) / 2
- numNew := maxLogs - numOld
- s.logs[numOld+s.numDroppedLogs%numNew] = lr
- s.numDroppedLogs++
- }
- // rotateLogBuffer rotates the records in the buffer: records 0 to pos-1 move at
- // the end (i.e. pos circular left shifts).
- func rotateLogBuffer(buf []opentracing.LogRecord, pos int) {
- // This algorithm is described in:
- // http://www.cplusplus.com/reference/algorithm/rotate
- for first, middle, next := 0, pos, pos; first != middle; {
- buf[first], buf[next] = buf[next], buf[first]
- first++
- next++
- if next == len(buf) {
- next = middle
- } else if first == middle {
- middle = next
- }
- }
- }
- func fixLogs(logs []opentracing.LogRecord, numDroppedLogs int) {
- // We dropped some log events, which means that we used part of Logs as a
- // circular buffer (see appendLog). De-circularize it.
- numOld := (len(logs) - 1) / 2
- numNew := len(logs) - numOld
- rotateLogBuffer(logs[numOld:], numDroppedLogs%numNew)
- // Replace the log in the middle (the oldest "new" log) with information
- // about the dropped logs. This means that we are effectively dropping one
- // more "new" log.
- numDropped := numDroppedLogs + 1
- logs[numOld] = opentracing.LogRecord{
- // Keep the timestamp of the last dropped event.
- Timestamp: logs[numOld].Timestamp,
- Fields: []log.Field{
- log.String("event", "dropped Span logs"),
- log.Int("dropped_log_count", numDropped),
- log.String("component", "jaeger-client"),
- },
- }
- }
- func (s *Span) fixLogsIfDropped() {
- if s.numDroppedLogs == 0 {
- return
- }
- fixLogs(s.logs, s.numDroppedLogs)
- s.numDroppedLogs = 0
- }
- // SetBaggageItem implements SetBaggageItem() of opentracing.SpanContext.
- // The call is proxied via tracer.baggageSetter to allow policies to be applied
- // before allowing to set/replace baggage keys.
- // The setter eventually stores a new SpanContext with extended baggage:
- //
- // span.context = span.context.WithBaggageItem(key, value)
- //
- // See SpanContext.WithBaggageItem() for explanation why it's done this way.
- func (s *Span) SetBaggageItem(key, value string) opentracing.Span {
- s.Lock()
- defer s.Unlock()
- s.tracer.setBaggage(s, key, value)
- return s
- }
- // BaggageItem implements BaggageItem() of opentracing.SpanContext
- func (s *Span) BaggageItem(key string) string {
- s.RLock()
- defer s.RUnlock()
- return s.context.baggage[key]
- }
- // Finish implements opentracing.Span API
- // After finishing the Span object it returns back to the allocator unless the reporter retains it again,
- // so after that, the Span object should no longer be used because it won't be valid anymore.
- func (s *Span) Finish() {
- s.FinishWithOptions(opentracing.FinishOptions{})
- }
- // FinishWithOptions implements opentracing.Span API
- func (s *Span) FinishWithOptions(options opentracing.FinishOptions) {
- if options.FinishTime.IsZero() {
- options.FinishTime = s.tracer.timeNow()
- }
- s.observer.OnFinish(options)
- s.Lock()
- s.duration = options.FinishTime.Sub(s.startTime)
- ctx := s.context
- s.Unlock()
- if !ctx.isSamplingFinalized() {
- decision := s.tracer.sampler.OnFinishSpan(s)
- s.applySamplingDecision(decision, true)
- }
- if ctx.IsSampled() {
- s.Lock()
- s.fixLogsIfDropped()
- if len(options.LogRecords) > 0 || len(options.BulkLogData) > 0 {
- // Note: bulk logs are not subject to maxLogsPerSpan limit
- if options.LogRecords != nil {
- s.logs = append(s.logs, options.LogRecords...)
- }
- for _, ld := range options.BulkLogData {
- s.logs = append(s.logs, ld.ToLogRecord())
- }
- }
- s.Unlock()
- }
- // call reportSpan even for non-sampled traces, to return span to the pool
- // and update metrics counter
- s.tracer.reportSpan(s)
- }
- // Context implements opentracing.Span API
- func (s *Span) Context() opentracing.SpanContext {
- s.Lock()
- defer s.Unlock()
- return s.context
- }
- // Tracer implements opentracing.Span API
- func (s *Span) Tracer() opentracing.Tracer {
- return s.tracer
- }
- func (s *Span) String() string {
- s.RLock()
- defer s.RUnlock()
- return s.context.String()
- }
- // OperationName allows retrieving current operation name.
- func (s *Span) OperationName() string {
- s.RLock()
- defer s.RUnlock()
- return s.operationName
- }
- // Retain increases object counter to increase the lifetime of the object
- func (s *Span) Retain() *Span {
- atomic.AddInt32(&s.referenceCounter, 1)
- return s
- }
- // Release decrements object counter and return to the
- // allocator manager when counter will below zero
- func (s *Span) Release() {
- if atomic.AddInt32(&s.referenceCounter, -1) == -1 {
- s.tracer.spanAllocator.Put(s)
- }
- }
- // reset span state and release unused data
- func (s *Span) reset() {
- s.firstInProcess = false
- s.context = emptyContext
- s.operationName = ""
- s.tracer = nil
- s.startTime = time.Time{}
- s.duration = 0
- s.observer = nil
- atomic.StoreInt32(&s.referenceCounter, 0)
- // Note: To reuse memory we can save the pointers on the heap
- s.tags = s.tags[:0]
- s.logs = s.logs[:0]
- s.numDroppedLogs = 0
- s.references = s.references[:0]
- }
- func (s *Span) serviceName() string {
- return s.tracer.serviceName
- }
- func (s *Span) applySamplingDecision(decision SamplingDecision, lock bool) {
- var ctx SpanContext
- if lock {
- ctx = s.SpanContext()
- } else {
- ctx = s.context
- }
- if !decision.Retryable {
- ctx.samplingState.setFinal()
- }
- if decision.Sample {
- ctx.samplingState.setSampled()
- if len(decision.Tags) > 0 {
- if lock {
- s.Lock()
- defer s.Unlock()
- }
- for _, tag := range decision.Tags {
- s.appendTagNoLocking(tag.key, tag.value)
- }
- }
- }
- }
- // setSamplingPriority returns true if the flag was updated successfully, false otherwise.
- // The behavior of setSamplingPriority is surprising
- // If noDebugFlagOnForcedSampling is set
- // setSamplingPriority(..., 1) always sets only flagSampled
- // If noDebugFlagOnForcedSampling is unset, and isDebugAllowed passes
- // setSamplingPriority(..., 1) sets both flagSampled and flagDebug
- // However,
- // setSamplingPriority(..., 0) always only resets flagSampled
- //
- // This means that doing a setSamplingPriority(..., 1) followed by setSamplingPriority(..., 0) can
- // leave flagDebug set
- func setSamplingPriority(state *samplingState, operationName string, tracer *Tracer, value interface{}) bool {
- val, ok := value.(uint16)
- if !ok {
- return false
- }
- if val == 0 {
- state.unsetSampled()
- state.setFinal()
- return true
- }
- if tracer.options.noDebugFlagOnForcedSampling {
- state.setSampled()
- state.setFinal()
- return true
- } else if tracer.isDebugAllowed(operationName) {
- state.setDebugAndSampled()
- state.setFinal()
- return true
- }
- return false
- }
- // EnableFirehose enables firehose flag on the span context
- func EnableFirehose(s *Span) {
- s.Lock()
- defer s.Unlock()
- s.context.samplingState.setFirehose()
- }
|