tracer.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493
  1. // Copyright (c) 2017-2018 Uber Technologies, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package jaeger
  15. import (
  16. "fmt"
  17. "io"
  18. "math/rand"
  19. "os"
  20. "reflect"
  21. "strconv"
  22. "sync"
  23. "time"
  24. "github.com/opentracing/opentracing-go"
  25. "github.com/opentracing/opentracing-go/ext"
  26. "github.com/uber/jaeger-client-go/internal/baggage"
  27. "github.com/uber/jaeger-client-go/internal/throttler"
  28. "github.com/uber/jaeger-client-go/log"
  29. "github.com/uber/jaeger-client-go/utils"
  30. )
  31. // Tracer implements opentracing.Tracer.
  32. type Tracer struct {
  33. serviceName string
  34. hostIPv4 uint32 // this is for zipkin endpoint conversion
  35. sampler SamplerV2
  36. reporter Reporter
  37. metrics Metrics
  38. logger log.DebugLogger
  39. timeNow func() time.Time
  40. randomNumber func() uint64
  41. options struct {
  42. gen128Bit bool // whether to generate 128bit trace IDs
  43. zipkinSharedRPCSpan bool
  44. highTraceIDGenerator func() uint64 // custom high trace ID generator
  45. maxTagValueLength int
  46. noDebugFlagOnForcedSampling bool
  47. maxLogsPerSpan int
  48. // more options to come
  49. }
  50. // allocator of Span objects
  51. spanAllocator SpanAllocator
  52. injectors map[interface{}]Injector
  53. extractors map[interface{}]Extractor
  54. observer compositeObserver
  55. tags []Tag
  56. process Process
  57. baggageRestrictionManager baggage.RestrictionManager
  58. baggageSetter *baggageSetter
  59. debugThrottler throttler.Throttler
  60. }
  61. // NewTracer creates Tracer implementation that reports tracing to Jaeger.
  62. // The returned io.Closer can be used in shutdown hooks to ensure that the internal
  63. // queue of the Reporter is drained and all buffered spans are submitted to collectors.
  64. // TODO (breaking change) return *Tracer only, without closer.
  65. func NewTracer(
  66. serviceName string,
  67. sampler Sampler,
  68. reporter Reporter,
  69. options ...TracerOption,
  70. ) (opentracing.Tracer, io.Closer) {
  71. t := &Tracer{
  72. serviceName: serviceName,
  73. sampler: samplerV1toV2(sampler),
  74. reporter: reporter,
  75. injectors: make(map[interface{}]Injector),
  76. extractors: make(map[interface{}]Extractor),
  77. metrics: *NewNullMetrics(),
  78. spanAllocator: simpleSpanAllocator{},
  79. }
  80. for _, option := range options {
  81. option(t)
  82. }
  83. // register default injectors/extractors unless they are already provided via options
  84. textPropagator := NewTextMapPropagator(getDefaultHeadersConfig(), t.metrics)
  85. t.addCodec(opentracing.TextMap, textPropagator, textPropagator)
  86. httpHeaderPropagator := NewHTTPHeaderPropagator(getDefaultHeadersConfig(), t.metrics)
  87. t.addCodec(opentracing.HTTPHeaders, httpHeaderPropagator, httpHeaderPropagator)
  88. binaryPropagator := NewBinaryPropagator(t)
  89. t.addCodec(opentracing.Binary, binaryPropagator, binaryPropagator)
  90. // TODO remove after TChannel supports OpenTracing
  91. interopPropagator := &jaegerTraceContextPropagator{tracer: t}
  92. t.addCodec(SpanContextFormat, interopPropagator, interopPropagator)
  93. zipkinPropagator := &zipkinPropagator{tracer: t}
  94. t.addCodec(ZipkinSpanFormat, zipkinPropagator, zipkinPropagator)
  95. if t.baggageRestrictionManager != nil {
  96. t.baggageSetter = newBaggageSetter(t.baggageRestrictionManager, &t.metrics)
  97. } else {
  98. t.baggageSetter = newBaggageSetter(baggage.NewDefaultRestrictionManager(0), &t.metrics)
  99. }
  100. if t.debugThrottler == nil {
  101. t.debugThrottler = throttler.DefaultThrottler{}
  102. }
  103. if t.randomNumber == nil {
  104. seedGenerator := utils.NewRand(time.Now().UnixNano())
  105. pool := sync.Pool{
  106. New: func() interface{} {
  107. return rand.NewSource(seedGenerator.Int63())
  108. },
  109. }
  110. t.randomNumber = func() uint64 {
  111. generator := pool.Get().(rand.Source)
  112. number := uint64(generator.Int63())
  113. pool.Put(generator)
  114. return number
  115. }
  116. }
  117. if t.timeNow == nil {
  118. t.timeNow = time.Now
  119. }
  120. if t.logger == nil {
  121. t.logger = log.NullLogger
  122. }
  123. // Set tracer-level tags
  124. t.tags = append(t.tags, Tag{key: JaegerClientVersionTagKey, value: JaegerClientVersion})
  125. if hostname, err := os.Hostname(); err == nil {
  126. t.tags = append(t.tags, Tag{key: TracerHostnameTagKey, value: hostname})
  127. }
  128. if ipval, ok := t.getTag(TracerIPTagKey); ok {
  129. ipv4, err := utils.ParseIPToUint32(ipval.(string))
  130. if err != nil {
  131. t.hostIPv4 = 0
  132. t.logger.Error("Unable to convert the externally provided ip to uint32: " + err.Error())
  133. } else {
  134. t.hostIPv4 = ipv4
  135. }
  136. } else if ip, err := utils.HostIP(); err == nil {
  137. t.tags = append(t.tags, Tag{key: TracerIPTagKey, value: ip.String()})
  138. t.hostIPv4 = utils.PackIPAsUint32(ip)
  139. } else {
  140. t.logger.Error("Unable to determine this host's IP address: " + err.Error())
  141. }
  142. if t.options.gen128Bit {
  143. if t.options.highTraceIDGenerator == nil {
  144. t.options.highTraceIDGenerator = t.randomNumber
  145. }
  146. } else if t.options.highTraceIDGenerator != nil {
  147. t.logger.Error("Overriding high trace ID generator but not generating " +
  148. "128 bit trace IDs, consider enabling the \"Gen128Bit\" option")
  149. }
  150. if t.options.maxTagValueLength == 0 {
  151. t.options.maxTagValueLength = DefaultMaxTagValueLength
  152. }
  153. t.process = Process{
  154. Service: serviceName,
  155. UUID: strconv.FormatUint(t.randomNumber(), 16),
  156. Tags: t.tags,
  157. }
  158. if throttler, ok := t.debugThrottler.(ProcessSetter); ok {
  159. throttler.SetProcess(t.process)
  160. }
  161. return t, t
  162. }
  163. // addCodec adds registers injector and extractor for given propagation format if not already defined.
  164. func (t *Tracer) addCodec(format interface{}, injector Injector, extractor Extractor) {
  165. if _, ok := t.injectors[format]; !ok {
  166. t.injectors[format] = injector
  167. }
  168. if _, ok := t.extractors[format]; !ok {
  169. t.extractors[format] = extractor
  170. }
  171. }
  172. // StartSpan implements StartSpan() method of opentracing.Tracer.
  173. func (t *Tracer) StartSpan(
  174. operationName string,
  175. options ...opentracing.StartSpanOption,
  176. ) opentracing.Span {
  177. sso := opentracing.StartSpanOptions{}
  178. for _, o := range options {
  179. o.Apply(&sso)
  180. }
  181. return t.startSpanWithOptions(operationName, sso)
  182. }
  183. func (t *Tracer) startSpanWithOptions(
  184. operationName string,
  185. options opentracing.StartSpanOptions,
  186. ) opentracing.Span {
  187. if options.StartTime.IsZero() {
  188. options.StartTime = t.timeNow()
  189. }
  190. // Predicate whether the given span context is an empty reference
  191. // or may be used as parent / debug ID / baggage items source
  192. isEmptyReference := func(ctx SpanContext) bool {
  193. return !ctx.IsValid() && !ctx.isDebugIDContainerOnly() && len(ctx.baggage) == 0
  194. }
  195. var references []Reference
  196. var parent SpanContext
  197. var hasParent bool // need this because `parent` is a value, not reference
  198. var ctx SpanContext
  199. var isSelfRef bool
  200. for _, ref := range options.References {
  201. ctxRef, ok := ref.ReferencedContext.(SpanContext)
  202. if !ok {
  203. t.logger.Error(fmt.Sprintf(
  204. "Reference contains invalid type of SpanReference: %s",
  205. reflect.ValueOf(ref.ReferencedContext)))
  206. continue
  207. }
  208. if isEmptyReference(ctxRef) {
  209. continue
  210. }
  211. if ref.Type == selfRefType {
  212. isSelfRef = true
  213. ctx = ctxRef
  214. continue
  215. }
  216. if ctxRef.IsValid() {
  217. // we don't want empty context that contains only debug-id or baggage
  218. references = append(references, Reference{Type: ref.Type, Context: ctxRef})
  219. }
  220. if !hasParent {
  221. parent = ctxRef
  222. hasParent = ref.Type == opentracing.ChildOfRef
  223. }
  224. }
  225. if !hasParent && !isEmptyReference(parent) {
  226. // If ChildOfRef wasn't found but a FollowFromRef exists, use the context from
  227. // the FollowFromRef as the parent
  228. hasParent = true
  229. }
  230. rpcServer := false
  231. if v, ok := options.Tags[ext.SpanKindRPCServer.Key]; ok {
  232. rpcServer = (v == ext.SpanKindRPCServerEnum || v == string(ext.SpanKindRPCServerEnum))
  233. }
  234. var internalTags []Tag
  235. newTrace := false
  236. if !isSelfRef {
  237. if !hasParent || !parent.IsValid() {
  238. newTrace = true
  239. ctx.traceID.Low = t.randomID()
  240. if t.options.gen128Bit {
  241. ctx.traceID.High = t.options.highTraceIDGenerator()
  242. }
  243. ctx.spanID = SpanID(ctx.traceID.Low)
  244. ctx.parentID = 0
  245. ctx.samplingState = &samplingState{
  246. localRootSpan: ctx.spanID,
  247. }
  248. if hasParent && parent.isDebugIDContainerOnly() && t.isDebugAllowed(operationName) {
  249. ctx.samplingState.setDebugAndSampled()
  250. internalTags = append(internalTags, Tag{key: JaegerDebugHeader, value: parent.debugID})
  251. }
  252. } else {
  253. ctx.traceID = parent.traceID
  254. if rpcServer && t.options.zipkinSharedRPCSpan {
  255. // Support Zipkin's one-span-per-RPC model
  256. ctx.spanID = parent.spanID
  257. ctx.parentID = parent.parentID
  258. } else {
  259. ctx.spanID = SpanID(t.randomID())
  260. ctx.parentID = parent.spanID
  261. }
  262. ctx.samplingState = parent.samplingState
  263. if parent.remote {
  264. ctx.samplingState.setFinal()
  265. ctx.samplingState.localRootSpan = ctx.spanID
  266. }
  267. }
  268. if hasParent {
  269. // copy baggage items
  270. if l := len(parent.baggage); l > 0 {
  271. ctx.baggage = make(map[string]string, len(parent.baggage))
  272. for k, v := range parent.baggage {
  273. ctx.baggage[k] = v
  274. }
  275. }
  276. }
  277. }
  278. sp := t.newSpan()
  279. sp.context = ctx
  280. sp.tracer = t
  281. sp.operationName = operationName
  282. sp.startTime = options.StartTime
  283. sp.duration = 0
  284. sp.references = references
  285. sp.firstInProcess = rpcServer || sp.context.parentID == 0
  286. if !sp.context.isSamplingFinalized() {
  287. decision := t.sampler.OnCreateSpan(sp)
  288. sp.applySamplingDecision(decision, false)
  289. }
  290. sp.observer = t.observer.OnStartSpan(sp, operationName, options)
  291. if tagsTotalLength := len(options.Tags) + len(internalTags); tagsTotalLength > 0 {
  292. if sp.tags == nil || cap(sp.tags) < tagsTotalLength {
  293. sp.tags = make([]Tag, 0, tagsTotalLength)
  294. }
  295. sp.tags = append(sp.tags, internalTags...)
  296. for k, v := range options.Tags {
  297. sp.setTagInternal(k, v, false)
  298. }
  299. }
  300. t.emitNewSpanMetrics(sp, newTrace)
  301. return sp
  302. }
  303. // Inject implements Inject() method of opentracing.Tracer
  304. func (t *Tracer) Inject(ctx opentracing.SpanContext, format interface{}, carrier interface{}) error {
  305. c, ok := ctx.(SpanContext)
  306. if !ok {
  307. return opentracing.ErrInvalidSpanContext
  308. }
  309. if injector, ok := t.injectors[format]; ok {
  310. return injector.Inject(c, carrier)
  311. }
  312. return opentracing.ErrUnsupportedFormat
  313. }
  314. // Extract implements Extract() method of opentracing.Tracer
  315. func (t *Tracer) Extract(
  316. format interface{},
  317. carrier interface{},
  318. ) (opentracing.SpanContext, error) {
  319. if extractor, ok := t.extractors[format]; ok {
  320. spanCtx, err := extractor.Extract(carrier)
  321. if err != nil {
  322. return nil, err // ensure returned spanCtx is nil
  323. }
  324. spanCtx.remote = true
  325. return spanCtx, nil
  326. }
  327. return nil, opentracing.ErrUnsupportedFormat
  328. }
  329. // Close releases all resources used by the Tracer and flushes any remaining buffered spans.
  330. func (t *Tracer) Close() error {
  331. t.logger.Debugf("closing tracer")
  332. t.reporter.Close()
  333. t.sampler.Close()
  334. if mgr, ok := t.baggageRestrictionManager.(io.Closer); ok {
  335. _ = mgr.Close()
  336. }
  337. if throttler, ok := t.debugThrottler.(io.Closer); ok {
  338. _ = throttler.Close()
  339. }
  340. return nil
  341. }
  342. // Tags returns a slice of tracer-level tags.
  343. func (t *Tracer) Tags() []opentracing.Tag {
  344. tags := make([]opentracing.Tag, len(t.tags))
  345. for i, tag := range t.tags {
  346. tags[i] = opentracing.Tag{Key: tag.key, Value: tag.value}
  347. }
  348. return tags
  349. }
  350. // getTag returns the value of specific tag, if not exists, return nil.
  351. // TODO only used by tests, move there.
  352. func (t *Tracer) getTag(key string) (interface{}, bool) {
  353. for _, tag := range t.tags {
  354. if tag.key == key {
  355. return tag.value, true
  356. }
  357. }
  358. return nil, false
  359. }
  360. // newSpan returns an instance of a clean Span object.
  361. // If options.PoolSpans is true, the spans are retrieved from an object pool.
  362. func (t *Tracer) newSpan() *Span {
  363. return t.spanAllocator.Get()
  364. }
  365. // emitNewSpanMetrics generates metrics on the number of started spans and traces.
  366. // newTrace param: we cannot simply check for parentID==0 because in Zipkin model the
  367. // server-side RPC span has the exact same trace/span/parent IDs as the
  368. // calling client-side span, but obviously the server side span is
  369. // no longer a root span of the trace.
  370. func (t *Tracer) emitNewSpanMetrics(sp *Span, newTrace bool) {
  371. if !sp.context.isSamplingFinalized() {
  372. t.metrics.SpansStartedDelayedSampling.Inc(1)
  373. if newTrace {
  374. t.metrics.TracesStartedDelayedSampling.Inc(1)
  375. }
  376. // joining a trace is not possible, because sampling decision inherited from upstream is final
  377. } else if sp.context.IsSampled() {
  378. t.metrics.SpansStartedSampled.Inc(1)
  379. if newTrace {
  380. t.metrics.TracesStartedSampled.Inc(1)
  381. } else if sp.firstInProcess {
  382. t.metrics.TracesJoinedSampled.Inc(1)
  383. }
  384. } else {
  385. t.metrics.SpansStartedNotSampled.Inc(1)
  386. if newTrace {
  387. t.metrics.TracesStartedNotSampled.Inc(1)
  388. } else if sp.firstInProcess {
  389. t.metrics.TracesJoinedNotSampled.Inc(1)
  390. }
  391. }
  392. }
  393. func (t *Tracer) reportSpan(sp *Span) {
  394. ctx := sp.SpanContext()
  395. if !ctx.isSamplingFinalized() {
  396. t.metrics.SpansFinishedDelayedSampling.Inc(1)
  397. } else if ctx.IsSampled() {
  398. t.metrics.SpansFinishedSampled.Inc(1)
  399. } else {
  400. t.metrics.SpansFinishedNotSampled.Inc(1)
  401. }
  402. // Note: if the reporter is processing Span asynchronously then it needs to Retain() the span,
  403. // and then Release() it when no longer needed.
  404. // Otherwise, the span may be reused for another trace and its data may be overwritten.
  405. if ctx.IsSampled() {
  406. t.reporter.Report(sp)
  407. }
  408. sp.Release()
  409. }
  410. // randomID generates a random trace/span ID, using tracer.random() generator.
  411. // It never returns 0.
  412. func (t *Tracer) randomID() uint64 {
  413. val := t.randomNumber()
  414. for val == 0 {
  415. val = t.randomNumber()
  416. }
  417. return val
  418. }
  419. // (NB) span must hold the lock before making this call
  420. func (t *Tracer) setBaggage(sp *Span, key, value string) {
  421. t.baggageSetter.setBaggage(sp, key, value)
  422. }
  423. // (NB) span must hold the lock before making this call
  424. func (t *Tracer) isDebugAllowed(operation string) bool {
  425. return t.debugThrottler.IsAllowed(operation)
  426. }
  427. // Sampler returns the sampler given to the tracer at creation.
  428. func (t *Tracer) Sampler() SamplerV2 {
  429. return t.sampler
  430. }
  431. // SelfRef creates an opentracing compliant SpanReference from a jaeger
  432. // SpanContext. This is a factory function in order to encapsulate jaeger specific
  433. // types.
  434. func SelfRef(ctx SpanContext) opentracing.SpanReference {
  435. return opentracing.SpanReference{
  436. Type: selfRefType,
  437. ReferencedContext: ctx,
  438. }
  439. }