diff --git a/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer.go b/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer.go index 11e6b774524a7b..88ff50f819d875 100644 --- a/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer.go +++ b/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer.go @@ -20,6 +20,7 @@ import ( "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder" "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" orchestratorforwarder "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator" + haagent "github.com/DataDog/datadog-agent/comp/haagent/def" "github.com/DataDog/datadog-agent/comp/serializer/compression" "github.com/DataDog/datadog-agent/pkg/aggregator" "github.com/DataDog/datadog-agent/pkg/aggregator/sender" @@ -42,6 +43,7 @@ type dependencies struct { SharedForwarder defaultforwarder.Component OrchestratorForwarder orchestratorforwarder.Component EventPlatformForwarder eventplatform.Component + HaAgent haagent.Component Compressor compression.Component Tagger tagger.Component @@ -86,6 +88,7 @@ func newDemultiplexer(deps dependencies) (provides, error) { deps.OrchestratorForwarder, options, deps.EventPlatformForwarder, + deps.HaAgent, deps.Compressor, deps.Tagger, hostnameDetected, diff --git a/comp/aggregator/demultiplexer/demultiplexerimpl/test_agent_demultiplexer.go b/comp/aggregator/demultiplexer/demultiplexerimpl/test_agent_demultiplexer.go index a227f78af631b0..6db4b02911a46d 100644 --- a/comp/aggregator/demultiplexer/demultiplexerimpl/test_agent_demultiplexer.go +++ b/comp/aggregator/demultiplexer/demultiplexerimpl/test_agent_demultiplexer.go @@ -185,6 +185,6 @@ func initTestAgentDemultiplexerWithFlushInterval(log log.Component, hostname hos sharedForwarder := defaultforwarder.NewDefaultForwarder(pkgconfigsetup.Datadog(), log, sharedForwarderOptions) orchestratorForwarder := optional.NewOption[defaultforwarder.Forwarder](defaultforwarder.NoopForwarder{}) eventPlatformForwarder := optional.NewOptionPtr[eventplatform.Forwarder](eventplatformimpl.NewNoopEventPlatformForwarder(hostname)) - demux := aggregator.InitAndStartAgentDemultiplexer(log, sharedForwarder, &orchestratorForwarder, opts, eventPlatformForwarder, compressor, noopimpl.NewTaggerClient(), "hostname") + demux := aggregator.InitAndStartAgentDemultiplexer(log, sharedForwarder, &orchestratorForwarder, opts, eventPlatformForwarder, nil, compressor, noopimpl.NewTaggerClient(), "hostname") return NewTestAgentDemultiplexer(demux) } diff --git a/comp/aggregator/diagnosesendermanager/diagnosesendermanagerimpl/sendermanager.go b/comp/aggregator/diagnosesendermanager/diagnosesendermanagerimpl/sendermanager.go index 8ac9a86d7ec271..04a828e1493447 100644 --- a/comp/aggregator/diagnosesendermanager/diagnosesendermanagerimpl/sendermanager.go +++ b/comp/aggregator/diagnosesendermanager/diagnosesendermanagerimpl/sendermanager.go @@ -72,15 +72,7 @@ func (sender *diagnoseSenderManager) LazyGetSenderManager() (sender.SenderManage forwarder := defaultforwarder.NewDefaultForwarder(config, log, defaultforwarder.NewOptions(config, log, nil)) orchestratorForwarder := optional.NewOptionPtr[defaultforwarder.Forwarder](defaultforwarder.NoopForwarder{}) eventPlatformForwarder := optional.NewOptionPtr[eventplatform.Forwarder](eventplatformimpl.NewNoopEventPlatformForwarder(sender.deps.Hostname)) - senderManager = aggregator.InitAndStartAgentDemultiplexer( - log, - forwarder, - orchestratorForwarder, - opts, - eventPlatformForwarder, - sender.deps.Compressor, - sender.deps.Tagger, - hostnameDetected) + senderManager = aggregator.InitAndStartAgentDemultiplexer(log, forwarder, orchestratorForwarder, opts, eventPlatformForwarder, nil, sender.deps.Compressor, sender.deps.Tagger, hostnameDetected) sender.senderManager.Set(senderManager) return senderManager, nil diff --git a/pkg/aggregator/aggregator.go b/pkg/aggregator/aggregator.go index f7201808048785..d3451e4eeba75e 100644 --- a/pkg/aggregator/aggregator.go +++ b/pkg/aggregator/aggregator.go @@ -16,6 +16,7 @@ import ( "github.com/DataDog/datadog-agent/comp/core/tagger" "github.com/DataDog/datadog-agent/comp/core/tagger/types" "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" + haagent "github.com/DataDog/datadog-agent/comp/haagent/def" "github.com/DataDog/datadog-agent/pkg/aggregator/internal/tags" checkid "github.com/DataDog/datadog-agent/pkg/collector/check/id" "github.com/DataDog/datadog-agent/pkg/config/model" @@ -252,6 +253,7 @@ type BufferedAggregator struct { flushMutex sync.Mutex // to start multiple flushes in parallel serializer serializer.MetricSerializer eventPlatformForwarder eventplatform.Component + haAgent haagent.Component hostname string hostnameUpdate chan string hostnameUpdateDone chan struct{} // signals that the hostname update is finished @@ -283,7 +285,7 @@ func NewFlushAndSerializeInParallel(config model.Config) FlushAndSerializeInPara } // NewBufferedAggregator instantiates a BufferedAggregator -func NewBufferedAggregator(s serializer.MetricSerializer, eventPlatformForwarder eventplatform.Component, tagger tagger.Component, hostname string, flushInterval time.Duration) *BufferedAggregator { +func NewBufferedAggregator(s serializer.MetricSerializer, eventPlatformForwarder eventplatform.Component, haAgent haagent.Component, tagger tagger.Component, hostname string, flushInterval time.Duration) *BufferedAggregator { bufferSize := pkgconfigsetup.Datadog().GetInt("aggregator_buffer_size") agentName := flavor.GetFlavor() @@ -326,6 +328,7 @@ func NewBufferedAggregator(s serializer.MetricSerializer, eventPlatformForwarder flushInterval: flushInterval, serializer: s, eventPlatformForwarder: eventPlatformForwarder, + haAgent: haAgent, hostname: hostname, hostnameUpdate: make(chan string), hostnameUpdateDone: make(chan struct{}), @@ -861,8 +864,9 @@ func (agg *BufferedAggregator) tags(withVersion bool) []string { tags = append(tags, "package_version:"+version.AgentPackageVersion) } } - if haagent.IsEnabled() { - tags = append(tags, "ha_agent_group:"+haagent.GetGroup()) + // TODO: remove "agg.haAgent != nil" + if agg.haAgent != nil && agg.haAgent.Enabled() { + tags = append(tags, "agent_group:"+agg.haAgent.GetGroup()) } // nil to empty string // This is expected by other components/tests diff --git a/pkg/aggregator/aggregator_test.go b/pkg/aggregator/aggregator_test.go index 2b51ce784d230e..bd0adff5a45305 100644 --- a/pkg/aggregator/aggregator_test.go +++ b/pkg/aggregator/aggregator_test.go @@ -147,7 +147,7 @@ func TestAddServiceCheckDefaultValues(t *testing.T) { s := &MockSerializerIterableSerie{} taggerComponent := fxutil.Test[tagger.Mock](t, taggerimpl.MockModule()) - agg := NewBufferedAggregator(s, nil, taggerComponent, "resolved-hostname", DefaultFlushInterval) + agg := NewBufferedAggregator(s, nil, nil, taggerComponent, "resolved-hostname", DefaultFlushInterval) agg.addServiceCheck(servicecheck.ServiceCheck{ // leave Host and Ts fields blank @@ -180,7 +180,7 @@ func TestAddEventDefaultValues(t *testing.T) { s := &MockSerializerIterableSerie{} taggerComponent := fxutil.Test[tagger.Mock](t, taggerimpl.MockModule()) - agg := NewBufferedAggregator(s, nil, taggerComponent, "resolved-hostname", DefaultFlushInterval) + agg := NewBufferedAggregator(s, nil, nil, taggerComponent, "resolved-hostname", DefaultFlushInterval) agg.addEvent(event.Event{ // only populate required fields @@ -230,7 +230,7 @@ func TestDefaultData(t *testing.T) { s := &MockSerializerIterableSerie{} taggerComponent := fxutil.Test[tagger.Mock](t, taggerimpl.MockModule()) - agg := NewBufferedAggregator(s, nil, taggerComponent, "hostname", DefaultFlushInterval) + agg := NewBufferedAggregator(s, nil, nil, taggerComponent, "hostname", DefaultFlushInterval) start := time.Now() @@ -586,7 +586,7 @@ func TestTags(t *testing.T) { taggerComponent := fxutil.Test[tagger.Mock](t, taggerimpl.MockModule()) - agg := NewBufferedAggregator(nil, nil, taggerComponent, tt.hostname, time.Second) + agg := NewBufferedAggregator(nil, nil, nil, taggerComponent, tt.hostname, time.Second) agg.agentTags = tt.agentTags agg.globalTags = tt.globalTags assert.ElementsMatch(t, tt.want, agg.tags(tt.withVersion)) @@ -620,7 +620,7 @@ func TestAddDJMRecurrentSeries(t *testing.T) { s := &MockSerializerIterableSerie{} // NewBufferedAggregator with DJM enable will create a new recurrentSeries taggerComponent := fxutil.Test[tagger.Mock](t, taggerimpl.MockModule()) - NewBufferedAggregator(s, nil, taggerComponent, "hostname", DefaultFlushInterval) + NewBufferedAggregator(s, nil, nil, taggerComponent, "hostname", DefaultFlushInterval) expectedRecurrentSeries := metrics.Series{&metrics.Serie{ Name: "datadog.djm.agent_host", diff --git a/pkg/aggregator/check_sampler_bench_test.go b/pkg/aggregator/check_sampler_bench_test.go index 9f001097d583c4..720b60791b6c8a 100644 --- a/pkg/aggregator/check_sampler_bench_test.go +++ b/pkg/aggregator/check_sampler_bench_test.go @@ -52,7 +52,7 @@ func benchmarkAddBucket(bucketValue int64, b *testing.B) { sharedForwarder := forwarder.NewDefaultForwarder(pkgconfigsetup.Datadog(), deps.Log, forwarderOpts) orchestratorForwarder := optional.NewOption[defaultforwarder.Forwarder](defaultforwarder.NoopForwarder{}) eventPlatformForwarder := optional.NewOptionPtr[eventplatform.Forwarder](eventplatformimpl.NewNoopEventPlatformForwarder(deps.Hostname)) - demux := InitAndStartAgentDemultiplexer(deps.Log, sharedForwarder, &orchestratorForwarder, options, eventPlatformForwarder, deps.Compressor, taggerComponent, "hostname") + demux := InitAndStartAgentDemultiplexer(deps.Log, sharedForwarder, &orchestratorForwarder, options, eventPlatformForwarder, nil, deps.Compressor, taggerComponent, "hostname") defer demux.Stop(true) checkSampler := newCheckSampler(1, true, true, 1000, tags.NewStore(true, "bench"), checkid.ID("hello:world:1234"), taggerComponent) diff --git a/pkg/aggregator/demultiplexer_agent.go b/pkg/aggregator/demultiplexer_agent.go index bc2e424b331990..bcec20c855a843 100644 --- a/pkg/aggregator/demultiplexer_agent.go +++ b/pkg/aggregator/demultiplexer_agent.go @@ -17,6 +17,7 @@ import ( forwarder "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder" "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" orchestratorforwarder "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator" + haagent "github.com/DataDog/datadog-agent/comp/haagent/def" "github.com/DataDog/datadog-agent/comp/serializer/compression" "github.com/DataDog/datadog-agent/pkg/aggregator/internal/tags" "github.com/DataDog/datadog-agent/pkg/aggregator/sender" @@ -119,29 +120,13 @@ type dataOutputs struct { // InitAndStartAgentDemultiplexer creates a new Demultiplexer and runs what's necessary // in goroutines. As of today, only the embedded BufferedAggregator needs a separate goroutine. // In the future, goroutines will be started for the event platform forwarder and/or orchestrator forwarder. -func InitAndStartAgentDemultiplexer( - log log.Component, - sharedForwarder forwarder.Forwarder, - orchestratorForwarder orchestratorforwarder.Component, - options AgentDemultiplexerOptions, - eventPlatformForwarder eventplatform.Component, - compressor compression.Component, - tagger tagger.Component, - hostname string) *AgentDemultiplexer { - demux := initAgentDemultiplexer(log, sharedForwarder, orchestratorForwarder, options, eventPlatformForwarder, compressor, tagger, hostname) +func InitAndStartAgentDemultiplexer(log log.Component, sharedForwarder forwarder.Forwarder, orchestratorForwarder orchestratorforwarder.Component, options AgentDemultiplexerOptions, eventPlatformForwarder eventplatform.Component, haAgent haagent.Component, compressor compression.Component, tagger tagger.Component, hostname string) *AgentDemultiplexer { + demux := initAgentDemultiplexer(log, sharedForwarder, orchestratorForwarder, options, eventPlatformForwarder, haAgent, compressor, tagger, hostname) go demux.run() return demux } -func initAgentDemultiplexer( - log log.Component, - sharedForwarder forwarder.Forwarder, - orchestratorForwarder orchestratorforwarder.Component, - options AgentDemultiplexerOptions, - eventPlatformForwarder eventplatform.Component, - compressor compression.Component, - tagger tagger.Component, - hostname string) *AgentDemultiplexer { +func initAgentDemultiplexer(log log.Component, sharedForwarder forwarder.Forwarder, orchestratorForwarder orchestratorforwarder.Component, options AgentDemultiplexerOptions, eventPlatformForwarder eventplatform.Component, haAgent haagent.Component, compressor compression.Component, tagger tagger.Component, hostname string) *AgentDemultiplexer { // prepare the multiple forwarders // ------------------------------- if pkgconfigsetup.Datadog().GetBool("telemetry.enabled") && pkgconfigsetup.Datadog().GetBool("telemetry.dogstatsd_origin") && !pkgconfigsetup.Datadog().GetBool("aggregator_use_tags_store") { @@ -157,7 +142,7 @@ func initAgentDemultiplexer( // prepare the embedded aggregator // -- - agg := NewBufferedAggregator(sharedSerializer, eventPlatformForwarder, tagger, hostname, options.FlushInterval) + agg := NewBufferedAggregator(sharedSerializer, eventPlatformForwarder, haAgent, tagger, hostname, options.FlushInterval) // statsd samplers // --------------- diff --git a/pkg/aggregator/demultiplexer_agent_test.go b/pkg/aggregator/demultiplexer_agent_test.go index 13d75d0cfaa568..b1110d50c7b983 100644 --- a/pkg/aggregator/demultiplexer_agent_test.go +++ b/pkg/aggregator/demultiplexer_agent_test.go @@ -65,7 +65,7 @@ func TestDemuxNoAggOptionDisabled(t *testing.T) { opts := demuxTestOptions() deps := createDemultiplexerAgentTestDeps(t) - demux := initAgentDemultiplexer(deps.Log, NewForwarderTest(deps.Log), deps.OrchestratorFwd, opts, deps.EventPlatform, deps.Compressor, deps.Tagger, "") + demux := initAgentDemultiplexer(deps.Log, NewForwarderTest(deps.Log), deps.OrchestratorFwd, opts, deps.EventPlatform, nil, deps.Compressor, deps.Tagger, "") batch := testDemuxSamples(t) @@ -87,7 +87,7 @@ func TestDemuxNoAggOptionEnabled(t *testing.T) { mockSerializer.On("AreSketchesEnabled").Return(true) opts.EnableNoAggregationPipeline = true deps := createDemultiplexerAgentTestDeps(t) - demux := initAgentDemultiplexer(deps.Log, NewForwarderTest(deps.Log), deps.OrchestratorFwd, opts, deps.EventPlatform, deps.Compressor, deps.Tagger, "") + demux := initAgentDemultiplexer(deps.Log, NewForwarderTest(deps.Log), deps.OrchestratorFwd, opts, deps.EventPlatform, nil, deps.Compressor, deps.Tagger, "") demux.statsd.noAggStreamWorker.serializer = mockSerializer // the no agg pipeline will use our mocked serializer go demux.run() diff --git a/pkg/aggregator/demultiplexer_mock.go b/pkg/aggregator/demultiplexer_mock.go index 4e9f6ea5c8d118..a601f4f65bbcac 100644 --- a/pkg/aggregator/demultiplexer_mock.go +++ b/pkg/aggregator/demultiplexer_mock.go @@ -33,5 +33,5 @@ type TestDeps struct { func InitAndStartAgentDemultiplexerForTest(deps TestDeps, options AgentDemultiplexerOptions, hostname string) *AgentDemultiplexer { orchestratorForwarder := optional.NewOption[defaultforwarder.Forwarder](defaultforwarder.NoopForwarder{}) eventPlatformForwarder := optional.NewOptionPtr[eventplatform.Forwarder](eventplatformimpl.NewNoopEventPlatformForwarder(deps.Hostname)) - return InitAndStartAgentDemultiplexer(deps.Log, deps.SharedForwarder, &orchestratorForwarder, options, eventPlatformForwarder, deps.Compressor, nooptagger.NewTaggerClient(), hostname) + return InitAndStartAgentDemultiplexer(deps.Log, deps.SharedForwarder, &orchestratorForwarder, options, eventPlatformForwarder, nil, deps.Compressor, nooptagger.NewTaggerClient(), hostname) } diff --git a/pkg/aggregator/demultiplexer_test.go b/pkg/aggregator/demultiplexer_test.go index 03e668898d9060..6ca3083052a00e 100644 --- a/pkg/aggregator/demultiplexer_test.go +++ b/pkg/aggregator/demultiplexer_test.go @@ -173,7 +173,7 @@ func TestDemuxFlushAggregatorToSerializer(t *testing.T) { opts := demuxTestOptions() opts.FlushInterval = time.Hour deps := createDemuxDeps(t, opts, eventplatformimpl.NewDefaultParams()) - demux := initAgentDemultiplexer(deps.Log, deps.SharedForwarder, deps.OrchestratorFwd, opts, deps.EventPlatformFwd, deps.Compressor, nooptagger.NewTaggerClient(), "") + demux := initAgentDemultiplexer(deps.Log, deps.SharedForwarder, deps.OrchestratorFwd, opts, deps.EventPlatformFwd, nil, deps.Compressor, nooptagger.NewTaggerClient(), "") demux.Aggregator().tlmContainerTagsEnabled = false require.NotNil(demux) require.NotNil(demux.aggregator) @@ -300,7 +300,7 @@ func createDemuxDepsWithOrchestratorFwd( return aggregatorDeps{ TestDeps: deps.TestDeps, - Demultiplexer: InitAndStartAgentDemultiplexer(deps.Log, deps.SharedForwarder, deps.OrchestratorForwarder, opts, deps.Eventplatform, deps.Compressor, nooptagger.NewTaggerClient(), ""), + Demultiplexer: InitAndStartAgentDemultiplexer(deps.Log, deps.SharedForwarder, deps.OrchestratorForwarder, opts, deps.Eventplatform, nil, deps.Compressor, nooptagger.NewTaggerClient(), ""), OrchestratorFwd: deps.OrchestratorForwarder, EventPlatformFwd: deps.Eventplatform, } diff --git a/pkg/aggregator/mocksender/mocksender.go b/pkg/aggregator/mocksender/mocksender.go index f1f9155430f8c5..13c1a3aeca8df7 100644 --- a/pkg/aggregator/mocksender/mocksender.go +++ b/pkg/aggregator/mocksender/mocksender.go @@ -43,7 +43,7 @@ func CreateDefaultDemultiplexer() *aggregator.AgentDemultiplexer { orchestratorForwarder := optional.NewOption[defaultforwarder.Forwarder](defaultforwarder.NoopForwarder{}) eventPlatformForwarder := optional.NewOptionPtr[eventplatform.Forwarder](eventplatformimpl.NewNoopEventPlatformForwarder(hostnameimpl.NewHostnameService())) taggerComponent := nooptagger.NewTaggerClient() - return aggregator.InitAndStartAgentDemultiplexer(log, sharedForwarder, &orchestratorForwarder, opts, eventPlatformForwarder, compressionimpl.NewMockCompressor(), taggerComponent, "") + return aggregator.InitAndStartAgentDemultiplexer(log, sharedForwarder, &orchestratorForwarder, opts, eventPlatformForwarder, nil, compressionimpl.NewMockCompressor(), taggerComponent, "") } diff --git a/pkg/aggregator/sender_test.go b/pkg/aggregator/sender_test.go index 19b6a01b74c476..fa4cf49752e710 100644 --- a/pkg/aggregator/sender_test.go +++ b/pkg/aggregator/sender_test.go @@ -58,7 +58,7 @@ func testDemux(log log.Component, hostname hostname.Component) *AgentDemultiplex opts.DontStartForwarders = true orchestratorForwarder := optional.NewOption[defaultforwarder.Forwarder](defaultforwarder.NoopForwarder{}) eventPlatformForwarder := optional.NewOptionPtr[eventplatform.Forwarder](eventplatformimpl.NewNoopEventPlatformForwarder(hostname)) - demux := initAgentDemultiplexer(log, NewForwarderTest(log), &orchestratorForwarder, opts, eventPlatformForwarder, compressionimpl.NewMockCompressor(), nooptagger.NewTaggerClient(), defaultHostname) + demux := initAgentDemultiplexer(log, NewForwarderTest(log), &orchestratorForwarder, opts, eventPlatformForwarder, nil, compressionimpl.NewMockCompressor(), nooptagger.NewTaggerClient(), defaultHostname) return demux } diff --git a/pkg/collector/corechecks/snmp/integration_profile_bundle_test.go b/pkg/collector/corechecks/snmp/integration_profile_bundle_test.go index 519e8a5ee8e7f5..081a8b3974f32c 100644 --- a/pkg/collector/corechecks/snmp/integration_profile_bundle_test.go +++ b/pkg/collector/corechecks/snmp/integration_profile_bundle_test.go @@ -25,7 +25,7 @@ import ( func TestProfileBundleJsonZip(t *testing.T) { timeNow = common.MockTimeNow - aggregator.NewBufferedAggregator(nil, nil, nooptagger.NewTaggerClient(), "", 1*time.Hour) + aggregator.NewBufferedAggregator(nil, nil, nil, nooptagger.NewTaggerClient(), "", 1*time.Hour) invalidPath, _ := filepath.Abs(filepath.Join("internal", "test", "zipprofiles.d")) pkgconfigsetup.Datadog().SetWithoutSource("confd_path", invalidPath) diff --git a/pkg/collector/corechecks/snmp/integration_profile_metadata_test.go b/pkg/collector/corechecks/snmp/integration_profile_metadata_test.go index 22cb6e3f26c88c..811841e8885fef 100644 --- a/pkg/collector/corechecks/snmp/integration_profile_metadata_test.go +++ b/pkg/collector/corechecks/snmp/integration_profile_metadata_test.go @@ -32,7 +32,7 @@ import ( func TestProfileMetadata_f5(t *testing.T) { timeNow = common.MockTimeNow - aggregator.NewBufferedAggregator(nil, nil, nooptagger.NewTaggerClient(), "", 1*time.Hour) + aggregator.NewBufferedAggregator(nil, nil, nil, nooptagger.NewTaggerClient(), "", 1*time.Hour) invalidPath, _ := filepath.Abs(filepath.Join("internal", "test", "metadata.d")) pkgconfigsetup.Datadog().SetWithoutSource("confd_path", invalidPath) diff --git a/pkg/collector/corechecks/snmp/integration_topology_test.go b/pkg/collector/corechecks/snmp/integration_topology_test.go index 65915af08a0050..cad47f7e7d1d48 100644 --- a/pkg/collector/corechecks/snmp/integration_topology_test.go +++ b/pkg/collector/corechecks/snmp/integration_topology_test.go @@ -32,7 +32,7 @@ import ( func TestTopologyPayload_LLDP(t *testing.T) { timeNow = common.MockTimeNow - aggregator.NewBufferedAggregator(nil, nil, nooptagger.NewTaggerClient(), "", 1*time.Hour) + aggregator.NewBufferedAggregator(nil, nil, nil, nooptagger.NewTaggerClient(), "", 1*time.Hour) invalidPath, _ := filepath.Abs(filepath.Join("internal", "test", "metadata.d")) pkgconfigsetup.Datadog().SetWithoutSource("confd_path", invalidPath) @@ -734,7 +734,7 @@ profiles: func TestTopologyPayload_CDP(t *testing.T) { timeNow = common.MockTimeNow - aggregator.NewBufferedAggregator(nil, nil, nooptagger.NewTaggerClient(), "", 1*time.Hour) + aggregator.NewBufferedAggregator(nil, nil, nil, nooptagger.NewTaggerClient(), "", 1*time.Hour) invalidPath, _ := filepath.Abs(filepath.Join("internal", "test", "metadata.d")) pkgconfigsetup.Datadog().SetWithoutSource("confd_path", invalidPath) @@ -1427,7 +1427,7 @@ profiles: // we have different data for LLDP and CDP to test that we're only using LLDP to build the links func TestTopologyPayload_LLDP_CDP(t *testing.T) { timeNow = common.MockTimeNow - aggregator.NewBufferedAggregator(nil, nil, nooptagger.NewTaggerClient(), "", 1*time.Hour) + aggregator.NewBufferedAggregator(nil, nil, nil, nooptagger.NewTaggerClient(), "", 1*time.Hour) invalidPath, _ := filepath.Abs(filepath.Join("internal", "test", "metadata.d")) pkgconfigsetup.Datadog().SetWithoutSource("confd_path", invalidPath) diff --git a/pkg/collector/corechecks/snmp/internal/checkconfig/config_test.go b/pkg/collector/corechecks/snmp/internal/checkconfig/config_test.go index cffed1f43f099c..318b250afa3434 100644 --- a/pkg/collector/corechecks/snmp/internal/checkconfig/config_test.go +++ b/pkg/collector/corechecks/snmp/internal/checkconfig/config_test.go @@ -25,7 +25,7 @@ import ( func TestConfigurations(t *testing.T) { profile.SetConfdPathAndCleanProfiles() - aggregator.NewBufferedAggregator(nil, nil, nooptagger.NewTaggerClient(), "", 1*time.Hour) + aggregator.NewBufferedAggregator(nil, nil, nil, nooptagger.NewTaggerClient(), "", 1*time.Hour) // language=yaml rawInstanceConfig := []byte(` @@ -326,7 +326,7 @@ profiles: func TestInlineProfileConfiguration(t *testing.T) { profile.SetConfdPathAndCleanProfiles() - aggregator.NewBufferedAggregator(nil, nil, nooptagger.NewTaggerClient(), "", 1*time.Hour) + aggregator.NewBufferedAggregator(nil, nil, nil, nooptagger.NewTaggerClient(), "", 1*time.Hour) // language=yaml rawInstanceConfig := []byte(` diff --git a/pkg/collector/corechecks/systemd/systemd_test.go b/pkg/collector/corechecks/systemd/systemd_test.go index e473b47a8875a3..65eadccd0d733e 100644 --- a/pkg/collector/corechecks/systemd/systemd_test.go +++ b/pkg/collector/corechecks/systemd/systemd_test.go @@ -1087,7 +1087,7 @@ unit_names: func TestCheckID(t *testing.T) { check1 := newCheck() check2 := newCheck() - aggregator.NewBufferedAggregator(nil, nil, nooptagger.NewTaggerClient(), "", 1*time.Hour) + aggregator.NewBufferedAggregator(nil, nil, nil, nooptagger.NewTaggerClient(), "", 1*time.Hour) // language=yaml rawInstanceConfig1 := []byte(` diff --git a/test/benchmarks/kubernetes_state/main.go b/test/benchmarks/kubernetes_state/main.go index 08b83d358855e7..a1ea844898a9ba 100644 --- a/test/benchmarks/kubernetes_state/main.go +++ b/test/benchmarks/kubernetes_state/main.go @@ -207,7 +207,7 @@ func main() { * As it has a `nil` serializer, it will panic if it tries to flush the metrics. * That’s why we need a big enough flush interval */ - aggregator.NewBufferedAggregator(nil, "", 1*time.Hour) + aggregator.NewBufferedAggregator(nil, "", nil, 1*time.Hour) /* * Wait for informers to get populated