zipkin_thrift_span.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. // Copyright (c) 2017 Uber Technologies, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package jaeger
  15. import (
  16. "encoding/binary"
  17. "fmt"
  18. "time"
  19. "github.com/opentracing/opentracing-go/ext"
  20. "github.com/uber/jaeger-client-go/internal/spanlog"
  21. z "github.com/uber/jaeger-client-go/thrift-gen/zipkincore"
  22. "github.com/uber/jaeger-client-go/utils"
  23. )
  24. const (
  25. // Zipkin UI does not work well with non-string tag values
  26. allowPackedNumbers = false
  27. )
  28. var specialTagHandlers = map[string]func(*zipkinSpan, interface{}){
  29. string(ext.SpanKind): setSpanKind,
  30. string(ext.PeerHostIPv4): setPeerIPv4,
  31. string(ext.PeerPort): setPeerPort,
  32. string(ext.PeerService): setPeerService,
  33. TracerIPTagKey: removeTag,
  34. }
  35. // BuildZipkinThrift builds thrift span based on internal span.
  36. // TODO: (breaking change) move to transport/zipkin and make private.
  37. func BuildZipkinThrift(s *Span) *z.Span {
  38. span := &zipkinSpan{Span: s}
  39. span.handleSpecialTags()
  40. parentID := int64(span.context.parentID)
  41. var ptrParentID *int64
  42. if parentID != 0 {
  43. ptrParentID = &parentID
  44. }
  45. traceIDHigh := int64(span.context.traceID.High)
  46. var ptrTraceIDHigh *int64
  47. if traceIDHigh != 0 {
  48. ptrTraceIDHigh = &traceIDHigh
  49. }
  50. timestamp := utils.TimeToMicrosecondsSinceEpochInt64(span.startTime)
  51. duration := span.duration.Nanoseconds() / int64(time.Microsecond)
  52. endpoint := &z.Endpoint{
  53. ServiceName: span.tracer.serviceName,
  54. Ipv4: int32(span.tracer.hostIPv4)}
  55. thriftSpan := &z.Span{
  56. TraceID: int64(span.context.traceID.Low),
  57. TraceIDHigh: ptrTraceIDHigh,
  58. ID: int64(span.context.spanID),
  59. ParentID: ptrParentID,
  60. Name: span.operationName,
  61. Timestamp: &timestamp,
  62. Duration: &duration,
  63. Debug: span.context.IsDebug(),
  64. Annotations: buildAnnotations(span, endpoint),
  65. BinaryAnnotations: buildBinaryAnnotations(span, endpoint)}
  66. return thriftSpan
  67. }
  68. func buildAnnotations(span *zipkinSpan, endpoint *z.Endpoint) []*z.Annotation {
  69. // automatically adding 2 Zipkin CoreAnnotations
  70. annotations := make([]*z.Annotation, 0, 2+len(span.logs))
  71. var startLabel, endLabel string
  72. if span.spanKind == string(ext.SpanKindRPCClientEnum) {
  73. startLabel, endLabel = z.CLIENT_SEND, z.CLIENT_RECV
  74. } else if span.spanKind == string(ext.SpanKindRPCServerEnum) {
  75. startLabel, endLabel = z.SERVER_RECV, z.SERVER_SEND
  76. }
  77. if !span.startTime.IsZero() && startLabel != "" {
  78. start := &z.Annotation{
  79. Timestamp: utils.TimeToMicrosecondsSinceEpochInt64(span.startTime),
  80. Value: startLabel,
  81. Host: endpoint}
  82. annotations = append(annotations, start)
  83. if span.duration != 0 {
  84. endTs := span.startTime.Add(span.duration)
  85. end := &z.Annotation{
  86. Timestamp: utils.TimeToMicrosecondsSinceEpochInt64(endTs),
  87. Value: endLabel,
  88. Host: endpoint}
  89. annotations = append(annotations, end)
  90. }
  91. }
  92. for _, log := range span.logs {
  93. anno := &z.Annotation{
  94. Timestamp: utils.TimeToMicrosecondsSinceEpochInt64(log.Timestamp),
  95. Host: endpoint}
  96. if content, err := spanlog.MaterializeWithJSON(log.Fields); err == nil {
  97. anno.Value = truncateString(string(content), span.tracer.options.maxTagValueLength)
  98. } else {
  99. anno.Value = err.Error()
  100. }
  101. annotations = append(annotations, anno)
  102. }
  103. return annotations
  104. }
  105. func buildBinaryAnnotations(span *zipkinSpan, endpoint *z.Endpoint) []*z.BinaryAnnotation {
  106. // automatically adding local component or server/client address tag, and client version
  107. annotations := make([]*z.BinaryAnnotation, 0, 2+len(span.tags))
  108. if span.peerDefined() && span.isRPC() {
  109. peer := z.Endpoint{
  110. Ipv4: span.peer.Ipv4,
  111. Port: span.peer.Port,
  112. ServiceName: span.peer.ServiceName}
  113. label := z.CLIENT_ADDR
  114. if span.isRPCClient() {
  115. label = z.SERVER_ADDR
  116. }
  117. anno := &z.BinaryAnnotation{
  118. Key: label,
  119. Value: []byte{1},
  120. AnnotationType: z.AnnotationType_BOOL,
  121. Host: &peer}
  122. annotations = append(annotations, anno)
  123. }
  124. if !span.isRPC() {
  125. componentName := endpoint.ServiceName
  126. for _, tag := range span.tags {
  127. if tag.key == string(ext.Component) {
  128. componentName = stringify(tag.value)
  129. break
  130. }
  131. }
  132. local := &z.BinaryAnnotation{
  133. Key: z.LOCAL_COMPONENT,
  134. Value: []byte(componentName),
  135. AnnotationType: z.AnnotationType_STRING,
  136. Host: endpoint}
  137. annotations = append(annotations, local)
  138. }
  139. for _, tag := range span.tags {
  140. // "Special tags" are already handled by this point, we'd be double reporting the
  141. // tags if we don't skip here
  142. if _, ok := specialTagHandlers[tag.key]; ok {
  143. continue
  144. }
  145. if anno := buildBinaryAnnotation(tag.key, tag.value, span.tracer.options.maxTagValueLength, nil); anno != nil {
  146. annotations = append(annotations, anno)
  147. }
  148. }
  149. return annotations
  150. }
  151. func buildBinaryAnnotation(key string, val interface{}, maxTagValueLength int, endpoint *z.Endpoint) *z.BinaryAnnotation {
  152. bann := &z.BinaryAnnotation{Key: key, Host: endpoint}
  153. if value, ok := val.(string); ok {
  154. bann.Value = []byte(truncateString(value, maxTagValueLength))
  155. bann.AnnotationType = z.AnnotationType_STRING
  156. } else if value, ok := val.([]byte); ok {
  157. if len(value) > maxTagValueLength {
  158. value = value[:maxTagValueLength]
  159. }
  160. bann.Value = value
  161. bann.AnnotationType = z.AnnotationType_BYTES
  162. } else if value, ok := val.(int32); ok && allowPackedNumbers {
  163. bann.Value = int32ToBytes(value)
  164. bann.AnnotationType = z.AnnotationType_I32
  165. } else if value, ok := val.(int64); ok && allowPackedNumbers {
  166. bann.Value = int64ToBytes(value)
  167. bann.AnnotationType = z.AnnotationType_I64
  168. } else if value, ok := val.(int); ok && allowPackedNumbers {
  169. bann.Value = int64ToBytes(int64(value))
  170. bann.AnnotationType = z.AnnotationType_I64
  171. } else if value, ok := val.(bool); ok {
  172. bann.Value = []byte{boolToByte(value)}
  173. bann.AnnotationType = z.AnnotationType_BOOL
  174. } else {
  175. value := stringify(val)
  176. bann.Value = []byte(truncateString(value, maxTagValueLength))
  177. bann.AnnotationType = z.AnnotationType_STRING
  178. }
  179. return bann
  180. }
  181. func stringify(value interface{}) string {
  182. if s, ok := value.(string); ok {
  183. return s
  184. }
  185. return fmt.Sprintf("%+v", value)
  186. }
  187. func truncateString(value string, maxLength int) string {
  188. // we ignore the problem of utf8 runes possibly being sliced in the middle,
  189. // as it is rather expensive to iterate through each tag just to find rune
  190. // boundaries.
  191. if len(value) > maxLength {
  192. return value[:maxLength]
  193. }
  194. return value
  195. }
  196. func boolToByte(b bool) byte {
  197. if b {
  198. return 1
  199. }
  200. return 0
  201. }
  202. // int32ToBytes converts int32 to bytes.
  203. func int32ToBytes(i int32) []byte {
  204. buf := make([]byte, 4)
  205. binary.BigEndian.PutUint32(buf, uint32(i))
  206. return buf
  207. }
  208. // int64ToBytes converts int64 to bytes.
  209. func int64ToBytes(i int64) []byte {
  210. buf := make([]byte, 8)
  211. binary.BigEndian.PutUint64(buf, uint64(i))
  212. return buf
  213. }
  214. type zipkinSpan struct {
  215. *Span
  216. // peer points to the peer service participating in this span,
  217. // e.g. the Client if this span is a server span,
  218. // or Server if this span is a client span
  219. peer struct {
  220. Ipv4 int32
  221. Port int16
  222. ServiceName string
  223. }
  224. // used to distinguish local vs. RPC Server vs. RPC Client spans
  225. spanKind string
  226. }
  227. func (s *zipkinSpan) handleSpecialTags() {
  228. s.Lock()
  229. defer s.Unlock()
  230. if s.firstInProcess {
  231. // append the process tags
  232. s.tags = append(s.tags, s.tracer.tags...)
  233. }
  234. filteredTags := make([]Tag, 0, len(s.tags))
  235. for _, tag := range s.tags {
  236. if handler, ok := specialTagHandlers[tag.key]; ok {
  237. handler(s, tag.value)
  238. } else {
  239. filteredTags = append(filteredTags, tag)
  240. }
  241. }
  242. s.tags = filteredTags
  243. }
  244. func setSpanKind(s *zipkinSpan, value interface{}) {
  245. if val, ok := value.(string); ok {
  246. s.spanKind = val
  247. return
  248. }
  249. if val, ok := value.(ext.SpanKindEnum); ok {
  250. s.spanKind = string(val)
  251. }
  252. }
  253. func setPeerIPv4(s *zipkinSpan, value interface{}) {
  254. if val, ok := value.(string); ok {
  255. if ip, err := utils.ParseIPToUint32(val); err == nil {
  256. s.peer.Ipv4 = int32(ip)
  257. return
  258. }
  259. }
  260. if val, ok := value.(uint32); ok {
  261. s.peer.Ipv4 = int32(val)
  262. return
  263. }
  264. if val, ok := value.(int32); ok {
  265. s.peer.Ipv4 = val
  266. }
  267. }
  268. func setPeerPort(s *zipkinSpan, value interface{}) {
  269. if val, ok := value.(string); ok {
  270. if port, err := utils.ParsePort(val); err == nil {
  271. s.peer.Port = int16(port)
  272. return
  273. }
  274. }
  275. if val, ok := value.(uint16); ok {
  276. s.peer.Port = int16(val)
  277. return
  278. }
  279. if val, ok := value.(int); ok {
  280. s.peer.Port = int16(val)
  281. }
  282. }
  283. func setPeerService(s *zipkinSpan, value interface{}) {
  284. if val, ok := value.(string); ok {
  285. s.peer.ServiceName = val
  286. }
  287. }
  288. func removeTag(s *zipkinSpan, value interface{}) {}
  289. func (s *zipkinSpan) peerDefined() bool {
  290. return s.peer.ServiceName != "" || s.peer.Ipv4 != 0 || s.peer.Port != 0
  291. }
  292. func (s *zipkinSpan) isRPC() bool {
  293. s.RLock()
  294. defer s.RUnlock()
  295. return s.spanKind == string(ext.SpanKindRPCClientEnum) || s.spanKind == string(ext.SpanKindRPCServerEnum)
  296. }
  297. func (s *zipkinSpan) isRPCClient() bool {
  298. s.RLock()
  299. defer s.RUnlock()
  300. return s.spanKind == string(ext.SpanKindRPCClientEnum)
  301. }