Skip to content

Commit

Permalink
fix(ddtrace/tracer): refactor DogStatsD URL resolution using agent-re…
Browse files Browse the repository at this point in the history
…ported StatsD port (#2995)
  • Loading branch information
darccio committed Nov 27, 2024
1 parent ec6fbb1 commit 9917768
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 61 deletions.
79 changes: 43 additions & 36 deletions ddtrace/tracer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ var (
// Replaced in tests
defaultSocketDSD = "/var/run/datadog/dsd.socket"

// defaultStatsdPort specifies the default port to use for connecting to the statsd server.
defaultStatsdPort = "8125"

// defaultMaxTagsHeaderLen specifies the default maximum length of the X-Datadog-Tags header value.
defaultMaxTagsHeaderLen = 128
)
Expand Down Expand Up @@ -528,9 +531,7 @@ func newConfig(opts ...StartOption) *config {
}
// if using stdout or traces are disabled, agent is disabled
agentDisabled := c.logToStdout || !c.enabled.current
ignoreStatsdPort := c.agent.ignore // preserve the value of c.agent.ignore when testing
c.agent = loadAgentFeatures(agentDisabled, c.agentURL, c.httpClient)
c.agent.ignore = ignoreStatsdPort
info, ok := debug.ReadBuildInfo()
if !ok {
c.loadContribIntegrations([]*debug.Module{})
Expand All @@ -539,24 +540,7 @@ func newConfig(opts ...StartOption) *config {
}
if c.statsdClient == nil {
// configure statsd client
addr := c.dogstatsdAddr
if addr == "" {
// no config defined address; use defaults
addr = defaultDogstatsdAddr()
}
if agentport := c.agent.StatsdPort; agentport > 0 && !c.agent.ignore {
// the agent reported a non-standard port
host, _, err := net.SplitHostPort(addr)
if err == nil {
// we have a valid host:port address; replace the port because
// the agent knows better
if host == "" {
host = defaultHostname
}
addr = net.JoinHostPort(host, strconv.Itoa(agentport))
}
// not a valid TCP address, leave it as it is (could be a socket connection)
}
addr := resolveDogstatsdAddr(c)
globalconfig.SetDogstatsdAddr(addr)
c.dogstatsdAddr = addr
}
Expand All @@ -578,6 +562,44 @@ func newConfig(opts ...StartOption) *config {
return c
}

// resolveDogstatsdAddr resolves the Dogstatsd address to use, based on the user-defined
// address and the agent-reported port. If the agent reports a port, it will be used
// instead of the user-defined address' port. UDS paths are honored regardless of the
// agent-reported port.
func resolveDogstatsdAddr(c *config) string {
addr := c.dogstatsdAddr
if addr == "" {
// no config defined address; use host and port from env vars
// or default to localhost:8125 if not set
addr = defaultDogstatsdAddr()
}
agentport := c.agent.StatsdPort
if agentport == 0 {
// the agent didn't report a port; use the already resolved address as
// features are loaded from the trace-agent, which might be not running
return addr
}
// the agent reported a port
host, _, err := net.SplitHostPort(addr)
if err != nil {
// parsing the address failed; use the already resolved address as is
return addr
}
if host == "unix" {
// no need to change the address because it's a UDS connection
// and these don't have ports
return addr
}
if host == "" {
// no host was provided; use the default hostname
host = defaultHostname
}
// use agent-reported address if it differs from the user-defined TCP-based protocol URI
// we have a valid host:port address; replace the port because the agent knows better
addr = net.JoinHostPort(host, strconv.Itoa(agentport))
return addr
}

func newStatsdClient(c *config) (internal.StatsdClient, error) {
if c.statsdClient != nil {
return c.statsdClient, nil
Expand Down Expand Up @@ -615,7 +637,7 @@ func defaultDogstatsdAddr() string {
// socket exists and user didn't specify otherwise via env vars
return "unix://" + defaultSocketDSD
}
host, port := defaultHostname, "8125"
host, port := defaultHostname, defaultStatsdPort
if envHost != "" {
host = envHost
}
Expand Down Expand Up @@ -650,9 +672,6 @@ type agentFeatures struct {
// featureFlags specifies all the feature flags reported by the trace-agent.
featureFlags map[string]struct{}

// ignore indicates that we should ignore the agent in favor of user set values.
// It should only be used during testing.
ignore bool
// peerTags specifies precursor tags to aggregate stats on when client stats is enabled
peerTags []string

Expand Down Expand Up @@ -683,9 +702,6 @@ func loadAgentFeatures(agentDisabled bool, agentURL *url.URL, httpClient *http.C
return
}
defer resp.Body.Close()
type agentConfig struct {
DefaultEnv string `json:"default_env"`
}
type infoResponse struct {
Endpoints []string `json:"endpoints"`
ClientDropP0s bool `json:"client_drop_p0s"`
Expand Down Expand Up @@ -821,15 +837,6 @@ func WithFeatureFlags(feats ...string) StartOption {
}
}

// withIgnoreAgent allows tests to ignore the agent running in CI so that we can
// properly test user set StatsdPort.
// This should only be used during testing.
func withIgnoreAgent(ignore bool) StartOption {
return func(c *config) {
c.agent.ignore = ignore
}
}

// WithLogger sets logger as the tracer's error printer.
// Diagnostic and startup tracer logs are prefixed to simplify the search within logs.
// If JSON logging format is required, it's possible to wrap tracer logs using an existing JSON logger with this
Expand Down
118 changes: 93 additions & 25 deletions ddtrace/tracer/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,14 @@ func testStatsd(t *testing.T, cfg *config, addr string) {

func TestStatsdUDPConnect(t *testing.T) {
t.Setenv("DD_DOGSTATSD_PORT", "8111")
testStatsd(t, newConfig(withIgnoreAgent(true)), net.JoinHostPort(defaultHostname, "8111"))
cfg := newConfig(withIgnoreAgent(true))
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
// We simulate the agent not being able to provide the statsd port
w.WriteHeader(http.StatusInternalServerError)
}))
defer srv.Close()

cfg := newConfig(WithAgentAddr(strings.TrimPrefix(srv.URL, "http://")))
testStatsd(t, cfg, net.JoinHostPort(defaultHostname, "8111"))
addr := net.JoinHostPort(defaultHostname, "8111")

client, err := newStatsdClient(cfg)
Expand Down Expand Up @@ -132,7 +138,7 @@ func TestAutoDetectStatsd(t *testing.T) {
defer conn.Close()
conn.SetDeadline(time.Now().Add(5 * time.Second))

cfg := newConfig(WithAgentTimeout(2), withIgnoreAgent(true))
cfg := newConfig(WithAgentTimeout(2))
statsd, err := newStatsdClient(cfg)
require.NoError(t, err)
defer statsd.Close()
Expand All @@ -151,7 +157,14 @@ func TestAutoDetectStatsd(t *testing.T) {

t.Run("env", func(t *testing.T) {
t.Setenv("DD_DOGSTATSD_PORT", "8111")
testStatsd(t, newConfig(withIgnoreAgent(true)), net.JoinHostPort(defaultHostname, "8111"))
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
// We simulate the agent not being able to provide the statsd port
w.WriteHeader(http.StatusInternalServerError)
}))
defer srv.Close()

cfg := newConfig(WithAgentAddr(strings.TrimPrefix(srv.URL, "http://")))
testStatsd(t, cfg, net.JoinHostPort(defaultHostname, "8111"))
})

t.Run("agent", func(t *testing.T) {
Expand All @@ -160,7 +173,7 @@ func TestAutoDetectStatsd(t *testing.T) {
w.Write([]byte(`{"endpoints": [], "config": {"statsd_port":0}}`))
}))
defer srv.Close()
cfg := newConfig(WithAgentAddr(strings.TrimPrefix(srv.URL, "http://")), WithAgentTimeout(2), withIgnoreAgent(true))
cfg := newConfig(WithAgentAddr(strings.TrimPrefix(srv.URL, "http://")), WithAgentTimeout(2))
testStatsd(t, cfg, net.JoinHostPort(defaultHostname, "8125"))
})

Expand Down Expand Up @@ -214,7 +227,7 @@ func TestLoadAgentFeatures(t *testing.T) {
w.Write([]byte(`{"endpoints":["/v0.6/stats"],"feature_flags":["a","b"],"client_drop_p0s":true,"config": {"statsd_port":8999}}`))
}))
defer srv.Close()
cfg := newConfig(WithAgentAddr(strings.TrimPrefix(srv.URL, "http://")), WithAgentTimeout(2), withIgnoreAgent(true))
cfg := newConfig(WithAgentAddr(strings.TrimPrefix(srv.URL, "http://")), WithAgentTimeout(2))
assert.True(t, cfg.agent.DropP0s)
assert.Equal(t, cfg.agent.StatsdPort, 8999)
assert.EqualValues(t, cfg.agent.featureFlags, map[string]struct{}{
Expand All @@ -232,7 +245,7 @@ func TestLoadAgentFeatures(t *testing.T) {
w.Write([]byte(`{"endpoints":["/v0.6/stats"],"client_drop_p0s":true,"config":{"statsd_port":8999}}`))
}))
defer srv.Close()
cfg := newConfig(WithAgentAddr(strings.TrimPrefix(srv.URL, "http://")), WithAgentTimeout(2), withIgnoreAgent(true))
cfg := newConfig(WithAgentAddr(strings.TrimPrefix(srv.URL, "http://")), WithAgentTimeout(2))
assert.True(t, cfg.agent.DropP0s)
assert.True(t, cfg.agent.Stats)
assert.Equal(t, 8999, cfg.agent.StatsdPort)
Expand Down Expand Up @@ -450,57 +463,104 @@ func TestTracerOptionsDefaults(t *testing.T) {
})

