Skip to content

Commit

Permalink
Adding the option to send telemetry to another endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
hush-hush committed Jun 25, 2020
1 parent 2db9a6d commit f740328
Show file tree
Hide file tree
Showing 7 changed files with 358 additions and 210 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
## Changes

[//]: # (comment: Don't forget to update statsd/statsd.go:clientVersionTelemetryTag when releasing a new version)
[//]: # (comment: Don't forget to update statsd/telemetry.go:clientVersionTelemetryTag when releasing a new version)

# 3.7.2 / 2020-06-16

Expand Down
14 changes: 12 additions & 2 deletions statsd/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var (
DefaultWriteTimeoutUDS = 1 * time.Millisecond
// DefaultTelemetry is the default value for the Telemetry option
DefaultTelemetry = true
// DefaultReceivingingMode is the default behavior when sending metrics
// DefaultReceivingMode is the default behavior when sending metrics
DefaultReceivingMode = MutexMode
// DefaultChannelModeBufferSize is the default size of the channel holding incoming metrics
DefaultChannelModeBufferSize = 4096
Expand Down Expand Up @@ -95,6 +95,8 @@ type Options struct {
AggregationFlushInterval time.Duration
// [beta] Aggregation enables/disables client side aggregation
Aggregation bool
// TelemetryAddr specify a different endpoint for telemetry metrics.
TelemetryAddr string
}

func resolveOptions(options []Option) (*Options, error) {
Expand Down Expand Up @@ -220,7 +222,7 @@ func WithChannelMode() Option {
}
}

// WithMutexModeMode will use mutex to receive metrics
// WithMutexMode will use mutex to receive metrics
func WithMutexMode() Option {
return func(o *Options) error {
o.ReceiveMode = MutexMode
Expand Down Expand Up @@ -259,3 +261,11 @@ func WithoutClientSideAggregation() Option {
return nil
}
}

// WithTelemetryAddr specify a different address for telemetry metrics.
func WithTelemetryAddr(addr string) Option {
return func(o *Options) error {
o.TelemetryAddr = addr
return nil
}
}
30 changes: 30 additions & 0 deletions statsd/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ func TestDefaultOptions(t *testing.T) {
assert.Equal(t, options.SenderQueueSize, DefaultSenderQueueSize)
assert.Equal(t, options.WriteTimeoutUDS, DefaultWriteTimeoutUDS)
assert.Equal(t, options.Telemetry, DefaultTelemetry)
assert.Equal(t, options.ReceiveMode, DefaultReceivingMode)
assert.Equal(t, options.ChannelModeBufferSize, DefaultChannelModeBufferSize)
assert.Equal(t, options.AggregationFlushInterval, DefaultAggregationFlushInterval)
assert.Equal(t, options.Aggregation, DefaultAggregation)
assert.Zero(t, options.TelemetryAddr)
}

func TestOptions(t *testing.T) {
Expand All @@ -33,6 +38,9 @@ func TestOptions(t *testing.T) {
testBufferShardCount := 28
testSenderQueueSize := 64
testWriteTimeoutUDS := 1 * time.Minute
testChannelBufferSize := 500
testAggregationWindow := 10 * time.Second
testTelemetryAddr := "localhost:1234"

options, err := resolveOptions([]Option{
WithNamespace(testNamespace),
Expand All @@ -45,6 +53,11 @@ func TestOptions(t *testing.T) {
WithSenderQueueSize(testSenderQueueSize),
WithWriteTimeoutUDS(testWriteTimeoutUDS),
WithoutTelemetry(),
WithChannelMode(),
WithChannelModeBufferSize(testChannelBufferSize),
WithAggregationInterval(testAggregationWindow),
WithClientSideAggregation(),
WithTelemetryAddr(testTelemetryAddr),
})

assert.NoError(t, err)
Expand All @@ -58,8 +71,25 @@ func TestOptions(t *testing.T) {
assert.Equal(t, options.SenderQueueSize, testSenderQueueSize)
assert.Equal(t, options.WriteTimeoutUDS, testWriteTimeoutUDS)
assert.Equal(t, options.Telemetry, false)
assert.Equal(t, options.ReceiveMode, ChannelMode)
assert.Equal(t, options.ChannelModeBufferSize, testChannelBufferSize)
assert.Equal(t, options.AggregationFlushInterval, testAggregationWindow)
assert.Equal(t, options.Aggregation, true)
assert.Equal(t, options.TelemetryAddr, testTelemetryAddr)
}

func TestResetOptions(t *testing.T) {
options, err := resolveOptions([]Option{
WithChannelMode(),
WithMutexMode(),
WithClientSideAggregation(),
WithoutClientSideAggregation(),
})

assert.NoError(t, err)
assert.Equal(t, options.ReceiveMode, MutexMode)
assert.Equal(t, options.Aggregation, false)
}
func TestOptionsNamespaceWithoutDot(t *testing.T) {
testNamespace := "datadog"

Expand Down
164 changes: 50 additions & 114 deletions statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,21 +61,6 @@ agent configuration file datadog.yaml.
*/
const DefaultMaxAgentPayloadSize = 8192

/*
TelemetryInterval is the interval at which telemetry will be sent by the client.
*/
const TelemetryInterval = 10 * time.Second

/*
clientTelemetryTag is a tag identifying this specific client.
*/
var clientTelemetryTag = "client:go"

/*
clientVersionTelemetryTag is a tag identifying this specific client version.
*/
var clientVersionTelemetryTag = "client_version:3.7.2"

/*
UnixAddressPrefix holds the prefix to use to enable Unix Domain Socket
traffic instead of UDP.
Expand Down Expand Up @@ -205,20 +190,18 @@ type Client struct {
// Tags are global tags to be added to every statsd call
Tags []string
// skipErrors turns off error passing and allows UDS to emulate UDP behaviour
SkipErrors bool
flushTime time.Duration
bufferPool *bufferPool
buffer *statsdBuffer
metrics *ClientMetrics
telemetryTags []string
stop chan struct{}
wg sync.WaitGroup
bufferShards []*worker
closerLock sync.Mutex
receiveMode ReceivingMode
agg *aggregator
options []Option
addrOption string
SkipErrors bool
flushTime time.Duration
metrics *ClientMetrics
telemetry *telemetryClient
stop chan struct{}
wg sync.WaitGroup
workers []*worker
closerLock sync.Mutex
receiveMode ReceivingMode
agg *aggregator
options []Option
addrOption string
}

// ClientMetrics contains metrics about the client
Expand All @@ -233,42 +216,29 @@ type ClientMetrics struct {
// https://golang.org/doc/faq#guarantee_satisfies_interface
var _ ClientInterface = &Client{}

func resolveAddr(addr string) (statsdWriter, string, error) {
if !strings.HasPrefix(addr, UnixAddressPrefix) {
w, err := newUDPWriter(addr)
return w, "udp", err
}

w, err := newUDSWriter(addr[len(UnixAddressPrefix):])
return w, "uds", err
}

// New returns a pointer to a new Client given an addr in the format "hostname:port" or
// "unix:///path/to/socket".
func New(addr string, options ...Option) (*Client, error) {
var w statsdWriter
o, err := resolveOptions(options)
if err != nil {
return nil, err
}

var writerType string
optimalPayloadSize := OptimalUDPPayloadSize
defaultBufferPoolSize := DefaultUDPBufferPoolSize
if !strings.HasPrefix(addr, UnixAddressPrefix) {
w, err = newUDPWriter(addr)
writerType = "udp"
} else {
// FIXME: The agent has a performance pitfall preventing us from using better defaults here.
// Once it's fixed, use `DefaultMaxAgentPayloadSize` and `DefaultUDSBufferPoolSize` instead.
optimalPayloadSize = OptimalUDPPayloadSize
defaultBufferPoolSize = DefaultUDPBufferPoolSize
w, err = newUDSWriter(addr[len(UnixAddressPrefix):])
writerType = "uds"
}
w, writerType, err := resolveAddr(addr)
if err != nil {
return nil, err
}

if o.MaxBytesPerPayload == 0 {
o.MaxBytesPerPayload = optimalPayloadSize
}
if o.BufferPoolSize == 0 {
o.BufferPoolSize = defaultBufferPoolSize
}
if o.SenderQueueSize == 0 {
o.SenderQueueSize = defaultBufferPoolSize
}
client, err := newWithWriter(w, o, writerType)
if err == nil {
client.options = append(client.options, options...)
Expand Down Expand Up @@ -321,8 +291,8 @@ func newWithWriter(w statsdWriter, o *Options, writerName string) (*Client, erro
}
}

c.telemetryTags = append(c.Tags, clientTelemetryTag, clientVersionTelemetryTag, "client_transport:"+writerName)

// FIXME: The agent has a performance pitfall preventing us from using better defaults here.
// Once it's fixed, use `DefaultMaxAgentPayloadSize` and `DefaultUDSBufferPoolSize` instead.
if o.MaxBytesPerPayload == 0 {
o.MaxBytesPerPayload = OptimalUDPPayloadSize
}
Expand All @@ -333,31 +303,39 @@ func newWithWriter(w statsdWriter, o *Options, writerName string) (*Client, erro
o.SenderQueueSize = DefaultUDPBufferPoolSize
}

c.receiveMode = o.ReceiveMode
c.bufferPool = newBufferPool(o.BufferPoolSize, o.MaxBytesPerPayload, o.MaxMessagesPerPayload)
c.buffer = c.bufferPool.borrowBuffer()
c.sender = newSender(w, o.SenderQueueSize, c.bufferPool)
bufferPool := newBufferPool(o.BufferPoolSize, o.MaxBytesPerPayload, o.MaxMessagesPerPayload)
c.sender = newSender(w, o.SenderQueueSize, bufferPool)
for i := 0; i < o.BufferShardCount; i++ {
w := newWorker(c.bufferPool, c.sender)
c.bufferShards = append(c.bufferShards, w)
w := newWorker(bufferPool, c.sender)
c.workers = append(c.workers, w)
if c.receiveMode == ChannelMode {
w.startReceivingMetric(o.ChannelModeBufferSize) // TODO make it configurable
w.startReceivingMetric(o.ChannelModeBufferSize)
}
}

c.receiveMode = o.ReceiveMode
c.flushTime = o.BufferFlushInterval
c.stop = make(chan struct{}, 1)

c.wg.Add(1)
go func() {
defer c.wg.Done()
c.watch()
}()
c.wg.Add(1)
go func() {
defer c.wg.Done()
if o.Telemetry {
c.telemetry()

if o.TelemetryAddr == "" {
c.telemetry = NewTelemetryClient(&c, writerName)
} else {
var err error
c.telemetry, err = NewTelemetryClientWithCustomAddr(&c, writerName, o.TelemetryAddr, bufferPool)
if err != nil {
return nil, err
}
}()
}

if o.Telemetry {
c.telemetry.run(&c.wg, c.stop)
}
return &c, nil
}

Expand All @@ -384,7 +362,7 @@ func (c *Client) watch() {
for {
select {
case <-ticker.C:
for _, w := range c.bufferShards {
for _, w := range c.workers {
w.flush()
}
case <-c.stop:
Expand All @@ -394,56 +372,14 @@ func (c *Client) watch() {
}
}

func (c *Client) telemetry() {
ticker := time.NewTicker(TelemetryInterval)
for {
select {
case <-ticker.C:
for _, m := range c.flushTelemetry() {
c.send(m)
}
case <-c.stop:
ticker.Stop()
return
}
}
}

// flushTelemetry returns Telemetry metrics to be flushed. It's its own function to ease testing.
func (c *Client) flushTelemetry() []metric {
m := []metric{}

// same as Count but without global namespace
telemetryCount := func(name string, value int64) {
m = append(m, metric{metricType: count, name: name, ivalue: value, tags: c.telemetryTags, rate: 1})
}

clientMetrics := c.FlushTelemetryMetrics()
telemetryCount("datadog.dogstatsd.client.metrics", int64(clientMetrics.TotalMetrics))
telemetryCount("datadog.dogstatsd.client.events", int64(clientMetrics.TotalEvents))
telemetryCount("datadog.dogstatsd.client.service_checks", int64(clientMetrics.TotalServiceChecks))
telemetryCount("datadog.dogstatsd.client.metric_dropped_on_receive", int64(clientMetrics.TotalDroppedOnReceive))

senderMetrics := c.sender.flushTelemetryMetrics()
telemetryCount("datadog.dogstatsd.client.packets_sent", int64(senderMetrics.TotalSentPayloads))
telemetryCount("datadog.dogstatsd.client.bytes_sent", int64(senderMetrics.TotalSentBytes))
telemetryCount("datadog.dogstatsd.client.packets_dropped", int64(senderMetrics.TotalDroppedPayloads))
telemetryCount("datadog.dogstatsd.client.bytes_dropped", int64(senderMetrics.TotalDroppedBytes))
telemetryCount("datadog.dogstatsd.client.packets_dropped_queue", int64(senderMetrics.TotalDroppedPayloadsQueueFull))
telemetryCount("datadog.dogstatsd.client.bytes_dropped_queue", int64(senderMetrics.TotalDroppedBytesQueueFull))
telemetryCount("datadog.dogstatsd.client.packets_dropped_writer", int64(senderMetrics.TotalDroppedPayloadsWriter))
telemetryCount("datadog.dogstatsd.client.bytes_dropped_writer", int64(senderMetrics.TotalDroppedBytesWriter))
return m
}

// Flush forces a flush of all the queued dogstatsd payloads
// This method is blocking and will not return until everything is sent
// through the network
func (c *Client) Flush() error {
if c == nil {
return ErrNoClient
}
for _, w := range c.bufferShards {
for _, w := range c.workers {
w.flush()
}
c.sender.flush()
Expand All @@ -468,7 +404,7 @@ func (c *Client) send(m metric) error {
m.namespace = c.Namespace

h := hashString32(m.name)
worker := c.bufferShards[h%uint32(len(c.bufferShards))]
worker := c.workers[h%uint32(len(c.workers))]

if c.receiveMode == ChannelMode {
select {
Expand Down Expand Up @@ -609,7 +545,7 @@ func (c *Client) Close() error {
close(c.stop)

if c.receiveMode == ChannelMode {
for _, w := range c.bufferShards {
for _, w := range c.workers {
w.stopReceivingMetric()
}
}
Expand Down
Loading

0 comments on commit f740328

Please sign in to comment.