From 29985e7f6eecf6abd81c5d5c9fa7808c3c70411a Mon Sep 17 00:00:00 2001 From: Gabriel Aszalos Date: Mon, 22 Oct 2018 11:44:40 +0200 Subject: [PATCH] all: protect statsd.Client for sync access (mostly tests) --- api/api.go | 2 +- api/responses.go | 8 ++++---- cmd/trace-agent/agent.go | 2 +- cmd/trace-agent/concentrator.go | 4 ++-- cmd/trace-agent/main.go | 7 +++++-- info/stats.go | 30 +++++++++++++++--------------- statsd/statsd.go | 31 +++++++++++++++++-------------- watchdog/logonpanic.go | 2 +- writer/service_writer.go | 12 ++++++------ writer/service_writer_test.go | 6 +++--- writer/stats_writer.go | 14 +++++++------- writer/stats_writer_test.go | 6 +++--- writer/trace_writer.go | 18 +++++++++--------- writer/trace_writer_test.go | 6 +++--- 14 files changed, 77 insertions(+), 71 deletions(-) diff --git a/api/api.go b/api/api.go index 39df717d8..dac9fc63c 100644 --- a/api/api.go +++ b/api/api.go @@ -305,7 +305,7 @@ func (r *HTTPReceiver) logStats() { accStats := info.NewReceiverStats() for now := range time.Tick(10 * time.Second) { - statsd.Client.Gauge("datadog.trace_agent.heartbeat", 1, nil, 1) + statsd.Client().Gauge("datadog.trace_agent.heartbeat", 1, nil, 1) // We update accStats with the new stats we collected accStats.Acc(r.Stats) diff --git a/api/responses.go b/api/responses.go index e78760d45..641ea41ea 100644 --- a/api/responses.go +++ b/api/responses.go @@ -25,7 +25,7 @@ type traceResponse struct { // HTTPFormatError is used for payload format errors func HTTPFormatError(tags []string, w http.ResponseWriter) { tags = append(tags, "error:format-error") - statsd.Client.Count(receiverErrorKey, 1, tags, 1) + statsd.Client().Count(receiverErrorKey, 1, tags, 1) http.Error(w, "format-error", http.StatusUnsupportedMediaType) } @@ -42,7 +42,7 @@ func HTTPDecodingError(err error, tags []string, w http.ResponseWriter) { } tags = append(tags, fmt.Sprintf("error:%s", errtag)) - statsd.Client.Count(receiverErrorKey, 1, tags, 1) + statsd.Client().Count(receiverErrorKey, 1, tags, 1) http.Error(w, msg, status) } @@ -50,7 +50,7 @@ func HTTPDecodingError(err error, tags []string, w http.ResponseWriter) { // HTTPEndpointNotSupported is for payloads getting sent to a wrong endpoint func HTTPEndpointNotSupported(tags []string, w http.ResponseWriter) { tags = append(tags, "error:unsupported-endpoint") - statsd.Client.Count(receiverErrorKey, 1, tags, 1) + statsd.Client().Count(receiverErrorKey, 1, tags, 1) http.Error(w, "unsupported-endpoint", http.StatusInternalServerError) } @@ -69,7 +69,7 @@ func HTTPRateByService(w http.ResponseWriter, dynConf *config.DynamicConfig) { encoder := json.NewEncoder(w) if err := encoder.Encode(response); err != nil { tags := []string{"error:response-error"} - statsd.Client.Count(receiverErrorKey, 1, tags, 1) + statsd.Client().Count(receiverErrorKey, 1, tags, 1) return } } diff --git a/cmd/trace-agent/agent.go b/cmd/trace-agent/agent.go index 0f9f612f4..2b593714e 100644 --- a/cmd/trace-agent/agent.go +++ b/cmd/trace-agent/agent.go @@ -337,7 +337,7 @@ func (a *Agent) watchdog() { a.Receiver.PreSampler.SetError(err) preSamplerStats := a.Receiver.PreSampler.Stats() - statsd.Client.Gauge("datadog.trace_agent.presampler_rate", preSamplerStats.Rate, nil, 1) + statsd.Client().Gauge("datadog.trace_agent.presampler_rate", preSamplerStats.Rate, nil, 1) info.UpdatePreSampler(*preSamplerStats) } diff --git a/cmd/trace-agent/concentrator.go b/cmd/trace-agent/concentrator.go index 409b99487..e7c627ca8 100644 --- a/cmd/trace-agent/concentrator.go +++ b/cmd/trace-agent/concentrator.go @@ -156,10 +156,10 @@ func (c *Concentrator) flushNow(now int64) []model.StatsBucket { log.Debugf("flushing bucket %d", ts) for _, d := range bucket.Distributions { - statsd.Client.Histogram("datadog.trace_agent.distribution.len", float64(d.Summary.N), nil, 1) + statsd.Client().Histogram("datadog.trace_agent.distribution.len", float64(d.Summary.N), nil, 1) } for _, d := range bucket.ErrDistributions { - statsd.Client.Histogram("datadog.trace_agent.err_distribution.len", float64(d.Summary.N), nil, 1) + statsd.Client().Histogram("datadog.trace_agent.err_distribution.len", float64(d.Summary.N), nil, 1) } sb = append(sb, bucket) delete(c.buckets, ts) diff --git a/cmd/trace-agent/main.go b/cmd/trace-agent/main.go index 4317b6db4..53cbd60f4 100644 --- a/cmd/trace-agent/main.go +++ b/cmd/trace-agent/main.go @@ -14,6 +14,7 @@ import ( _ "net/http/pprof" + dogstatsd "github.com/DataDog/datadog-go/statsd" log "github.com/cihub/seelog" "github.com/DataDog/datadog-agent/pkg/pidfile" @@ -138,13 +139,15 @@ func runAgent(ctx context.Context) { } // Initialize dogstatsd client - err = statsd.Configure(cfg, []string{"version:" + info.Version}) + client, err := dogstatsd.New(fmt.Sprintf("%s:%d", cfg.StatsdHost, cfg.StatsdPort)) if err != nil { osutil.Exitf("cannot configure dogstatsd: %v", err) } + client.Tags = []string{"version:" + info.Version} + statsd.SetClient(client) // count the number of times the agent started - statsd.Client.Count("datadog.trace_agent.started", 1, nil, 1) + statsd.Client().Count("datadog.trace_agent.started", 1, nil, 1) // Seed rand rand.Seed(time.Now().UTC().UnixNano()) diff --git a/info/stats.go b/info/stats.go index c95116628..e8abb7dff 100644 --- a/info/stats.go +++ b/info/stats.go @@ -134,21 +134,21 @@ func (ts *TagStats) publish() { // Publish the stats tags := ts.Tags.toArray() - statsd.Client.Count("datadog.trace_agent.receiver.trace", tracesReceived, tags, 1) - statsd.Client.Count("datadog.trace_agent.receiver.traces_received", tracesReceived, tags, 1) - statsd.Client.Count("datadog.trace_agent.receiver.traces_dropped", tracesDropped, tags, 1) - statsd.Client.Count("datadog.trace_agent.receiver.traces_filtered", tracesFiltered, tags, 1) - statsd.Client.Count("datadog.trace_agent.receiver.traces_priority", tracesPriorityNone, append(tags, "priority:none"), 1) - statsd.Client.Count("datadog.trace_agent.receiver.traces_priority", tracesPriorityNeg, append(tags, "priority:neg"), 1) - statsd.Client.Count("datadog.trace_agent.receiver.traces_priority", tracesPriority0, append(tags, "priority:0"), 1) - statsd.Client.Count("datadog.trace_agent.receiver.traces_priority", tracesPriority1, append(tags, "priority:1"), 1) - statsd.Client.Count("datadog.trace_agent.receiver.traces_priority", tracesPriority2, append(tags, "priority:2"), 1) - statsd.Client.Count("datadog.trace_agent.receiver.traces_bytes", tracesBytes, tags, 1) - statsd.Client.Count("datadog.trace_agent.receiver.spans_received", spansReceived, tags, 1) - statsd.Client.Count("datadog.trace_agent.receiver.spans_dropped", spansDropped, tags, 1) - statsd.Client.Count("datadog.trace_agent.receiver.spans_filtered", spansFiltered, tags, 1) - statsd.Client.Count("datadog.trace_agent.receiver.services_received", servicesReceived, tags, 1) - statsd.Client.Count("datadog.trace_agent.receiver.services_bytes", servicesBytes, tags, 1) + statsd.Client().Count("datadog.trace_agent.receiver.trace", tracesReceived, tags, 1) + statsd.Client().Count("datadog.trace_agent.receiver.traces_received", tracesReceived, tags, 1) + statsd.Client().Count("datadog.trace_agent.receiver.traces_dropped", tracesDropped, tags, 1) + statsd.Client().Count("datadog.trace_agent.receiver.traces_filtered", tracesFiltered, tags, 1) + statsd.Client().Count("datadog.trace_agent.receiver.traces_priority", tracesPriorityNone, append(tags, "priority:none"), 1) + statsd.Client().Count("datadog.trace_agent.receiver.traces_priority", tracesPriorityNeg, append(tags, "priority:neg"), 1) + statsd.Client().Count("datadog.trace_agent.receiver.traces_priority", tracesPriority0, append(tags, "priority:0"), 1) + statsd.Client().Count("datadog.trace_agent.receiver.traces_priority", tracesPriority1, append(tags, "priority:1"), 1) + statsd.Client().Count("datadog.trace_agent.receiver.traces_priority", tracesPriority2, append(tags, "priority:2"), 1) + statsd.Client().Count("datadog.trace_agent.receiver.traces_bytes", tracesBytes, tags, 1) + statsd.Client().Count("datadog.trace_agent.receiver.spans_received", spansReceived, tags, 1) + statsd.Client().Count("datadog.trace_agent.receiver.spans_dropped", spansDropped, tags, 1) + statsd.Client().Count("datadog.trace_agent.receiver.spans_filtered", spansFiltered, tags, 1) + statsd.Client().Count("datadog.trace_agent.receiver.services_received", servicesReceived, tags, 1) + statsd.Client().Count("datadog.trace_agent.receiver.services_bytes", servicesBytes, tags, 1) } // Stats holds the metrics that will be reported every 10s by the agent. diff --git a/statsd/statsd.go b/statsd/statsd.go index 16d6745d1..c20f0152f 100644 --- a/statsd/statsd.go +++ b/statsd/statsd.go @@ -1,10 +1,9 @@ package statsd import ( - "fmt" + "sync" "github.com/DataDog/datadog-go/statsd" - "github.com/DataDog/datadog-trace-agent/config" ) // StatsClient represents a client capable of sending stats to some stat endpoint. @@ -14,17 +13,21 @@ type StatsClient interface { Histogram(name string, value float64, tags []string, rate float64) error } -// Client is a global Statsd client. When a client is configured via Configure, -// that becomes the new global Statsd client in the package. -var Client StatsClient = (*statsd.Client)(nil) +var ( + mu sync.RWMutex + client StatsClient = (*statsd.Client)(nil) +) + +// Client returns the global StatsClient. +func Client() StatsClient { + mu.RLock() + defer mu.RUnlock() + return client +} -// Configure creates a statsd client for the given agent's configuration, using the specified global tags. -func Configure(conf *config.AgentConfig, tags []string) error { - client, err := statsd.New(fmt.Sprintf("%s:%d", conf.StatsdHost, conf.StatsdPort)) - if err != nil { - return err - } - client.Tags = tags - Client = client - return nil +// SetClient sets the global StatsClient. +func SetClient(c StatsClient) { + mu.Lock() + defer mu.Unlock() + client = c } diff --git a/watchdog/logonpanic.go b/watchdog/logonpanic.go index 2d93a9c3d..902b1e2bb 100644 --- a/watchdog/logonpanic.go +++ b/watchdog/logonpanic.go @@ -31,7 +31,7 @@ func LogOnPanic() { errMsg := fmt.Sprintf("%v", err) logMsg := "Unexpected panic: " + errMsg + "\n" + stacktrace - statsd.Client.Gauge("datadog.trace_agent.panic", 1, []string{ + statsd.Client().Gauge("datadog.trace_agent.panic", 1, []string{ "err:" + shortErrMsg(errMsg), }, 1) diff --git a/writer/service_writer.go b/writer/service_writer.go index 6c3bfee3c..96e770e38 100644 --- a/writer/service_writer.go +++ b/writer/service_writer.go @@ -75,7 +75,7 @@ func (w *ServiceWriter) Run() { log.Infof("flushed service payload; url:%s, time:%s, size:%d bytes", url, event.SendStats.SendTime, len(event.Payload.Bytes)) tags := []string{"url:" + url} - statsd.Client.Gauge("datadog.trace_agent.service_writer.flush_duration", + statsd.Client().Gauge("datadog.trace_agent.service_writer.flush_duration", event.SendStats.SendTime.Seconds(), tags, 1) atomic.AddInt64(&w.stats.Payloads, 1) case SenderFailureEvent: @@ -162,11 +162,11 @@ func (w *ServiceWriter) updateInfo() { swInfo.Retries = atomic.SwapInt64(&w.stats.Retries, 0) // TODO(gbbr): Scope these stats per endpoint (see (config.AgentConfig).AdditionalEndpoints)) - statsd.Client.Count("datadog.trace_agent.service_writer.payloads", int64(swInfo.Payloads), nil, 1) - statsd.Client.Count("datadog.trace_agent.service_writer.services", int64(swInfo.Services), nil, 1) - statsd.Client.Count("datadog.trace_agent.service_writer.bytes", int64(swInfo.Bytes), nil, 1) - statsd.Client.Count("datadog.trace_agent.service_writer.retries", int64(swInfo.Retries), nil, 1) - statsd.Client.Count("datadog.trace_agent.service_writer.errors", int64(swInfo.Errors), nil, 1) + statsd.Client().Count("datadog.trace_agent.service_writer.payloads", int64(swInfo.Payloads), nil, 1) + statsd.Client().Count("datadog.trace_agent.service_writer.services", int64(swInfo.Services), nil, 1) + statsd.Client().Count("datadog.trace_agent.service_writer.bytes", int64(swInfo.Bytes), nil, 1) + statsd.Client().Count("datadog.trace_agent.service_writer.retries", int64(swInfo.Retries), nil, 1) + statsd.Client().Count("datadog.trace_agent.service_writer.errors", int64(swInfo.Errors), nil, 1) info.UpdateServiceWriterInfo(swInfo) } diff --git a/writer/service_writer_test.go b/writer/service_writer_test.go index 54bd39bee..5679adda0 100644 --- a/writer/service_writer_test.go +++ b/writer/service_writer_test.go @@ -206,10 +206,10 @@ func testServiceWriter() (*ServiceWriter, chan model.ServicesMetadata, *testEndp testEndpoint := &testEndpoint{} serviceWriter.BaseWriter.payloadSender.setEndpoint(testEndpoint) testStatsClient := &testutil.TestStatsClient{} - originalClient := statsd.Client - statsd.Client = testStatsClient + originalClient := statsd.Client() + statsd.SetClient(testStatsClient) return serviceWriter, serviceChannel, testEndpoint, testStatsClient, func() { - statsd.Client = originalClient + statsd.SetClient(originalClient) } } diff --git a/writer/stats_writer.go b/writer/stats_writer.go index c6bef46d7..bb1d6a32c 100644 --- a/writer/stats_writer.go +++ b/writer/stats_writer.go @@ -265,7 +265,7 @@ func (w *StatsWriter) monitor() { log.Infof("flushed stat payload; url: %s, time:%s, size:%d bytes", url, e.SendStats.SendTime, len(e.Payload.Bytes)) tags := []string{"url:" + url} - statsd.Client.Gauge("datadog.trace_agent.stats_writer.flush_duration", + statsd.Client().Gauge("datadog.trace_agent.stats_writer.flush_duration", e.SendStats.SendTime.Seconds(), tags, 1) atomic.AddInt64(&w.info.Payloads, 1) case SenderFailureEvent: @@ -293,12 +293,12 @@ func (w *StatsWriter) monitor() { swInfo.Errors = atomic.SwapInt64(&w.info.Errors, 0) // TODO(gbbr): Scope these stats per endpoint (see (config.AgentConfig).AdditionalEndpoints)) - statsd.Client.Count("datadog.trace_agent.stats_writer.payloads", int64(swInfo.Payloads), nil, 1) - statsd.Client.Count("datadog.trace_agent.stats_writer.stats_buckets", int64(swInfo.StatsBuckets), nil, 1) - statsd.Client.Count("datadog.trace_agent.stats_writer.bytes", int64(swInfo.Bytes), nil, 1) - statsd.Client.Count("datadog.trace_agent.stats_writer.retries", int64(swInfo.Retries), nil, 1) - statsd.Client.Count("datadog.trace_agent.stats_writer.splits", int64(swInfo.Splits), nil, 1) - statsd.Client.Count("datadog.trace_agent.stats_writer.errors", int64(swInfo.Errors), nil, 1) + statsd.Client().Count("datadog.trace_agent.stats_writer.payloads", int64(swInfo.Payloads), nil, 1) + statsd.Client().Count("datadog.trace_agent.stats_writer.stats_buckets", int64(swInfo.StatsBuckets), nil, 1) + statsd.Client().Count("datadog.trace_agent.stats_writer.bytes", int64(swInfo.Bytes), nil, 1) + statsd.Client().Count("datadog.trace_agent.stats_writer.retries", int64(swInfo.Retries), nil, 1) + statsd.Client().Count("datadog.trace_agent.stats_writer.splits", int64(swInfo.Splits), nil, 1) + statsd.Client().Count("datadog.trace_agent.stats_writer.errors", int64(swInfo.Errors), nil, 1) info.UpdateStatsWriterInfo(swInfo) } diff --git a/writer/stats_writer_test.go b/writer/stats_writer_test.go index 8bae38fdf..4426dac83 100644 --- a/writer/stats_writer_test.go +++ b/writer/stats_writer_test.go @@ -401,10 +401,10 @@ func testStatsWriter() (*StatsWriter, chan []model.StatsBucket, *testEndpoint, * testEndpoint := &testEndpoint{} statsWriter.BaseWriter.payloadSender.setEndpoint(testEndpoint) testStatsClient := &testutil.TestStatsClient{} - originalClient := statsd.Client - statsd.Client = testStatsClient + originalClient := statsd.Client() + statsd.SetClient(testStatsClient) return statsWriter, statsChannel, testEndpoint, testStatsClient, func() { - statsd.Client = originalClient + statsd.SetClient(originalClient) } } diff --git a/writer/trace_writer.go b/writer/trace_writer.go index 0a3fa2a7a..6e3391a77 100644 --- a/writer/trace_writer.go +++ b/writer/trace_writer.go @@ -95,7 +95,7 @@ func (w *TraceWriter) Run() { log.Infof("flushed trace payload to the API, time:%s, size:%d bytes", event.SendStats.SendTime, len(event.Payload.Bytes)) tags := []string{"url:" + event.SendStats.Host} - statsd.Client.Gauge("datadog.trace_agent.trace_writer.flush_duration", + statsd.Client().Gauge("datadog.trace_agent.trace_writer.flush_duration", event.SendStats.SendTime.Seconds(), tags, 1) atomic.AddInt64(&w.stats.Payloads, 1) case SenderFailureEvent: @@ -281,14 +281,14 @@ func (w *TraceWriter) updateInfo() { twInfo.Errors = atomic.SwapInt64(&w.stats.Errors, 0) twInfo.SingleMaxSpans = atomic.SwapInt64(&w.stats.SingleMaxSpans, 0) - statsd.Client.Count("datadog.trace_agent.trace_writer.payloads", int64(twInfo.Payloads), nil, 1) - statsd.Client.Count("datadog.trace_agent.trace_writer.traces", int64(twInfo.Traces), nil, 1) - statsd.Client.Count("datadog.trace_agent.trace_writer.transactions", int64(twInfo.Transactions), nil, 1) - statsd.Client.Count("datadog.trace_agent.trace_writer.spans", int64(twInfo.Spans), nil, 1) - statsd.Client.Count("datadog.trace_agent.trace_writer.bytes", int64(twInfo.Bytes), nil, 1) - statsd.Client.Count("datadog.trace_agent.trace_writer.retries", int64(twInfo.Retries), nil, 1) - statsd.Client.Count("datadog.trace_agent.trace_writer.errors", int64(twInfo.Errors), nil, 1) - statsd.Client.Count("datadog.trace_agent.trace_writer.single_max_spans", int64(twInfo.SingleMaxSpans), nil, 1) + statsd.Client().Count("datadog.trace_agent.trace_writer.payloads", int64(twInfo.Payloads), nil, 1) + statsd.Client().Count("datadog.trace_agent.trace_writer.traces", int64(twInfo.Traces), nil, 1) + statsd.Client().Count("datadog.trace_agent.trace_writer.transactions", int64(twInfo.Transactions), nil, 1) + statsd.Client().Count("datadog.trace_agent.trace_writer.spans", int64(twInfo.Spans), nil, 1) + statsd.Client().Count("datadog.trace_agent.trace_writer.bytes", int64(twInfo.Bytes), nil, 1) + statsd.Client().Count("datadog.trace_agent.trace_writer.retries", int64(twInfo.Retries), nil, 1) + statsd.Client().Count("datadog.trace_agent.trace_writer.errors", int64(twInfo.Errors), nil, 1) + statsd.Client().Count("datadog.trace_agent.trace_writer.single_max_spans", int64(twInfo.SingleMaxSpans), nil, 1) info.UpdateTraceWriterInfo(twInfo) } diff --git a/writer/trace_writer_test.go b/writer/trace_writer_test.go index d04b5b58c..2d4e09ec2 100644 --- a/writer/trace_writer_test.go +++ b/writer/trace_writer_test.go @@ -350,11 +350,11 @@ func testTraceWriter() (*TraceWriter, chan *SampledTrace, *testEndpoint, *testut testEndpoint := &testEndpoint{} traceWriter.BaseWriter.payloadSender.setEndpoint(testEndpoint) testStatsClient := &testutil.TestStatsClient{} - originalClient := statsd.Client - statsd.Client = testStatsClient + originalClient := statsd.Client() + statsd.SetClient(testStatsClient) return traceWriter, payloadChannel, testEndpoint, testStatsClient, func() { - statsd.Client = originalClient + statsd.SetClient(originalClient) } }