diff --git a/comp/core/agenttelemetry/impl/agenttelemetry_test.go b/comp/core/agenttelemetry/impl/agenttelemetry_test.go index 43371e0c7b0c2..d982800caddd5 100644 --- a/comp/core/agenttelemetry/impl/agenttelemetry_test.go +++ b/comp/core/agenttelemetry/impl/agenttelemetry_test.go @@ -28,6 +28,7 @@ import ( "github.com/DataDog/datadog-agent/comp/core/telemetry" "github.com/DataDog/datadog-agent/comp/core/telemetry/telemetryimpl" "github.com/DataDog/datadog-agent/pkg/util/fxutil" + "github.com/DataDog/zstd" ) // HTTP client mock @@ -139,12 +140,14 @@ func makeLogMock(t *testing.T) log.Component { return logmock.New(t) } -func makeSenderImpl(t *testing.T, c string) sender { +func makeSenderImpl(t *testing.T, cl client, c string) sender { o := convertYamlStrToMap(t, c) cfg := makeCfgMock(t, o) log := makeLogMock(t) - client := newClientMock() - sndr, err := newSenderImpl(cfg, log, client) + if cl == nil { + cl = newClientMock() + } + sndr, err := newSenderImpl(cfg, log, cl) assert.NoError(t, err) return sndr } @@ -552,10 +555,10 @@ func TestNoTagSpecifiedAggregationHistogram(t *testing.T) { // setup and initiate atel tel := makeTelMock(t) buckets := []float64{10, 100, 1000, 10000} - gauge := tel.NewHistogram("bar", "zoo", []string{"tag1", "tag2", "tag3"}, "", buckets) - gauge.WithTags(map[string]string{"tag1": "a1", "tag2": "b1", "tag3": "c1"}).Observe(1001) - gauge.WithTags(map[string]string{"tag1": "a2", "tag2": "b2", "tag3": "c2"}).Observe(1002) - gauge.WithTags(map[string]string{"tag1": "a3", "tag2": "b3", "tag3": "c3"}).Observe(1003) + hist := tel.NewHistogram("bar", "zoo", []string{"tag1", "tag2", "tag3"}, "", buckets) + hist.WithTags(map[string]string{"tag1": "a1", "tag2": "b1", "tag3": "c1"}).Observe(1001) + hist.WithTags(map[string]string{"tag1": "a2", "tag2": "b2", "tag3": "c2"}).Observe(1002) + hist.WithTags(map[string]string{"tag1": "a3", "tag2": "b3", "tag3": "c3"}).Observe(1003) o := convertYamlStrToMap(t, c) s := &senderMock{} @@ -714,7 +717,7 @@ func TestTwoProfilesOnTheSameScheduleGenerateSinglePayload(t *testing.T) { counter2.AddWithTags(20, map[string]string{"tag1": "a1", "tag2": "b1", "tag3": "c1"}) o := convertYamlStrToMap(t, c) - s := makeSenderImpl(t, c) + s := makeSenderImpl(t, nil, c) r := newRunnerMock() a := getTestAtel(t, tel, o, s, nil, r) require.True(t, a.enabled) @@ -751,7 +754,7 @@ func TestOneProfileWithOneMetricMultipleContextsGenerateTwoPayloads(t *testing.T counter1.AddWithTags(20, map[string]string{"tag1": "a2", "tag2": "b2", "tag3": "c2"}) o := convertYamlStrToMap(t, c) - s := makeSenderImpl(t, c) + s := makeSenderImpl(t, nil, c) r := newRunnerMock() a := getTestAtel(t, tel, o, s, nil, r) require.True(t, a.enabled) @@ -827,7 +830,7 @@ func TestOneProfileWithTwoMetricGenerateSinglePayloads(t *testing.T) { counter2.AddWithTags(20, map[string]string{"tag1": "a1", "tag2": "b1", "tag3": "c1"}) o := convertYamlStrToMap(t, c) - s := makeSenderImpl(t, c) + s := makeSenderImpl(t, nil, c) r := newRunnerMock() a := getTestAtel(t, tel, o, s, nil, r) require.True(t, a.enabled) @@ -849,7 +852,7 @@ func TestSenderConfigNoConfig(t *testing.T) { agent_telemetry: enabled: true ` - sndr := makeSenderImpl(t, c) + sndr := makeSenderImpl(t, nil, c) url := buildURL(sndr.(*senderImpl).endpoints.Main) assert.Equal(t, "https://instrumentation-telemetry-intake.datadoghq.com/api/v2/apmtelemetry", url) @@ -877,7 +880,7 @@ func TestSenderConfigOnlySites(t *testing.T) { for _, tt := range tests { c := fmt.Sprintf(ctemp, tt.site) - sndr := makeSenderImpl(t, c) + sndr := makeSenderImpl(t, nil, c) url := buildURL(sndr.(*senderImpl).endpoints.Main) assert.Equal(t, tt.testURL, url) } @@ -894,7 +897,7 @@ func TestSenderConfigAdditionalEndpoint(t *testing.T) { - api_key: bar host: instrumentation-telemetry-intake.us5.datadoghq.com ` - sndr := makeSenderImpl(t, c) + sndr := makeSenderImpl(t, nil, c) assert.NotNil(t, sndr) assert.Len(t, sndr.(*senderImpl).endpoints.Endpoints, 2) @@ -913,7 +916,7 @@ func TestSenderConfigPartialDDUrl(t *testing.T) { enabled: true dd_url: instrumentation-telemetry-intake.us5.datadoghq.com. ` - sndr := makeSenderImpl(t, c) + sndr := makeSenderImpl(t, nil, c) assert.NotNil(t, sndr) assert.Len(t, sndr.(*senderImpl).endpoints.Endpoints, 1) @@ -930,7 +933,7 @@ func TestSenderConfigFullDDUrl(t *testing.T) { enabled: true dd_url: https://instrumentation-telemetry-intake.us5.datadoghq.com. ` - sndr := makeSenderImpl(t, c) + sndr := makeSenderImpl(t, nil, c) assert.NotNil(t, sndr) assert.Len(t, sndr.(*senderImpl).endpoints.Endpoints, 1) @@ -950,7 +953,7 @@ func TestSenderConfigDDUrlWithAdditionalEndpoints(t *testing.T) { - api_key: bar host: instrumentation-telemetry-intake.us3.datadoghq.com. ` - sndr := makeSenderImpl(t, c) + sndr := makeSenderImpl(t, nil, c) assert.NotNil(t, sndr) assert.Len(t, sndr.(*senderImpl).endpoints.Endpoints, 2) @@ -970,7 +973,7 @@ func TestSenderConfigDDUrlWithEmptyAdditionalPoint(t *testing.T) { dd_url: instrumentation-telemetry-intake.us5.datadoghq.com. additional_endpoints: ` - sndr := makeSenderImpl(t, c) + sndr := makeSenderImpl(t, nil, c) assert.NotNil(t, sndr) assert.Len(t, sndr.(*senderImpl).endpoints.Endpoints, 1) @@ -1010,7 +1013,7 @@ func TestGetAsJSONScrub(t *testing.T) { counter3.AddWithTags(11, map[string]string{"text": "test"}) o := convertYamlStrToMap(t, c) - s := makeSenderImpl(t, c) + s := makeSenderImpl(t, nil, c) r := newRunnerMock() a := getTestAtel(t, tel, o, s, nil, r) require.True(t, a.enabled) @@ -1058,7 +1061,7 @@ func TestAdjustPrometheusCounterValueMultipleTags(t *testing.T) { // setup and initiate atel tel := makeTelMock(t) o := convertYamlStrToMap(t, c) - s := makeSenderImpl(t, c) + s := makeSenderImpl(t, nil, c) r := newRunnerMock() a := getTestAtel(t, tel, o, s, nil, r) require.True(t, a.enabled) @@ -1167,7 +1170,7 @@ func TestAdjustPrometheusCounterValueMultipleTagValues(t *testing.T) { // setup and initiate atel tel := makeTelMock(t) o := convertYamlStrToMap(t, c) - s := makeSenderImpl(t, c) + s := makeSenderImpl(t, nil, c) r := newRunnerMock() a := getTestAtel(t, tel, o, s, nil, r) require.True(t, a.enabled) @@ -1241,7 +1244,7 @@ func TestAdjustPrometheusCounterValueTagless(t *testing.T) { // setup and initiate atel tel := makeTelMock(t) o := convertYamlStrToMap(t, c) - s := makeSenderImpl(t, c) + s := makeSenderImpl(t, nil, c) r := newRunnerMock() a := getTestAtel(t, tel, o, s, nil, r) require.True(t, a.enabled) @@ -1348,7 +1351,7 @@ func TestHistogramFloatUpperBoundNormalization(t *testing.T) { // setup and initiate atel tel := makeTelMock(t) o := convertYamlStrToMap(t, c) - s := makeSenderImpl(t, c) + s := makeSenderImpl(t, nil, c) r := newRunnerMock() a := getTestAtel(t, tel, o, s, nil, r) require.True(t, a.enabled) @@ -1474,7 +1477,7 @@ func TestHistogramFloatUpperBoundNormalizationWithTags(t *testing.T) { // setup and initiate atel tel := makeTelMock(t) o := convertYamlStrToMap(t, c) - s := makeSenderImpl(t, c) + s := makeSenderImpl(t, nil, c) r := newRunnerMock() a := getTestAtel(t, tel, o, s, nil, r) require.True(t, a.enabled) @@ -1591,7 +1594,7 @@ func TestHistogramFloatUpperBoundNormalizationWithMultivalueTags(t *testing.T) { // setup and initiate atel tel := makeTelMock(t) o := convertYamlStrToMap(t, c) - s := makeSenderImpl(t, c) + s := makeSenderImpl(t, nil, c) r := newRunnerMock() a := getTestAtel(t, tel, o, s, nil, r) require.True(t, a.enabled) @@ -1841,7 +1844,7 @@ func TestHistogramPercentile(t *testing.T) { // setup and initiate atel tel := makeTelMock(t) o := convertYamlStrToMap(t, c) - s := makeSenderImpl(t, c) + s := makeSenderImpl(t, nil, c) r := newRunnerMock() a := getTestAtel(t, tel, o, s, nil, r) require.True(t, a.enabled) @@ -1895,3 +1898,72 @@ func TestHistogramPercentile(t *testing.T) { assert.Equal(t, 20.0, *metric.P95) assert.Equal(t, 20.0, *metric.P99) } + +func TestUsingPayloadCompressionInAgentTelemetrySender(t *testing.T) { + // Run with compression (by default default) + var cfg1 = ` + agent_telemetry: + enabled: true + profiles: + - name: xxx + metric: + metrics: + - name: foo.bar + ` + + tel := makeTelMock(t) + hist := tel.NewHistogram("foo", "bar", nil, "", []float64{1, 2, 5, 100}) + hist.Observe(1) + hist.Observe(5) + hist.Observe(6) + hist.Observe(100) + + // setup and initiate atel + o1 := convertYamlStrToMap(t, cfg1) + cl1 := newClientMock() + s1 := makeSenderImpl(t, cl1, cfg1) + r1 := newRunnerMock() + a1 := getTestAtel(t, tel, o1, s1, cl1, r1) + require.True(t, a1.enabled) + + // run the runner to trigger the telemetry report + a1.start() + r1.(*runnerMock).run() + assert.True(t, len(cl1.(*clientMock).body) > 0) + + // Run without compression + var cfg2 = ` + agent_telemetry: + use_compression: false + enabled: true + profiles: + - name: xxx + metric: + metrics: + - name: foo.bar + aggregate_tags: + ` + + // setup and initiate atel + o2 := convertYamlStrToMap(t, cfg2) + cl2 := newClientMock() + s2 := makeSenderImpl(t, cl2, cfg2) + r2 := newRunnerMock() + a2 := getTestAtel(t, tel, o2, s2, cl2, r2) + require.True(t, a2.enabled) + + // run the runner to trigger the telemetry report + a2.start() + r2.(*runnerMock).run() + assert.True(t, len(cl2.(*clientMock).body) > 0) + decompressBody, err := zstd.Decompress(nil, cl1.(*clientMock).body) + require.NoError(t, err) + require.NotZero(t, len(decompressBody)) + + // we cannot compare body (time stamp different and internal + // bucket serialization, but success above and significant size differences + // should be suffient + compressBodyLen := len(cl1.(*clientMock).body) + nonCompressBodyLen := len(cl2.(*clientMock).body) + assert.True(t, float64(nonCompressBodyLen)/float64(compressBodyLen) > 1.5) +} diff --git a/comp/core/agenttelemetry/impl/sender.go b/comp/core/agenttelemetry/impl/sender.go index f489667dc8eb8..b7b62f460b933 100644 --- a/comp/core/agenttelemetry/impl/sender.go +++ b/comp/core/agenttelemetry/impl/sender.go @@ -27,6 +27,7 @@ import ( httputils "github.com/DataDog/datadog-agent/pkg/util/http" "github.com/DataDog/datadog-agent/pkg/util/scrubber" "github.com/DataDog/datadog-agent/pkg/version" + "github.com/DataDog/zstd" ) const ( @@ -58,7 +59,9 @@ type senderImpl struct { cfgComp config.Reader logComp log.Component - client client + compress bool + compressionLevel int + client client endpoints *logconfig.Endpoints @@ -211,9 +214,12 @@ func newSenderImpl( cfgComp: cfgComp, logComp: logComp, - client: client, - endpoints: endpoints, - agentVersion: agentVersion.GetNumberAndPre(), + compress: cfgComp.GetBool("agent_telemetry.use_compression"), + compressionLevel: cfgComp.GetInt("agent_telemetry.compression_level"), + client: client, + endpoints: endpoints, + agentVersion: agentVersion.GetNumberAndPre(), + // pre-fill parts of payload which are not changing during run-time payloadTemplate: Payload{ APIVersion: "v2", @@ -370,11 +376,24 @@ func (s *senderImpl) flushSession(ss *senderSession) error { return fmt.Errorf("failed to marshal agent telemetry payload: %w", err) } - reqBody, err := scrubber.ScrubJSON(payloadJSON) + reqBodyRaw, err := scrubber.ScrubJSON(payloadJSON) if err != nil { return fmt.Errorf("failed to scrubl agent telemetry payload: %w", err) } + // Try to compress the payload if needed + reqBody := reqBodyRaw + compressed := false + if s.compress { + reqBodyCompressed, err2 := zstd.CompressLevel(nil, reqBodyRaw, s.compressionLevel) + if err2 == nil { + compressed = true + reqBody = reqBodyCompressed + } else { + s.logComp.Errorf("Failed to compress agent telemetry payload: %v", err) + } + } + // Send the payload to all endpoints var errs error reqType := payloads.RequestType @@ -386,7 +405,7 @@ func (s *senderImpl) flushSession(ss *senderSession) error { errs = errors.Join(errs, err) continue } - s.addHeaders(req, reqType, ep.GetAPIKey(), bodyLen) + s.addHeaders(req, reqType, ep.GetAPIKey(), bodyLen, compressed) resp, err := s.client.Do(req.WithContext(ss.cancelCtx)) if err != nil { errs = errors.Join(errs, err) @@ -436,7 +455,7 @@ func (s *senderImpl) sendAgentMetricPayloads(ss *senderSession, metrics []*agent } } -func (s *senderImpl) addHeaders(req *http.Request, requesttype, apikey, bodylen string) { +func (s *senderImpl) addHeaders(req *http.Request, requesttype, apikey, bodylen string, compressed bool) { req.Header.Add("DD-Api-Key", apikey) req.Header.Add("Content-Type", "application/json") req.Header.Add("Content-Length", bodylen) @@ -446,4 +465,8 @@ func (s *senderImpl) addHeaders(req *http.Request, requesttype, apikey, bodylen req.Header.Add("DD-Telemetry-Product-Version", s.agentVersion) // Not clear how to acquire that. Appears that EVP adds it automatically req.Header.Add("datadog-container-id", "") + + if compressed { + req.Header.Set("Content-Encoding", "zstd") + } } diff --git a/releasenotes/notes/agent-tel-gzip-bba8a51c1aa3ba2f.yaml b/releasenotes/notes/agent-tel-gzip-bba8a51c1aa3ba2f.yaml new file mode 100644 index 0000000000000..ca28c715ecaa1 --- /dev/null +++ b/releasenotes/notes/agent-tel-gzip-bba8a51c1aa3ba2f.yaml @@ -0,0 +1,11 @@ +# Each section from every release note are combined when the +# CHANGELOG.rst is rendered. So the text needs to be worded so that +# it does not depend on any information only available in another +# section. This may mean repeating some details, but each section +# must be readable independently of the other. +# +# Each section note must be formatted as reStructuredText. +--- +enhancements: + - | + Use HTTP zstd compression for the Agent telemetry payloads.