sampler_remote.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. // Copyright (c) 2017 Uber Technologies, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package jaeger
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "io/ioutil"
  19. "net/http"
  20. "net/url"
  21. "sync"
  22. "sync/atomic"
  23. "time"
  24. "github.com/uber/jaeger-client-go/log"
  25. "github.com/uber/jaeger-client-go/thrift-gen/sampling"
  26. )
  27. const (
  28. defaultRemoteSamplingTimeout = 10 * time.Second
  29. defaultSamplingRefreshInterval = time.Minute
  30. )
  31. // SamplingStrategyFetcher is used to fetch sampling strategy updates from remote server.
  32. type SamplingStrategyFetcher interface {
  33. Fetch(service string) ([]byte, error)
  34. }
  35. // SamplingStrategyParser is used to parse sampling strategy updates. The output object
  36. // should be of the type that is recognized by the SamplerUpdaters.
  37. type SamplingStrategyParser interface {
  38. Parse(response []byte) (interface{}, error)
  39. }
  40. // SamplerUpdater is used by RemotelyControlledSampler to apply sampling strategies,
  41. // retrieved from remote config server, to the current sampler. The updater can modify
  42. // the sampler in-place if sampler supports it, or create a new one.
  43. //
  44. // If the strategy does not contain configuration for the sampler in question,
  45. // updater must return modifiedSampler=nil to give other updaters a chance to inspect
  46. // the sampling strategy response.
  47. //
  48. // RemotelyControlledSampler invokes the updaters while holding a lock on the main sampler.
  49. type SamplerUpdater interface {
  50. Update(sampler SamplerV2, strategy interface{}) (modified SamplerV2, err error)
  51. }
  52. // RemotelyControlledSampler is a delegating sampler that polls a remote server
  53. // for the appropriate sampling strategy, constructs a corresponding sampler and
  54. // delegates to it for sampling decisions.
  55. type RemotelyControlledSampler struct {
  56. // These fields must be first in the struct because `sync/atomic` expects 64-bit alignment.
  57. // Cf. https://github.com/uber/jaeger-client-go/issues/155, https://goo.gl/zW7dgq
  58. closed int64 // 0 - not closed, 1 - closed
  59. sync.RWMutex // used to serialize access to samplerOptions.sampler
  60. samplerOptions
  61. serviceName string
  62. doneChan chan *sync.WaitGroup
  63. }
  64. // NewRemotelyControlledSampler creates a sampler that periodically pulls
  65. // the sampling strategy from an HTTP sampling server (e.g. jaeger-agent).
  66. func NewRemotelyControlledSampler(
  67. serviceName string,
  68. opts ...SamplerOption,
  69. ) *RemotelyControlledSampler {
  70. options := new(samplerOptions).applyOptionsAndDefaults(opts...)
  71. sampler := &RemotelyControlledSampler{
  72. samplerOptions: *options,
  73. serviceName: serviceName,
  74. doneChan: make(chan *sync.WaitGroup),
  75. }
  76. go sampler.pollController()
  77. return sampler
  78. }
  79. // IsSampled implements IsSampled() of Sampler.
  80. // TODO (breaking change) remove when Sampler V1 is removed
  81. func (s *RemotelyControlledSampler) IsSampled(id TraceID, operation string) (bool, []Tag) {
  82. return false, nil
  83. }
  84. // OnCreateSpan implements OnCreateSpan of SamplerV2.
  85. func (s *RemotelyControlledSampler) OnCreateSpan(span *Span) SamplingDecision {
  86. s.RLock()
  87. defer s.RUnlock()
  88. return s.sampler.OnCreateSpan(span)
  89. }
  90. // OnSetOperationName implements OnSetOperationName of SamplerV2.
  91. func (s *RemotelyControlledSampler) OnSetOperationName(span *Span, operationName string) SamplingDecision {
  92. s.RLock()
  93. defer s.RUnlock()
  94. return s.sampler.OnSetOperationName(span, operationName)
  95. }
  96. // OnSetTag implements OnSetTag of SamplerV2.
  97. func (s *RemotelyControlledSampler) OnSetTag(span *Span, key string, value interface{}) SamplingDecision {
  98. s.RLock()
  99. defer s.RUnlock()
  100. return s.sampler.OnSetTag(span, key, value)
  101. }
  102. // OnFinishSpan implements OnFinishSpan of SamplerV2.
  103. func (s *RemotelyControlledSampler) OnFinishSpan(span *Span) SamplingDecision {
  104. s.RLock()
  105. defer s.RUnlock()
  106. return s.sampler.OnFinishSpan(span)
  107. }
  108. // Close implements Close() of Sampler.
  109. func (s *RemotelyControlledSampler) Close() {
  110. if swapped := atomic.CompareAndSwapInt64(&s.closed, 0, 1); !swapped {
  111. s.logger.Error("Repeated attempt to close the sampler is ignored")
  112. return
  113. }
  114. var wg sync.WaitGroup
  115. wg.Add(1)
  116. s.doneChan <- &wg
  117. wg.Wait()
  118. }
  119. // Equal implements Equal() of Sampler.
  120. func (s *RemotelyControlledSampler) Equal(other Sampler) bool {
  121. // NB The Equal() function is expensive and will be removed. See PerOperationSampler.Equal() for
  122. // more information.
  123. return false
  124. }
  125. func (s *RemotelyControlledSampler) pollController() {
  126. ticker := time.NewTicker(s.samplingRefreshInterval)
  127. defer ticker.Stop()
  128. s.pollControllerWithTicker(ticker)
  129. }
  130. func (s *RemotelyControlledSampler) pollControllerWithTicker(ticker *time.Ticker) {
  131. for {
  132. select {
  133. case <-ticker.C:
  134. s.UpdateSampler()
  135. case wg := <-s.doneChan:
  136. wg.Done()
  137. return
  138. }
  139. }
  140. }
  141. // Sampler returns the currently active sampler.
  142. func (s *RemotelyControlledSampler) Sampler() SamplerV2 {
  143. s.RLock()
  144. defer s.RUnlock()
  145. return s.sampler
  146. }
  147. func (s *RemotelyControlledSampler) setSampler(sampler SamplerV2) {
  148. s.Lock()
  149. defer s.Unlock()
  150. s.sampler = sampler
  151. }
  152. // UpdateSampler forces the sampler to fetch sampling strategy from backend server.
  153. // This function is called automatically on a timer, but can also be safely called manually, e.g. from tests.
  154. func (s *RemotelyControlledSampler) UpdateSampler() {
  155. res, err := s.samplingFetcher.Fetch(s.serviceName)
  156. if err != nil {
  157. s.metrics.SamplerQueryFailure.Inc(1)
  158. s.logger.Infof("failed to fetch sampling strategy: %v", err)
  159. return
  160. }
  161. strategy, err := s.samplingParser.Parse(res)
  162. if err != nil {
  163. s.metrics.SamplerUpdateFailure.Inc(1)
  164. s.logger.Infof("failed to parse sampling strategy response: %v", err)
  165. return
  166. }
  167. s.Lock()
  168. defer s.Unlock()
  169. s.metrics.SamplerRetrieved.Inc(1)
  170. if err := s.updateSamplerViaUpdaters(strategy); err != nil {
  171. s.metrics.SamplerUpdateFailure.Inc(1)
  172. s.logger.Infof("failed to handle sampling strategy response %+v. Got error: %v", res, err)
  173. return
  174. }
  175. s.metrics.SamplerUpdated.Inc(1)
  176. }
  177. // NB: this function should only be called while holding a Write lock
  178. func (s *RemotelyControlledSampler) updateSamplerViaUpdaters(strategy interface{}) error {
  179. for _, updater := range s.updaters {
  180. sampler, err := updater.Update(s.sampler, strategy)
  181. if err != nil {
  182. return err
  183. }
  184. if sampler != nil {
  185. s.logger.Debugf("sampler updated: %+v", sampler)
  186. s.sampler = sampler
  187. return nil
  188. }
  189. }
  190. return fmt.Errorf("unsupported sampling strategy %+v", strategy)
  191. }
  192. // -----------------------
  193. // ProbabilisticSamplerUpdater is used by RemotelyControlledSampler to parse sampling configuration.
  194. type ProbabilisticSamplerUpdater struct{}
  195. // Update implements Update of SamplerUpdater.
  196. func (u *ProbabilisticSamplerUpdater) Update(sampler SamplerV2, strategy interface{}) (SamplerV2, error) {
  197. type response interface {
  198. GetProbabilisticSampling() *sampling.ProbabilisticSamplingStrategy
  199. }
  200. var _ response = new(sampling.SamplingStrategyResponse) // sanity signature check
  201. if resp, ok := strategy.(response); ok {
  202. if probabilistic := resp.GetProbabilisticSampling(); probabilistic != nil {
  203. if ps, ok := sampler.(*ProbabilisticSampler); ok {
  204. if err := ps.Update(probabilistic.SamplingRate); err != nil {
  205. return nil, err
  206. }
  207. return sampler, nil
  208. }
  209. return newProbabilisticSampler(probabilistic.SamplingRate), nil
  210. }
  211. }
  212. return nil, nil
  213. }
  214. // -----------------------
  215. // RateLimitingSamplerUpdater is used by RemotelyControlledSampler to parse sampling configuration.
  216. type RateLimitingSamplerUpdater struct{}
  217. // Update implements Update of SamplerUpdater.
  218. func (u *RateLimitingSamplerUpdater) Update(sampler SamplerV2, strategy interface{}) (SamplerV2, error) {
  219. type response interface {
  220. GetRateLimitingSampling() *sampling.RateLimitingSamplingStrategy
  221. }
  222. var _ response = new(sampling.SamplingStrategyResponse) // sanity signature check
  223. if resp, ok := strategy.(response); ok {
  224. if rateLimiting := resp.GetRateLimitingSampling(); rateLimiting != nil {
  225. rateLimit := float64(rateLimiting.MaxTracesPerSecond)
  226. if rl, ok := sampler.(*RateLimitingSampler); ok {
  227. rl.Update(rateLimit)
  228. return rl, nil
  229. }
  230. return NewRateLimitingSampler(rateLimit), nil
  231. }
  232. }
  233. return nil, nil
  234. }
  235. // -----------------------
  236. // AdaptiveSamplerUpdater is used by RemotelyControlledSampler to parse sampling configuration.
  237. // Fields have the same meaning as in PerOperationSamplerParams.
  238. type AdaptiveSamplerUpdater struct {
  239. MaxOperations int
  240. OperationNameLateBinding bool
  241. }
  242. // Update implements Update of SamplerUpdater.
  243. func (u *AdaptiveSamplerUpdater) Update(sampler SamplerV2, strategy interface{}) (SamplerV2, error) {
  244. type response interface {
  245. GetOperationSampling() *sampling.PerOperationSamplingStrategies
  246. }
  247. var _ response = new(sampling.SamplingStrategyResponse) // sanity signature check
  248. if p, ok := strategy.(response); ok {
  249. if operations := p.GetOperationSampling(); operations != nil {
  250. if as, ok := sampler.(*PerOperationSampler); ok {
  251. as.update(operations)
  252. return as, nil
  253. }
  254. return NewPerOperationSampler(PerOperationSamplerParams{
  255. MaxOperations: u.MaxOperations,
  256. OperationNameLateBinding: u.OperationNameLateBinding,
  257. Strategies: operations,
  258. }), nil
  259. }
  260. }
  261. return nil, nil
  262. }
  263. // -----------------------
  264. type httpSamplingStrategyFetcher struct {
  265. serverURL string
  266. logger log.DebugLogger
  267. httpClient http.Client
  268. }
  269. func newHTTPSamplingStrategyFetcher(serverURL string, logger log.DebugLogger) *httpSamplingStrategyFetcher {
  270. customTransport := http.DefaultTransport.(*http.Transport).Clone()
  271. customTransport.ResponseHeaderTimeout = defaultRemoteSamplingTimeout
  272. return &httpSamplingStrategyFetcher{
  273. serverURL: serverURL,
  274. logger: logger,
  275. httpClient: http.Client{
  276. Transport: customTransport,
  277. },
  278. }
  279. }
  280. func (f *httpSamplingStrategyFetcher) Fetch(serviceName string) ([]byte, error) {
  281. v := url.Values{}
  282. v.Set("service", serviceName)
  283. uri := f.serverURL + "?" + v.Encode()
  284. resp, err := f.httpClient.Get(uri)
  285. if err != nil {
  286. return nil, err
  287. }
  288. defer func() {
  289. if err := resp.Body.Close(); err != nil {
  290. f.logger.Error(fmt.Sprintf("failed to close HTTP response body: %+v", err))
  291. }
  292. }()
  293. body, err := ioutil.ReadAll(resp.Body)
  294. if err != nil {
  295. return nil, err
  296. }
  297. if resp.StatusCode >= 400 {
  298. return nil, fmt.Errorf("StatusCode: %d, Body: %s", resp.StatusCode, body)
  299. }
  300. return body, nil
  301. }
  302. // -----------------------
  303. type samplingStrategyParser struct{}
  304. func (p *samplingStrategyParser) Parse(response []byte) (interface{}, error) {
  305. strategy := new(sampling.SamplingStrategyResponse)
  306. if err := json.Unmarshal(response, strategy); err != nil {
  307. return nil, err
  308. }
  309. return strategy, nil
  310. }