From 86dab7cf210a986cb2e5d8ea6497476444f4c6ed Mon Sep 17 00:00:00 2001 From: andrewqian2001datadog Date: Wed, 23 Oct 2024 09:41:00 -0400 Subject: [PATCH] [AMLII-1994] Improve expected_tags_duration for metrics (#29634) Co-authored-by: Esther Kim --- comp/core/tagger/README.md | 1 - comp/core/tagger/common/entity_id_builder.go | 2 - .../collectors/workloadmeta_extract.go | 13 -- .../collectors/workloadmeta_main.go | 1 - .../collectors/catalog-core/options.go | 2 - .../collectors/catalog-less/options.go | 2 - .../collectors/catalog/all_options.go | 2 - .../collectors/internal/host/host.go | 116 ---------------- .../collectors/internal/host/host_test.go | 128 ------------------ comp/core/workloadmeta/def/types.go | 1 - pkg/aggregator/demultiplexer.go | 12 +- pkg/aggregator/demultiplexer_agent.go | 10 +- pkg/aggregator/demultiplexer_serverless.go | 20 +-- pkg/aggregator/host_tag_provider.go | 55 ++++++++ pkg/aggregator/host_tag_provider_test.go | 73 ++++++++++ .../no_aggregation_stream_worker.go | 7 +- ...nfigurable-host-tags-c71fb5597d20bffe.yaml | 11 ++ 17 files changed, 170 insertions(+), 286 deletions(-) delete mode 100644 comp/core/workloadmeta/collectors/internal/host/host.go delete mode 100644 comp/core/workloadmeta/collectors/internal/host/host_test.go create mode 100644 pkg/aggregator/host_tag_provider.go create mode 100644 pkg/aggregator/host_tag_provider_test.go create mode 100644 releasenotes/notes/fix-configurable-host-tags-c71fb5597d20bffe.yaml diff --git a/comp/core/tagger/README.md b/comp/core/tagger/README.md index 88ad5d8229623..774a2fcaeecf2 100644 --- a/comp/core/tagger/README.md +++ b/comp/core/tagger/README.md @@ -53,7 +53,6 @@ Tagger entities are identified by a string-typed ID, with one of the following f | workloadmeta.KindContainer | `container_id://` | | workloadmeta.KindContainerImageMetadata | `container_image_metadata://` | | workloadmeta.KindECSTask | `ecs_task://` | -| workloadmeta.KindHost | `host://` | | workloadmeta.KindKubernetesDeployment | `deployment:///` | | workloadmeta.KindKubernetesMetadata | `kubernetes_metadata://///` (`` is empty in cluster-scoped objects) | | workloadmeta.KindKubernetesPod | `kubernetes_pod_uid://` | diff --git a/comp/core/tagger/common/entity_id_builder.go b/comp/core/tagger/common/entity_id_builder.go index 93774664bc7bf..798faa8df3fc0 100644 --- a/comp/core/tagger/common/entity_id_builder.go +++ b/comp/core/tagger/common/entity_id_builder.go @@ -27,8 +27,6 @@ func BuildTaggerEntityID(entityID workloadmeta.EntityID) types.EntityID { return types.NewEntityID(types.Process, entityID.ID) case workloadmeta.KindKubernetesDeployment: return types.NewEntityID(types.KubernetesDeployment, entityID.ID) - case workloadmeta.KindHost: - return types.NewEntityID(types.Host, entityID.ID) case workloadmeta.KindKubernetesMetadata: return types.NewEntityID(types.KubernetesMetadata, entityID.ID) default: diff --git a/comp/core/tagger/taggerimpl/collectors/workloadmeta_extract.go b/comp/core/tagger/taggerimpl/collectors/workloadmeta_extract.go index dcbba45836ccf..5e8bd77951d3a 100644 --- a/comp/core/tagger/taggerimpl/collectors/workloadmeta_extract.go +++ b/comp/core/tagger/taggerimpl/collectors/workloadmeta_extract.go @@ -142,8 +142,6 @@ func (c *WorkloadMetaCollector) processEvents(evBundle workloadmeta.EventBundle) tagInfos = append(tagInfos, c.handleECSTask(ev)...) case workloadmeta.KindContainerImageMetadata: tagInfos = append(tagInfos, c.handleContainerImage(ev)...) - case workloadmeta.KindHost: - tagInfos = append(tagInfos, c.handleHostTags(ev)...) case workloadmeta.KindKubernetesMetadata: tagInfos = append(tagInfos, c.handleKubeMetadata(ev)...) case workloadmeta.KindProcess: @@ -298,17 +296,6 @@ func (c *WorkloadMetaCollector) handleContainerImage(ev workloadmeta.Event) []*t } } -func (c *WorkloadMetaCollector) handleHostTags(ev workloadmeta.Event) []*types.TagInfo { - hostTags := ev.Entity.(*workloadmeta.HostTags) - return []*types.TagInfo{ - { - Source: hostSource, - EntityID: types.NewEntityID("internal", "global-entity-id"), - LowCardTags: hostTags.HostTags, - }, - } -} - func (c *WorkloadMetaCollector) labelsToTags(labels map[string]string, tags *taglist.TagList) { // standard tags from labels c.extractFromMapWithFn(labels, standardDockerLabels, tags.AddStandard) diff --git a/comp/core/tagger/taggerimpl/collectors/workloadmeta_main.go b/comp/core/tagger/taggerimpl/collectors/workloadmeta_main.go index fe561a0ca3481..23c5973e363e4 100644 --- a/comp/core/tagger/taggerimpl/collectors/workloadmeta_main.go +++ b/comp/core/tagger/taggerimpl/collectors/workloadmeta_main.go @@ -34,7 +34,6 @@ const ( containerSource = workloadmetaCollectorName + "-" + string(workloadmeta.KindContainer) containerImageSource = workloadmetaCollectorName + "-" + string(workloadmeta.KindContainerImageMetadata) processSource = workloadmetaCollectorName + "-" + string(workloadmeta.KindProcess) - hostSource = workloadmetaCollectorName + "-" + string(workloadmeta.KindHost) kubeMetadataSource = workloadmetaCollectorName + "-" + string(workloadmeta.KindKubernetesMetadata) deploymentSource = workloadmetaCollectorName + "-" + string(workloadmeta.KindKubernetesDeployment) diff --git a/comp/core/workloadmeta/collectors/catalog-core/options.go b/comp/core/workloadmeta/collectors/catalog-core/options.go index 4738f78796a0a..ab0370acfffdb 100644 --- a/comp/core/workloadmeta/collectors/catalog-core/options.go +++ b/comp/core/workloadmeta/collectors/catalog-core/options.go @@ -17,7 +17,6 @@ import ( "github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/internal/docker" "github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/internal/ecs" "github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/internal/ecsfargate" - "github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/internal/host" "github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/internal/kubeapiserver" "github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/internal/kubelet" "github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/internal/kubemetadata" @@ -39,7 +38,6 @@ func getCollectorOptions() []fx.Option { kubemetadata.GetFxOptions(), podman.GetFxOptions(), remoteprocesscollector.GetFxOptions(), - host.GetFxOptions(), process.GetFxOptions(), } } diff --git a/comp/core/workloadmeta/collectors/catalog-less/options.go b/comp/core/workloadmeta/collectors/catalog-less/options.go index 2f0feda6b9b34..515460e179a1e 100644 --- a/comp/core/workloadmeta/collectors/catalog-less/options.go +++ b/comp/core/workloadmeta/collectors/catalog-less/options.go @@ -17,7 +17,6 @@ import ( "github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/internal/docker" "github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/internal/ecs" "github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/internal/ecsfargate" - "github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/internal/host" "github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/internal/kubeapiserver" "github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/internal/kubelet" "github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/internal/kubemetadata" @@ -41,6 +40,5 @@ func getCollectorOptions() []fx.Option { remoteworkloadmeta.GetFxOptions(), fx.Supply(remoteworkloadmeta.Params{}), processcollector.GetFxOptions(), - host.GetFxOptions(), } } diff --git a/comp/core/workloadmeta/collectors/catalog/all_options.go b/comp/core/workloadmeta/collectors/catalog/all_options.go index 616552b889217..2865a451a9fd1 100644 --- a/comp/core/workloadmeta/collectors/catalog/all_options.go +++ b/comp/core/workloadmeta/collectors/catalog/all_options.go @@ -22,7 +22,6 @@ import ( "github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/internal/docker" "github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/internal/ecs" "github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/internal/ecsfargate" - "github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/internal/host" "github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/internal/kubeapiserver" "github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/internal/kubelet" "github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/internal/kubemetadata" @@ -46,6 +45,5 @@ func getCollectorOptions() []fx.Option { remoteworkloadmeta.GetFxOptions(), remoteWorkloadmetaParams(), processcollector.GetFxOptions(), - host.GetFxOptions(), } } diff --git a/comp/core/workloadmeta/collectors/internal/host/host.go b/comp/core/workloadmeta/collectors/internal/host/host.go deleted file mode 100644 index 99864ce288002..0000000000000 --- a/comp/core/workloadmeta/collectors/internal/host/host.go +++ /dev/null @@ -1,116 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2016-present Datadog, Inc. - -// Package host implements the host tag Workloadmeta collector. -package host - -import ( - "context" - - "github.com/benbjohnson/clock" - "go.uber.org/fx" - - "github.com/DataDog/datadog-agent/comp/core/config" - workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def" - hostMetadataUtils "github.com/DataDog/datadog-agent/comp/metadata/host/hostimpl/hosttags" - "github.com/DataDog/datadog-agent/pkg/util/log" -) - -const id = "host" - -type dependencies struct { - fx.In - - Config config.Component -} - -type collector struct { - store workloadmeta.Component - catalog workloadmeta.AgentType - config config.Component - clock clock.Clock - timeoutTimer *clock.Timer -} - -// GetFxOptions returns the FX framework options for the collector -func GetFxOptions() fx.Option { - return fx.Provide(NewCollector) -} - -// NewCollector returns a new host collector provider and an error -func NewCollector(deps dependencies) (workloadmeta.CollectorProvider, error) { - return workloadmeta.CollectorProvider{ - Collector: &collector{ - catalog: workloadmeta.NodeAgent | workloadmeta.ProcessAgent, - config: deps.Config, - clock: clock.New(), - }, - }, nil -} - -func (c *collector) Start(_ context.Context, store workloadmeta.Component) error { - - c.store = store - - duration := c.config.GetDuration("expected_tags_duration") - if duration <= 0 { - return nil - } - - log.Debugf("Adding host tags to metrics for %v", duration) - c.timeoutTimer = c.clock.Timer(duration) - - return nil -} - -func (c *collector) Pull(ctx context.Context) error { - // Feature is disabled or timeout has previously occurred - if c.timeoutTimer == nil { - return nil - } - - // Timeout reached - expire any host tags in the store - if c.resetTimerIfTimedOut() { - c.store.Notify(makeEvent([]string{})) - return nil - } - - tags := hostMetadataUtils.Get(ctx, false, c.config).System - c.store.Notify(makeEvent(tags)) - return nil -} - -func (c *collector) GetID() string { - return id -} - -func (c *collector) GetTargetCatalog() workloadmeta.AgentType { - return c.catalog -} - -func (c *collector) resetTimerIfTimedOut() bool { - select { - case <-c.timeoutTimer.C: - c.timeoutTimer = nil - return true - default: - return false - } -} - -func makeEvent(tags []string) []workloadmeta.CollectorEvent { - return []workloadmeta.CollectorEvent{ - { - Type: workloadmeta.EventTypeSet, - Source: workloadmeta.SourceHost, - Entity: &workloadmeta.HostTags{ - EntityID: workloadmeta.EntityID{ - Kind: workloadmeta.KindHost, - ID: id, - }, - HostTags: tags, - }, - }} -} diff --git a/comp/core/workloadmeta/collectors/internal/host/host_test.go b/comp/core/workloadmeta/collectors/internal/host/host_test.go deleted file mode 100644 index a7b49a782cb74..0000000000000 --- a/comp/core/workloadmeta/collectors/internal/host/host_test.go +++ /dev/null @@ -1,128 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2016-present Datadog, Inc. - -// Package host implements the host tag Workloadmeta collector. -package host - -import ( - "context" - "sync" - "testing" - "time" - - "github.com/benbjohnson/clock" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.uber.org/fx" - - "github.com/DataDog/datadog-agent/comp/core" - "github.com/DataDog/datadog-agent/comp/core/config" - workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def" - workloadmetafxmock "github.com/DataDog/datadog-agent/comp/core/workloadmeta/fx-mock" - workloadmetamock "github.com/DataDog/datadog-agent/comp/core/workloadmeta/mock" - "github.com/DataDog/datadog-agent/pkg/util/fxutil" -) - -type testDeps struct { - fx.In - - Config config.Component - Wml workloadmetamock.Mock -} - -func TestHostCollector(t *testing.T) { - expectedTags := []string{"tag1:value1", "tag2", "tag3"} - ctx := context.TODO() - - overrides := map[string]interface{}{ - "tags": expectedTags, - "expected_tags_duration": "10m", - } - - deps := fxutil.Test[testDeps](t, fx.Options( - fx.Replace(config.MockParams{Overrides: overrides}), - core.MockBundle(), - fx.Supply(context.Background()), - workloadmetafxmock.MockModule(workloadmeta.NewParams()), - )) - - mockClock := clock.NewMock() - c := collector{ - config: deps.Config, - clock: mockClock, - } - - err := c.Start(ctx, deps.Wml) - require.NoError(t, err) - - expectedWorkloadmetaEvents := []workloadmeta.Event{ - { - // Event generated by the first Pull() call - Type: workloadmeta.EventTypeSet, - Entity: &workloadmeta.HostTags{ - EntityID: workloadmeta.EntityID{ - Kind: workloadmeta.KindHost, - ID: "host", - }, - HostTags: expectedTags, - }, - }, - { - // Event generated by the second Pull() call after more than - // "config.expected_tags_duration" has passed - Type: workloadmeta.EventTypeSet, - Entity: &workloadmeta.HostTags{ - EntityID: workloadmeta.EntityID{ - Kind: workloadmeta.KindHost, - ID: "host", - }, - HostTags: []string{}, - }, - }, - } - - // Create a subscriber in a different goroutine and check that it receives - // the expected events - var wg sync.WaitGroup - wg.Add(1) - go func() { - assertExpectedEventsAreReceived(t, deps.Wml, 10*time.Second, expectedWorkloadmetaEvents) - wg.Done() - }() - - err = c.Pull(ctx) - require.NoError(t, err) - - mockClock.Add(11 * time.Minute) // Notice that this is more than the expected_tags_duration defined above - mockClock.WaitForAllTimers() - - err = c.Pull(ctx) - require.NoError(t, err) - - wg.Wait() -} - -func assertExpectedEventsAreReceived(t *testing.T, wlmeta workloadmeta.Component, timeout time.Duration, expectedEvents []workloadmeta.Event) { - eventChan := wlmeta.Subscribe( - "host-test", - workloadmeta.NormalPriority, - workloadmeta.NewFilterBuilder().AddKind(workloadmeta.KindHost).Build(), - ) - defer wlmeta.Unsubscribe(eventChan) - - var receivedEvents []workloadmeta.Event - - for len(receivedEvents) < len(expectedEvents) { - select { - case eventBundle := <-eventChan: - eventBundle.Acknowledge() - receivedEvents = append(receivedEvents, eventBundle.Events...) - case <-time.After(timeout): - require.Fail(t, "timed out waiting for event") - } - } - - assert.ElementsMatch(t, expectedEvents, receivedEvents) -} diff --git a/comp/core/workloadmeta/def/types.go b/comp/core/workloadmeta/def/types.go index 5bef5126a10b5..9d50f7f88ed78 100644 --- a/comp/core/workloadmeta/def/types.go +++ b/comp/core/workloadmeta/def/types.go @@ -47,7 +47,6 @@ const ( KindECSTask Kind = "ecs_task" KindContainerImageMetadata Kind = "container_image_metadata" KindProcess Kind = "process" - KindHost Kind = "host" ) // Source is the source name of an entity. diff --git a/pkg/aggregator/demultiplexer.go b/pkg/aggregator/demultiplexer.go index 5a9b76c2b5811..7990ce4973e37 100644 --- a/pkg/aggregator/demultiplexer.go +++ b/pkg/aggregator/demultiplexer.go @@ -10,6 +10,7 @@ import ( pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup" "github.com/DataDog/datadog-agent/pkg/metrics" + "github.com/DataDog/datadog-agent/pkg/tagset" "github.com/DataDog/datadog-agent/pkg/aggregator/sender" agentruntime "github.com/DataDog/datadog-agent/pkg/runtime" @@ -94,19 +95,23 @@ func createIterableMetrics( serializer serializer.MetricSerializer, logPayloads bool, isServerless bool, + hostTagProvider *HostTagProvider, ) (*metrics.IterableSeries, *metrics.IterableSketches) { var series *metrics.IterableSeries var sketches *metrics.IterableSketches - + hostTags := hostTagProvider.GetHostTags() if serializer.AreSeriesEnabled() { series = metrics.NewIterableSeries(func(se *metrics.Serie) { if logPayloads { log.Debugf("Flushing serie: %s", se) } + + if hostTags != nil { + se.Tags = tagset.CombineCompositeTagsAndSlice(se.Tags, hostTagProvider.GetHostTags()) + } tagsetTlm.updateHugeSerieTelemetry(se) }, flushAndSerializeInParallel.BufferSize, flushAndSerializeInParallel.ChannelSize) } - if serializer.AreSketchesEnabled() { sketches = metrics.NewIterableSketches(func(sketch *metrics.SketchSeries) { if logPayloads { @@ -115,6 +120,9 @@ func createIterableMetrics( if isServerless { log.DebugfServerless("Sending sketches payload : %s", sketch.String()) } + if hostTags != nil { + sketch.Tags = tagset.CombineCompositeTagsAndSlice(sketch.Tags, hostTagProvider.GetHostTags()) + } tagsetTlm.updateHugeSketchesTelemetry(sketch) }, flushAndSerializeInParallel.BufferSize, flushAndSerializeInParallel.ChannelSize) } diff --git a/pkg/aggregator/demultiplexer_agent.go b/pkg/aggregator/demultiplexer_agent.go index fcd44e572e45c..914e544e91c2b 100644 --- a/pkg/aggregator/demultiplexer_agent.go +++ b/pkg/aggregator/demultiplexer_agent.go @@ -63,6 +63,8 @@ type AgentDemultiplexer struct { senders *senders + hostTagProvider *HostTagProvider + // sharded statsd time samplers statsd } @@ -159,7 +161,6 @@ func initAgentDemultiplexer( bufferSize := pkgconfigsetup.Datadog().GetInt("aggregator_buffer_size") metricSamplePool := metrics.NewMetricSamplePool(MetricSamplePoolBatchSize, utils.IsTelemetryEnabled(pkgconfigsetup.Datadog())) - _, statsdPipelinesCount := GetDogStatsDWorkerAndPipelineCount() log.Debug("the Demultiplexer will use", statsdPipelinesCount, "pipelines") @@ -190,7 +191,6 @@ func initAgentDemultiplexer( } // -- - demux := &AgentDemultiplexer{ log: log, options: options, @@ -208,7 +208,8 @@ func initAgentDemultiplexer( noAggSerializer: noAggSerializer, }, - senders: newSenders(agg), + hostTagProvider: NewHostTagProvider(), + senders: newSenders(agg), // statsd time samplers statsd: statsd{ @@ -400,8 +401,7 @@ func (d *AgentDemultiplexer) flushToSerializer(start time.Time, waitForSerialize } logPayloads := pkgconfigsetup.Datadog().GetBool("log_payloads") - series, sketches := createIterableMetrics(d.aggregator.flushAndSerializeInParallel, d.sharedSerializer, logPayloads, false) - + series, sketches := createIterableMetrics(d.aggregator.flushAndSerializeInParallel, d.sharedSerializer, logPayloads, false, d.hostTagProvider) metrics.Serialize( series, sketches, diff --git a/pkg/aggregator/demultiplexer_serverless.go b/pkg/aggregator/demultiplexer_serverless.go index 97573531f5a9d..686ce5e6c096f 100644 --- a/pkg/aggregator/demultiplexer_serverless.go +++ b/pkg/aggregator/demultiplexer_serverless.go @@ -37,6 +37,8 @@ type ServerlessDemultiplexer struct { flushAndSerializeInParallel FlushAndSerializeInParallel + hostTagProvider *HostTagProvider + *senders } @@ -55,14 +57,14 @@ func InitAndStartServerlessDemultiplexer(keysPerDomain map[string][]string, forw statsdWorker := newTimeSamplerWorker(statsdSampler, DefaultFlushInterval, bufferSize, metricSamplePool, flushAndSerializeInParallel, tagsStore) demux := &ServerlessDemultiplexer{ - log: logger, - forwarder: forwarder, - statsdSampler: statsdSampler, - statsdWorker: statsdWorker, - serializer: serializer, - metricSamplePool: metricSamplePool, - flushLock: &sync.Mutex{}, - + log: logger, + forwarder: forwarder, + statsdSampler: statsdSampler, + statsdWorker: statsdWorker, + serializer: serializer, + metricSamplePool: metricSamplePool, + flushLock: &sync.Mutex{}, + hostTagProvider: NewHostTagProvider(), flushAndSerializeInParallel: flushAndSerializeInParallel, } @@ -105,7 +107,7 @@ func (d *ServerlessDemultiplexer) ForceFlushToSerializer(start time.Time, waitFo defer d.flushLock.Unlock() logPayloads := pkgconfigsetup.Datadog().GetBool("log_payloads") - series, sketches := createIterableMetrics(d.flushAndSerializeInParallel, d.serializer, logPayloads, true) + series, sketches := createIterableMetrics(d.flushAndSerializeInParallel, d.serializer, logPayloads, true, d.hostTagProvider) metrics.Serialize( series, diff --git a/pkg/aggregator/host_tag_provider.go b/pkg/aggregator/host_tag_provider.go new file mode 100644 index 0000000000000..4158756977cc9 --- /dev/null +++ b/pkg/aggregator/host_tag_provider.go @@ -0,0 +1,55 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//nolint:revive +package aggregator + +import ( + "context" + "sync" + + hostMetadataUtils "github.com/DataDog/datadog-agent/comp/metadata/host/hostimpl/hosttags" + pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup" + "github.com/DataDog/datadog-agent/pkg/util/log" + + "github.com/benbjohnson/clock" +) + +type HostTagProvider struct { + hostTags []string + sync.RWMutex +} + +func NewHostTagProvider() *HostTagProvider { + return newHostTagProviderWithClock(clock.New()) +} + +func newHostTagProviderWithClock(clock clock.Clock) *HostTagProvider { + p := &HostTagProvider{ + hostTags: nil, + } + + duration := pkgconfigsetup.Datadog().GetDuration("expected_tags_duration") + log.Debugf("Adding host tags to metrics for %v", duration) + if pkgconfigsetup.Datadog().GetDuration("expected_tags_duration") > 0 { + p.hostTags = append([]string{}, hostMetadataUtils.Get(context.TODO(), false, pkgconfigsetup.Datadog()).System...) + expectedTagsDeadline := pkgconfigsetup.StartTime.Add(duration) + clock.AfterFunc(expectedTagsDeadline.Sub(clock.Now()), func() { + p.Lock() + defer p.Unlock() + p.hostTags = nil + log.Debugf("host tags for metrics have expired") + }) + } + + return p +} + +func (p *HostTagProvider) GetHostTags() []string { + p.RLock() + defer p.RUnlock() + + return p.hostTags +} diff --git a/pkg/aggregator/host_tag_provider_test.go b/pkg/aggregator/host_tag_provider_test.go new file mode 100644 index 0000000000000..b8b56bc50a044 --- /dev/null +++ b/pkg/aggregator/host_tag_provider_test.go @@ -0,0 +1,73 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package aggregator + +import ( + "testing" + "time" + + "github.com/benbjohnson/clock" + "github.com/stretchr/testify/assert" + + configmock "github.com/DataDog/datadog-agent/pkg/config/mock" + pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup" +) + +// TestHostTagProviderNoExpiration checks that tags are not expired when expected_tags_duration is 0 +func TestExpectedTagDurationNotSet(t *testing.T) { + + mockConfig := configmock.New(t) + + tags := []string{"tag1:value1", "tag2:value2", "tag3:value3"} + mockConfig.SetWithoutSource("tags", tags) + defer mockConfig.SetWithoutSource("tags", nil) + + // Setting expected_tags_duration to 0 (no host tags should be added) + mockConfig.SetWithoutSource("expected_tags_duration", "0") + + p := NewHostTagProvider() + + tagList := p.GetHostTags() + + assert.Equal(t, 0, len(tagList)) +} + +// TestHostTagProviderExpectedTags verifies that the tags are returned correctly and then return nil after the expected duration +func TestHostTagProviderExpectedTags(t *testing.T) { + mockConfig := configmock.New(t) + + mockClock := clock.NewMock() + + oldStartTime := pkgconfigsetup.StartTime + pkgconfigsetup.StartTime = mockClock.Now() + defer func() { + pkgconfigsetup.StartTime = oldStartTime + }() + + // Define and set the expected tags + hosttags := []string{"tag1:value1", "tag2:value2", "tag3:value3"} + mockConfig.SetWithoutSource("tags", hosttags) + defer mockConfig.SetWithoutSource("tags", nil) + + // Set the expected tags expiration duration to 5 seconds + expectedTagsDuration := 5 * time.Second + mockConfig.SetWithoutSource("expected_tags_duration", "5s") + defer mockConfig.SetWithoutSource("expected_tags_duration", "0") + + p := newHostTagProviderWithClock(mockClock) + + tagList := p.GetHostTags() + + // Verify that the tags are returned correctly before expiration + assert.Equal(t, hosttags, tagList) + + // Simulate time passing for the expected duration (5 seconds) + mockClock.Add(expectedTagsDuration) + + // Verify that after the expiration time, the tags are no longer returned (nil) + assert.Nil(t, p.GetHostTags()) + +} diff --git a/pkg/aggregator/no_aggregation_stream_worker.go b/pkg/aggregator/no_aggregation_stream_worker.go index 38324f7d7dd01..2569067934bd1 100644 --- a/pkg/aggregator/no_aggregation_stream_worker.go +++ b/pkg/aggregator/no_aggregation_stream_worker.go @@ -49,6 +49,8 @@ type noAggregationStreamWorker struct { samplesChan chan metrics.MetricSampleBatch stopChan chan trigger + hostTagProvider *HostTagProvider + logThrottling util.SimpleThrottler } @@ -95,6 +97,7 @@ func newNoAggregationStreamWorker(maxMetricsPerPayload int, _ *metrics.MetricSam stopChan: make(chan trigger), samplesChan: make(chan metrics.MetricSampleBatch, pkgconfigsetup.Datadog().GetInt("dogstatsd_queue_size")), + hostTagProvider: NewHostTagProvider(), // warning for the unsupported metric types should appear maximum 200 times // every 5 minutes. logThrottling: util.NewSimpleThrottler(200, 5*time.Minute, "Pausing the unsupported metric type warning message for 5m"), @@ -145,7 +148,7 @@ func (w *noAggregationStreamWorker) run() { ticker := time.NewTicker(noAggWorkerStreamCheckFrequency) defer ticker.Stop() logPayloads := pkgconfigsetup.Datadog().GetBool("log_payloads") - w.seriesSink, w.sketchesSink = createIterableMetrics(w.flushConfig, w.serializer, logPayloads, false) + w.seriesSink, w.sketchesSink = createIterableMetrics(w.flushConfig, w.serializer, logPayloads, false, w.hostTagProvider) stopped := false var stopBlockChan chan struct{} @@ -246,7 +249,7 @@ func (w *noAggregationStreamWorker) run() { break } - w.seriesSink, w.sketchesSink = createIterableMetrics(w.flushConfig, w.serializer, logPayloads, false) + w.seriesSink, w.sketchesSink = createIterableMetrics(w.flushConfig, w.serializer, logPayloads, false, w.hostTagProvider) } if stopBlockChan != nil { diff --git a/releasenotes/notes/fix-configurable-host-tags-c71fb5597d20bffe.yaml b/releasenotes/notes/fix-configurable-host-tags-c71fb5597d20bffe.yaml new file mode 100644 index 0000000000000..02b6d7d69f2c1 --- /dev/null +++ b/releasenotes/notes/fix-configurable-host-tags-c71fb5597d20bffe.yaml @@ -0,0 +1,11 @@ +# Each section from every release note are combined when the +# CHANGELOG.rst is rendered. So the text needs to be worded so that +# it does not depend on any information only available in another +# section. This may mean repeating some details, but each section +# must be readable independently of the other. +# +# Each section note must be formatted as reStructuredText. +--- +fixes: + - | + Fixes host tags with a configurable duration so the metric's context hash doesn't change, preventing the aggregator from mistaking it as a new metric.