t.Run("dogstatsd", func(t *testing.T) {
// Simulate the agent (assuming no concurrency at all)
var fail bool
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
if fail {
w.WriteHeader(http.StatusInternalServerError)
return
}
w.Write([]byte(`{"endpoints":["/v0.6/stats"],"feature_flags":["a","b"],"client_drop_p0s":true,"config": {"statsd_port":8125}}`))
}))
defer srv.Close()

opts := []StartOption{
WithAgentAddr(strings.TrimPrefix(srv.URL, "http://")),
}

t.Run("default", func(t *testing.T) {
tracer := newTracer(WithAgentTimeout(2), withIgnoreAgent(true))
tracer := newTracer(opts...)
defer tracer.Stop()
c := tracer.config
assert.Equal(t, c.dogstatsdAddr, "localhost:8125")
assert.Equal(t, globalconfig.DogstatsdAddr(), "localhost:8125")
assert.Equal(t, "localhost:8125", c.dogstatsdAddr)
assert.Equal(t, "localhost:8125", globalconfig.DogstatsdAddr())
})

t.Run("env-host", func(t *testing.T) {
t.Setenv("DD_AGENT_HOST", "my-host")
tracer := newTracer(WithAgentTimeout(2), withIgnoreAgent(true))
tracer := newTracer(opts...)
defer tracer.Stop()
c := tracer.config
assert.Equal(t, c.dogstatsdAddr, "my-host:8125")
assert.Equal(t, globalconfig.DogstatsdAddr(), "my-host:8125")
assert.Equal(t, "my-host:8125", c.dogstatsdAddr)
assert.Equal(t, "my-host:8125", globalconfig.DogstatsdAddr())
})

