propagation.go 10 KB


  1. // Copyright (c) 2017 Uber Technologies, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package jaeger
  15. import (
  16. "bytes"
  17. "encoding/binary"
  18. "fmt"
  19. "io"
  20. "log"
  21. "net/url"
  22. "strings"
  23. "sync"
  24. opentracing "github.com/opentracing/opentracing-go"
  25. )
  26. // Injector is responsible for injecting SpanContext instances in a manner suitable
  27. // for propagation via a format-specific "carrier" object. Typically the
  28. // injection will take place across an RPC boundary, but message queues and
  29. // other IPC mechanisms are also reasonable places to use an Injector.
  30. type Injector interface {
  31. // Inject takes `SpanContext` and injects it into `carrier`. The actual type
  32. // of `carrier` depends on the `format` passed to `Tracer.Inject()`.
  33. //
  34. // Implementations may return opentracing.ErrInvalidCarrier or any other
  35. // implementation-specific error if injection fails.
  36. Inject(ctx SpanContext, carrier interface{}) error
  37. }
  38. // Extractor is responsible for extracting SpanContext instances from a
  39. // format-specific "carrier" object. Typically the extraction will take place
  40. // on the server side of an RPC boundary, but message queues and other IPC
  41. // mechanisms are also reasonable places to use an Extractor.
  42. type Extractor interface {
  43. // Extract decodes a SpanContext instance from the given `carrier`,
  44. // or (nil, opentracing.ErrSpanContextNotFound) if no context could
  45. // be found in the `carrier`.
  46. Extract(carrier interface{}) (SpanContext, error)
  47. }
  48. // TextMapPropagator is a combined Injector and Extractor for TextMap format
  49. type TextMapPropagator struct {
  50. headerKeys *HeadersConfig
  51. metrics Metrics
  52. encodeValue func(string) string
  53. decodeValue func(string) string
  54. }
  55. // NewTextMapPropagator creates a combined Injector and Extractor for TextMap format
  56. func NewTextMapPropagator(headerKeys *HeadersConfig, metrics Metrics) *TextMapPropagator {
  57. return &TextMapPropagator{
  58. headerKeys: headerKeys,
  59. metrics: metrics,
  60. encodeValue: func(val string) string {
  61. return val
  62. },
  63. decodeValue: func(val string) string {
  64. return val
  65. },
  66. }
  67. }
  68. // NewHTTPHeaderPropagator creates a combined Injector and Extractor for HTTPHeaders format
  69. func NewHTTPHeaderPropagator(headerKeys *HeadersConfig, metrics Metrics) *TextMapPropagator {
  70. return &TextMapPropagator{
  71. headerKeys: headerKeys,
  72. metrics: metrics,
  73. encodeValue: func(val string) string {
  74. return url.QueryEscape(val)
  75. },
  76. decodeValue: func(val string) string {
  77. // ignore decoding errors, cannot do anything about them
  78. if v, err := url.QueryUnescape(val); err == nil {
  79. return v
  80. }
  81. return val
  82. },
  83. }
  84. }
  85. // BinaryPropagator is a combined Injector and Extractor for Binary format
  86. type BinaryPropagator struct {
  87. tracer *Tracer
  88. buffers sync.Pool
  89. }
  90. // NewBinaryPropagator creates a combined Injector and Extractor for Binary format
  91. func NewBinaryPropagator(tracer *Tracer) *BinaryPropagator {
  92. return &BinaryPropagator{
  93. tracer: tracer,
  94. buffers: sync.Pool{New: func() interface{} { return &bytes.Buffer{} }},
  95. }
  96. }
  97. // Inject implements Injector of TextMapPropagator
  98. func (p *TextMapPropagator) Inject(
  99. sc SpanContext,
  100. abstractCarrier interface{},
  101. ) error {
  102. textMapWriter, ok := abstractCarrier.(opentracing.TextMapWriter)
  103. if !ok {
  104. return opentracing.ErrInvalidCarrier
  105. }
  106. // Do not encode the string with trace context to avoid accidental double-encoding
  107. // if people are using opentracing < 0.10.0. Our colon-separated representation
  108. // of the trace context is already safe for HTTP headers.
  109. textMapWriter.Set(p.headerKeys.TraceContextHeaderName, sc.String())
  110. for k, v := range sc.baggage {
  111. safeKey := p.addBaggageKeyPrefix(k)
  112. safeVal := p.encodeValue(v)
  113. textMapWriter.Set(safeKey, safeVal)
  114. }
  115. return nil
  116. }
  117. // Extract implements Extractor of TextMapPropagator
  118. func (p *TextMapPropagator) Extract(abstractCarrier interface{}) (SpanContext, error) {
  119. textMapReader, ok := abstractCarrier.(opentracing.TextMapReader)
  120. if !ok {
  121. return emptyContext, opentracing.ErrInvalidCarrier
  122. }
  123. var ctx SpanContext
  124. var baggage map[string]string
  125. err := textMapReader.ForeachKey(func(rawKey, value string) error {
  126. key := strings.ToLower(rawKey) // TODO not necessary for plain TextMap
  127. if key == p.headerKeys.TraceContextHeaderName {
  128. var err error
  129. safeVal := p.decodeValue(value)
  130. if ctx, err = ContextFromString(safeVal); err != nil {
  131. return err
  132. }
  133. } else if key == p.headerKeys.JaegerDebugHeader {
  134. ctx.debugID = p.decodeValue(value)
  135. } else if key == p.headerKeys.JaegerBaggageHeader {
  136. if baggage == nil {
  137. baggage = make(map[string]string)
  138. }
  139. for k, v := range p.parseCommaSeparatedMap(value) {
  140. baggage[k] = v
  141. }
  142. } else if strings.HasPrefix(key, p.headerKeys.TraceBaggageHeaderPrefix) {
  143. if baggage == nil {
  144. baggage = make(map[string]string)
  145. }
  146. safeKey := p.removeBaggageKeyPrefix(key)
  147. safeVal := p.decodeValue(value)
  148. baggage[safeKey] = safeVal
  149. }
  150. return nil
  151. })
  152. if err != nil {
  153. p.metrics.DecodingErrors.Inc(1)
  154. return emptyContext, err
  155. }
  156. if !ctx.traceID.IsValid() && ctx.debugID == "" && len(baggage) == 0 {
  157. return emptyContext, opentracing.ErrSpanContextNotFound
  158. }
  159. ctx.baggage = baggage
  160. return ctx, nil
  161. }
  162. // Inject implements Injector of BinaryPropagator
  163. func (p *BinaryPropagator) Inject(
  164. sc SpanContext,
  165. abstractCarrier interface{},
  166. ) error {
  167. carrier, ok := abstractCarrier.(io.Writer)
  168. if !ok {
  169. return opentracing.ErrInvalidCarrier
  170. }
  171. // Handle the tracer context
  172. if err := binary.Write(carrier, binary.BigEndian, sc.traceID); err != nil {
  173. return err
  174. }
  175. if err := binary.Write(carrier, binary.BigEndian, sc.spanID); err != nil {
  176. return err
  177. }
  178. if err := binary.Write(carrier, binary.BigEndian, sc.parentID); err != nil {
  179. return err
  180. }
  181. if err := binary.Write(carrier, binary.BigEndian, sc.samplingState.flags()); err != nil {
  182. return err
  183. }
  184. // Handle the baggage items
  185. if err := binary.Write(carrier, binary.BigEndian, int32(len(sc.baggage))); err != nil {
  186. return err
  187. }
  188. for k, v := range sc.baggage {
  189. if err := binary.Write(carrier, binary.BigEndian, int32(len(k))); err != nil {
  190. return err
  191. }
  192. io.WriteString(carrier, k)
  193. if err := binary.Write(carrier, binary.BigEndian, int32(len(v))); err != nil {
  194. return err
  195. }
  196. io.WriteString(carrier, v)
  197. }
  198. return nil
  199. }
  200. // W3C limits https://github.com/w3c/baggage/blob/master/baggage/HTTP_HEADER_FORMAT.md#limits
  201. const (
  202. maxBinaryBaggage = 180
  203. maxBinaryNameValueLen = 4096
  204. )
  205. // Extract implements Extractor of BinaryPropagator
  206. func (p *BinaryPropagator) Extract(abstractCarrier interface{}) (SpanContext, error) {
  207. carrier, ok := abstractCarrier.(io.Reader)
  208. if !ok {
  209. return emptyContext, opentracing.ErrInvalidCarrier
  210. }
  211. var ctx SpanContext
  212. ctx.samplingState = &samplingState{}
  213. if err := binary.Read(carrier, binary.BigEndian, &ctx.traceID); err != nil {
  214. return emptyContext, opentracing.ErrSpanContextCorrupted
  215. }
  216. if err := binary.Read(carrier, binary.BigEndian, &ctx.spanID); err != nil {
  217. return emptyContext, opentracing.ErrSpanContextCorrupted
  218. }
  219. if err := binary.Read(carrier, binary.BigEndian, &ctx.parentID); err != nil {
  220. return emptyContext, opentracing.ErrSpanContextCorrupted
  221. }
  222. var flags byte
  223. if err := binary.Read(carrier, binary.BigEndian, &flags); err != nil {
  224. return emptyContext, opentracing.ErrSpanContextCorrupted
  225. }
  226. ctx.samplingState.setFlags(flags)
  227. // Handle the baggage items
  228. var numBaggage int32
  229. if err := binary.Read(carrier, binary.BigEndian, &numBaggage); err != nil {
  230. return emptyContext, opentracing.ErrSpanContextCorrupted
  231. }
  232. if numBaggage > maxBinaryBaggage {
  233. return emptyContext, opentracing.ErrSpanContextCorrupted
  234. }
  235. if iNumBaggage := int(numBaggage); iNumBaggage > 0 {
  236. ctx.baggage = make(map[string]string, iNumBaggage)
  237. buf := p.buffers.Get().(*bytes.Buffer)
  238. defer p.buffers.Put(buf)
  239. var keyLen, valLen int32
  240. for i := 0; i < iNumBaggage; i++ {
  241. if err := binary.Read(carrier, binary.BigEndian, &keyLen); err != nil {
  242. return emptyContext, opentracing.ErrSpanContextCorrupted
  243. }
  244. buf.Reset()
  245. buf.Grow(int(keyLen))
  246. if n, err := io.CopyN(buf, carrier, int64(keyLen)); err != nil || int32(n) != keyLen {
  247. return emptyContext, opentracing.ErrSpanContextCorrupted
  248. }
  249. key := buf.String()
  250. if err := binary.Read(carrier, binary.BigEndian, &valLen); err != nil {
  251. return emptyContext, opentracing.ErrSpanContextCorrupted
  252. }
  253. if keyLen+valLen > maxBinaryNameValueLen {
  254. return emptyContext, opentracing.ErrSpanContextCorrupted
  255. }
  256. buf.Reset()
  257. buf.Grow(int(valLen))
  258. if n, err := io.CopyN(buf, carrier, int64(valLen)); err != nil || int32(n) != valLen {
  259. return emptyContext, opentracing.ErrSpanContextCorrupted
  260. }
  261. ctx.baggage[key] = buf.String()
  262. }
  263. }
  264. return ctx, nil
  265. }
  266. // Converts a comma separated key value pair list into a map
  267. // e.g. key1=value1, key2=value2, key3 = value3
  268. // is converted to map[string]string { "key1" : "value1",
  269. // "key2" : "value2",
  270. // "key3" : "value3" }
  271. func (p *TextMapPropagator) parseCommaSeparatedMap(value string) map[string]string {
  272. baggage := make(map[string]string)
  273. value, err := url.QueryUnescape(value)
  274. if err != nil {
  275. log.Printf("Unable to unescape %s, %v", value, err)
  276. return baggage
  277. }
  278. for _, kvpair := range strings.Split(value, ",") {
  279. kv := strings.Split(strings.TrimSpace(kvpair), "=")
  280. if len(kv) == 2 {
  281. baggage[strings.TrimSpace(kv[0])] = kv[1]
  282. } else {
  283. log.Printf("Malformed value passed in for %s", p.headerKeys.JaegerBaggageHeader)
  284. }
  285. }
  286. return baggage
  287. }
  288. // Converts a baggage item key into an http header format,
  289. // by prepending TraceBaggageHeaderPrefix and encoding the key string
  290. func (p *TextMapPropagator) addBaggageKeyPrefix(key string) string {
  291. // TODO encodeBaggageKeyAsHeader add caching and escaping
  292. return fmt.Sprintf("%v%v", p.headerKeys.TraceBaggageHeaderPrefix, key)
  293. }
  294. func (p *TextMapPropagator) removeBaggageKeyPrefix(key string) string {
  295. // TODO decodeBaggageHeaderKey add caching and escaping
  296. return key[len(p.headerKeys.TraceBaggageHeaderPrefix):]
  297. }