span.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503
  1. // Copyright (c) 2017-2018 Uber Technologies, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package jaeger
  15. import (
  16. "sync"
  17. "sync/atomic"
  18. "time"
  19. "github.com/opentracing/opentracing-go"
  20. "github.com/opentracing/opentracing-go/ext"
  21. "github.com/opentracing/opentracing-go/log"
  22. )
  23. // Span implements opentracing.Span
  24. type Span struct {
  25. // referenceCounter used to increase the lifetime of
  26. // the object before return it into the pool.
  27. referenceCounter int32
  28. sync.RWMutex
  29. tracer *Tracer
  30. // TODO: (breaking change) change to use a pointer
  31. context SpanContext
  32. // The name of the "operation" this span is an instance of.
  33. // Known as a "span name" in some implementations.
  34. operationName string
  35. // firstInProcess, if true, indicates that this span is the root of the (sub)tree
  36. // of spans in the current process. In other words it's true for the root spans,
  37. // and the ingress spans when the process joins another trace.
  38. firstInProcess bool
  39. // startTime is the timestamp indicating when the span began, with microseconds precision.
  40. startTime time.Time
  41. // duration returns duration of the span with microseconds precision.
  42. // Zero value means duration is unknown.
  43. duration time.Duration
  44. // tags attached to this span
  45. tags []Tag
  46. // The span's "micro-log"
  47. logs []opentracing.LogRecord
  48. // The number of logs dropped because of MaxLogsPerSpan.
  49. numDroppedLogs int
  50. // references for this span
  51. references []Reference
  52. observer ContribSpanObserver
  53. }
  54. // Tag is a simple key value wrapper.
  55. // TODO (breaking change) deprecate in the next major release, use opentracing.Tag instead.
  56. type Tag struct {
  57. key string
  58. value interface{}
  59. }
  60. // NewTag creates a new Tag.
  61. // TODO (breaking change) deprecate in the next major release, use opentracing.Tag instead.
  62. func NewTag(key string, value interface{}) Tag {
  63. return Tag{key: key, value: value}
  64. }
  65. // SetOperationName sets or changes the operation name.
  66. func (s *Span) SetOperationName(operationName string) opentracing.Span {
  67. s.Lock()
  68. s.operationName = operationName
  69. ctx := s.context
  70. s.Unlock()
  71. if !ctx.isSamplingFinalized() {
  72. decision := s.tracer.sampler.OnSetOperationName(s, operationName)
  73. s.applySamplingDecision(decision, true)
  74. }
  75. s.observer.OnSetOperationName(operationName)
  76. return s
  77. }
  78. // SetTag implements SetTag() of opentracing.Span
  79. func (s *Span) SetTag(key string, value interface{}) opentracing.Span {
  80. return s.setTagInternal(key, value, true)
  81. }
  82. func (s *Span) setTagInternal(key string, value interface{}, lock bool) opentracing.Span {
  83. var ctx SpanContext
  84. var operationName string
  85. if lock {
  86. ctx = s.SpanContext()
  87. operationName = s.OperationName()
  88. } else {
  89. ctx = s.context
  90. operationName = s.operationName
  91. }
  92. s.observer.OnSetTag(key, value)
  93. if key == string(ext.SamplingPriority) && !setSamplingPriority(ctx.samplingState, operationName, s.tracer, value) {
  94. return s
  95. }
  96. if !ctx.isSamplingFinalized() {
  97. decision := s.tracer.sampler.OnSetTag(s, key, value)
  98. s.applySamplingDecision(decision, lock)
  99. }
  100. if ctx.isWriteable() {
  101. if lock {
  102. s.Lock()
  103. defer s.Unlock()
  104. }
  105. s.appendTagNoLocking(key, value)
  106. }
  107. return s
  108. }
  109. // SpanContext returns span context
  110. func (s *Span) SpanContext() SpanContext {
  111. s.Lock()
  112. defer s.Unlock()
  113. return s.context
  114. }
  115. // StartTime returns span start time
  116. func (s *Span) StartTime() time.Time {
  117. s.Lock()
  118. defer s.Unlock()
  119. return s.startTime
  120. }
  121. // Duration returns span duration
  122. func (s *Span) Duration() time.Duration {
  123. s.Lock()
  124. defer s.Unlock()
  125. return s.duration
  126. }
  127. // Tags returns tags for span
  128. func (s *Span) Tags() opentracing.Tags {
  129. s.Lock()
  130. defer s.Unlock()
  131. var result = make(opentracing.Tags, len(s.tags))
  132. for _, tag := range s.tags {
  133. result[tag.key] = tag.value
  134. }
  135. return result
  136. }
  137. // Logs returns micro logs for span
  138. func (s *Span) Logs() []opentracing.LogRecord {
  139. s.Lock()
  140. defer s.Unlock()
  141. logs := append([]opentracing.LogRecord(nil), s.logs...)
  142. if s.numDroppedLogs != 0 {
  143. fixLogs(logs, s.numDroppedLogs)
  144. }
  145. return logs
  146. }
  147. // References returns references for this span
  148. func (s *Span) References() []opentracing.SpanReference {
  149. s.Lock()
  150. defer s.Unlock()
  151. if s.references == nil || len(s.references) == 0 {
  152. return nil
  153. }
  154. result := make([]opentracing.SpanReference, len(s.references))
  155. for i, r := range s.references {
  156. result[i] = opentracing.SpanReference{Type: r.Type, ReferencedContext: r.Context}
  157. }
  158. return result
  159. }
  160. func (s *Span) appendTagNoLocking(key string, value interface{}) {
  161. s.tags = append(s.tags, Tag{key: key, value: value})
  162. }
  163. // LogFields implements opentracing.Span API
  164. func (s *Span) LogFields(fields ...log.Field) {
  165. s.Lock()
  166. defer s.Unlock()
  167. if !s.context.IsSampled() {
  168. return
  169. }
  170. s.logFieldsNoLocking(fields...)
  171. }
  172. // this function should only be called while holding a Write lock
  173. func (s *Span) logFieldsNoLocking(fields ...log.Field) {
  174. lr := opentracing.LogRecord{
  175. Fields: fields,
  176. Timestamp: time.Now(),
  177. }
  178. s.appendLogNoLocking(lr)
  179. }
  180. // LogKV implements opentracing.Span API
  181. func (s *Span) LogKV(alternatingKeyValues ...interface{}) {
  182. s.RLock()
  183. sampled := s.context.IsSampled()
  184. s.RUnlock()
  185. if !sampled {
  186. return
  187. }
  188. fields, err := log.InterleavedKVToFields(alternatingKeyValues...)
  189. if err != nil {
  190. s.LogFields(log.Error(err), log.String("function", "LogKV"))
  191. return
  192. }
  193. s.LogFields(fields...)
  194. }
  195. // LogEvent implements opentracing.Span API
  196. func (s *Span) LogEvent(event string) {
  197. s.Log(opentracing.LogData{Event: event})
  198. }
  199. // LogEventWithPayload implements opentracing.Span API
  200. func (s *Span) LogEventWithPayload(event string, payload interface{}) {
  201. s.Log(opentracing.LogData{Event: event, Payload: payload})
  202. }
  203. // Log implements opentracing.Span API
  204. func (s *Span) Log(ld opentracing.LogData) {
  205. s.Lock()
  206. defer s.Unlock()
  207. if s.context.IsSampled() {
  208. if ld.Timestamp.IsZero() {
  209. ld.Timestamp = s.tracer.timeNow()
  210. }
  211. s.appendLogNoLocking(ld.ToLogRecord())
  212. }
  213. }
  214. // this function should only be called while holding a Write lock
  215. func (s *Span) appendLogNoLocking(lr opentracing.LogRecord) {
  216. maxLogs := s.tracer.options.maxLogsPerSpan
  217. if maxLogs == 0 || len(s.logs) < maxLogs {
  218. s.logs = append(s.logs, lr)
  219. return
  220. }
  221. // We have too many logs. We don't touch the first numOld logs; we treat the
  222. // rest as a circular buffer and overwrite the oldest log among those.
  223. numOld := (maxLogs - 1) / 2
  224. numNew := maxLogs - numOld
  225. s.logs[numOld+s.numDroppedLogs%numNew] = lr
  226. s.numDroppedLogs++
  227. }
  228. // rotateLogBuffer rotates the records in the buffer: records 0 to pos-1 move at
  229. // the end (i.e. pos circular left shifts).
  230. func rotateLogBuffer(buf []opentracing.LogRecord, pos int) {
  231. // This algorithm is described in:
  232. // http://www.cplusplus.com/reference/algorithm/rotate
  233. for first, middle, next := 0, pos, pos; first != middle; {
  234. buf[first], buf[next] = buf[next], buf[first]
  235. first++
  236. next++
  237. if next == len(buf) {
  238. next = middle
  239. } else if first == middle {
  240. middle = next
  241. }
  242. }
  243. }
  244. func fixLogs(logs []opentracing.LogRecord, numDroppedLogs int) {
  245. // We dropped some log events, which means that we used part of Logs as a
  246. // circular buffer (see appendLog). De-circularize it.
  247. numOld := (len(logs) - 1) / 2
  248. numNew := len(logs) - numOld
  249. rotateLogBuffer(logs[numOld:], numDroppedLogs%numNew)
  250. // Replace the log in the middle (the oldest "new" log) with information
  251. // about the dropped logs. This means that we are effectively dropping one
  252. // more "new" log.
  253. numDropped := numDroppedLogs + 1
  254. logs[numOld] = opentracing.LogRecord{
  255. // Keep the timestamp of the last dropped event.
  256. Timestamp: logs[numOld].Timestamp,
  257. Fields: []log.Field{
  258. log.String("event", "dropped Span logs"),
  259. log.Int("dropped_log_count", numDropped),
  260. log.String("component", "jaeger-client"),
  261. },
  262. }
  263. }
  264. func (s *Span) fixLogsIfDropped() {
  265. if s.numDroppedLogs == 0 {
  266. return
  267. }
  268. fixLogs(s.logs, s.numDroppedLogs)
  269. s.numDroppedLogs = 0
  270. }
  271. // SetBaggageItem implements SetBaggageItem() of opentracing.SpanContext.
  272. // The call is proxied via tracer.baggageSetter to allow policies to be applied
  273. // before allowing to set/replace baggage keys.
  274. // The setter eventually stores a new SpanContext with extended baggage:
  275. //
  276. // span.context = span.context.WithBaggageItem(key, value)
  277. //
  278. // See SpanContext.WithBaggageItem() for explanation why it's done this way.
  279. func (s *Span) SetBaggageItem(key, value string) opentracing.Span {
  280. s.Lock()
  281. defer s.Unlock()
  282. s.tracer.setBaggage(s, key, value)
  283. return s
  284. }
  285. // BaggageItem implements BaggageItem() of opentracing.SpanContext
  286. func (s *Span) BaggageItem(key string) string {
  287. s.RLock()
  288. defer s.RUnlock()
  289. return s.context.baggage[key]
  290. }
  291. // Finish implements opentracing.Span API
  292. // After finishing the Span object it returns back to the allocator unless the reporter retains it again,
  293. // so after that, the Span object should no longer be used because it won't be valid anymore.
  294. func (s *Span) Finish() {
  295. s.FinishWithOptions(opentracing.FinishOptions{})
  296. }
  297. // FinishWithOptions implements opentracing.Span API
  298. func (s *Span) FinishWithOptions(options opentracing.FinishOptions) {
  299. if options.FinishTime.IsZero() {
  300. options.FinishTime = s.tracer.timeNow()
  301. }
  302. s.observer.OnFinish(options)
  303. s.Lock()
  304. s.duration = options.FinishTime.Sub(s.startTime)
  305. ctx := s.context
  306. s.Unlock()
  307. if !ctx.isSamplingFinalized() {
  308. decision := s.tracer.sampler.OnFinishSpan(s)
  309. s.applySamplingDecision(decision, true)
  310. }
  311. if ctx.IsSampled() {
  312. s.Lock()
  313. s.fixLogsIfDropped()
  314. if len(options.LogRecords) > 0 || len(options.BulkLogData) > 0 {
  315. // Note: bulk logs are not subject to maxLogsPerSpan limit
  316. if options.LogRecords != nil {
  317. s.logs = append(s.logs, options.LogRecords...)
  318. }
  319. for _, ld := range options.BulkLogData {
  320. s.logs = append(s.logs, ld.ToLogRecord())
  321. }
  322. }
  323. s.Unlock()
  324. }
  325. // call reportSpan even for non-sampled traces, to return span to the pool
  326. // and update metrics counter
  327. s.tracer.reportSpan(s)
  328. }
  329. // Context implements opentracing.Span API
  330. func (s *Span) Context() opentracing.SpanContext {
  331. s.Lock()
  332. defer s.Unlock()
  333. return s.context
  334. }
  335. // Tracer implements opentracing.Span API
  336. func (s *Span) Tracer() opentracing.Tracer {
  337. return s.tracer
  338. }
  339. func (s *Span) String() string {
  340. s.RLock()
  341. defer s.RUnlock()
  342. return s.context.String()
  343. }
  344. // OperationName allows retrieving current operation name.
  345. func (s *Span) OperationName() string {
  346. s.RLock()
  347. defer s.RUnlock()
  348. return s.operationName
  349. }
  350. // Retain increases object counter to increase the lifetime of the object
  351. func (s *Span) Retain() *Span {
  352. atomic.AddInt32(&s.referenceCounter, 1)
  353. return s
  354. }
  355. // Release decrements object counter and return to the
  356. // allocator manager when counter will below zero
  357. func (s *Span) Release() {
  358. if atomic.AddInt32(&s.referenceCounter, -1) == -1 {
  359. s.tracer.spanAllocator.Put(s)
  360. }
  361. }
  362. // reset span state and release unused data
  363. func (s *Span) reset() {
  364. s.firstInProcess = false
  365. s.context = emptyContext
  366. s.operationName = ""
  367. s.tracer = nil
  368. s.startTime = time.Time{}
  369. s.duration = 0
  370. s.observer = nil
  371. atomic.StoreInt32(&s.referenceCounter, 0)
  372. // Note: To reuse memory we can save the pointers on the heap
  373. s.tags = s.tags[:0]
  374. s.logs = s.logs[:0]
  375. s.numDroppedLogs = 0
  376. s.references = s.references[:0]
  377. }
  378. func (s *Span) serviceName() string {
  379. return s.tracer.serviceName
  380. }
  381. func (s *Span) applySamplingDecision(decision SamplingDecision, lock bool) {
  382. var ctx SpanContext
  383. if lock {
  384. ctx = s.SpanContext()
  385. } else {
  386. ctx = s.context
  387. }
  388. if !decision.Retryable {
  389. ctx.samplingState.setFinal()
  390. }
  391. if decision.Sample {
  392. ctx.samplingState.setSampled()
  393. if len(decision.Tags) > 0 {
  394. if lock {
  395. s.Lock()
  396. defer s.Unlock()
  397. }
  398. for _, tag := range decision.Tags {
  399. s.appendTagNoLocking(tag.key, tag.value)
  400. }
  401. }
  402. }
  403. }
  404. // setSamplingPriority returns true if the flag was updated successfully, false otherwise.
  405. // The behavior of setSamplingPriority is surprising
  406. // If noDebugFlagOnForcedSampling is set
  407. // setSamplingPriority(..., 1) always sets only flagSampled
  408. // If noDebugFlagOnForcedSampling is unset, and isDebugAllowed passes
  409. // setSamplingPriority(..., 1) sets both flagSampled and flagDebug
  410. // However,
  411. // setSamplingPriority(..., 0) always only resets flagSampled
  412. //
  413. // This means that doing a setSamplingPriority(..., 1) followed by setSamplingPriority(..., 0) can
  414. // leave flagDebug set
  415. func setSamplingPriority(state *samplingState, operationName string, tracer *Tracer, value interface{}) bool {
  416. val, ok := value.(uint16)
  417. if !ok {
  418. return false
  419. }
  420. if val == 0 {
  421. state.unsetSampled()
  422. state.setFinal()
  423. return true
  424. }
  425. if tracer.options.noDebugFlagOnForcedSampling {
  426. state.setSampled()
  427. state.setFinal()
  428. return true
  429. } else if tracer.isDebugAllowed(operationName) {
  430. state.setDebugAndSampled()
  431. state.setFinal()
  432. return true
  433. }
  434. return false
  435. }
  436. // EnableFirehose enables firehose flag on the span context
  437. func EnableFirehose(s *Span) {
  438. s.Lock()
  439. defer s.Unlock()
  440. s.context.samplingState.setFirehose()
  441. }