t.Run("env-port", func(t *testing.T) {
t.Setenv("DD_DOGSTATSD_PORT", "123")
tracer := newTracer(WithAgentTimeout(2), withIgnoreAgent(true))
tracer := newTracer(opts...)
defer tracer.Stop()
c := tracer.config
assert.Equal(t, "localhost:8125", c.dogstatsdAddr)
assert.Equal(t, "localhost:8125", globalconfig.DogstatsdAddr())
})

t.Run("env-port: agent not available", func(t *testing.T) {
t.Setenv("DD_DOGSTATSD_PORT", "123")
fail = true
tracer := newTracer(opts...)
defer tracer.Stop()
c := tracer.config
assert.Equal(t, "localhost:123", c.dogstatsdAddr)
assert.Equal(t, "localhost:123", globalconfig.DogstatsdAddr())
fail = false
})

t.Run("env-both", func(t *testing.T) {
t.Setenv("DD_AGENT_HOST", "my-host")
t.Setenv("DD_DOGSTATSD_PORT", "123")
tracer := newTracer(WithAgentTimeout(2), withIgnoreAgent(true))
tracer := newTracer(opts...)
defer tracer.Stop()
c := tracer.config
assert.Equal(t, c.dogstatsdAddr, "my-host:123")
assert.Equal(t, globalconfig.DogstatsdAddr(), "my-host:123")
assert.Equal(t, "my-host:8125", c.dogstatsdAddr)
assert.Equal(t, "my-host:8125", globalconfig.DogstatsdAddr())
})

