Skip to content
This repository has been archived by the owner on Aug 30, 2019. It is now read-only.

Commit

Permalink
all: protect statsd.Client for sync access (mostly tests)
Browse files Browse the repository at this point in the history
  • Loading branch information
gbbr committed Oct 22, 2018
1 parent 8720fdf commit 29985e7
Show file tree
Hide file tree
Showing 14 changed files with 77 additions and 71 deletions.
2 changes: 1 addition & 1 deletion api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func (r *HTTPReceiver) logStats() {
accStats := info.NewReceiverStats()

for now := range time.Tick(10 * time.Second) {
statsd.Client.Gauge("datadog.trace_agent.heartbeat", 1, nil, 1)
statsd.Client().Gauge("datadog.trace_agent.heartbeat", 1, nil, 1)

// We update accStats with the new stats we collected
accStats.Acc(r.Stats)
Expand Down
8 changes: 4 additions & 4 deletions api/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type traceResponse struct {
// HTTPFormatError is used for payload format errors
func HTTPFormatError(tags []string, w http.ResponseWriter) {
tags = append(tags, "error:format-error")
statsd.Client.Count(receiverErrorKey, 1, tags, 1)
statsd.Client().Count(receiverErrorKey, 1, tags, 1)
http.Error(w, "format-error", http.StatusUnsupportedMediaType)
}

Expand All @@ -42,15 +42,15 @@ func HTTPDecodingError(err error, tags []string, w http.ResponseWriter) {
}

tags = append(tags, fmt.Sprintf("error:%s", errtag))
statsd.Client.Count(receiverErrorKey, 1, tags, 1)
statsd.Client().Count(receiverErrorKey, 1, tags, 1)

http.Error(w, msg, status)
}

// HTTPEndpointNotSupported is for payloads getting sent to a wrong endpoint
func HTTPEndpointNotSupported(tags []string, w http.ResponseWriter) {
tags = append(tags, "error:unsupported-endpoint")
statsd.Client.Count(receiverErrorKey, 1, tags, 1)
statsd.Client().Count(receiverErrorKey, 1, tags, 1)
http.Error(w, "unsupported-endpoint", http.StatusInternalServerError)
}

Expand All @@ -69,7 +69,7 @@ func HTTPRateByService(w http.ResponseWriter, dynConf *config.DynamicConfig) {
encoder := json.NewEncoder(w)
if err := encoder.Encode(response); err != nil {
tags := []string{"error:response-error"}
statsd.Client.Count(receiverErrorKey, 1, tags, 1)
statsd.Client().Count(receiverErrorKey, 1, tags, 1)
return
}
}
2 changes: 1 addition & 1 deletion cmd/trace-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func (a *Agent) watchdog() {
a.Receiver.PreSampler.SetError(err)

preSamplerStats := a.Receiver.PreSampler.Stats()
statsd.Client.Gauge("datadog.trace_agent.presampler_rate", preSamplerStats.Rate, nil, 1)
statsd.Client().Gauge("datadog.trace_agent.presampler_rate", preSamplerStats.Rate, nil, 1)
info.UpdatePreSampler(*preSamplerStats)
}

Expand Down
4 changes: 2 additions & 2 deletions cmd/trace-agent/concentrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,10 @@ func (c *Concentrator) flushNow(now int64) []model.StatsBucket {

log.Debugf("flushing bucket %d", ts)
for _, d := range bucket.Distributions {
statsd.Client.Histogram("datadog.trace_agent.distribution.len", float64(d.Summary.N), nil, 1)
statsd.Client().Histogram("datadog.trace_agent.distribution.len", float64(d.Summary.N), nil, 1)
}
for _, d := range bucket.ErrDistributions {
statsd.Client.Histogram("datadog.trace_agent.err_distribution.len", float64(d.Summary.N), nil, 1)
statsd.Client().Histogram("datadog.trace_agent.err_distribution.len", float64(d.Summary.N), nil, 1)
}
sb = append(sb, bucket)
delete(c.buckets, ts)
Expand Down
7 changes: 5 additions & 2 deletions cmd/trace-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

_ "net/http/pprof"

dogstatsd "github.com/DataDog/datadog-go/statsd"
log "github.com/cihub/seelog"

"github.com/DataDog/datadog-agent/pkg/pidfile"
Expand Down Expand Up @@ -138,13 +139,15 @@ func runAgent(ctx context.Context) {
}

// Initialize dogstatsd client
err = statsd.Configure(cfg, []string{"version:" + info.Version})
client, err := dogstatsd.New(fmt.Sprintf("%s:%d", cfg.StatsdHost, cfg.StatsdPort))
if err != nil {
osutil.Exitf("cannot configure dogstatsd: %v", err)
}
client.Tags = []string{"version:" + info.Version}
statsd.SetClient(client)

// count the number of times the agent started
statsd.Client.Count("datadog.trace_agent.started", 1, nil, 1)
statsd.Client().Count("datadog.trace_agent.started", 1, nil, 1)

// Seed rand
rand.Seed(time.Now().UTC().UnixNano())
Expand Down
30 changes: 15 additions & 15 deletions info/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,21 +134,21 @@ func (ts *TagStats) publish() {
// Publish the stats
tags := ts.Tags.toArray()

statsd.Client.Count("datadog.trace_agent.receiver.trace", tracesReceived, tags, 1)
statsd.Client.Count("datadog.trace_agent.receiver.traces_received", tracesReceived, tags, 1)
statsd.Client.Count("datadog.trace_agent.receiver.traces_dropped", tracesDropped, tags, 1)
statsd.Client.Count("datadog.trace_agent.receiver.traces_filtered", tracesFiltered, tags, 1)
statsd.Client.Count("datadog.trace_agent.receiver.traces_priority", tracesPriorityNone, append(tags, "priority:none"), 1)
statsd.Client.Count("datadog.trace_agent.receiver.traces_priority", tracesPriorityNeg, append(tags, "priority:neg"), 1)
statsd.Client.Count("datadog.trace_agent.receiver.traces_priority", tracesPriority0, append(tags, "priority:0"), 1)
statsd.Client.Count("datadog.trace_agent.receiver.traces_priority", tracesPriority1, append(tags, "priority:1"), 1)
statsd.Client.Count("datadog.trace_agent.receiver.traces_priority", tracesPriority2, append(tags, "priority:2"), 1)
statsd.Client.Count("datadog.trace_agent.receiver.traces_bytes", tracesBytes, tags, 1)
statsd.Client.Count("datadog.trace_agent.receiver.spans_received", spansReceived, tags, 1)
statsd.Client.Count("datadog.trace_agent.receiver.spans_dropped", spansDropped, tags, 1)
statsd.Client.Count("datadog.trace_agent.receiver.spans_filtered", spansFiltered, tags, 1)
statsd.Client.Count("datadog.trace_agent.receiver.services_received", servicesReceived, tags, 1)
statsd.Client.Count("datadog.trace_agent.receiver.services_bytes", servicesBytes, tags, 1)
statsd.Client().Count("datadog.trace_agent.receiver.trace", tracesReceived, tags, 1)
statsd.Client().Count("datadog.trace_agent.receiver.traces_received", tracesReceived, tags, 1)
statsd.Client().Count("datadog.trace_agent.receiver.traces_dropped", tracesDropped, tags, 1)
statsd.Client().Count("datadog.trace_agent.receiver.traces_filtered", tracesFiltered, tags, 1)
statsd.Client().Count("datadog.trace_agent.receiver.traces_priority", tracesPriorityNone, append(tags, "priority:none"), 1)
statsd.Client().Count("datadog.trace_agent.receiver.traces_priority", tracesPriorityNeg, append(tags, "priority:neg"), 1)
statsd.Client().Count("datadog.trace_agent.receiver.traces_priority", tracesPriority0, append(tags, "priority:0"), 1)
statsd.Client().Count("datadog.trace_agent.receiver.traces_priority", tracesPriority1, append(tags, "priority:1"), 1)
statsd.Client().Count("datadog.trace_agent.receiver.traces_priority", tracesPriority2, append(tags, "priority:2"), 1)
statsd.Client().Count("datadog.trace_agent.receiver.traces_bytes", tracesBytes, tags, 1)
statsd.Client().Count("datadog.trace_agent.receiver.spans_received", spansReceived, tags, 1)
statsd.Client().Count("datadog.trace_agent.receiver.spans_dropped", spansDropped, tags, 1)
statsd.Client().Count("datadog.trace_agent.receiver.spans_filtered", spansFiltered, tags, 1)
statsd.Client().Count("datadog.trace_agent.receiver.services_received", servicesReceived, tags, 1)
statsd.Client().Count("datadog.trace_agent.receiver.services_bytes", servicesBytes, tags, 1)
}

// Stats holds the metrics that will be reported every 10s by the agent.
Expand Down
31 changes: 17 additions & 14 deletions statsd/statsd.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package statsd

import (
"fmt"
"sync"

"github.com/DataDog/datadog-go/statsd"
"github.com/DataDog/datadog-trace-agent/config"
)

// StatsClient represents a client capable of sending stats to some stat endpoint.
Expand All @@ -14,17 +13,21 @@ type StatsClient interface {
Histogram(name string, value float64, tags []string, rate float64) error
}

// Client is a global Statsd client. When a client is configured via Configure,
// that becomes the new global Statsd client in the package.
var Client StatsClient = (*statsd.Client)(nil)
var (
mu sync.RWMutex
client StatsClient = (*statsd.Client)(nil)
)

// Client returns the global StatsClient.
func Client() StatsClient {
mu.RLock()
defer mu.RUnlock()
return client
}

// Configure creates a statsd client for the given agent's configuration, using the specified global tags.
func Configure(conf *config.AgentConfig, tags []string) error {
client, err := statsd.New(fmt.Sprintf("%s:%d", conf.StatsdHost, conf.StatsdPort))
if err != nil {
return err
}
client.Tags = tags
Client = client
return nil
// SetClient sets the global StatsClient.
func SetClient(c StatsClient) {
mu.Lock()
defer mu.Unlock()
client = c
}
2 changes: 1 addition & 1 deletion watchdog/logonpanic.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func LogOnPanic() {
errMsg := fmt.Sprintf("%v", err)
logMsg := "Unexpected panic: " + errMsg + "\n" + stacktrace

statsd.Client.Gauge("datadog.trace_agent.panic", 1, []string{
statsd.Client().Gauge("datadog.trace_agent.panic", 1, []string{
"err:" + shortErrMsg(errMsg),
}, 1)

Expand Down
12 changes: 6 additions & 6 deletions writer/service_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (w *ServiceWriter) Run() {
log.Infof("flushed service payload; url:%s, time:%s, size:%d bytes", url, event.SendStats.SendTime,
len(event.Payload.Bytes))
tags := []string{"url:" + url}
statsd.Client.Gauge("datadog.trace_agent.service_writer.flush_duration",
statsd.Client().Gauge("datadog.trace_agent.service_writer.flush_duration",
event.SendStats.SendTime.Seconds(), tags, 1)
atomic.AddInt64(&w.stats.Payloads, 1)
case SenderFailureEvent:
Expand Down Expand Up @@ -162,11 +162,11 @@ func (w *ServiceWriter) updateInfo() {
swInfo.Retries = atomic.SwapInt64(&w.stats.Retries, 0)

// TODO(gbbr): Scope these stats per endpoint (see (config.AgentConfig).AdditionalEndpoints))
statsd.Client.Count("datadog.trace_agent.service_writer.payloads", int64(swInfo.Payloads), nil, 1)
statsd.Client.Count("datadog.trace_agent.service_writer.services", int64(swInfo.Services), nil, 1)
statsd.Client.Count("datadog.trace_agent.service_writer.bytes", int64(swInfo.Bytes), nil, 1)
statsd.Client.Count("datadog.trace_agent.service_writer.retries", int64(swInfo.Retries), nil, 1)
statsd.Client.Count("datadog.trace_agent.service_writer.errors", int64(swInfo.Errors), nil, 1)
statsd.Client().Count("datadog.trace_agent.service_writer.payloads", int64(swInfo.Payloads), nil, 1)
statsd.Client().Count("datadog.trace_agent.service_writer.services", int64(swInfo.Services), nil, 1)
statsd.Client().Count("datadog.trace_agent.service_writer.bytes", int64(swInfo.Bytes), nil, 1)
statsd.Client().Count("datadog.trace_agent.service_writer.retries", int64(swInfo.Retries), nil, 1)
statsd.Client().Count("datadog.trace_agent.service_writer.errors", int64(swInfo.Errors), nil, 1)

info.UpdateServiceWriterInfo(swInfo)
}
6 changes: 3 additions & 3 deletions writer/service_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,10 @@ func testServiceWriter() (*ServiceWriter, chan model.ServicesMetadata, *testEndp
testEndpoint := &testEndpoint{}
serviceWriter.BaseWriter.payloadSender.setEndpoint(testEndpoint)
testStatsClient := &testutil.TestStatsClient{}
originalClient := statsd.Client
statsd.Client = testStatsClient
originalClient := statsd.Client()
statsd.SetClient(testStatsClient)

return serviceWriter, serviceChannel, testEndpoint, testStatsClient, func() {
statsd.Client = originalClient
statsd.SetClient(originalClient)
}
}
14 changes: 7 additions & 7 deletions writer/stats_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func (w *StatsWriter) monitor() {
log.Infof("flushed stat payload; url: %s, time:%s, size:%d bytes", url, e.SendStats.SendTime,
len(e.Payload.Bytes))
tags := []string{"url:" + url}
statsd.Client.Gauge("datadog.trace_agent.stats_writer.flush_duration",
statsd.Client().Gauge("datadog.trace_agent.stats_writer.flush_duration",
e.SendStats.SendTime.Seconds(), tags, 1)
atomic.AddInt64(&w.info.Payloads, 1)
case SenderFailureEvent:
Expand Down Expand Up @@ -293,12 +293,12 @@ func (w *StatsWriter) monitor() {
swInfo.Errors = atomic.SwapInt64(&w.info.Errors, 0)

// TODO(gbbr): Scope these stats per endpoint (see (config.AgentConfig).AdditionalEndpoints))
statsd.Client.Count("datadog.trace_agent.stats_writer.payloads", int64(swInfo.Payloads), nil, 1)
statsd.Client.Count("datadog.trace_agent.stats_writer.stats_buckets", int64(swInfo.StatsBuckets), nil, 1)
statsd.Client.Count("datadog.trace_agent.stats_writer.bytes", int64(swInfo.Bytes), nil, 1)
statsd.Client.Count("datadog.trace_agent.stats_writer.retries", int64(swInfo.Retries), nil, 1)
statsd.Client.Count("datadog.trace_agent.stats_writer.splits", int64(swInfo.Splits), nil, 1)
statsd.Client.Count("datadog.trace_agent.stats_writer.errors", int64(swInfo.Errors), nil, 1)
statsd.Client().Count("datadog.trace_agent.stats_writer.payloads", int64(swInfo.Payloads), nil, 1)
statsd.Client().Count("datadog.trace_agent.stats_writer.stats_buckets", int64(swInfo.StatsBuckets), nil, 1)
statsd.Client().Count("datadog.trace_agent.stats_writer.bytes", int64(swInfo.Bytes), nil, 1)
statsd.Client().Count("datadog.trace_agent.stats_writer.retries", int64(swInfo.Retries), nil, 1)
statsd.Client().Count("datadog.trace_agent.stats_writer.splits", int64(swInfo.Splits), nil, 1)
statsd.Client().Count("datadog.trace_agent.stats_writer.errors", int64(swInfo.Errors), nil, 1)

info.UpdateStatsWriterInfo(swInfo)
}
Expand Down
6 changes: 3 additions & 3 deletions writer/stats_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,10 +401,10 @@ func testStatsWriter() (*StatsWriter, chan []model.StatsBucket, *testEndpoint, *
testEndpoint := &testEndpoint{}
statsWriter.BaseWriter.payloadSender.setEndpoint(testEndpoint)
testStatsClient := &testutil.TestStatsClient{}
originalClient := statsd.Client
statsd.Client = testStatsClient
originalClient := statsd.Client()
statsd.SetClient(testStatsClient)

return statsWriter, statsChannel, testEndpoint, testStatsClient, func() {
statsd.Client = originalClient
statsd.SetClient(originalClient)
}
}
18 changes: 9 additions & 9 deletions writer/trace_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (w *TraceWriter) Run() {
log.Infof("flushed trace payload to the API, time:%s, size:%d bytes", event.SendStats.SendTime,
len(event.Payload.Bytes))
tags := []string{"url:" + event.SendStats.Host}
statsd.Client.Gauge("datadog.trace_agent.trace_writer.flush_duration",
statsd.Client().Gauge("datadog.trace_agent.trace_writer.flush_duration",
event.SendStats.SendTime.Seconds(), tags, 1)
atomic.AddInt64(&w.stats.Payloads, 1)
case SenderFailureEvent:
Expand Down Expand Up @@ -281,14 +281,14 @@ func (w *TraceWriter) updateInfo() {
twInfo.Errors = atomic.SwapInt64(&w.stats.Errors, 0)
twInfo.SingleMaxSpans = atomic.SwapInt64(&w.stats.SingleMaxSpans, 0)

statsd.Client.Count("datadog.trace_agent.trace_writer.payloads", int64(twInfo.Payloads), nil, 1)
statsd.Client.Count("datadog.trace_agent.trace_writer.traces", int64(twInfo.Traces), nil, 1)
statsd.Client.Count("datadog.trace_agent.trace_writer.transactions", int64(twInfo.Transactions), nil, 1)
statsd.Client.Count("datadog.trace_agent.trace_writer.spans", int64(twInfo.Spans), nil, 1)
statsd.Client.Count("datadog.trace_agent.trace_writer.bytes", int64(twInfo.Bytes), nil, 1)
statsd.Client.Count("datadog.trace_agent.trace_writer.retries", int64(twInfo.Retries), nil, 1)
statsd.Client.Count("datadog.trace_agent.trace_writer.errors", int64(twInfo.Errors), nil, 1)
statsd.Client.Count("datadog.trace_agent.trace_writer.single_max_spans", int64(twInfo.SingleMaxSpans), nil, 1)
statsd.Client().Count("datadog.trace_agent.trace_writer.payloads", int64(twInfo.Payloads), nil, 1)
statsd.Client().Count("datadog.trace_agent.trace_writer.traces", int64(twInfo.Traces), nil, 1)
statsd.Client().Count("datadog.trace_agent.trace_writer.transactions", int64(twInfo.Transactions), nil, 1)
statsd.Client().Count("datadog.trace_agent.trace_writer.spans", int64(twInfo.Spans), nil, 1)
statsd.Client().Count("datadog.trace_agent.trace_writer.bytes", int64(twInfo.Bytes), nil, 1)
statsd.Client().Count("datadog.trace_agent.trace_writer.retries", int64(twInfo.Retries), nil, 1)
statsd.Client().Count("datadog.trace_agent.trace_writer.errors", int64(twInfo.Errors), nil, 1)
statsd.Client().Count("datadog.trace_agent.trace_writer.single_max_spans", int64(twInfo.SingleMaxSpans), nil, 1)

info.UpdateTraceWriterInfo(twInfo)
}
6 changes: 3 additions & 3 deletions writer/trace_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,11 +350,11 @@ func testTraceWriter() (*TraceWriter, chan *SampledTrace, *testEndpoint, *testut
testEndpoint := &testEndpoint{}
traceWriter.BaseWriter.payloadSender.setEndpoint(testEndpoint)
testStatsClient := &testutil.TestStatsClient{}
originalClient := statsd.Client
statsd.Client = testStatsClient
originalClient := statsd.Client()
statsd.SetClient(testStatsClient)

return traceWriter, payloadChannel, testEndpoint, testStatsClient, func() {
statsd.Client = originalClient
statsd.SetClient(originalClient)
}
}

Expand Down

0 comments on commit 29985e7

Please sign in to comment.