From 63e7470e12744dc734511f945615e1f45ce9e00f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dario=20Casta=C3=B1=C3=A9?= Date: Wed, 27 Nov 2024 18:17:45 +0100 Subject: [PATCH] fix(ddtrace/tracer): refactor DogStatsD URL resolution using agent-reported StatsD port (#2995) --- ddtrace/tracer/option.go | 80 +++++++++--------- ddtrace/tracer/option_test.go | 149 ++++++++++++++++++---------------- 2 files changed, 120 insertions(+), 109 deletions(-) diff --git a/ddtrace/tracer/option.go b/ddtrace/tracer/option.go index f4075a3c2d..dbffa546df 100644 --- a/ddtrace/tracer/option.go +++ b/ddtrace/tracer/option.go @@ -106,6 +106,9 @@ var ( // Replaced in tests defaultSocketDSD = "/var/run/datadog/dsd.socket" + // defaultStatsdPort specifies the default port to use for connecting to the statsd server. + defaultStatsdPort = "8125" + // defaultMaxTagsHeaderLen specifies the default maximum length of the X-Datadog-Tags header value. defaultMaxTagsHeaderLen = 128 ) @@ -528,9 +531,7 @@ func newConfig(opts ...StartOption) *config { } // if using stdout or traces are disabled, agent is disabled agentDisabled := c.logToStdout || !c.enabled.current - ignoreStatsdPort := c.agent.ignore // preserve the value of c.agent.ignore when testing c.agent = loadAgentFeatures(agentDisabled, c.agentURL, c.httpClient) - c.agent.ignore = ignoreStatsdPort info, ok := debug.ReadBuildInfo() if !ok { c.loadContribIntegrations([]*debug.Module{}) @@ -539,25 +540,7 @@ func newConfig(opts ...StartOption) *config { } if c.statsdClient == nil { // configure statsd client - addr := c.dogstatsdAddr - if addr == "" { - // no config defined address; use defaults - addr = defaultDogstatsdAddr() - } - if agentport := c.agent.StatsdPort; agentport > 0 && !c.agent.ignore { - // the agent reported a non-standard port - host, _, err := net.SplitHostPort(addr) - // Use agent-reported address if it differs from the user-defined TCP-based protocol URI - if err == nil && host != "unix" { - // we have a valid host:port address; replace the port because - // the agent knows better - if host == "" { - host = defaultHostname - } - addr = net.JoinHostPort(host, strconv.Itoa(agentport)) - } - // not a valid TCP address, leave it as it is (could be a socket connection) - } + addr := resolveDogstatsdAddr(c) globalconfig.SetDogstatsdAddr(addr) c.dogstatsdAddr = addr } @@ -579,6 +562,44 @@ func newConfig(opts ...StartOption) *config { return c } +// resolveDogstatsdAddr resolves the Dogstatsd address to use, based on the user-defined +// address and the agent-reported port. If the agent reports a port, it will be used +// instead of the user-defined address' port. UDS paths are honored regardless of the +// agent-reported port. +func resolveDogstatsdAddr(c *config) string { + addr := c.dogstatsdAddr + if addr == "" { + // no config defined address; use host and port from env vars + // or default to localhost:8125 if not set + addr = defaultDogstatsdAddr() + } + agentport := c.agent.StatsdPort + if agentport == 0 { + // the agent didn't report a port; use the already resolved address as + // features are loaded from the trace-agent, which might be not running + return addr + } + // the agent reported a port + host, _, err := net.SplitHostPort(addr) + if err != nil { + // parsing the address failed; use the already resolved address as is + return addr + } + if host == "unix" { + // no need to change the address because it's a UDS connection + // and these don't have ports + return addr + } + if host == "" { + // no host was provided; use the default hostname + host = defaultHostname + } + // use agent-reported address if it differs from the user-defined TCP-based protocol URI + // we have a valid host:port address; replace the port because the agent knows better + addr = net.JoinHostPort(host, strconv.Itoa(agentport)) + return addr +} + func newStatsdClient(c *config) (internal.StatsdClient, error) { if c.statsdClient != nil { return c.statsdClient, nil @@ -616,7 +637,7 @@ func defaultDogstatsdAddr() string { // socket exists and user didn't specify otherwise via env vars return "unix://" + defaultSocketDSD } - host, port := defaultHostname, "8125" + host, port := defaultHostname, defaultStatsdPort if envHost != "" { host = envHost } @@ -651,9 +672,6 @@ type agentFeatures struct { // featureFlags specifies all the feature flags reported by the trace-agent. featureFlags map[string]struct{} - // ignore indicates that we should ignore the agent in favor of user set values. - // It should only be used during testing. - ignore bool // peerTags specifies precursor tags to aggregate stats on when client stats is enabled peerTags []string @@ -684,9 +702,6 @@ func loadAgentFeatures(agentDisabled bool, agentURL *url.URL, httpClient *http.C return } defer resp.Body.Close() - type agentConfig struct { - DefaultEnv string `json:"default_env"` - } type infoResponse struct { Endpoints []string `json:"endpoints"` ClientDropP0s bool `json:"client_drop_p0s"` @@ -822,15 +837,6 @@ func WithFeatureFlags(feats ...string) StartOption { } } -// withIgnoreAgent allows tests to ignore the agent running in CI so that we can -// properly test user set StatsdPort. -// This should only be used during testing. -func withIgnoreAgent(ignore bool) StartOption { - return func(c *config) { - c.agent.ignore = ignore - } -} - // WithLogger sets logger as the tracer's error printer. // Diagnostic and startup tracer logs are prefixed to simplify the search within logs. // If JSON logging format is required, it's possible to wrap tracer logs using an existing JSON logger with this diff --git a/ddtrace/tracer/option_test.go b/ddtrace/tracer/option_test.go index 4fcac958d0..d915cb45b7 100644 --- a/ddtrace/tracer/option_test.go +++ b/ddtrace/tracer/option_test.go @@ -66,8 +66,14 @@ func testStatsd(t *testing.T, cfg *config, addr string) { func TestStatsdUDPConnect(t *testing.T) { t.Setenv("DD_DOGSTATSD_PORT", "8111") - testStatsd(t, newConfig(withIgnoreAgent(true)), net.JoinHostPort(defaultHostname, "8111")) - cfg := newConfig(withIgnoreAgent(true)) + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + // We simulate the agent not being able to provide the statsd port + w.WriteHeader(http.StatusInternalServerError) + })) + defer srv.Close() + + cfg := newConfig(WithAgentAddr(strings.TrimPrefix(srv.URL, "http://"))) + testStatsd(t, cfg, net.JoinHostPort(defaultHostname, "8111")) addr := net.JoinHostPort(defaultHostname, "8111") client, err := newStatsdClient(cfg) @@ -132,7 +138,7 @@ func TestAutoDetectStatsd(t *testing.T) { defer conn.Close() conn.SetDeadline(time.Now().Add(5 * time.Second)) - cfg := newConfig(WithAgentTimeout(2), withIgnoreAgent(true)) + cfg := newConfig(WithAgentTimeout(2)) statsd, err := newStatsdClient(cfg) require.NoError(t, err) defer statsd.Close() @@ -151,7 +157,14 @@ func TestAutoDetectStatsd(t *testing.T) { t.Run("env", func(t *testing.T) { t.Setenv("DD_DOGSTATSD_PORT", "8111") - testStatsd(t, newConfig(withIgnoreAgent(true)), net.JoinHostPort(defaultHostname, "8111")) + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + // We simulate the agent not being able to provide the statsd port + w.WriteHeader(http.StatusInternalServerError) + })) + defer srv.Close() + + cfg := newConfig(WithAgentAddr(strings.TrimPrefix(srv.URL, "http://"))) + testStatsd(t, cfg, net.JoinHostPort(defaultHostname, "8111")) }) t.Run("agent", func(t *testing.T) { @@ -160,7 +173,7 @@ func TestAutoDetectStatsd(t *testing.T) { w.Write([]byte(`{"endpoints": [], "config": {"statsd_port":0}}`)) })) defer srv.Close() - cfg := newConfig(WithAgentAddr(strings.TrimPrefix(srv.URL, "http://")), WithAgentTimeout(2), withIgnoreAgent(true)) + cfg := newConfig(WithAgentAddr(strings.TrimPrefix(srv.URL, "http://")), WithAgentTimeout(2)) testStatsd(t, cfg, net.JoinHostPort(defaultHostname, "8125")) }) @@ -214,7 +227,7 @@ func TestLoadAgentFeatures(t *testing.T) { w.Write([]byte(`{"endpoints":["/v0.6/stats"],"feature_flags":["a","b"],"client_drop_p0s":true,"config": {"statsd_port":8999}}`)) })) defer srv.Close() - cfg := newConfig(WithAgentAddr(strings.TrimPrefix(srv.URL, "http://")), WithAgentTimeout(2), withIgnoreAgent(true)) + cfg := newConfig(WithAgentAddr(strings.TrimPrefix(srv.URL, "http://")), WithAgentTimeout(2)) assert.True(t, cfg.agent.DropP0s) assert.Equal(t, cfg.agent.StatsdPort, 8999) assert.EqualValues(t, cfg.agent.featureFlags, map[string]struct{}{ @@ -232,7 +245,7 @@ func TestLoadAgentFeatures(t *testing.T) { w.Write([]byte(`{"endpoints":["/v0.6/stats"],"client_drop_p0s":true,"config":{"statsd_port":8999}}`)) })) defer srv.Close() - cfg := newConfig(WithAgentAddr(strings.TrimPrefix(srv.URL, "http://")), WithAgentTimeout(2), withIgnoreAgent(true)) + cfg := newConfig(WithAgentAddr(strings.TrimPrefix(srv.URL, "http://")), WithAgentTimeout(2)) assert.True(t, cfg.agent.DropP0s) assert.True(t, cfg.agent.Stats) assert.Equal(t, 8999, cfg.agent.StatsdPort) @@ -450,101 +463,102 @@ func TestTracerOptionsDefaults(t *testing.T) { }) t.Run("dogstatsd", func(t *testing.T) { - t.Skip("TODO: fix test that fails only in CI") - t.Run("default", func(t *testing.T) { - tracer := newTracer(WithAgentTimeout(2)) - defer tracer.Stop() - c := tracer.config - assert.Equal(t, c.dogstatsdAddr, "localhost:8125") - assert.Equal(t, globalconfig.DogstatsdAddr(), "localhost:8125") - }) + // Simulate the agent (assuming no concurrency at all) + var fail bool + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + if fail { + w.WriteHeader(http.StatusInternalServerError) + return + } + w.Write([]byte(`{"endpoints":["/v0.6/stats"],"feature_flags":["a","b"],"client_drop_p0s":true,"config": {"statsd_port":8125}}`)) + })) + defer srv.Close() - t.Run("default:ignore", func(t *testing.T) { - tracer := newTracer(WithAgentTimeout(2), withIgnoreAgent(true)) - defer tracer.Stop() - c := tracer.config - assert.Equal(t, c.dogstatsdAddr, "localhost:8125") - assert.Equal(t, globalconfig.DogstatsdAddr(), "localhost:8125") - }) + opts := []StartOption{ + WithAgentAddr(strings.TrimPrefix(srv.URL, "http://")), + } - t.Run("env-host", func(t *testing.T) { - t.Setenv("DD_AGENT_HOST", "my-host") - tracer := newTracer(WithAgentTimeout(2)) + t.Run("default", func(t *testing.T) { + tracer := newTracer(opts...) defer tracer.Stop() c := tracer.config - assert.Equal(t, c.dogstatsdAddr, "my-host:8125") - assert.Equal(t, globalconfig.DogstatsdAddr(), "my-host:8125") + assert.Equal(t, "localhost:8125", c.dogstatsdAddr) + assert.Equal(t, "localhost:8125", globalconfig.DogstatsdAddr()) }) - t.Run("env-host:ignore", func(t *testing.T) { + t.Run("env-host", func(t *testing.T) { t.Setenv("DD_AGENT_HOST", "my-host") - tracer := newTracer(WithAgentTimeout(2), withIgnoreAgent(true)) + tracer := newTracer(opts...) defer tracer.Stop() c := tracer.config - assert.Equal(t, c.dogstatsdAddr, "my-host:8125") - assert.Equal(t, globalconfig.DogstatsdAddr(), "my-host:8125") + assert.Equal(t, "my-host:8125", c.dogstatsdAddr) + assert.Equal(t, "my-host:8125", globalconfig.DogstatsdAddr()) }) t.Run("env-port", func(t *testing.T) { t.Setenv("DD_DOGSTATSD_PORT", "123") - tracer := newTracer(WithAgentTimeout(2)) + tracer := newTracer(opts...) defer tracer.Stop() c := tracer.config assert.Equal(t, "localhost:8125", c.dogstatsdAddr) assert.Equal(t, "localhost:8125", globalconfig.DogstatsdAddr()) }) - t.Run("env-port:ignore", func(t *testing.T) { + t.Run("env-port: agent not available", func(t *testing.T) { t.Setenv("DD_DOGSTATSD_PORT", "123") - tracer := newTracer(WithAgentTimeout(2), withIgnoreAgent(true)) + fail = true + tracer := newTracer(opts...) defer tracer.Stop() c := tracer.config assert.Equal(t, "localhost:123", c.dogstatsdAddr) assert.Equal(t, "localhost:123", globalconfig.DogstatsdAddr()) + fail = false }) t.Run("env-both", func(t *testing.T) { t.Setenv("DD_AGENT_HOST", "my-host") t.Setenv("DD_DOGSTATSD_PORT", "123") - tracer := newTracer(WithAgentTimeout(2)) + tracer := newTracer(opts...) defer tracer.Stop() c := tracer.config - assert.Equal(t, c.dogstatsdAddr, "my-host:123") - assert.Equal(t, globalconfig.DogstatsdAddr(), "my-host:123") + assert.Equal(t, "my-host:8125", c.dogstatsdAddr) + assert.Equal(t, "my-host:8125", globalconfig.DogstatsdAddr()) }) - t.Run("env-both:ignore", func(t *testing.T) { + t.Run("env-both: agent not available", func(t *testing.T) { t.Setenv("DD_AGENT_HOST", "my-host") t.Setenv("DD_DOGSTATSD_PORT", "123") - tracer := newTracer(WithAgentTimeout(2), withIgnoreAgent(true)) + fail = true + tracer := newTracer(opts...) defer tracer.Stop() c := tracer.config - assert.Equal(t, c.dogstatsdAddr, "my-host:123") - assert.Equal(t, globalconfig.DogstatsdAddr(), "my-host:123") + assert.Equal(t, "my-host:123", c.dogstatsdAddr) + assert.Equal(t, "my-host:123", globalconfig.DogstatsdAddr()) + fail = false }) t.Run("option", func(t *testing.T) { - tracer := newTracer(WithDogstatsdAddress("10.1.0.12:4002")) - defer tracer.Stop() - c := tracer.config - assert.Equal(t, c.dogstatsdAddr, "10.1.0.12:8125") - assert.Equal(t, globalconfig.DogstatsdAddr(), "10.1.0.12:8125") - }) - - t.Run("option:ignore", func(t *testing.T) { - tracer := newTracer(WithDogstatsdAddress("10.1.0.12:4002"), withIgnoreAgent(true)) + o := make([]StartOption, len(opts)) + copy(o, opts) + o = append(o, WithDogstatsdAddress("10.1.0.12:4002")) + tracer := newTracer(o...) defer tracer.Stop() c := tracer.config - assert.Equal(t, c.dogstatsdAddr, "10.1.0.12:4002") - assert.Equal(t, globalconfig.DogstatsdAddr(), "10.1.0.12:4002") + assert.Equal(t, "10.1.0.12:8125", c.dogstatsdAddr) + assert.Equal(t, "10.1.0.12:8125", globalconfig.DogstatsdAddr()) }) - t.Run("env-env", func(t *testing.T) { - t.Setenv("DD_ENV", "testEnv") - tracer := newTracer(WithAgentTimeout(2)) + t.Run("option: agent not available", func(t *testing.T) { + o := make([]StartOption, len(opts)) + copy(o, opts) + fail = true + o = append(o, WithDogstatsdAddress("10.1.0.12:4002")) + tracer := newTracer(o...) defer tracer.Stop() c := tracer.config - assert.Equal(t, "testEnv", c.env) + assert.Equal(t, "10.1.0.12:4002", c.dogstatsdAddr) + assert.Equal(t, "10.1.0.12:4002", globalconfig.DogstatsdAddr()) + fail = false }) t.Run("uds", func(t *testing.T) { @@ -558,26 +572,17 @@ func TestTracerOptionsDefaults(t *testing.T) { tracer := newTracer(WithDogstatsdAddress("unix://" + addr)) defer tracer.Stop() c := tracer.config - assert.NotNil(c) assert.Equal("unix://"+addr, c.dogstatsdAddr) assert.Equal("unix://"+addr, globalconfig.DogstatsdAddr()) }) + }) - t.Run("uds:ignore", func(t *testing.T) { - assert := assert.New(t) - dir, err := os.MkdirTemp("", "socket") - if err != nil { - t.Fatal("Failed to create socket") - } - addr := filepath.Join(dir, "dsd.socket") - defer os.RemoveAll(addr) - tracer := newTracer(WithDogstatsdAddress("unix://"+addr), withIgnoreAgent(true)) - defer tracer.Stop() - c := tracer.config - assert.NotNil(c) - assert.Equal("unix://"+addr, c.dogstatsdAddr) - assert.Equal("unix://"+addr, globalconfig.DogstatsdAddr()) - }) + t.Run("env-env", func(t *testing.T) { + t.Setenv("DD_ENV", "testEnv") + tracer := newTracer(WithAgentTimeout(2)) + defer tracer.Stop() + c := tracer.config + assert.Equal(t, "testEnv", c.env) }) t.Run("env-agentAddr", func(t *testing.T) {