t.Run("env-env", func(t *testing.T) {
t.Setenv("DD_ENV", "testEnv")
tracer := newTracer(WithAgentTimeout(2), withIgnoreAgent(true))
t.Run("env-both: agent not available", func(t *testing.T) {
t.Setenv("DD_AGENT_HOST", "my-host")
t.Setenv("DD_DOGSTATSD_PORT", "123")
fail = true
tracer := newTracer(opts...)
defer tracer.Stop()
c := tracer.config
assert.Equal(t, "testEnv", c.env)
assert.Equal(t, "my-host:123", c.dogstatsdAddr)
assert.Equal(t, "my-host:123", globalconfig.DogstatsdAddr())
fail = false
})

t.Run("option", func(t *testing.T) {
tracer := newTracer(WithDogstatsdAddress("10.1.0.12:4002"), withIgnoreAgent(true))
o := make([]StartOption, len(opts))
copy(o, opts)
o = append(o, WithDogstatsdAddress("10.1.0.12:4002"))
tracer := newTracer(o...)
defer tracer.Stop()
c := tracer.config
assert.Equal(t, c.dogstatsdAddr, "10.1.0.12:4002")
assert.Equal(t, globalconfig.DogstatsdAddr(), "10.1.0.12:4002")
assert.Equal(t, "10.1.0.12:8125", c.dogstatsdAddr)
assert.Equal(t, "10.1.0.12:8125", globalconfig.DogstatsdAddr())
})

t.Run("option: agent not available", func(t *testing.T) {
o := make([]StartOption, len(opts))
copy(o, opts)
fail = true
o = append(o, WithDogstatsdAddress("10.1.0.12:4002"))
tracer := newTracer(o...)
defer tracer.Stop()
c := tracer.config
assert.Equal(t, "10.1.0.12:4002", c.dogstatsdAddr)
assert.Equal(t, "10.1.0.12:4002", globalconfig.DogstatsdAddr())
fail = false
})

t.Run("uds", func(t *testing.T) {
assert := assert.New(t)
dir, err := os.MkdirTemp("", "socket")
Expand All @@ -509,14 +569,22 @@ func TestTracerOptionsDefaults(t *testing.T) {
}
addr := filepath.Join(dir, "dsd.socket")
defer os.RemoveAll(addr)
tracer := newTracer(WithDogstatsdAddress("unix://"+addr), withIgnoreAgent(true))
tracer := newTracer(WithDogstatsdAddress("unix://" + addr))
defer tracer.Stop()
c := tracer.config
assert.Equal("unix://"+addr, c.dogstatsdAddr)
assert.Equal("unix://"+addr, globalconfig.DogstatsdAddr())
})
})

t.Run("env-env", func(t *testing.T) {
t.Setenv("DD_ENV", "testEnv")
tracer := newTracer(WithAgentTimeout(2))
defer tracer.Stop()
c := tracer.config
assert.Equal(t, "testEnv", c.env)
})

t.Run("env-agentAddr", func(t *testing.T) {
t.Setenv("DD_AGENT_HOST", "trace-agent")
tracer := newTracer(WithAgentTimeout(2))
Expand Down

0 comments on commit 9917768

Please sign in to comment.