Skip to content

Commit

Permalink
Fix DD_AGENT_HOST variable support
Browse files Browse the repository at this point in the history
DD_AGENT_HOST did not support UDS or NamedPipe
  • Loading branch information
hush-hush committed Mar 31, 2021
1 parent 7e21371 commit 0ead22c
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 165 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ Find a list of all the available options for your DogStatsD Client in the [Datad

### Supported environment variables

* If the `addr` parameter is empty, the client uses the `DD_AGENT_HOST` and (optionally) the `DD_DOGSTATSD_PORT` environment variables to build a target address.
* If the `addr` parameter is empty, the client uses the `DD_AGENT_HOST` environment variables to build a target address.
Example: `DD_AGENT_HOST=127.0.0.1:8125` for UDP, `DD_AGENT_HOST=unix:///path/to/socket` for UDS and `DD_AGENT_HOST=\\.\pipe\my_windows_pipe` for Windows
* If the `DD_ENTITY_ID` environment variable is found, its value is injected as a global `dd.internal.entity_id` tag. The Datadog Agent uses this tag to insert container tags into the metrics. To avoid overwriting this global tag, only `append` to the `c.Tags` slice.

To enable origin detection and set the `DD_ENTITY_ID` environment variable, add the following lines to your application manifest:
Expand Down
123 changes: 65 additions & 58 deletions statsd/pipe_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,94 +10,103 @@ import (
"time"

"github.com/Microsoft/go-winio"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func createNamedPipe(t *testing.T) (string, *os.File, net.Listener) {
f, err := ioutil.TempFile("", "test-pipe-")
require.Nil(t, err)

pipepath := WindowsPipeAddressPrefix + f.Name()
ln, err := winio.ListenPipe(pipepath, &winio.PipeConfig{
SecurityDescriptor: "D:AI(A;;GA;;;WD)",
InputBufferSize: 1_000_000,
})
if err != nil {
os.Remove(f.Name())
t.Fatal(err)
}
return pipepath, f, ln
}

// acceptOne accepts one single connection from ln, reads 512 bytes from it
// and sends it to the out channel, afterwards closing the connection.
func acceptOne(t *testing.T, ln net.Listener, out chan string) {
conn, err := ln.Accept()
if err != nil {
t.Fatal(err)
}
require.Nil(t, err)

buf := make([]byte, 512)
n, err := conn.Read(buf)
if err != nil {
t.Fatal(err)
}
require.Nil(t, err)

conn.Close()
out <- string(buf[:n])
}

func TestPipeWriter(t *testing.T) {
f, err := ioutil.TempFile("", "test-pipe-")
if err != nil {
t.Fatal(err)
}
pipepath, f, ln := createNamedPipe(t)
defer os.Remove(f.Name())
pipepath := WindowsPipeAddressPrefix + f.Name()
ln, err := winio.ListenPipe(pipepath, &winio.PipeConfig{
SecurityDescriptor: "D:AI(A;;GA;;;WD)",
InputBufferSize: 1_000_000,
})
if err != nil {
t.Fatal(err)
}

out := make(chan string)
go acceptOne(t, ln, out)

client, err := New(pipepath)
if err != nil {
t.Fatal(err)
}
if err := client.Gauge("metric", 1, []string{"key:val"}, 1); err != nil {
t.Fatal(err)
}
require.Nil(t, err)

err = client.Gauge("metric", 1, []string{"key:val"}, 1)
require.Nil(t, err)

got := <-out
if exp := "metric:1|g|#key:val"; got != exp {
t.Fatalf("Expected %q, got %q", exp, got)
}
assert.Equal(t, got, "metric:1|g|#key:val")
}

func TestPipeWriterEnv(t *testing.T) {
pipepath, f, ln := createNamedPipe(t)
defer os.Remove(f.Name())

out := make(chan string)
go acceptOne(t, ln, out)

os.Setenv(agentHostEnvVarName, pipepath)
defer os.Unsetenv(agentHostEnvVarName)

client, err := New("")
require.Nil(t, err)

err = client.Gauge("metric", 1, []string{"key:val"}, 1)
require.Nil(t, err)

got := <-out
assert.Equal(t, got, "metric:1|g|#key:val")
}

func TestPipeWriterReconnect(t *testing.T) {
f, err := ioutil.TempFile("", "test-pipe-")
if err != nil {
t.Fatal(err)
}
pipepath, f, ln := createNamedPipe(t)
defer os.Remove(f.Name())
pipepath := WindowsPipeAddressPrefix + f.Name()
ln, err := winio.ListenPipe(pipepath, &winio.PipeConfig{
SecurityDescriptor: "D:AI(A;;GA;;;WD)",
InputBufferSize: 1_000_000,
})
if err != nil {
t.Fatalf("Listen: %s", err)
}

out := make(chan string)
go acceptOne(t, ln, out)
client, err := New(pipepath)
if err != nil {
t.Fatalf("New: %s", err)
}
require.Nil(t, err)

// first attempt works, then connection closes
if err := client.Gauge("metric", 1, []string{"key:val"}, 1); err != nil {
t.Fatalf("Failed to send gauge: %s", err)
}
err = client.Gauge("metric", 1, []string{"key:val"}, 1)
require.Nil(t, err, "Failed to send gauge: %s", err)

timeout := time.After(1 * time.Second)
select {
case got := <-out:
if exp := "metric:1|g|#key:val"; got != exp {
t.Fatalf("Expected %q, got %q", exp, got)
}
assert.Equal(t, got, "metric:1|g|#key:val")
case <-timeout:
t.Fatal("timeout1")
t.Fatal("timeout receiving the first metric")
}

// second attempt fails by attempting the same connection
go acceptOne(t, ln, out)
if err := client.Gauge("metric", 2, []string{"key:val"}, 1); err != nil {
t.Fatalf("Failed to send second gauge: %s", err)
}
err = client.Gauge("metric", 2, []string{"key:val"}, 1)
require.Nil(t, err, "Failed to send second gauge: %s", err)

timeout = time.After(100 * time.Millisecond)
select {
case <-out:
Expand All @@ -108,15 +117,13 @@ func TestPipeWriterReconnect(t *testing.T) {

// subsequent attempts succeed with new connection
for n := 0; n < 3; n++ {
if err := client.Gauge("metric", 3, []string{"key:val"}, 1); err != nil {
t.Fatalf("Failed to send second gauge: %s", err)
}
err = client.Gauge("metric", 3, []string{"key:val"}, 1)
require.Nil(t, err, "Failed to send second gauge: %s", err)

timeout = time.After(500 * time.Millisecond)
select {
case got := <-out:
if exp := "metric:3|g|#key:val"; got != exp {
t.Fatalf("Expected %q, got %q", exp, got)
}
assert.Equal(t, got, "metric:3|g|#key:val")
return
case <-timeout:
continue
Expand Down
39 changes: 37 additions & 2 deletions statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ statsd is based on go-statsd-client.
package statsd

import (
"errors"
"fmt"
"os"
"strings"
Expand Down Expand Up @@ -61,6 +62,12 @@ traffic instead of UDP.
*/
const WindowsPipeAddressPrefix = `\\.\pipe\`

const (
agentHostEnvVarName = "DD_AGENT_HOST"
agentPortEnvVarName = "DD_DOGSTATSD_PORT"
defaultUDPPort = "8125"
)

/*
ddEnvTagsMapping is a mapping of each "DD_" prefixed environment variable
to a specific tag name.
Expand Down Expand Up @@ -228,7 +235,35 @@ type ClientMetrics struct {
// https://golang.org/doc/faq#guarantee_satisfies_interface
var _ ClientInterface = &Client{}

func resolveAddr(addr string) (statsdWriter, string, error) {
func resolveAddr(addr string) string {
envPort := ""
if addr == "" {
addr = os.Getenv(agentHostEnvVarName)
envPort = os.Getenv(agentPortEnvVarName)
}

if addr == "" {
return ""
}

if !strings.HasPrefix(addr, WindowsPipeAddressPrefix) && !strings.HasPrefix(addr, UnixAddressPrefix) {
if !strings.Contains(addr, ":") {
if envPort != "" {
addr = fmt.Sprintf("%s:%s", addr, envPort)
} else {
addr = fmt.Sprintf("%s:%s", addr, defaultUDPPort)
}
}
}
return addr
}

func createWriter(addr string) (statsdWriter, string, error) {
addr = resolveAddr(addr)
if addr == "" {
return nil, "", errors.New("No address passed and autodetection from environment failed")
}

switch {
case strings.HasPrefix(addr, WindowsPipeAddressPrefix):
w, err := newWindowsPipeWriter(addr)
Expand All @@ -250,7 +285,7 @@ func New(addr string, options ...Option) (*Client, error) {
return nil, err
}

w, writerType, err := resolveAddr(addr)
w, writerType, err := createWriter(addr)
if err != nil {
return nil, err
}
Expand Down
48 changes: 48 additions & 0 deletions statsd/statsd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"io"
"net"
"os"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -179,3 +180,50 @@ func TestCloneWithExtraOptions(t *testing.T) {
assert.Equal(t, cloneClient.addrOption, addr)
assert.Len(t, cloneClient.options, 3)
}

func TestResolveAddressFromEnvironment(t *testing.T) {
hostInitialValue, hostInitiallySet := os.LookupEnv(agentHostEnvVarName)
if hostInitiallySet {
defer os.Setenv(agentHostEnvVarName, hostInitialValue)
} else {
defer os.Unsetenv(agentHostEnvVarName)
}
portInitialValue, portInitiallySet := os.LookupEnv(agentPortEnvVarName)
if portInitiallySet {
defer os.Setenv(agentPortEnvVarName, portInitialValue)
} else {
defer os.Unsetenv(agentPortEnvVarName)
}

for _, tc := range []struct {
name string
addrParam string
hostEnv string
portEnv string
expectedAddr string
}{
{"UPD Nominal case", "127.0.0.1:1234", "", "", "127.0.0.1:1234"},
{"UPD Parameter overrides environment", "127.0.0.1:8125", "10.12.16.9", "1234", "127.0.0.1:8125"},
{"UPD Host and port passed as env", "", "10.12.16.9", "1234", "10.12.16.9:1234"},
{"UPD Host env, default port", "", "10.12.16.9", "", "10.12.16.9:8125"},
{"UPD Host passed, ignore env port", "10.12.16.9", "", "1234", "10.12.16.9:8125"},

{"UDS socket passed", "unix://test/path.socket", "", "", "unix://test/path.socket"},
{"UDS socket env", "", "unix://test/path.socket", "", "unix://test/path.socket"},
{"UDS socket env with port", "", "unix://test/path.socket", "8125", "unix://test/path.socket"},

{"Pipe passed", "\\\\.\\pipe\\my_pipe", "", "", "\\\\.\\pipe\\my_pipe"},
{"Pipe env", "", "\\\\.\\pipe\\my_pipe", "", "\\\\.\\pipe\\my_pipe"},
{"Pipe env with port", "", "\\\\.\\pipe\\my_pipe", "8125", "\\\\.\\pipe\\my_pipe"},

{"No autodetection failed", "", "", "", ""},
} {
t.Run(tc.name, func(t *testing.T) {
os.Setenv(agentHostEnvVarName, tc.hostEnv)
os.Setenv(agentPortEnvVarName, tc.portEnv)

addr := resolveAddr(tc.addrParam)
assert.Equal(t, tc.expectedAddr, addr)
})
}
}
2 changes: 1 addition & 1 deletion statsd/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func newTelemetryClient(c *Client, transport string, devMode bool) *telemetryCli
}

func newTelemetryClientWithCustomAddr(c *Client, transport string, devMode bool, telemetryAddr string, pool *bufferPool) (*telemetryClient, error) {
telemetryWriter, _, err := resolveAddr(telemetryAddr)
telemetryWriter, _, err := createWriter(telemetryAddr)
if err != nil {
return nil, fmt.Errorf("Could not resolve telemetry address: %v", err)
}
Expand Down
33 changes: 0 additions & 33 deletions statsd/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,17 @@ package statsd

import (
"errors"
"fmt"
"net"
"os"
"time"
)

const (
autoHostEnvName = "DD_AGENT_HOST"
autoPortEnvName = "DD_DOGSTATSD_PORT"
defaultUDPPort = "8125"
)

// udpWriter is an internal class wrapping around management of UDP connection
type udpWriter struct {
conn net.Conn
}

// New returns a pointer to a new udpWriter given an addr in the format "hostname:port".
func newUDPWriter(addr string) (*udpWriter, error) {
if addr == "" {
addr = addressFromEnvironment()
}
if addr == "" {
return nil, errors.New("No address passed and autodetection from environment failed")
}

udpAddr, err := net.ResolveUDPAddr("udp", addr)
if err != nil {
return nil, err
Expand All @@ -53,21 +38,3 @@ func (w *udpWriter) Write(data []byte) (int, error) {
func (w *udpWriter) Close() error {
return w.conn.Close()
}

func (w *udpWriter) remoteAddr() net.Addr {
return w.conn.RemoteAddr()
}

func addressFromEnvironment() string {
autoHost := os.Getenv(autoHostEnvName)
if autoHost == "" {
return ""
}

autoPort := os.Getenv(autoPortEnvName)
if autoPort == "" {
autoPort = defaultUDPPort
}

return fmt.Sprintf("%s:%s", autoHost, autoPort)
}
Loading

0 comments on commit 0ead22c

Please sign in to comment.