diff --git a/CHANGELOG.md b/CHANGELOG.md index 82d440fa5..f3669f5cd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ ## Changes -[//]: # (comment: Don't forget to update statsd/statsd.go:clientVersionTelemetryTag when releasing a new version) +[//]: # (comment: Don't forget to update statsd/telemetry.go:clientVersionTelemetryTag when releasing a new version) # 3.7.2 / 2020-06-16 diff --git a/statsd/options.go b/statsd/options.go index e3c4eaedf..189f7f7c6 100644 --- a/statsd/options.go +++ b/statsd/options.go @@ -27,7 +27,7 @@ var ( DefaultWriteTimeoutUDS = 1 * time.Millisecond // DefaultTelemetry is the default value for the Telemetry option DefaultTelemetry = true - // DefaultReceivingingMode is the default behavior when sending metrics + // DefaultReceivingMode is the default behavior when sending metrics DefaultReceivingMode = MutexMode // DefaultChannelModeBufferSize is the default size of the channel holding incoming metrics DefaultChannelModeBufferSize = 4096 @@ -95,6 +95,8 @@ type Options struct { AggregationFlushInterval time.Duration // [beta] Aggregation enables/disables client side aggregation Aggregation bool + // TelemetryAddr specify a different endpoint for telemetry metrics. + TelemetryAddr string } func resolveOptions(options []Option) (*Options, error) { @@ -220,7 +222,7 @@ func WithChannelMode() Option { } } -// WithMutexModeMode will use mutex to receive metrics +// WithMutexMode will use mutex to receive metrics func WithMutexMode() Option { return func(o *Options) error { o.ReceiveMode = MutexMode @@ -259,3 +261,11 @@ func WithoutClientSideAggregation() Option { return nil } } + +// WithTelemetryAddr specify a different address for telemetry metrics. +func WithTelemetryAddr(addr string) Option { + return func(o *Options) error { + o.TelemetryAddr = addr + return nil + } +} diff --git a/statsd/options_test.go b/statsd/options_test.go index 2622855d2..0f97d951c 100644 --- a/statsd/options_test.go +++ b/statsd/options_test.go @@ -21,6 +21,11 @@ func TestDefaultOptions(t *testing.T) { assert.Equal(t, options.SenderQueueSize, DefaultSenderQueueSize) assert.Equal(t, options.WriteTimeoutUDS, DefaultWriteTimeoutUDS) assert.Equal(t, options.Telemetry, DefaultTelemetry) + assert.Equal(t, options.ReceiveMode, DefaultReceivingMode) + assert.Equal(t, options.ChannelModeBufferSize, DefaultChannelModeBufferSize) + assert.Equal(t, options.AggregationFlushInterval, DefaultAggregationFlushInterval) + assert.Equal(t, options.Aggregation, DefaultAggregation) + assert.Zero(t, options.TelemetryAddr) } func TestOptions(t *testing.T) { @@ -33,6 +38,9 @@ func TestOptions(t *testing.T) { testBufferShardCount := 28 testSenderQueueSize := 64 testWriteTimeoutUDS := 1 * time.Minute + testChannelBufferSize := 500 + testAggregationWindow := 10 * time.Second + testTelemetryAddr := "localhost:1234" options, err := resolveOptions([]Option{ WithNamespace(testNamespace), @@ -45,6 +53,11 @@ func TestOptions(t *testing.T) { WithSenderQueueSize(testSenderQueueSize), WithWriteTimeoutUDS(testWriteTimeoutUDS), WithoutTelemetry(), + WithChannelMode(), + WithChannelModeBufferSize(testChannelBufferSize), + WithAggregationInterval(testAggregationWindow), + WithClientSideAggregation(), + WithTelemetryAddr(testTelemetryAddr), }) assert.NoError(t, err) @@ -58,8 +71,25 @@ func TestOptions(t *testing.T) { assert.Equal(t, options.SenderQueueSize, testSenderQueueSize) assert.Equal(t, options.WriteTimeoutUDS, testWriteTimeoutUDS) assert.Equal(t, options.Telemetry, false) + assert.Equal(t, options.ReceiveMode, ChannelMode) + assert.Equal(t, options.ChannelModeBufferSize, testChannelBufferSize) + assert.Equal(t, options.AggregationFlushInterval, testAggregationWindow) + assert.Equal(t, options.Aggregation, true) + assert.Equal(t, options.TelemetryAddr, testTelemetryAddr) } +func TestResetOptions(t *testing.T) { + options, err := resolveOptions([]Option{ + WithChannelMode(), + WithMutexMode(), + WithClientSideAggregation(), + WithoutClientSideAggregation(), + }) + + assert.NoError(t, err) + assert.Equal(t, options.ReceiveMode, MutexMode) + assert.Equal(t, options.Aggregation, false) +} func TestOptionsNamespaceWithoutDot(t *testing.T) { testNamespace := "datadog" diff --git a/statsd/statsd.go b/statsd/statsd.go index 31a4ee439..b77bfbcba 100644 --- a/statsd/statsd.go +++ b/statsd/statsd.go @@ -61,21 +61,6 @@ agent configuration file datadog.yaml. */ const DefaultMaxAgentPayloadSize = 8192 -/* -TelemetryInterval is the interval at which telemetry will be sent by the client. -*/ -const TelemetryInterval = 10 * time.Second - -/* -clientTelemetryTag is a tag identifying this specific client. -*/ -var clientTelemetryTag = "client:go" - -/* -clientVersionTelemetryTag is a tag identifying this specific client version. -*/ -var clientVersionTelemetryTag = "client_version:3.7.2" - /* UnixAddressPrefix holds the prefix to use to enable Unix Domain Socket traffic instead of UDP. @@ -205,20 +190,18 @@ type Client struct { // Tags are global tags to be added to every statsd call Tags []string // skipErrors turns off error passing and allows UDS to emulate UDP behaviour - SkipErrors bool - flushTime time.Duration - bufferPool *bufferPool - buffer *statsdBuffer - metrics *ClientMetrics - telemetryTags []string - stop chan struct{} - wg sync.WaitGroup - bufferShards []*worker - closerLock sync.Mutex - receiveMode ReceivingMode - agg *aggregator - options []Option - addrOption string + SkipErrors bool + flushTime time.Duration + metrics *ClientMetrics + telemetry *telemetryClient + stop chan struct{} + wg sync.WaitGroup + workers []*worker + closerLock sync.Mutex + receiveMode ReceivingMode + agg *aggregator + options []Option + addrOption string } // ClientMetrics contains metrics about the client @@ -233,42 +216,29 @@ type ClientMetrics struct { // https://golang.org/doc/faq#guarantee_satisfies_interface var _ ClientInterface = &Client{} +func resolveAddr(addr string) (statsdWriter, string, error) { + if !strings.HasPrefix(addr, UnixAddressPrefix) { + w, err := newUDPWriter(addr) + return w, "udp", err + } + + w, err := newUDSWriter(addr[len(UnixAddressPrefix):]) + return w, "uds", err +} + // New returns a pointer to a new Client given an addr in the format "hostname:port" or // "unix:///path/to/socket". func New(addr string, options ...Option) (*Client, error) { - var w statsdWriter o, err := resolveOptions(options) if err != nil { return nil, err } - var writerType string - optimalPayloadSize := OptimalUDPPayloadSize - defaultBufferPoolSize := DefaultUDPBufferPoolSize - if !strings.HasPrefix(addr, UnixAddressPrefix) { - w, err = newUDPWriter(addr) - writerType = "udp" - } else { - // FIXME: The agent has a performance pitfall preventing us from using better defaults here. - // Once it's fixed, use `DefaultMaxAgentPayloadSize` and `DefaultUDSBufferPoolSize` instead. - optimalPayloadSize = OptimalUDPPayloadSize - defaultBufferPoolSize = DefaultUDPBufferPoolSize - w, err = newUDSWriter(addr[len(UnixAddressPrefix):]) - writerType = "uds" - } + w, writerType, err := resolveAddr(addr) if err != nil { return nil, err } - if o.MaxBytesPerPayload == 0 { - o.MaxBytesPerPayload = optimalPayloadSize - } - if o.BufferPoolSize == 0 { - o.BufferPoolSize = defaultBufferPoolSize - } - if o.SenderQueueSize == 0 { - o.SenderQueueSize = defaultBufferPoolSize - } client, err := newWithWriter(w, o, writerType) if err == nil { client.options = append(client.options, options...) @@ -321,8 +291,8 @@ func newWithWriter(w statsdWriter, o *Options, writerName string) (*Client, erro } } - c.telemetryTags = append(c.Tags, clientTelemetryTag, clientVersionTelemetryTag, "client_transport:"+writerName) - + // FIXME: The agent has a performance pitfall preventing us from using better defaults here. + // Once it's fixed, use `DefaultMaxAgentPayloadSize` and `DefaultUDSBufferPoolSize` instead. if o.MaxBytesPerPayload == 0 { o.MaxBytesPerPayload = OptimalUDPPayloadSize } @@ -333,31 +303,39 @@ func newWithWriter(w statsdWriter, o *Options, writerName string) (*Client, erro o.SenderQueueSize = DefaultUDPBufferPoolSize } - c.receiveMode = o.ReceiveMode - c.bufferPool = newBufferPool(o.BufferPoolSize, o.MaxBytesPerPayload, o.MaxMessagesPerPayload) - c.buffer = c.bufferPool.borrowBuffer() - c.sender = newSender(w, o.SenderQueueSize, c.bufferPool) + bufferPool := newBufferPool(o.BufferPoolSize, o.MaxBytesPerPayload, o.MaxMessagesPerPayload) + c.sender = newSender(w, o.SenderQueueSize, bufferPool) for i := 0; i < o.BufferShardCount; i++ { - w := newWorker(c.bufferPool, c.sender) - c.bufferShards = append(c.bufferShards, w) + w := newWorker(bufferPool, c.sender) + c.workers = append(c.workers, w) if c.receiveMode == ChannelMode { - w.startReceivingMetric(o.ChannelModeBufferSize) // TODO make it configurable + w.startReceivingMetric(o.ChannelModeBufferSize) } } + + c.receiveMode = o.ReceiveMode c.flushTime = o.BufferFlushInterval c.stop = make(chan struct{}, 1) + c.wg.Add(1) go func() { defer c.wg.Done() c.watch() }() - c.wg.Add(1) - go func() { - defer c.wg.Done() - if o.Telemetry { - c.telemetry() + + if o.TelemetryAddr == "" { + c.telemetry = NewTelemetryClient(&c, writerName) + } else { + var err error + c.telemetry, err = NewTelemetryClientWithCustomAddr(&c, writerName, o.TelemetryAddr, bufferPool) + if err != nil { + return nil, err } - }() + } + + if o.Telemetry { + c.telemetry.run(&c.wg, c.stop) + } return &c, nil } @@ -384,7 +362,7 @@ func (c *Client) watch() { for { select { case <-ticker.C: - for _, w := range c.bufferShards { + for _, w := range c.workers { w.flush() } case <-c.stop: @@ -394,48 +372,6 @@ func (c *Client) watch() { } } -func (c *Client) telemetry() { - ticker := time.NewTicker(TelemetryInterval) - for { - select { - case <-ticker.C: - for _, m := range c.flushTelemetry() { - c.send(m) - } - case <-c.stop: - ticker.Stop() - return - } - } -} - -// flushTelemetry returns Telemetry metrics to be flushed. It's its own function to ease testing. -func (c *Client) flushTelemetry() []metric { - m := []metric{} - - // same as Count but without global namespace - telemetryCount := func(name string, value int64) { - m = append(m, metric{metricType: count, name: name, ivalue: value, tags: c.telemetryTags, rate: 1}) - } - - clientMetrics := c.FlushTelemetryMetrics() - telemetryCount("datadog.dogstatsd.client.metrics", int64(clientMetrics.TotalMetrics)) - telemetryCount("datadog.dogstatsd.client.events", int64(clientMetrics.TotalEvents)) - telemetryCount("datadog.dogstatsd.client.service_checks", int64(clientMetrics.TotalServiceChecks)) - telemetryCount("datadog.dogstatsd.client.metric_dropped_on_receive", int64(clientMetrics.TotalDroppedOnReceive)) - - senderMetrics := c.sender.flushTelemetryMetrics() - telemetryCount("datadog.dogstatsd.client.packets_sent", int64(senderMetrics.TotalSentPayloads)) - telemetryCount("datadog.dogstatsd.client.bytes_sent", int64(senderMetrics.TotalSentBytes)) - telemetryCount("datadog.dogstatsd.client.packets_dropped", int64(senderMetrics.TotalDroppedPayloads)) - telemetryCount("datadog.dogstatsd.client.bytes_dropped", int64(senderMetrics.TotalDroppedBytes)) - telemetryCount("datadog.dogstatsd.client.packets_dropped_queue", int64(senderMetrics.TotalDroppedPayloadsQueueFull)) - telemetryCount("datadog.dogstatsd.client.bytes_dropped_queue", int64(senderMetrics.TotalDroppedBytesQueueFull)) - telemetryCount("datadog.dogstatsd.client.packets_dropped_writer", int64(senderMetrics.TotalDroppedPayloadsWriter)) - telemetryCount("datadog.dogstatsd.client.bytes_dropped_writer", int64(senderMetrics.TotalDroppedBytesWriter)) - return m -} - // Flush forces a flush of all the queued dogstatsd payloads // This method is blocking and will not return until everything is sent // through the network @@ -443,7 +379,7 @@ func (c *Client) Flush() error { if c == nil { return ErrNoClient } - for _, w := range c.bufferShards { + for _, w := range c.workers { w.flush() } c.sender.flush() @@ -468,7 +404,7 @@ func (c *Client) send(m metric) error { m.namespace = c.Namespace h := hashString32(m.name) - worker := c.bufferShards[h%uint32(len(c.bufferShards))] + worker := c.workers[h%uint32(len(c.workers))] if c.receiveMode == ChannelMode { select { @@ -609,7 +545,7 @@ func (c *Client) Close() error { close(c.stop) if c.receiveMode == ChannelMode { - for _, w := range c.bufferShards { + for _, w := range c.workers { w.stopReceivingMetric() } } diff --git a/statsd/statsd_test.go b/statsd/statsd_test.go index 1a9eb2b92..39be63896 100644 --- a/statsd/statsd_test.go +++ b/statsd/statsd_test.go @@ -4,7 +4,6 @@ import ( "fmt" "io" "net" - "os" "testing" "time" @@ -28,95 +27,20 @@ func (statsdWriterWrapper) Write(p []byte) (n int, err error) { func TestCustomWriterBufferConfiguration(t *testing.T) { client, err := NewWithWriter(statsdWriterWrapper{}) - if err != nil { - t.Fatal(err) - } + require.Nil(t, err) defer client.Close() - assert.Equal(t, OptimalUDPPayloadSize, client.bufferPool.bufferMaxSize) - assert.Equal(t, DefaultUDPBufferPoolSize, cap(client.bufferPool.pool)) + assert.Equal(t, OptimalUDPPayloadSize, client.sender.pool.bufferMaxSize) + assert.Equal(t, DefaultUDPBufferPoolSize, cap(client.sender.pool.pool)) assert.Equal(t, DefaultUDPBufferPoolSize, cap(client.sender.queue)) } -func testTelemetry(t *testing.T, client *Client, expectedTelemetryTags []string) { - client.Gauge("Gauge", 21, nil, 1) - client.Count("Count", 21, nil, 1) - client.Histogram("Histogram", 21, nil, 1) - client.Distribution("Distribution", 21, nil, 1) - client.Decr("Decr", nil, 1) - client.Incr("Incr", nil, 1) - client.Set("Set", "value", nil, 1) - client.Timing("Timing", 21, nil, 1) - client.TimeInMilliseconds("TimeInMilliseconds", 21, nil, 1) - client.SimpleEvent("hello", "world") - client.SimpleServiceCheck("hello", Warn) - - metrics := client.flushTelemetry() - - expectedMetricsName := map[string]int64{ - "datadog.dogstatsd.client.metrics": 9, - "datadog.dogstatsd.client.events": 1, - "datadog.dogstatsd.client.service_checks": 1, - "datadog.dogstatsd.client.metric_dropped_on_receive": 0, - "datadog.dogstatsd.client.packets_sent": 0, - "datadog.dogstatsd.client.bytes_sent": 0, - "datadog.dogstatsd.client.packets_dropped": 0, - "datadog.dogstatsd.client.bytes_dropped": 0, - "datadog.dogstatsd.client.packets_dropped_queue": 0, - "datadog.dogstatsd.client.bytes_dropped_queue": 0, - "datadog.dogstatsd.client.packets_dropped_writer": 0, - "datadog.dogstatsd.client.bytes_dropped_writer": 0, - } - - assert.Equal(t, len(expectedMetricsName), len(metrics)) - for _, m := range metrics { - expectedValue, found := expectedMetricsName[m.name] - if !found { - assert.Fail(t, fmt.Sprintf("Unknown metrics: %s", m.name)) - } - - assert.Equal(t, expectedValue, m.ivalue, fmt.Sprintf("wrong ivalue for '%s'", m.name)) - assert.Equal(t, count, m.metricType, fmt.Sprintf("wrong metricTypefor '%s'", m.name)) - assert.Equal(t, expectedTelemetryTags, m.tags, fmt.Sprintf("wrong tags for '%s'", m.name)) - assert.Equal(t, float64(1), m.rate, fmt.Sprintf("wrong rate for '%s'", m.name)) - } -} - -func TestTelemetry(t *testing.T) { - // disabling autoflush of the telemetry - client, err := New("localhost:8125", WithoutTelemetry()) - if err != nil { - t.Fatal(err) - } - - expectedTelemetryTags := []string{clientTelemetryTag, clientVersionTelemetryTag, "client_transport:udp"} - testTelemetry(t, client, expectedTelemetryTags) -} - -func TestTelemetryWithGlobalTags(t *testing.T) { - // disabling autoflush of the telemetry - os.Setenv("DD_ENV", "test") - defer os.Unsetenv("DD_ENV") - - client, err := New("localhost:8125", WithoutTelemetry(), WithTags([]string{"tag1", "tag2"})) - if err != nil { - t.Fatal(err) - } - - expectedTelemetryTags := []string{"tag1", "tag2", "env:test", clientTelemetryTag, clientVersionTelemetryTag, "client_transport:udp"} - testTelemetry(t, client, expectedTelemetryTags) -} - func getTestServer(t *testing.T, addr string) *net.UDPConn { udpAddr, err := net.ResolveUDPAddr("udp", addr) - if err != nil { - require.Failf(t, "could not resolve udp '%s': %s", addr, err.Error()) - } + require.Nil(t, err, fmt.Sprintf("could not resolve udp '%s': %s", addr, err)) server, err := net.ListenUDP("udp", udpAddr) - if err != nil { - require.Failf(t, "Could not listen to UDP addr: %s", err.Error()) - } + require.Nil(t, err, fmt.Sprintf("Could not listen to UDP addr: %s", err)) return server } @@ -127,15 +51,12 @@ func TestFlushOnClose(t *testing.T) { defer server.Close() client, err := New(addr) - if err != nil { - t.Fatalf("failed to create client: %s", err) - } + require.Nil(t, err, fmt.Sprintf("failed to create client: %s", err)) client.Count("name", 1, []string{"tag"}, 1) - if err := client.Close(); err != nil { - t.Fatalf("failed to close client: %s", err) - } + err = client.Close() + require.Nil(t, err, fmt.Sprintf("failed to close client: %s", err)) readDone := make(chan struct{}) n := 0 @@ -158,9 +79,7 @@ func TestCloneWithExtraOptions(t *testing.T) { addr := "localhost:1201" client, err := New(addr, WithTags([]string{"tag1", "tag2"})) - if err != nil { - t.Fatalf("failed to create client: %s", err) - } + require.Nil(t, err, fmt.Sprintf("failed to create client: %s", err)) assert.Equal(t, client.Tags, []string{"tag1", "tag2"}) assert.Equal(t, client.Namespace, "") @@ -169,9 +88,7 @@ func TestCloneWithExtraOptions(t *testing.T) { assert.Len(t, client.options, 1) cloneClient, err := CloneWithExtraOptions(client, WithNamespace("test"), WithChannelMode()) - if err != nil { - t.Fatalf("failed to clone client: %s", err) - } + require.Nil(t, err, fmt.Sprintf("failed to clone client: %s", err)) assert.Equal(t, cloneClient.Tags, []string{"tag1", "tag2"}) assert.Equal(t, cloneClient.Namespace, "test.") diff --git a/statsd/telemetry.go b/statsd/telemetry.go new file mode 100644 index 000000000..63ecd6627 --- /dev/null +++ b/statsd/telemetry.go @@ -0,0 +1,116 @@ +package statsd + +import ( + "fmt" + "sync" + "time" +) + +/* +TelemetryInterval is the interval at which telemetry will be sent by the client. +*/ +const TelemetryInterval = 10 * time.Second + +/* +clientTelemetryTag is a tag identifying this specific client. +*/ +var clientTelemetryTag = "client:go" + +/* +clientVersionTelemetryTag is a tag identifying this specific client version. +*/ +var clientVersionTelemetryTag = "client_version:3.7.2" + +type telemetryClient struct { + c *Client + tags []string + namespace string + sender *sender + worker *worker +} + +func NewTelemetryClient(c *Client, transport string) *telemetryClient { + return &telemetryClient{ + c: c, + tags: append(c.Tags, clientTelemetryTag, clientVersionTelemetryTag, "client_transport:"+transport), + namespace: c.Namespace, + } +} + +func NewTelemetryClientWithCustomAddr(c *Client, transport string, telemetryAddr string, pool *bufferPool) (*telemetryClient, error) { + telemetryWriter, _, err := resolveAddr(telemetryAddr) + if err != nil { + return nil, fmt.Errorf("Could not resolve telemetry address: %v", err) + } + + t := NewTelemetryClient(c, transport) + + // creating a custom sender/worker with 1 worker in + // mutex mode for the telemetry that share the same + // bufferPool + t.sender = newSender(telemetryWriter, DefaultUDPBufferPoolSize, pool) + t.worker = newWorker(pool, t.sender) + return t, nil +} + +func (t *telemetryClient) run(wg *sync.WaitGroup, stop chan struct{}) { + wg.Add(1) + go func() { + defer wg.Done() + ticker := time.NewTicker(TelemetryInterval) + for { + select { + case <-ticker.C: + t.sendTelemetry() + case <-stop: + ticker.Stop() + if t.sender != nil { + t.sender.close() + } + return + } + } + }() +} + +func (t *telemetryClient) sendTelemetry() { + for _, m := range t.flush() { + if t.worker != nil { + m.namespace = t.namespace + t.worker.processMetric(m) + } else { + t.c.send(m) + } + } + + if t.worker != nil { + t.worker.flush() + } +} + +// flushTelemetry returns Telemetry metrics to be flushed. It's its own function to ease testing. +func (t *telemetryClient) flush() []metric { + m := []metric{} + + // same as Count but without global namespace + telemetryCount := func(name string, value int64) { + m = append(m, metric{metricType: count, name: name, ivalue: value, tags: t.tags, rate: 1}) + } + + clientMetrics := t.c.FlushTelemetryMetrics() + telemetryCount("datadog.dogstatsd.client.metrics", int64(clientMetrics.TotalMetrics)) + telemetryCount("datadog.dogstatsd.client.events", int64(clientMetrics.TotalEvents)) + telemetryCount("datadog.dogstatsd.client.service_checks", int64(clientMetrics.TotalServiceChecks)) + telemetryCount("datadog.dogstatsd.client.metric_dropped_on_receive", int64(clientMetrics.TotalDroppedOnReceive)) + + senderMetrics := t.c.sender.flushTelemetryMetrics() + telemetryCount("datadog.dogstatsd.client.packets_sent", int64(senderMetrics.TotalSentPayloads)) + telemetryCount("datadog.dogstatsd.client.bytes_sent", int64(senderMetrics.TotalSentBytes)) + telemetryCount("datadog.dogstatsd.client.packets_dropped", int64(senderMetrics.TotalDroppedPayloads)) + telemetryCount("datadog.dogstatsd.client.bytes_dropped", int64(senderMetrics.TotalDroppedBytes)) + telemetryCount("datadog.dogstatsd.client.packets_dropped_queue", int64(senderMetrics.TotalDroppedPayloadsQueueFull)) + telemetryCount("datadog.dogstatsd.client.bytes_dropped_queue", int64(senderMetrics.TotalDroppedBytesQueueFull)) + telemetryCount("datadog.dogstatsd.client.packets_dropped_writer", int64(senderMetrics.TotalDroppedPayloadsWriter)) + telemetryCount("datadog.dogstatsd.client.bytes_dropped_writer", int64(senderMetrics.TotalDroppedBytesWriter)) + return m +} diff --git a/statsd/telemetry_test.go b/statsd/telemetry_test.go new file mode 100644 index 000000000..bde5cbd3a --- /dev/null +++ b/statsd/telemetry_test.go @@ -0,0 +1,139 @@ +package statsd + +import ( + "fmt" + "io" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var basicExpectedTags = []string{clientTelemetryTag, clientVersionTelemetryTag, "client_transport:test_transport"} + +func TestNewTelemetry(t *testing.T) { + client, err := New("localhost:8125", WithoutTelemetry(), WithNamespace("test_namespace")) + require.Nil(t, err) + + telemetry := NewTelemetryClient(client, "test_transport") + assert.NotNil(t, telemetry) + + assert.Equal(t, telemetry.c, client) + assert.Equal(t, telemetry.namespace, "test_namespace.") + assert.Equal(t, telemetry.tags, basicExpectedTags) + assert.Nil(t, telemetry.sender) + assert.Nil(t, telemetry.worker) +} + +func submitTestMetrics(c *Client) { + c.Gauge("Gauge", 21, nil, 1) + c.Count("Count", 21, nil, 1) + c.Histogram("Histogram", 21, nil, 1) + c.Distribution("Distribution", 21, nil, 1) + c.Decr("Decr", nil, 1) + c.Incr("Incr", nil, 1) + c.Set("Set", "value", nil, 1) + c.Timing("Timing", 21, nil, 1) + c.TimeInMilliseconds("TimeInMilliseconds", 21, nil, 1) + c.SimpleEvent("hello", "world") + c.SimpleServiceCheck("hello", Warn) +} + +func testTelemetry(t *testing.T, telemetry *telemetryClient, expectedTelemetryTags []string) { + assert.NotNil(t, telemetry) + + submitTestMetrics(telemetry.c) + metrics := telemetry.flush() + + expectedMetricsName := map[string]int64{ + "datadog.dogstatsd.client.metrics": 9, + "datadog.dogstatsd.client.events": 1, + "datadog.dogstatsd.client.service_checks": 1, + "datadog.dogstatsd.client.metric_dropped_on_receive": 0, + "datadog.dogstatsd.client.packets_sent": 0, + "datadog.dogstatsd.client.bytes_sent": 0, + "datadog.dogstatsd.client.packets_dropped": 0, + "datadog.dogstatsd.client.bytes_dropped": 0, + "datadog.dogstatsd.client.packets_dropped_queue": 0, + "datadog.dogstatsd.client.bytes_dropped_queue": 0, + "datadog.dogstatsd.client.packets_dropped_writer": 0, + "datadog.dogstatsd.client.bytes_dropped_writer": 0, + } + + assert.Equal(t, len(expectedMetricsName), len(metrics)) + for _, m := range metrics { + expectedValue, found := expectedMetricsName[m.name] + assert.True(t, found, fmt.Sprintf("Unknown metrics: %s", m.name)) + + assert.Equal(t, expectedValue, m.ivalue, fmt.Sprintf("wrong ivalue for '%s'", m.name)) + assert.Equal(t, count, m.metricType, fmt.Sprintf("wrong metricTypefor '%s'", m.name)) + assert.Equal(t, expectedTelemetryTags, m.tags, fmt.Sprintf("wrong tags for '%s'", m.name)) + assert.Equal(t, float64(1), m.rate, fmt.Sprintf("wrong rate for '%s'", m.name)) + } +} + +func TestTelemetry(t *testing.T) { + // disabling autoflush of the telemetry + client, err := New("localhost:8125", WithoutTelemetry()) + require.Nil(t, err) + + telemetry := NewTelemetryClient(client, "test_transport") + testTelemetry(t, telemetry, basicExpectedTags) +} + +func TestTelemetryWithGlobalTags(t *testing.T) { + os.Setenv("DD_ENV", "test") + defer os.Unsetenv("DD_ENV") + + // disabling autoflush of the telemetry + client, err := New("localhost:8125", WithoutTelemetry(), WithTags([]string{"tag1", "tag2"})) + require.Nil(t, err) + + telemetry := NewTelemetryClient(client, "test_transport") + + expectedTelemetryTags := append([]string{"tag1", "tag2", "env:test"}, basicExpectedTags...) + testTelemetry(t, telemetry, expectedTelemetryTags) +} + +func TestTelemetryCustomAddr(t *testing.T) { + buffer := make([]byte, 4096) + addr := "localhost:1201" + server := getTestServer(t, addr) + defer server.Close() + + client, err := New("localhost:9876", WithTelemetryAddr(addr), WithNamespace("test_namespace")) + require.Nil(t, err, fmt.Sprintf("failed to create client: %s", err)) + readDone := make(chan struct{}) + n := 0 + go func() { + n, _ = io.ReadAtLeast(server, buffer, 1) + close(readDone) + }() + + submitTestMetrics(client) + client.telemetry.sendTelemetry() + + select { + case <-readDone: + case <-time.After(2 * time.Second): + require.Fail(t, "No data was flush on Close") + } + + result := string(buffer[:n]) + + expectedPayload := "test_namespace.datadog.dogstatsd.client.metrics:9|c|#client:go," + clientVersionTelemetryTag + ",client_transport:udp\n" + + "test_namespace.datadog.dogstatsd.client.events:1|c|#client:go," + clientVersionTelemetryTag + ",client_transport:udp\n" + + "test_namespace.datadog.dogstatsd.client.service_checks:1|c|#client:go," + clientVersionTelemetryTag + ",client_transport:udp\n" + + "test_namespace.datadog.dogstatsd.client.metric_dropped_on_receive:0|c|#client:go," + clientVersionTelemetryTag + ",client_transport:udp\n" + + "test_namespace.datadog.dogstatsd.client.packets_sent:0|c|#client:go," + clientVersionTelemetryTag + ",client_transport:udp\n" + + "test_namespace.datadog.dogstatsd.client.bytes_sent:0|c|#client:go," + clientVersionTelemetryTag + ",client_transport:udp\n" + + "test_namespace.datadog.dogstatsd.client.packets_dropped:0|c|#client:go," + clientVersionTelemetryTag + ",client_transport:udp\n" + + "test_namespace.datadog.dogstatsd.client.bytes_dropped:0|c|#client:go," + clientVersionTelemetryTag + ",client_transport:udp\n" + + "test_namespace.datadog.dogstatsd.client.packets_dropped_queue:0|c|#client:go," + clientVersionTelemetryTag + ",client_transport:udp\n" + + "test_namespace.datadog.dogstatsd.client.bytes_dropped_queue:0|c|#client:go," + clientVersionTelemetryTag + ",client_transport:udp\n" + + "test_namespace.datadog.dogstatsd.client.packets_dropped_writer:0|c|#client:go," + clientVersionTelemetryTag + ",client_transport:udp\n" + + "test_namespace.datadog.dogstatsd.client.bytes_dropped_writer:0|c|#client:go," + clientVersionTelemetryTag + ",client_transport:udp" + assert.Equal(t, expectedPayload, result) +}