From b80f86c8411b2bef986860e2498637027f39e113 Mon Sep 17 00:00:00 2001 From: Gustavo Caso Date: Thu, 28 Nov 2024 14:48:37 +0100 Subject: [PATCH] Revert "[ASCII-2562] Migrate forward eventplatform component to the latest file structure" (#31563) --- cmd/agent/subcommands/run/command.go | 4 +- cmd/agent/subcommands/snmp/command.go | 6 +- .../subcommands/run/command.go | 4 +- .../subcommands/start/command.go | 4 +- cmd/dogstatsd/subcommands/start/command.go | 4 +- cmd/process-agent/command/main_common.go | 4 +- cmd/process-agent/subcommands/check/check.go | 4 +- cmd/serverless-init/metric/metric_test.go | 3 +- comp/agent/bundle_test.go | 4 +- comp/aggregator/bundle_test.go | 4 +- .../demultiplexerimpl/demultiplexer.go | 2 +- .../demultiplexer_fake_sampler_mock.go | 4 +- .../demultiplexerimpl/status_test.go | 4 +- .../test_agent_demultiplexer.go | 11 +- .../sendermanager.go | 5 +- .../collectorimpl/collector_demux_test.go | 3 +- .../eventplatform/{def => }/component.go | 8 +- .../{mock => }/component_mock.go | 7 +- .../epforwarder.go | 64 ++++++++--- .../eventplatformimpl/epforwarder_mock.go | 27 +++++ .../eventplatformimpl/epforwarder_mockgen.go | 101 ++++++++++++++++++ .../eventplatform/eventplatformimpl/params.go | 22 ++++ comp/forwarder/eventplatform/fx-mock/fx.go | 21 ---- comp/forwarder/eventplatform/fx-noop/fx.go | 21 ---- comp/forwarder/eventplatform/fx/fx.go | 17 --- .../forwarder/eventplatform/impl-noop/noop.go | 39 ------- .../eventplatform/mock/epforwarder_mock.go | 36 ------- .../eventplatform/mock/epforwarder_mockgen.go | 77 ------------- .../eventplatformreceiverimpl/format.go | 2 +- comp/ndmtmp/bundle_test.go | 7 +- comp/ndmtmp/forwarder/component.go | 4 +- comp/ndmtmp/forwarder/component_mock.go | 4 +- .../forwarder/forwarderimpl/forwarder.go | 5 +- .../forwarder/forwarderimpl/forwarder_mock.go | 9 +- comp/netflow/flowaggregator/aggregator.go | 6 +- .../netflow/flowaggregator/aggregator_test.go | 22 ++-- comp/netflow/server/integration_test.go | 7 +- comp/netflow/testutil/testutil.go | 2 +- comp/networkpath/bundle_test.go | 4 +- .../npcollectorimpl/npcollector.go | 6 +- .../npcollectorimpl/npcollector_test.go | 6 +- .../npcollectorimpl/npcollector_testutils.go | 11 +- .../npcollectorimpl/npcollectorcomp.go | 30 +++--- comp/process/bundle_test.go | 4 +- comp/snmpscan/impl/devicescan.go | 5 +- comp/snmpscan/impl/snmpscan.go | 9 +- .../forwarder/forwarderimpl/forwarder.go | 2 +- .../forwarder/forwarderimpl/forwarder_test.go | 2 +- comp/snmptraps/status/statusimpl/status.go | 2 +- .../status/statusimpl/status_test.go | 2 +- pkg/aggregator/aggregator.go | 23 +++- pkg/aggregator/aggregator_test.go | 2 +- pkg/aggregator/check_sampler_bench_test.go | 6 +- pkg/aggregator/demultiplexer_agent.go | 6 +- pkg/aggregator/demultiplexer_agent_test.go | 6 +- pkg/aggregator/demultiplexer_mock.go | 6 +- pkg/aggregator/demultiplexer_test.go | 66 ++++++++---- pkg/aggregator/mocksender/mocksender.go | 10 +- pkg/aggregator/sender_test.go | 20 ++-- pkg/cli/subcommands/check/command.go | 7 +- .../corechecks/containerimage/processor.go | 2 +- .../containerimage/processor_test.go | 2 +- .../containerlifecycle/processor.go | 2 +- .../cisco-sdwan/report/metadata.go | 2 +- .../corechecks/networkpath/networkpath.go | 2 +- pkg/collector/corechecks/sbom/processor.go | 2 +- .../corechecks/sbom/processor_test.go | 2 +- .../corechecks/servicediscovery/events.go | 2 +- .../internal/report/report_device_metadata.go | 2 +- pkg/diagnose/runner.go | 4 +- .../invocationlifecycle/lifecycle_test.go | 3 +- pkg/serverless/logs/logs_test.go | 3 +- .../metrics/enhanced_metrics_test.go | 3 +- tasks/components.py | 1 + 74 files changed, 445 insertions(+), 400 deletions(-) rename comp/forwarder/eventplatform/{def => }/component.go (91%) rename comp/forwarder/eventplatform/{mock => }/component_mock.go (60%) rename comp/forwarder/eventplatform/{impl => eventplatformimpl}/epforwarder.go (90%) create mode 100644 comp/forwarder/eventplatform/eventplatformimpl/epforwarder_mock.go create mode 100644 comp/forwarder/eventplatform/eventplatformimpl/epforwarder_mockgen.go create mode 100644 comp/forwarder/eventplatform/eventplatformimpl/params.go delete mode 100644 comp/forwarder/eventplatform/fx-mock/fx.go delete mode 100644 comp/forwarder/eventplatform/fx-noop/fx.go delete mode 100644 comp/forwarder/eventplatform/fx/fx.go delete mode 100644 comp/forwarder/eventplatform/impl-noop/noop.go delete mode 100644 comp/forwarder/eventplatform/mock/epforwarder_mock.go delete mode 100644 comp/forwarder/eventplatform/mock/epforwarder_mockgen.go diff --git a/cmd/agent/subcommands/run/command.go b/cmd/agent/subcommands/run/command.go index 88c47aa47ef3a..9c126f014aab2 100644 --- a/cmd/agent/subcommands/run/command.go +++ b/cmd/agent/subcommands/run/command.go @@ -89,7 +89,7 @@ import ( dogstatsdStatusimpl "github.com/DataDog/datadog-agent/comp/dogstatsd/status/statusimpl" "github.com/DataDog/datadog-agent/comp/forwarder" "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder" - eventplatformfx "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/fx" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/eventplatformimpl" "github.com/DataDog/datadog-agent/comp/forwarder/eventplatformreceiver/eventplatformreceiverimpl" orchestratorForwarderImpl "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator/orchestratorimpl" langDetectionCl "github.com/DataDog/datadog-agent/comp/languagedetection/client" @@ -424,7 +424,7 @@ func getSharedFxOption() fx.Option { langDetectionClimpl.Module(), metadata.Bundle(), orchestratorForwarderImpl.Module(orchestratorForwarderImpl.NewDefaultParams()), - eventplatformfx.Module(), + eventplatformimpl.Module(eventplatformimpl.NewDefaultParams()), eventplatformreceiverimpl.Module(), // injecting the shared Serializer to FX until we migrate it to a proper component. This allows other diff --git a/cmd/agent/subcommands/snmp/command.go b/cmd/agent/subcommands/snmp/command.go index 516818867c618..4dba953403985 100644 --- a/cmd/agent/subcommands/snmp/command.go +++ b/cmd/agent/subcommands/snmp/command.go @@ -23,7 +23,7 @@ import ( nooptagger "github.com/DataDog/datadog-agent/comp/core/tagger/fx-noop" "github.com/DataDog/datadog-agent/comp/forwarder" "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder" - eventplatformfx "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/fx" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/eventplatformimpl" "github.com/DataDog/datadog-agent/comp/forwarder/eventplatformreceiver/eventplatformreceiverimpl" "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator/orchestratorimpl" haagentfx "github.com/DataDog/datadog-agent/comp/haagent/fx" @@ -97,7 +97,7 @@ func Commands(globalParams *command.GlobalParams) []*cobra.Command { demultiplexerimpl.Module(demultiplexerimpl.NewDefaultParams()), forwarder.Bundle(defaultforwarder.NewParams(defaultforwarder.WithFeatures(defaultforwarder.CoreFeatures))), orchestratorimpl.Module(orchestratorimpl.NewDefaultParams()), - eventplatformfx.Module(), + eventplatformimpl.Module(eventplatformimpl.NewDefaultParams()), compressionfx.Module(), nooptagger.Module(), eventplatformreceiverimpl.Module(), @@ -160,7 +160,7 @@ func Commands(globalParams *command.GlobalParams) []*cobra.Command { aggregator.Bundle(demultiplexerimpl.NewDefaultParams()), orchestratorimpl.Module(orchestratorimpl.NewDefaultParams()), forwarder.Bundle(defaultforwarder.NewParams(defaultforwarder.WithFeatures(defaultforwarder.CoreFeatures))), - eventplatformfx.Module(), + eventplatformimpl.Module(eventplatformimpl.NewDefaultParams()), eventplatformreceiverimpl.Module(), compressionfx.Module(), nooptagger.Module(), diff --git a/cmd/cluster-agent-cloudfoundry/subcommands/run/command.go b/cmd/cluster-agent-cloudfoundry/subcommands/run/command.go index 9aa1076c8ec4a..b16485b36f9e4 100644 --- a/cmd/cluster-agent-cloudfoundry/subcommands/run/command.go +++ b/cmd/cluster-agent-cloudfoundry/subcommands/run/command.go @@ -46,7 +46,7 @@ import ( workloadmetafx "github.com/DataDog/datadog-agent/comp/core/workloadmeta/fx" "github.com/DataDog/datadog-agent/comp/forwarder" "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder" - eventplatformfxnoop "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/fx-noop" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/eventplatformimpl" "github.com/DataDog/datadog-agent/comp/forwarder/eventplatformreceiver/eventplatformreceiverimpl" orchestratorForwarderImpl "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator/orchestratorimpl" haagentfx "github.com/DataDog/datadog-agent/comp/haagent/fx" @@ -89,7 +89,7 @@ func Commands(globalParams *command.GlobalParams) []*cobra.Command { compressionfx.Module(), demultiplexerimpl.Module(demultiplexerimpl.NewDefaultParams()), orchestratorForwarderImpl.Module(orchestratorForwarderImpl.NewDisabledParams()), - eventplatformfxnoop.Module(), + eventplatformimpl.Module(eventplatformimpl.NewDisabledParams()), eventplatformreceiverimpl.Module(), // setup workloadmeta diff --git a/cmd/cluster-agent/subcommands/start/command.go b/cmd/cluster-agent/subcommands/start/command.go index 98a671dad950d..f85419780b657 100644 --- a/cmd/cluster-agent/subcommands/start/command.go +++ b/cmd/cluster-agent/subcommands/start/command.go @@ -58,7 +58,7 @@ import ( workloadmetafx "github.com/DataDog/datadog-agent/comp/core/workloadmeta/fx" "github.com/DataDog/datadog-agent/comp/forwarder" "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder" - eventplatformfxnoop "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/fx-noop" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/eventplatformimpl" "github.com/DataDog/datadog-agent/comp/forwarder/eventplatformreceiver/eventplatformreceiverimpl" orchestratorForwarderImpl "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator/orchestratorimpl" haagentfx "github.com/DataDog/datadog-agent/comp/haagent/fx" @@ -141,7 +141,7 @@ func Commands(globalParams *command.GlobalParams) []*cobra.Command { compressionfx.Module(), demultiplexerimpl.Module(demultiplexerimpl.NewDefaultParams()), orchestratorForwarderImpl.Module(orchestratorForwarderImpl.NewDefaultParams()), - eventplatformfxnoop.Module(), + eventplatformimpl.Module(eventplatformimpl.NewDisabledParams()), eventplatformreceiverimpl.Module(), // setup workloadmeta wmcatalog.GetCatalog(), diff --git a/cmd/dogstatsd/subcommands/start/command.go b/cmd/dogstatsd/subcommands/start/command.go index ea7fe59c8051a..245ff9139caff 100644 --- a/cmd/dogstatsd/subcommands/start/command.go +++ b/cmd/dogstatsd/subcommands/start/command.go @@ -40,7 +40,7 @@ import ( dogstatsdServer "github.com/DataDog/datadog-agent/comp/dogstatsd/server" "github.com/DataDog/datadog-agent/comp/forwarder" "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder" - eventplatformfxnoop "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/fx-noop" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/eventplatformimpl" "github.com/DataDog/datadog-agent/comp/forwarder/eventplatformreceiver/eventplatformreceiverimpl" orchestratorForwarderImpl "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator/orchestratorimpl" haagentfx "github.com/DataDog/datadog-agent/comp/haagent/fx" @@ -149,7 +149,7 @@ func RunDogstatsdFct(cliParams *CLIParams, defaultConfPath string, defaultLogFil )), secretsimpl.Module(), orchestratorForwarderImpl.Module(orchestratorForwarderImpl.NewDisabledParams()), - eventplatformfxnoop.Module(), + eventplatformimpl.Module(eventplatformimpl.NewDisabledParams()), eventplatformreceiverimpl.Module(), hostnameimpl.Module(), localTaggerfx.Module(tagger.Params{}), diff --git a/cmd/process-agent/command/main_common.go b/cmd/process-agent/command/main_common.go index 5c8d983e90b95..188c9684fd30b 100644 --- a/cmd/process-agent/command/main_common.go +++ b/cmd/process-agent/command/main_common.go @@ -38,7 +38,7 @@ import ( workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def" workloadmetafx "github.com/DataDog/datadog-agent/comp/core/workloadmeta/fx" compstatsd "github.com/DataDog/datadog-agent/comp/dogstatsd/statsd" - eventplatformfx "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/fx" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/eventplatformimpl" "github.com/DataDog/datadog-agent/comp/forwarder/eventplatformreceiver/eventplatformreceiverimpl" hostMetadataUtils "github.com/DataDog/datadog-agent/comp/metadata/host/hostimpl/utils" "github.com/DataDog/datadog-agent/comp/networkpath" @@ -134,7 +134,7 @@ func runApp(ctx context.Context, globalParams *GlobalParams) error { process.Bundle(), eventplatformreceiverimpl.Module(), - eventplatformfx.Module(), + eventplatformimpl.Module(eventplatformimpl.NewDefaultParams()), // Provides the rdnssquerier module rdnsquerierfx.Module(), diff --git a/cmd/process-agent/subcommands/check/check.go b/cmd/process-agent/subcommands/check/check.go index 5f88b17cb11ef..ad2181d4ac90c 100644 --- a/cmd/process-agent/subcommands/check/check.go +++ b/cmd/process-agent/subcommands/check/check.go @@ -32,7 +32,7 @@ import ( wmcatalog "github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/catalog" workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def" workloadmetafx "github.com/DataDog/datadog-agent/comp/core/workloadmeta/fx" - eventplatformfx "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/fx" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/eventplatformimpl" "github.com/DataDog/datadog-agent/comp/forwarder/eventplatformreceiver/eventplatformreceiverimpl" hostMetadataUtils "github.com/DataDog/datadog-agent/comp/metadata/host/hostimpl/utils" "github.com/DataDog/datadog-agent/comp/networkpath/npcollector" @@ -127,7 +127,7 @@ func MakeCommand(globalParamsGetter func() *command.GlobalParams, name string, a // Provide eventplatformimpl module eventplatformreceiverimpl.Module(), - eventplatformfx.Module(), + eventplatformimpl.Module(eventplatformimpl.NewDefaultParams()), // Provide rdnsquerier module rdnsquerierfx.Module(), diff --git a/cmd/serverless-init/metric/metric_test.go b/cmd/serverless-init/metric/metric_test.go index 874e10c13316d..facd4acabca0c 100644 --- a/cmd/serverless-init/metric/metric_test.go +++ b/cmd/serverless-init/metric/metric_test.go @@ -14,6 +14,7 @@ import ( "github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer" "github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer/demultiplexerimpl" + "github.com/DataDog/datadog-agent/comp/core/hostname/hostnameimpl" log "github.com/DataDog/datadog-agent/comp/core/log/def" logmock "github.com/DataDog/datadog-agent/comp/core/log/mock" compressionmock "github.com/DataDog/datadog-agent/comp/serializer/compression/fx-mock" @@ -64,5 +65,5 @@ func TestAddShutdownMetric(t *testing.T) { } func createDemultiplexer(t *testing.T) demultiplexer.FakeSamplerMock { - return fxutil.Test[demultiplexer.FakeSamplerMock](t, fx.Provide(func() log.Component { return logmock.New(t) }), compressionmock.MockModule(), demultiplexerimpl.FakeSamplerMockModule()) + return fxutil.Test[demultiplexer.FakeSamplerMock](t, fx.Provide(func() log.Component { return logmock.New(t) }), compressionmock.MockModule(), demultiplexerimpl.FakeSamplerMockModule(), hostnameimpl.MockModule()) } diff --git a/comp/agent/bundle_test.go b/comp/agent/bundle_test.go index b392cba7ca396..8a1820206ee81 100644 --- a/comp/agent/bundle_test.go +++ b/comp/agent/bundle_test.go @@ -14,7 +14,7 @@ import ( workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def" workloadmetafxmock "github.com/DataDog/datadog-agent/comp/core/workloadmeta/fx-mock" "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder" - eventplatformmock "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/fx-mock" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/eventplatformimpl" "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator/orchestratorimpl" compressionmock "github.com/DataDog/datadog-agent/comp/serializer/compression/fx-mock" "github.com/DataDog/datadog-agent/pkg/util/fxutil" @@ -27,7 +27,7 @@ func TestBundleDependencies(t *testing.T) { compressionmock.MockModule(), defaultforwarder.MockModule(), orchestratorimpl.MockModule(), - eventplatformmock.MockModule(), + eventplatformimpl.MockModule(), demultiplexerimpl.Module(demultiplexerimpl.NewDefaultParams()), workloadmetafxmock.MockModule(workloadmeta.NewParams()), ) diff --git a/comp/aggregator/bundle_test.go b/comp/aggregator/bundle_test.go index 0b4b33bae168d..2a38d5a0ac603 100644 --- a/comp/aggregator/bundle_test.go +++ b/comp/aggregator/bundle_test.go @@ -12,7 +12,7 @@ import ( "github.com/DataDog/datadog-agent/comp/core" nooptagger "github.com/DataDog/datadog-agent/comp/core/tagger/fx-noop" "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder" - eventplatformmock "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/fx-mock" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/eventplatformimpl" orchestratorForwarderImpl "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator/orchestratorimpl" haagentmock "github.com/DataDog/datadog-agent/comp/haagent/mock" compressionmock "github.com/DataDog/datadog-agent/comp/serializer/compression/fx-mock" @@ -25,7 +25,7 @@ func TestBundleDependencies(t *testing.T) { compressionmock.MockModule(), defaultforwarder.MockModule(), orchestratorForwarderImpl.MockModule(), - eventplatformmock.MockModule(), + eventplatformimpl.MockModule(), nooptagger.Module(), haagentmock.Module(), ) diff --git a/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer.go b/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer.go index 0b4dba5fafbb9..b6b1dd7d56310 100644 --- a/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer.go +++ b/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer.go @@ -18,7 +18,7 @@ import ( "github.com/DataDog/datadog-agent/comp/core/status" tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def" "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder" - eventplatform "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/def" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" orchestratorforwarder "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator" haagent "github.com/DataDog/datadog-agent/comp/haagent/def" compression "github.com/DataDog/datadog-agent/comp/serializer/compression/def" diff --git a/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer_fake_sampler_mock.go b/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer_fake_sampler_mock.go index e74314cf03f41..53e3beb5f77d6 100644 --- a/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer_fake_sampler_mock.go +++ b/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer_fake_sampler_mock.go @@ -14,6 +14,7 @@ import ( "go.uber.org/fx" demultiplexerComp "github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer" + "github.com/DataDog/datadog-agent/comp/core/hostname" log "github.com/DataDog/datadog-agent/comp/core/log/def" compression "github.com/DataDog/datadog-agent/comp/serializer/compression/def" "github.com/DataDog/datadog-agent/pkg/aggregator" @@ -34,6 +35,7 @@ type fakeSamplerMockDependencies struct { fx.In Lc fx.Lifecycle Log log.Component + Hostname hostname.Component Compressor compression.Component } @@ -54,7 +56,7 @@ func (f *fakeSamplerMock) Stop(flush bool) { } func newFakeSamplerMock(deps fakeSamplerMockDependencies) demultiplexerComp.FakeSamplerMock { - demux := initTestAgentDemultiplexerWithFlushInterval(deps.Log, deps.Compressor, time.Hour) + demux := initTestAgentDemultiplexerWithFlushInterval(deps.Log, deps.Hostname, deps.Compressor, time.Hour) mock := &fakeSamplerMock{ TestAgentDemultiplexer: demux, } diff --git a/comp/aggregator/demultiplexer/demultiplexerimpl/status_test.go b/comp/aggregator/demultiplexer/demultiplexerimpl/status_test.go index c22c827126590..0e60ca002a79e 100644 --- a/comp/aggregator/demultiplexer/demultiplexerimpl/status_test.go +++ b/comp/aggregator/demultiplexer/demultiplexerimpl/status_test.go @@ -18,7 +18,7 @@ import ( tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def" taggerMock "github.com/DataDog/datadog-agent/comp/core/tagger/mock" "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder" - eventplatformmock "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/fx-mock" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/eventplatformimpl" "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator/orchestratorimpl" haagentmock "github.com/DataDog/datadog-agent/comp/haagent/mock" compressionmock "github.com/DataDog/datadog-agent/comp/serializer/compression/fx-mock" @@ -64,7 +64,7 @@ func TestStatusOutPut(t *testing.T) { defaultforwarder.MockModule(), haagentmock.Module(), orchestratorimpl.MockModule(), - eventplatformmock.MockModule(), + eventplatformimpl.MockModule(), fx.Provide(func() tagger.Component { return mockTagger }), diff --git a/comp/aggregator/demultiplexer/demultiplexerimpl/test_agent_demultiplexer.go b/comp/aggregator/demultiplexer/demultiplexerimpl/test_agent_demultiplexer.go index f33ab36b0bf08..431bcdb8c06e2 100644 --- a/comp/aggregator/demultiplexer/demultiplexerimpl/test_agent_demultiplexer.go +++ b/comp/aggregator/demultiplexer/demultiplexerimpl/test_agent_demultiplexer.go @@ -12,11 +12,12 @@ import ( "sync" "time" + "github.com/DataDog/datadog-agent/comp/core/hostname" log "github.com/DataDog/datadog-agent/comp/core/log/def" noopimpl "github.com/DataDog/datadog-agent/comp/core/tagger/impl-noop" "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder" - eventplatform "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/def" - eventplatformock "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/mock" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/eventplatformimpl" haagentmock "github.com/DataDog/datadog-agent/comp/haagent/mock" compression "github.com/DataDog/datadog-agent/comp/serializer/compression/def" "github.com/DataDog/datadog-agent/pkg/aggregator" @@ -67,7 +68,7 @@ func (a *TestAgentDemultiplexer) AggregateSample(sample metrics.MetricSample) { } // GetEventPlatformForwarder returns a event platform forwarder -func (a *TestAgentDemultiplexer) GetEventPlatformForwarder() eventplatform.Component { +func (a *TestAgentDemultiplexer) GetEventPlatformForwarder() (eventplatform.Forwarder, error) { return a.AgentDemultiplexer.GetEventPlatformForwarder() } @@ -175,7 +176,7 @@ func (a *TestAgentDemultiplexer) Reset() { } // initTestAgentDemultiplexerWithFlushInterval inits a TestAgentDemultiplexer with the given flush interval. -func initTestAgentDemultiplexerWithFlushInterval(log log.Component, compressor compression.Component, flushInterval time.Duration) *TestAgentDemultiplexer { +func initTestAgentDemultiplexerWithFlushInterval(log log.Component, hostname hostname.Component, compressor compression.Component, flushInterval time.Duration) *TestAgentDemultiplexer { opts := aggregator.DefaultAgentDemultiplexerOptions() opts.FlushInterval = flushInterval opts.DontStartForwarders = true @@ -184,7 +185,7 @@ func initTestAgentDemultiplexerWithFlushInterval(log log.Component, compressor c sharedForwarderOptions := defaultforwarder.NewOptions(pkgconfigsetup.Datadog(), log, nil) sharedForwarder := defaultforwarder.NewDefaultForwarder(pkgconfigsetup.Datadog(), log, sharedForwarderOptions) orchestratorForwarder := optional.NewOption[defaultforwarder.Forwarder](defaultforwarder.NoopForwarder{}) - eventPlatformForwarder := eventplatformock.NewMock() + eventPlatformForwarder := optional.NewOptionPtr[eventplatform.Forwarder](eventplatformimpl.NewNoopEventPlatformForwarder(hostname)) demux := aggregator.InitAndStartAgentDemultiplexer(log, sharedForwarder, &orchestratorForwarder, opts, eventPlatformForwarder, haagentmock.NewMockHaAgent(), compressor, noopimpl.NewComponent(), "hostname") return NewTestAgentDemultiplexer(demux) } diff --git a/comp/aggregator/diagnosesendermanager/diagnosesendermanagerimpl/sendermanager.go b/comp/aggregator/diagnosesendermanager/diagnosesendermanagerimpl/sendermanager.go index afb0a6c869105..9e3c058314c79 100644 --- a/comp/aggregator/diagnosesendermanager/diagnosesendermanagerimpl/sendermanager.go +++ b/comp/aggregator/diagnosesendermanager/diagnosesendermanagerimpl/sendermanager.go @@ -17,7 +17,8 @@ import ( log "github.com/DataDog/datadog-agent/comp/core/log/def" tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def" "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder" - eventplatformnoop "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/impl-noop" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/eventplatformimpl" haagent "github.com/DataDog/datadog-agent/comp/haagent/def" compression "github.com/DataDog/datadog-agent/comp/serializer/compression/def" "github.com/DataDog/datadog-agent/pkg/aggregator" @@ -73,7 +74,7 @@ func (sender *diagnoseSenderManager) LazyGetSenderManager() (sender.SenderManage haAgent := sender.deps.HaAgent forwarder := defaultforwarder.NewDefaultForwarder(config, log, defaultforwarder.NewOptions(config, log, nil)) orchestratorForwarder := optional.NewOptionPtr[defaultforwarder.Forwarder](defaultforwarder.NoopForwarder{}) - eventPlatformForwarder := eventplatformnoop.NewComponent() + eventPlatformForwarder := optional.NewOptionPtr[eventplatform.Forwarder](eventplatformimpl.NewNoopEventPlatformForwarder(sender.deps.Hostname)) senderManager = aggregator.InitAndStartAgentDemultiplexer( log, forwarder, diff --git a/comp/collector/collector/collectorimpl/collector_demux_test.go b/comp/collector/collector/collectorimpl/collector_demux_test.go index b68f7a91e546a..ed91be6ef1ade 100644 --- a/comp/collector/collector/collectorimpl/collector_demux_test.go +++ b/comp/collector/collector/collectorimpl/collector_demux_test.go @@ -27,6 +27,7 @@ import ( "github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer" "github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer/demultiplexerimpl" + "github.com/DataDog/datadog-agent/comp/core/hostname/hostnameimpl" "github.com/DataDog/datadog-agent/pkg/aggregator/sender" "github.com/DataDog/datadog-agent/pkg/collector/check/stub" "github.com/DataDog/datadog-agent/pkg/metrics" @@ -81,7 +82,7 @@ func (s *SenderManagerProxy) GetDefaultSender() (sender.Sender, error) { } func (suite *CollectorDemuxTestSuite) SetupTest() { - suite.demux = fxutil.Test[demultiplexer.FakeSamplerMock](suite.T(), fx.Provide(func() log.Component { return logmock.New(suite.T()) }), compressionmock.MockModule(), demultiplexerimpl.FakeSamplerMockModule()) + suite.demux = fxutil.Test[demultiplexer.FakeSamplerMock](suite.T(), fx.Provide(func() log.Component { return logmock.New(suite.T()) }), compressionmock.MockModule(), demultiplexerimpl.FakeSamplerMockModule(), hostnameimpl.MockModule()) suite.SenderManagerMock = NewSenderManagerMock(suite.demux) suite.c = newCollector(fxutil.Test[dependencies](suite.T(), core.MockBundle(), diff --git a/comp/forwarder/eventplatform/def/component.go b/comp/forwarder/eventplatform/component.go similarity index 91% rename from comp/forwarder/eventplatform/def/component.go rename to comp/forwarder/eventplatform/component.go index 1132f6afed912..65bb1c26d4b21 100644 --- a/comp/forwarder/eventplatform/def/component.go +++ b/comp/forwarder/eventplatform/component.go @@ -36,10 +36,14 @@ const ( EventTypeServiceDiscovery = "service-discovery" ) -//go:generate mockgen -source=../def/$GOFILE -package=mock -destination=../mock/epforwarder_mockgen.go - // Component is the interface of the event platform forwarder component. type Component interface { + // Get the forwarder instance if it exists. + Get() (Forwarder, bool) +} + +// Forwarder is the interface of the event platform forwarder. +type Forwarder interface { SendEventPlatformEvent(e *message.Message, eventType string) error SendEventPlatformEventBlocking(e *message.Message, eventType string) error Purge() map[string][]*message.Message diff --git a/comp/forwarder/eventplatform/mock/component_mock.go b/comp/forwarder/eventplatform/component_mock.go similarity index 60% rename from comp/forwarder/eventplatform/mock/component_mock.go rename to comp/forwarder/eventplatform/component_mock.go index e662cec2c630b..c8cf64fe4d5e3 100644 --- a/comp/forwarder/eventplatform/mock/component_mock.go +++ b/comp/forwarder/eventplatform/component_mock.go @@ -3,12 +3,11 @@ // This product includes software developed at Datadog (https://www.datadoghq.com/). // Copyright 2023-present Datadog, Inc. -// Package mock provides the mock interafce for the event platform component. -package mock +//go:build test -import eventplatform "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/def" +package eventplatform // Mock implements mock-specific methods. type Mock interface { - eventplatform.Component + Component } diff --git a/comp/forwarder/eventplatform/impl/epforwarder.go b/comp/forwarder/eventplatform/eventplatformimpl/epforwarder.go similarity index 90% rename from comp/forwarder/eventplatform/impl/epforwarder.go rename to comp/forwarder/eventplatform/eventplatformimpl/epforwarder.go index 18042c15df4aa..9c4fb0d451ed5 100644 --- a/comp/forwarder/eventplatform/impl/epforwarder.go +++ b/comp/forwarder/eventplatform/eventplatformimpl/epforwarder.go @@ -13,11 +13,13 @@ import ( "strings" "sync" + "go.uber.org/fx" + configcomp "github.com/DataDog/datadog-agent/comp/core/config" "github.com/DataDog/datadog-agent/comp/core/hostname/hostnameinterface" - compdef "github.com/DataDog/datadog-agent/comp/def" - eventplatform "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/def" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" "github.com/DataDog/datadog-agent/comp/forwarder/eventplatformreceiver" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatformreceiver/eventplatformreceiverimpl" "github.com/DataDog/datadog-agent/comp/logs/agent/config" "github.com/DataDog/datadog-agent/pkg/config/model" pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup" @@ -28,10 +30,19 @@ import ( "github.com/DataDog/datadog-agent/pkg/logs/message" "github.com/DataDog/datadog-agent/pkg/logs/metrics" "github.com/DataDog/datadog-agent/pkg/logs/sender" + "github.com/DataDog/datadog-agent/pkg/util/fxutil" "github.com/DataDog/datadog-agent/pkg/util/log" + "github.com/DataDog/datadog-agent/pkg/util/optional" "github.com/DataDog/datadog-agent/pkg/util/startstop" ) +//go:generate mockgen -source=$GOFILE -package=$GOPACKAGE -destination=epforwarder_mockgen.go + +// Module defines the fx options for this component. +func Module(params Params) fxutil.Module { + return fxutil.Component(fx.Provide(newEventPlatformForwarder), fx.Supply(params)) +} + const ( eventTypeDBMSamples = "dbm-samples" eventTypeDBMMetrics = "dbm-metrics" @@ -319,14 +330,14 @@ func (s *defaultEventPlatformForwarder) Purge() map[string][]*message.Message { return result } -func (s *defaultEventPlatformForwarder) start() { +func (s *defaultEventPlatformForwarder) Start() { s.destinationsCtx.Start() for _, p := range s.pipelines { p.Start() } } -func (s *defaultEventPlatformForwarder) stop() { +func (s *defaultEventPlatformForwarder) Stop() { log.Debugf("shutting down event platform forwarder") stopper := startstop.NewParallelStopper() for _, p := range s.pipelines { @@ -478,28 +489,51 @@ func newDefaultEventPlatformForwarder(config model.Reader, eventPlatformReceiver } } -// Requires defined the eventplatform requirements -type Requires struct { - compdef.In +type dependencies struct { + fx.In + Params Params Config configcomp.Component - Lc compdef.Lifecycle + Lc fx.Lifecycle EventPlatformReceiver eventplatformreceiver.Component Hostname hostnameinterface.Component } -// NewComponent creates a new EventPlatformForwarder -func NewComponent(reqs Requires) eventplatform.Component { - forwarder := newDefaultEventPlatformForwarder(reqs.Config, reqs.EventPlatformReceiver) +// newEventPlatformForwarder creates a new EventPlatformForwarder +func newEventPlatformForwarder(deps dependencies) eventplatform.Component { + var forwarder *defaultEventPlatformForwarder - reqs.Lc.Append(compdef.Hook{ + if deps.Params.UseNoopEventPlatformForwarder { + forwarder = newNoopEventPlatformForwarder(deps.Hostname) + } else if deps.Params.UseEventPlatformForwarder { + forwarder = newDefaultEventPlatformForwarder(deps.Config, deps.EventPlatformReceiver) + } + if forwarder == nil { + return optional.NewNoneOptionPtr[eventplatform.Forwarder]() + } + deps.Lc.Append(fx.Hook{ OnStart: func(context.Context) error { - forwarder.start() + forwarder.Start() return nil }, OnStop: func(context.Context) error { - forwarder.stop() + forwarder.Stop() return nil }, }) - return forwarder + return optional.NewOptionPtr[eventplatform.Forwarder](forwarder) +} + +// NewNoopEventPlatformForwarder returns the standard event platform forwarder with sending disabled, meaning events +// will build up in each pipeline channel without being forwarded to the intake +func NewNoopEventPlatformForwarder(hostname hostnameinterface.Component) eventplatform.Forwarder { + return newNoopEventPlatformForwarder(hostname) +} + +func newNoopEventPlatformForwarder(hostname hostnameinterface.Component) *defaultEventPlatformForwarder { + f := newDefaultEventPlatformForwarder(pkgconfigsetup.Datadog(), eventplatformreceiverimpl.NewReceiver(hostname).Comp) + // remove the senders + for _, p := range f.pipelines { + p.strategy = nil + } + return f } diff --git a/comp/forwarder/eventplatform/eventplatformimpl/epforwarder_mock.go b/comp/forwarder/eventplatform/eventplatformimpl/epforwarder_mock.go new file mode 100644 index 0000000000000..2d4d6d296ca7e --- /dev/null +++ b/comp/forwarder/eventplatform/eventplatformimpl/epforwarder_mock.go @@ -0,0 +1,27 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build test + +package eventplatformimpl + +import ( + "github.com/DataDog/datadog-agent/comp/core/hostname" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" + "github.com/DataDog/datadog-agent/pkg/util/fxutil" + "github.com/DataDog/datadog-agent/pkg/util/optional" + "go.uber.org/fx" +) + +// MockModule defines the fx options for the mock component. +func MockModule() fxutil.Module { + return fxutil.Component( + fx.Provide(newMockComponent), + ) +} + +func newMockComponent(hostname hostname.Component) eventplatform.Component { + return optional.NewOptionPtr[eventplatform.Forwarder](NewNoopEventPlatformForwarder(hostname)) +} diff --git a/comp/forwarder/eventplatform/eventplatformimpl/epforwarder_mockgen.go b/comp/forwarder/eventplatform/eventplatformimpl/epforwarder_mockgen.go new file mode 100644 index 0000000000000..2c196bdad5aa5 --- /dev/null +++ b/comp/forwarder/eventplatform/eventplatformimpl/epforwarder_mockgen.go @@ -0,0 +1,101 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: epforwarder.go + +// Package eventplatformimpl is a generated GoMock package. +package eventplatformimpl + +import ( + reflect "reflect" + + message "github.com/DataDog/datadog-agent/pkg/logs/message" + gomock "github.com/golang/mock/gomock" +) + +// MockEventPlatformForwarder is a mock of EventPlatformForwarder interface. +type MockEventPlatformForwarder struct { + ctrl *gomock.Controller + recorder *MockEventPlatformForwarderMockRecorder +} + +// MockEventPlatformForwarderMockRecorder is the mock recorder for MockEventPlatformForwarder. +type MockEventPlatformForwarderMockRecorder struct { + mock *MockEventPlatformForwarder +} + +// NewMockEventPlatformForwarder creates a new mock instance. +func NewMockEventPlatformForwarder(ctrl *gomock.Controller) *MockEventPlatformForwarder { + mock := &MockEventPlatformForwarder{ctrl: ctrl} + mock.recorder = &MockEventPlatformForwarderMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockEventPlatformForwarder) EXPECT() *MockEventPlatformForwarderMockRecorder { + return m.recorder +} + +// Purge mocks base method. +func (m *MockEventPlatformForwarder) Purge() map[string][]*message.Message { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Purge") + ret0, _ := ret[0].(map[string][]*message.Message) + return ret0 +} + +// Purge indicates an expected call of Purge. +func (mr *MockEventPlatformForwarderMockRecorder) Purge() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Purge", reflect.TypeOf((*MockEventPlatformForwarder)(nil).Purge)) +} + +// SendEventPlatformEvent mocks base method. +func (m *MockEventPlatformForwarder) SendEventPlatformEvent(e *message.Message, eventType string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendEventPlatformEvent", e, eventType) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendEventPlatformEvent indicates an expected call of SendEventPlatformEvent. +func (mr *MockEventPlatformForwarderMockRecorder) SendEventPlatformEvent(e, eventType interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendEventPlatformEvent", reflect.TypeOf((*MockEventPlatformForwarder)(nil).SendEventPlatformEvent), e, eventType) +} + +// SendEventPlatformEventBlocking mocks base method. +func (m *MockEventPlatformForwarder) SendEventPlatformEventBlocking(e *message.Message, eventType string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendEventPlatformEventBlocking", e, eventType) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendEventPlatformEventBlocking indicates an expected call of SendEventPlatformEventBlocking. +func (mr *MockEventPlatformForwarderMockRecorder) SendEventPlatformEventBlocking(e, eventType interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendEventPlatformEventBlocking", reflect.TypeOf((*MockEventPlatformForwarder)(nil).SendEventPlatformEventBlocking), e, eventType) +} + +// Start mocks base method. +func (m *MockEventPlatformForwarder) Start() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Start") +} + +// Start indicates an expected call of Start. +func (mr *MockEventPlatformForwarderMockRecorder) Start() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockEventPlatformForwarder)(nil).Start)) +} + +// Stop mocks base method. +func (m *MockEventPlatformForwarder) Stop() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Stop") +} + +// Stop indicates an expected call of Stop. +func (mr *MockEventPlatformForwarderMockRecorder) Stop() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockEventPlatformForwarder)(nil).Stop)) +} diff --git a/comp/forwarder/eventplatform/eventplatformimpl/params.go b/comp/forwarder/eventplatform/eventplatformimpl/params.go new file mode 100644 index 0000000000000..108f9e4b228d7 --- /dev/null +++ b/comp/forwarder/eventplatform/eventplatformimpl/params.go @@ -0,0 +1,22 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2023-present Datadog, Inc. + +package eventplatformimpl + +// Params defines the parameters for the event platform forwarder. +type Params struct { + UseNoopEventPlatformForwarder bool + UseEventPlatformForwarder bool +} + +// NewDefaultParams returns the default parameters for the event platform forwarder. +func NewDefaultParams() Params { + return Params{UseEventPlatformForwarder: true, UseNoopEventPlatformForwarder: false} +} + +// NewDisabledParams returns the disabled parameters for the event platform forwarder. +func NewDisabledParams() Params { + return Params{UseEventPlatformForwarder: false, UseNoopEventPlatformForwarder: false} +} diff --git a/comp/forwarder/eventplatform/fx-mock/fx.go b/comp/forwarder/eventplatform/fx-mock/fx.go deleted file mode 100644 index eb837f131e500..0000000000000 --- a/comp/forwarder/eventplatform/fx-mock/fx.go +++ /dev/null @@ -1,21 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2024-present Datadog, Inc. - -// Package fx provides the fxmock module for the rdnsquerier component -package fx - -import ( - eventplatformmock "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/mock" - "github.com/DataDog/datadog-agent/pkg/util/fxutil" -) - -// MockModule defines the fx options for the mock component. -func MockModule() fxutil.Module { - return fxutil.Component( - fxutil.ProvideComponentConstructor( - eventplatformmock.NewMock, - ), - ) -} diff --git a/comp/forwarder/eventplatform/fx-noop/fx.go b/comp/forwarder/eventplatform/fx-noop/fx.go deleted file mode 100644 index 76e656b46ef53..0000000000000 --- a/comp/forwarder/eventplatform/fx-noop/fx.go +++ /dev/null @@ -1,21 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2024-present Datadog, Inc. - -// Package fx provides the fxnoop module for the rdnsquerier component -package fx - -import ( - eventplatformnoop "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/impl-noop" - "github.com/DataDog/datadog-agent/pkg/util/fxutil" -) - -// Module defines the fx options for the mock component. -func Module() fxutil.Module { - return fxutil.Component( - fxutil.ProvideComponentConstructor( - eventplatformnoop.NewComponent, - ), - ) -} diff --git a/comp/forwarder/eventplatform/fx/fx.go b/comp/forwarder/eventplatform/fx/fx.go deleted file mode 100644 index 74e287dbb79cb..0000000000000 --- a/comp/forwarder/eventplatform/fx/fx.go +++ /dev/null @@ -1,17 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2016-present Datadog, Inc. - -// Package fx provides the fx module for the event platform component. -package fx - -import ( - eventplatformimpl "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/impl" - "github.com/DataDog/datadog-agent/pkg/util/fxutil" -) - -// Module defines the fx options for this component. -func Module() fxutil.Module { - return fxutil.Component(fxutil.ProvideComponentConstructor(eventplatformimpl.NewComponent)) -} diff --git a/comp/forwarder/eventplatform/impl-noop/noop.go b/comp/forwarder/eventplatform/impl-noop/noop.go deleted file mode 100644 index 7d6ed39c04940..0000000000000 --- a/comp/forwarder/eventplatform/impl-noop/noop.go +++ /dev/null @@ -1,39 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2016-present Datadog, Inc. - -// Package eventplatformimpl contains the logic for the noop forwarding component -package eventplatformimpl - -import ( - "errors" - - eventplatform "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/def" - "github.com/DataDog/datadog-agent/pkg/logs/message" -) - -type noopForwarder struct { -} - -// SendEventPlatformEvent sends messages to the event platform intake. -// SendEventPlatformEvent will drop messages and return an error if the input channel is already full. -func (s *noopForwarder) SendEventPlatformEvent(*message.Message, string) error { - return errors.New("noop forwarder does not support SendEventPlatformEvent") -} - -// SendEventPlatformEventBlocking sends messages to the event platform intake. -// SendEventPlatformEventBlocking will block if the input channel is already full. -func (s *noopForwarder) SendEventPlatformEventBlocking(*message.Message, string) error { - return errors.New("noop forwarder does not support SendEventPlatformEventBlocking") -} - -// Purge clears out all pipeline channels, returning a map of eventType to list of messages in that were removed from each channel -func (s *noopForwarder) Purge() map[string][]*message.Message { - return map[string][]*message.Message{} -} - -// NewComponent creates a new EventPlatformForwarder -func NewComponent() eventplatform.Component { - return &noopForwarder{} -} diff --git a/comp/forwarder/eventplatform/mock/epforwarder_mock.go b/comp/forwarder/eventplatform/mock/epforwarder_mock.go deleted file mode 100644 index 659a458a61a00..0000000000000 --- a/comp/forwarder/eventplatform/mock/epforwarder_mock.go +++ /dev/null @@ -1,36 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2016-present Datadog, Inc. - -package mock - -import ( - eventplatform "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/def" - message "github.com/DataDog/datadog-agent/pkg/logs/message" -) - -type mock struct { -} - -// SendEventPlatformEvent sends messages to the event platform intake. -// SendEventPlatformEvent will drop messages and return an error if the input channel is already full. -func (s *mock) SendEventPlatformEvent(*message.Message, string) error { - return nil -} - -// SendEventPlatformEventBlocking sends messages to the event platform intake. -// SendEventPlatformEventBlocking will block if the input channel is already full. -func (s *mock) SendEventPlatformEventBlocking(*message.Message, string) error { - return nil -} - -// Purge clears out all pipeline channels, returning a map of eventType to list of messages in that were removed from each channel -func (s *mock) Purge() map[string][]*message.Message { - return map[string][]*message.Message{} -} - -// NewMock returns a new mock component. -func NewMock() eventplatform.Component { - return &mock{} -} diff --git a/comp/forwarder/eventplatform/mock/epforwarder_mockgen.go b/comp/forwarder/eventplatform/mock/epforwarder_mockgen.go deleted file mode 100644 index e1dabb87b6592..0000000000000 --- a/comp/forwarder/eventplatform/mock/epforwarder_mockgen.go +++ /dev/null @@ -1,77 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: ../def/component.go - -// Package mock is a generated GoMock package. -package mock - -import ( - reflect "reflect" - - message "github.com/DataDog/datadog-agent/pkg/logs/message" - gomock "github.com/golang/mock/gomock" -) - -// MockComponent is a mock of Component interface. -type MockComponent struct { - ctrl *gomock.Controller - recorder *MockComponentMockRecorder -} - -// MockComponentMockRecorder is the mock recorder for MockComponent. -type MockComponentMockRecorder struct { - mock *MockComponent -} - -// NewMockComponent creates a new mock instance. -func NewMockComponent(ctrl *gomock.Controller) *MockComponent { - mock := &MockComponent{ctrl: ctrl} - mock.recorder = &MockComponentMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockComponent) EXPECT() *MockComponentMockRecorder { - return m.recorder -} - -// Purge mocks base method. -func (m *MockComponent) Purge() map[string][]*message.Message { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Purge") - ret0, _ := ret[0].(map[string][]*message.Message) - return ret0 -} - -// Purge indicates an expected call of Purge. -func (mr *MockComponentMockRecorder) Purge() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Purge", reflect.TypeOf((*MockComponent)(nil).Purge)) -} - -// SendEventPlatformEvent mocks base method. -func (m *MockComponent) SendEventPlatformEvent(e *message.Message, eventType string) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SendEventPlatformEvent", e, eventType) - ret0, _ := ret[0].(error) - return ret0 -} - -// SendEventPlatformEvent indicates an expected call of SendEventPlatformEvent. -func (mr *MockComponentMockRecorder) SendEventPlatformEvent(e, eventType interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendEventPlatformEvent", reflect.TypeOf((*MockComponent)(nil).SendEventPlatformEvent), e, eventType) -} - -// SendEventPlatformEventBlocking mocks base method. -func (m *MockComponent) SendEventPlatformEventBlocking(e *message.Message, eventType string) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SendEventPlatformEventBlocking", e, eventType) - ret0, _ := ret[0].(error) - return ret0 -} - -// SendEventPlatformEventBlocking indicates an expected call of SendEventPlatformEventBlocking. -func (mr *MockComponentMockRecorder) SendEventPlatformEventBlocking(e, eventType interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendEventPlatformEventBlocking", reflect.TypeOf((*MockComponent)(nil).SendEventPlatformEventBlocking), e, eventType) -} diff --git a/comp/forwarder/eventplatformreceiver/eventplatformreceiverimpl/format.go b/comp/forwarder/eventplatformreceiver/eventplatformreceiverimpl/format.go index e3529345a94ad..940757de88d16 100644 --- a/comp/forwarder/eventplatformreceiver/eventplatformreceiverimpl/format.go +++ b/comp/forwarder/eventplatformreceiver/eventplatformreceiverimpl/format.go @@ -15,7 +15,7 @@ import ( "github.com/DataDog/agent-payload/v5/sbom" "google.golang.org/protobuf/proto" - eventplatform "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/def" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" "github.com/DataDog/datadog-agent/pkg/logs/message" ) diff --git a/comp/ndmtmp/bundle_test.go b/comp/ndmtmp/bundle_test.go index 57d0999b2d2a1..c0bd689899558 100644 --- a/comp/ndmtmp/bundle_test.go +++ b/comp/ndmtmp/bundle_test.go @@ -8,15 +8,14 @@ package ndmtmp import ( "testing" - "go.uber.org/fx" - "github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer/demultiplexerimpl" "github.com/DataDog/datadog-agent/comp/core" "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder" - eventplatformmock "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/fx-mock" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/eventplatformimpl" orchestratorForwarderImpl "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator/orchestratorimpl" ddagg "github.com/DataDog/datadog-agent/pkg/aggregator" "github.com/DataDog/datadog-agent/pkg/util/fxutil" + "go.uber.org/fx" ) func TestBundleDependencies(t *testing.T) { @@ -24,7 +23,7 @@ func TestBundleDependencies(t *testing.T) { demultiplexerimpl.MockModule(), orchestratorForwarderImpl.MockModule(), defaultforwarder.MockModule(), - eventplatformmock.MockModule(), + eventplatformimpl.MockModule(), core.MockBundle(), ) } diff --git a/comp/ndmtmp/forwarder/component.go b/comp/ndmtmp/forwarder/component.go index fe0dbcdfe9d2e..e8f535198e85b 100644 --- a/comp/ndmtmp/forwarder/component.go +++ b/comp/ndmtmp/forwarder/component.go @@ -7,12 +7,12 @@ package forwarder import ( - eventplatform "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/def" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" ) // team: ndm-core // Component is the component type. type Component interface { - eventplatform.Component + eventplatform.Forwarder } diff --git a/comp/ndmtmp/forwarder/component_mock.go b/comp/ndmtmp/forwarder/component_mock.go index 9043d58474584..5df873226a3f1 100644 --- a/comp/ndmtmp/forwarder/component_mock.go +++ b/comp/ndmtmp/forwarder/component_mock.go @@ -6,11 +6,11 @@ // Package forwarder exposes the event platform forwarder for netflow. package forwarder -import "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/mock" +import "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/eventplatformimpl" // MockComponent is the type for mock components. // It is a gomock-generated mock of EventPlatformForwarder. type MockComponent interface { Component - EXPECT() *mock.MockComponentMockRecorder + EXPECT() *eventplatformimpl.MockEventPlatformForwarderMockRecorder } diff --git a/comp/ndmtmp/forwarder/forwarderimpl/forwarder.go b/comp/ndmtmp/forwarder/forwarderimpl/forwarder.go index 2652a75a1a413..a34fe908b1222 100644 --- a/comp/ndmtmp/forwarder/forwarderimpl/forwarder.go +++ b/comp/ndmtmp/forwarder/forwarderimpl/forwarder.go @@ -7,11 +7,10 @@ package forwarderimpl import ( - "go.uber.org/fx" - "github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer" "github.com/DataDog/datadog-agent/comp/ndmtmp/forwarder" "github.com/DataDog/datadog-agent/pkg/util/fxutil" + "go.uber.org/fx" ) // Module defines the fx options for this component. @@ -20,6 +19,6 @@ func Module() fxutil.Module { fx.Provide(getForwarder)) } -func getForwarder(agg demultiplexer.Component) forwarder.Component { +func getForwarder(agg demultiplexer.Component) (forwarder.Component, error) { return agg.GetEventPlatformForwarder() } diff --git a/comp/ndmtmp/forwarder/forwarderimpl/forwarder_mock.go b/comp/ndmtmp/forwarder/forwarderimpl/forwarder_mock.go index d6e2be374edc3..a7efc1b2f8d2f 100644 --- a/comp/ndmtmp/forwarder/forwarderimpl/forwarder_mock.go +++ b/comp/ndmtmp/forwarder/forwarderimpl/forwarder_mock.go @@ -10,17 +10,16 @@ package forwarderimpl import ( "testing" - "github.com/golang/mock/gomock" - "go.uber.org/fx" - - eventplatformmock "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/mock" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/eventplatformimpl" "github.com/DataDog/datadog-agent/comp/ndmtmp/forwarder" "github.com/DataDog/datadog-agent/pkg/util/fxutil" + "github.com/golang/mock/gomock" + "go.uber.org/fx" ) func getMockForwarder(t testing.TB) forwarder.MockComponent { ctrl := gomock.NewController(t) - return eventplatformmock.NewMockComponent(ctrl) + return eventplatformimpl.NewMockEventPlatformForwarder(ctrl) } // MockModule defines a component with a mock forwarder diff --git a/comp/netflow/flowaggregator/aggregator.go b/comp/netflow/flowaggregator/aggregator.go index 8c64ab30a3cc8..93afc304748b8 100644 --- a/comp/netflow/flowaggregator/aggregator.go +++ b/comp/netflow/flowaggregator/aggregator.go @@ -17,7 +17,7 @@ import ( "go.uber.org/atomic" log "github.com/DataDog/datadog-agent/comp/core/log/def" - eventplatform "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/def" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" "github.com/DataDog/datadog-agent/comp/netflow/format" rdnsquerier "github.com/DataDog/datadog-agent/comp/rdnsquerier/def" "github.com/DataDog/datadog-agent/pkg/aggregator/sender" @@ -41,7 +41,7 @@ type FlowAggregator struct { rollupTrackerRefreshInterval time.Duration flowAcc *flowAccumulator sender sender.Sender - epForwarder eventplatform.Component + epForwarder eventplatform.Forwarder stopChan chan struct{} flushLoopDone chan struct{} runDone chan struct{} @@ -78,7 +78,7 @@ var maxNegativeSequenceDiffToReset = map[common.FlowType]int{ } // NewFlowAggregator returns a new FlowAggregator -func NewFlowAggregator(sender sender.Sender, epForwarder eventplatform.Component, config *config.NetflowConfig, hostname string, logger log.Component, rdnsQuerier rdnsquerier.Component) *FlowAggregator { +func NewFlowAggregator(sender sender.Sender, epForwarder eventplatform.Forwarder, config *config.NetflowConfig, hostname string, logger log.Component, rdnsQuerier rdnsquerier.Component) *FlowAggregator { flushInterval := time.Duration(config.AggregatorFlushInterval) * time.Second flowContextTTL := time.Duration(config.AggregatorFlowContextTTL) * time.Second rollupTrackerRefreshInterval := time.Duration(config.AggregatorRollupTrackerRefreshInterval) * time.Second diff --git a/comp/netflow/flowaggregator/aggregator_test.go b/comp/netflow/flowaggregator/aggregator_test.go index 25a233880264b..181915ab3f628 100644 --- a/comp/netflow/flowaggregator/aggregator_test.go +++ b/comp/netflow/flowaggregator/aggregator_test.go @@ -30,7 +30,7 @@ import ( "github.com/stretchr/testify/require" logmock "github.com/DataDog/datadog-agent/comp/core/log/mock" - eventplatformmock "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/mock" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/eventplatformimpl" "github.com/DataDog/datadog-agent/pkg/aggregator/mocksender" "github.com/DataDog/datadog-agent/pkg/logs/message" "github.com/DataDog/datadog-agent/pkg/util/fxutil" @@ -87,7 +87,7 @@ func TestAggregator(t *testing.T) { TCPFlags: 19, EtherType: uint32(0x0800), } - epForwarder := eventplatformmock.NewMockComponent(gomock.NewController(t)) + epForwarder := eventplatformimpl.NewMockEventPlatformForwarder(gomock.NewController(t)) // language=json event := []byte(` @@ -246,7 +246,7 @@ func TestAggregator_withMockPayload(t *testing.T) { }, } ctrl := gomock.NewController(t) - epForwarder := eventplatformmock.NewMockComponent(ctrl) + epForwarder := eventplatformimpl.NewMockEventPlatformForwarder(ctrl) testutil.ExpectNetflow5Payloads(t, epForwarder) @@ -363,7 +363,7 @@ func TestFlowAggregator_flush_submitCollectorMetrics_error(t *testing.T) { } ctrl := gomock.NewController(t) - epForwarder := eventplatformmock.NewMockComponent(ctrl) + epForwarder := eventplatformimpl.NewMockEventPlatformForwarder(ctrl) aggregator := NewFlowAggregator(sender, epForwarder, &conf, "my-hostname", logger, rdnsQuerier) aggregator.goflowPrometheusGatherer = prometheus.GathererFunc(func() ([]*promClient.MetricFamily, error) { @@ -402,7 +402,7 @@ func TestFlowAggregator_submitCollectorMetrics(t *testing.T) { } ctrl := gomock.NewController(t) - epForwarder := eventplatformmock.NewMockComponent(ctrl) + epForwarder := eventplatformimpl.NewMockEventPlatformForwarder(ctrl) logger := logmock.New(t) rdnsQuerier := fxutil.Test[rdnsquerier.Component](t, rdnsquerierfxmock.MockModule()) @@ -479,7 +479,7 @@ func TestFlowAggregator_submitCollectorMetrics_error(t *testing.T) { } ctrl := gomock.NewController(t) - epForwarder := eventplatformmock.NewMockComponent(ctrl) + epForwarder := eventplatformimpl.NewMockEventPlatformForwarder(ctrl) logger := logmock.New(t) rdnsQuerier := fxutil.Test[rdnsquerier.Component](t, rdnsquerierfxmock.MockModule()) @@ -514,7 +514,7 @@ func TestFlowAggregator_sendExporterMetadata_multiplePayloads(t *testing.T) { } ctrl := gomock.NewController(t) - epForwarder := eventplatformmock.NewMockComponent(ctrl) + epForwarder := eventplatformimpl.NewMockEventPlatformForwarder(ctrl) logger := logmock.New(t) rdnsQuerier := fxutil.Test[rdnsquerier.Component](t, rdnsquerierfxmock.MockModule()) @@ -599,7 +599,7 @@ func TestFlowAggregator_sendExporterMetadata_noPayloads(t *testing.T) { } ctrl := gomock.NewController(t) - epForwarder := eventplatformmock.NewMockComponent(ctrl) + epForwarder := eventplatformimpl.NewMockEventPlatformForwarder(ctrl) logger := logmock.New(t) rdnsQuerier := fxutil.Test[rdnsquerier.Component](t, rdnsquerierfxmock.MockModule()) @@ -632,7 +632,7 @@ func TestFlowAggregator_sendExporterMetadata_invalidIPIgnored(t *testing.T) { } ctrl := gomock.NewController(t) - epForwarder := eventplatformmock.NewMockComponent(ctrl) + epForwarder := eventplatformimpl.NewMockEventPlatformForwarder(ctrl) logger := logmock.New(t) rdnsQuerier := fxutil.Test[rdnsquerier.Component](t, rdnsquerierfxmock.MockModule()) @@ -717,7 +717,7 @@ func TestFlowAggregator_sendExporterMetadata_multipleNamespaces(t *testing.T) { } ctrl := gomock.NewController(t) - epForwarder := eventplatformmock.NewMockComponent(ctrl) + epForwarder := eventplatformimpl.NewMockEventPlatformForwarder(ctrl) logger := logmock.New(t) rdnsQuerier := fxutil.Test[rdnsquerier.Component](t, rdnsquerierfxmock.MockModule()) @@ -821,7 +821,7 @@ func TestFlowAggregator_sendExporterMetadata_singleExporterIpWithMultipleFlowTyp } ctrl := gomock.NewController(t) - epForwarder := eventplatformmock.NewMockComponent(ctrl) + epForwarder := eventplatformimpl.NewMockEventPlatformForwarder(ctrl) logger := logmock.New(t) rdnsQuerier := fxutil.Test[rdnsquerier.Component](t, rdnsquerierfxmock.MockModule()) diff --git a/comp/netflow/server/integration_test.go b/comp/netflow/server/integration_test.go index 8fc5a8e9aed36..5945207807db6 100644 --- a/comp/netflow/server/integration_test.go +++ b/comp/netflow/server/integration_test.go @@ -13,15 +13,14 @@ import ( "testing" "time" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" + "github.com/DataDog/datadog-agent/comp/netflow/goflowlib" + "github.com/DataDog/datadog-agent/comp/netflow/goflowlib/netflowstate" "github.com/netsampler/goflow2/decoders/netflow/templates" "github.com/netsampler/goflow2/utils" "github.com/sirupsen/logrus" "go.uber.org/atomic" - eventplatform "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/def" - "github.com/DataDog/datadog-agent/comp/netflow/goflowlib" - "github.com/DataDog/datadog-agent/comp/netflow/goflowlib/netflowstate" - "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" diff --git a/comp/netflow/testutil/testutil.go b/comp/netflow/testutil/testutil.go index 1bc3ad9242331..dedee4f79f427 100644 --- a/comp/netflow/testutil/testutil.go +++ b/comp/netflow/testutil/testutil.go @@ -25,7 +25,7 @@ import ( "github.com/DataDog/datadog-agent/pkg/logs/message" - eventplatform "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/def" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" "github.com/DataDog/datadog-agent/comp/ndmtmp/forwarder" "github.com/DataDog/datadog-agent/comp/netflow/payload" ) diff --git a/comp/networkpath/bundle_test.go b/comp/networkpath/bundle_test.go index 49e160d2b5ef4..6eb6868e8acba 100644 --- a/comp/networkpath/bundle_test.go +++ b/comp/networkpath/bundle_test.go @@ -9,7 +9,7 @@ import ( "testing" "github.com/DataDog/datadog-agent/comp/core" - eventplatformmock "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/fx-mock" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/eventplatformimpl" rdnsquerier "github.com/DataDog/datadog-agent/comp/rdnsquerier/fx-mock" "github.com/DataDog/datadog-agent/pkg/util/fxutil" ) @@ -17,7 +17,7 @@ import ( func TestBundleDependencies(t *testing.T) { fxutil.TestBundle(t, Bundle(), core.MockBundle(), - eventplatformmock.MockModule(), + eventplatformimpl.MockModule(), rdnsquerier.MockModule(), ) } diff --git a/comp/networkpath/npcollector/npcollectorimpl/npcollector.go b/comp/networkpath/npcollector/npcollectorimpl/npcollector.go index eb4ded2939a94..4ee787e567320 100644 --- a/comp/networkpath/npcollector/npcollectorimpl/npcollector.go +++ b/comp/networkpath/npcollector/npcollectorimpl/npcollector.go @@ -19,7 +19,7 @@ import ( log "github.com/DataDog/datadog-agent/comp/core/log/def" telemetryComp "github.com/DataDog/datadog-agent/comp/core/telemetry" - eventplatform "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/def" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" "github.com/DataDog/datadog-agent/comp/networkpath/npcollector/npcollectorimpl/common" "github.com/DataDog/datadog-agent/comp/networkpath/npcollector/npcollectorimpl/pathteststore" rdnsquerier "github.com/DataDog/datadog-agent/comp/rdnsquerier/def" @@ -43,7 +43,7 @@ type npCollectorImpl struct { collectorConfigs *collectorConfigs // Deps - epForwarder eventplatform.Component + epForwarder eventplatform.Forwarder logger log.Component metricSender metricsender.MetricSender statsdClient ddgostatsd.ClientInterface @@ -84,7 +84,7 @@ func newNoopNpCollectorImpl() *npCollectorImpl { } } -func newNpCollectorImpl(epForwarder eventplatform.Component, collectorConfigs *collectorConfigs, logger log.Component, telemetrycomp telemetryComp.Component, rdnsquerier rdnsquerier.Component) *npCollectorImpl { +func newNpCollectorImpl(epForwarder eventplatform.Forwarder, collectorConfigs *collectorConfigs, logger log.Component, telemetrycomp telemetryComp.Component, rdnsquerier rdnsquerier.Component) *npCollectorImpl { logger.Infof("New NpCollector (workers=%d timeout=%d max_ttl=%d input_chan_size=%d processing_chan_size=%d pathtest_contexts_limit=%d pathtest_ttl=%s pathtest_interval=%s flush_interval=%s reverse_dns_enabled=%t reverse_dns_timeout=%d)", collectorConfigs.workers, collectorConfigs.timeout, diff --git a/comp/networkpath/npcollector/npcollectorimpl/npcollector_test.go b/comp/networkpath/npcollector/npcollectorimpl/npcollector_test.go index b774c71b84f5a..390ce1c8684f4 100644 --- a/comp/networkpath/npcollector/npcollectorimpl/npcollector_test.go +++ b/comp/networkpath/npcollector/npcollectorimpl/npcollector_test.go @@ -24,8 +24,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/DataDog/datadog-agent/comp/core/telemetry" - eventplatform "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/def" - eventplatformmock "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/mock" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/eventplatformimpl" "github.com/DataDog/datadog-agent/comp/networkpath/npcollector/npcollectorimpl/common" "github.com/DataDog/datadog-agent/comp/networkpath/npcollector/npcollectorimpl/pathteststore" rdnsquerier "github.com/DataDog/datadog-agent/comp/rdnsquerier/def" @@ -91,7 +91,7 @@ func Test_NpCollector_runningAndProcessing(t *testing.T) { stats := &teststatsd.Client{} - mockEpForwarder := eventplatformmock.NewMockComponent(gomock.NewController(t)) + mockEpForwarder := eventplatformimpl.NewMockEventPlatformForwarder(gomock.NewController(t)) npCollector.epForwarder = mockEpForwarder app.RequireStart() diff --git a/comp/networkpath/npcollector/npcollectorimpl/npcollector_testutils.go b/comp/networkpath/npcollector/npcollectorimpl/npcollector_testutils.go index 3075c06bc75ce..16d777c11fd79 100644 --- a/comp/networkpath/npcollector/npcollectorimpl/npcollector_testutils.go +++ b/comp/networkpath/npcollector/npcollectorimpl/npcollector_testutils.go @@ -15,18 +15,17 @@ import ( "time" model "github.com/DataDog/agent-payload/v5/process" - "github.com/stretchr/testify/require" - "go.uber.org/fx" - "go.uber.org/fx/fxtest" - "github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer/demultiplexerimpl" "github.com/DataDog/datadog-agent/comp/core" "github.com/DataDog/datadog-agent/comp/core/config" "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder" - eventplatformmock "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/fx-mock" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/eventplatformimpl" "github.com/DataDog/datadog-agent/comp/ndmtmp/forwarder/forwarderimpl" "github.com/DataDog/datadog-agent/comp/networkpath/npcollector" rdnsqueriermock "github.com/DataDog/datadog-agent/comp/rdnsquerier/fx-mock" + "github.com/stretchr/testify/require" + "go.uber.org/fx" + "go.uber.org/fx/fxtest" ) // MockTimeNow mocks time.Now @@ -44,7 +43,7 @@ var testOptions = fx.Options( demultiplexerimpl.MockModule(), defaultforwarder.MockModule(), core.MockBundle(), - eventplatformmock.MockModule(), + eventplatformimpl.MockModule(), rdnsqueriermock.MockModule(), ) diff --git a/comp/networkpath/npcollector/npcollectorimpl/npcollectorcomp.go b/comp/networkpath/npcollector/npcollectorimpl/npcollectorcomp.go index 31fa00d16ad86..01c0a42691ed9 100644 --- a/comp/networkpath/npcollector/npcollectorimpl/npcollectorcomp.go +++ b/comp/networkpath/npcollector/npcollectorimpl/npcollectorcomp.go @@ -13,7 +13,7 @@ import ( "github.com/DataDog/datadog-agent/comp/core/config" log "github.com/DataDog/datadog-agent/comp/core/log/def" "github.com/DataDog/datadog-agent/comp/core/telemetry" - eventplatform "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/def" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" "github.com/DataDog/datadog-agent/comp/networkpath/npcollector" rdnsquerier "github.com/DataDog/datadog-agent/comp/rdnsquerier/def" nooprdnsquerier "github.com/DataDog/datadog-agent/comp/rdnsquerier/impl-none" @@ -61,17 +61,23 @@ func newNpCollector(deps dependencies) provides { rdnsQuerier = nooprdnsquerier.NewNone().Comp } - collector = newNpCollectorImpl(deps.EpForwarder, configs, deps.Logger, deps.Telemetry, rdnsQuerier) - deps.Lc.Append(fx.Hook{ - // No need for OnStart hook since NpCollector.Init() will be called by clients when needed. - OnStart: func(context.Context) error { - return collector.start() - }, - OnStop: func(context.Context) error { - collector.stop() - return nil - }, - }) + epForwarder, ok := deps.EpForwarder.Get() + if !ok { + deps.Logger.Errorf("Error getting EpForwarder") + collector = newNoopNpCollectorImpl() + } else { + collector = newNpCollectorImpl(epForwarder, configs, deps.Logger, deps.Telemetry, rdnsQuerier) + deps.Lc.Append(fx.Hook{ + // No need for OnStart hook since NpCollector.Init() will be called by clients when needed. + OnStart: func(context.Context) error { + return collector.start() + }, + OnStop: func(context.Context) error { + collector.stop() + return nil + }, + }) + } } else { deps.Logger.Debugf("Network Path Collector disabled") collector = newNoopNpCollectorImpl() diff --git a/comp/process/bundle_test.go b/comp/process/bundle_test.go index 8ebfe77be895f..591dce9f63cfb 100644 --- a/comp/process/bundle_test.go +++ b/comp/process/bundle_test.go @@ -23,7 +23,7 @@ import ( workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def" workloadmetafx "github.com/DataDog/datadog-agent/comp/core/workloadmeta/fx" "github.com/DataDog/datadog-agent/comp/dogstatsd/statsd" - eventplatformfx "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/fx" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/eventplatformimpl" "github.com/DataDog/datadog-agent/comp/forwarder/eventplatformreceiver/eventplatformreceiverimpl" "github.com/DataDog/datadog-agent/comp/networkpath/npcollector/npcollectorimpl" "github.com/DataDog/datadog-agent/comp/process/runner" @@ -96,7 +96,7 @@ func TestBundleOneShot(t *testing.T) { core.MockBundle(), workloadmetafx.Module(workloadmeta.NewParams()), eventplatformreceiverimpl.Module(), - eventplatformfx.Module(), + eventplatformimpl.Module(eventplatformimpl.NewDefaultParams()), rdnsquerier.MockModule(), npcollectorimpl.Module(), statsd.MockModule(), diff --git a/comp/snmpscan/impl/devicescan.go b/comp/snmpscan/impl/devicescan.go index cb0505108a848..cef73d6aee845 100644 --- a/comp/snmpscan/impl/devicescan.go +++ b/comp/snmpscan/impl/devicescan.go @@ -9,12 +9,11 @@ import ( "encoding/json" "time" - "github.com/gosnmp/gosnmp" - - eventplatform "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/def" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" "github.com/DataDog/datadog-agent/pkg/logs/message" "github.com/DataDog/datadog-agent/pkg/networkdevice/metadata" "github.com/DataDog/datadog-agent/pkg/snmp/gosnmplib" + "github.com/gosnmp/gosnmp" ) func (s snmpScannerImpl) RunDeviceScan(snmpConnection *gosnmp.GoSNMP, deviceNamespace string, deviceIPAddress string) error { diff --git a/comp/snmpscan/impl/snmpscan.go b/comp/snmpscan/impl/snmpscan.go index 835866508cae1..1beb572ac27c0 100644 --- a/comp/snmpscan/impl/snmpscan.go +++ b/comp/snmpscan/impl/snmpscan.go @@ -10,7 +10,7 @@ import ( "github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer" log "github.com/DataDog/datadog-agent/comp/core/log/def" compdef "github.com/DataDog/datadog-agent/comp/def" - eventplatform "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/def" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" rcclienttypes "github.com/DataDog/datadog-agent/comp/remote-config/rcclient/types" snmpscan "github.com/DataDog/datadog-agent/comp/snmpscan/def" ) @@ -30,7 +30,10 @@ type Provides struct { // NewComponent creates a new snmpscan component func NewComponent(reqs Requires) (Provides, error) { - forwarder := reqs.Demultiplexer.GetEventPlatformForwarder() + forwarder, err := reqs.Demultiplexer.GetEventPlatformForwarder() + if err != nil { + return Provides{}, err + } scanner := snmpScannerImpl{ log: reqs.Logger, epforwarder: forwarder, @@ -43,5 +46,5 @@ func NewComponent(reqs Requires) (Provides, error) { type snmpScannerImpl struct { log log.Component - epforwarder eventplatform.Component + epforwarder eventplatform.Forwarder } diff --git a/comp/snmptraps/forwarder/forwarderimpl/forwarder.go b/comp/snmptraps/forwarder/forwarderimpl/forwarder.go index c34d5809eaac4..52f0c5113078a 100644 --- a/comp/snmptraps/forwarder/forwarderimpl/forwarder.go +++ b/comp/snmptraps/forwarder/forwarderimpl/forwarder.go @@ -14,7 +14,7 @@ import ( "github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer" log "github.com/DataDog/datadog-agent/comp/core/log/def" - eventplatform "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/def" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" "github.com/DataDog/datadog-agent/comp/snmptraps/config" "github.com/DataDog/datadog-agent/comp/snmptraps/formatter" "github.com/DataDog/datadog-agent/comp/snmptraps/forwarder" diff --git a/comp/snmptraps/forwarder/forwarderimpl/forwarder_test.go b/comp/snmptraps/forwarder/forwarderimpl/forwarder_test.go index 1aa47f244e21b..8bf5df680100b 100644 --- a/comp/snmptraps/forwarder/forwarderimpl/forwarder_test.go +++ b/comp/snmptraps/forwarder/forwarderimpl/forwarder_test.go @@ -14,7 +14,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/fx" - eventplatform "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/def" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" "github.com/DataDog/datadog-agent/comp/snmptraps/config/configimpl" "github.com/DataDog/datadog-agent/comp/snmptraps/formatter" "github.com/DataDog/datadog-agent/comp/snmptraps/formatter/formatterimpl" diff --git a/comp/snmptraps/status/statusimpl/status.go b/comp/snmptraps/status/statusimpl/status.go index 01649bd2be17c..0fd411f2615ae 100644 --- a/comp/snmptraps/status/statusimpl/status.go +++ b/comp/snmptraps/status/statusimpl/status.go @@ -15,7 +15,7 @@ import ( "go.uber.org/fx" "github.com/DataDog/datadog-agent/comp/core/status" - eventplatform "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/def" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" trapsStatus "github.com/DataDog/datadog-agent/comp/snmptraps/status" "github.com/DataDog/datadog-agent/pkg/util/fxutil" ) diff --git a/comp/snmptraps/status/statusimpl/status_test.go b/comp/snmptraps/status/statusimpl/status_test.go index 342512a463097..ffa1fac2c5469 100644 --- a/comp/snmptraps/status/statusimpl/status_test.go +++ b/comp/snmptraps/status/statusimpl/status_test.go @@ -14,7 +14,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - eventplatform "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/def" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" _ "github.com/DataDog/datadog-agent/pkg/aggregator" ) diff --git a/pkg/aggregator/aggregator.go b/pkg/aggregator/aggregator.go index 96225d1fed599..e8ac4fd425d13 100644 --- a/pkg/aggregator/aggregator.go +++ b/pkg/aggregator/aggregator.go @@ -7,6 +7,7 @@ package aggregator import ( + "errors" "expvar" "fmt" "sync" @@ -14,7 +15,7 @@ import ( tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def" "github.com/DataDog/datadog-agent/comp/core/tagger/types" - eventplatform "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/def" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" haagent "github.com/DataDog/datadog-agent/comp/haagent/def" "github.com/DataDog/datadog-agent/pkg/aggregator/internal/tags" checkid "github.com/DataDog/datadog-agent/pkg/collector/check/id" @@ -409,8 +410,12 @@ func (agg *BufferedAggregator) GetBufferedChannels() (chan []*event.Event, chan } // GetEventPlatformForwarder returns a event platform forwarder -func (agg *BufferedAggregator) GetEventPlatformForwarder() eventplatform.Component { - return agg.eventPlatformForwarder +func (agg *BufferedAggregator) GetEventPlatformForwarder() (eventplatform.Forwarder, error) { + forwarder, found := agg.eventPlatformForwarder.Get() + if !found { + return nil, errors.New("event platform forwarder not initialized") + } + return forwarder, nil } func (agg *BufferedAggregator) registerSender(id checkid.ID) error { @@ -459,9 +464,13 @@ func (agg *BufferedAggregator) handleSenderBucket(checkBucket senderHistogramBuc } func (agg *BufferedAggregator) handleEventPlatformEvent(event senderEventPlatformEvent) error { + forwarder, found := agg.eventPlatformForwarder.Get() + if !found { + return errors.New("event platform forwarder not initialized") + } m := message.NewMessage(event.rawEvent, nil, "", 0) // eventPlatformForwarder is threadsafe so no locking needed here - return agg.eventPlatformForwarder.SendEventPlatformEvent(m, event.eventType) + return forwarder.SendEventPlatformEvent(m, event.eventType) } // addServiceCheck adds the service check to the slice of current service checks @@ -680,7 +689,11 @@ func (agg *BufferedAggregator) GetEvents() event.Events { // GetEventPlatformEvents grabs the event platform events from the queue and clears them. // Note that this works only if using the 'noop' event platform forwarder func (agg *BufferedAggregator) GetEventPlatformEvents() map[string][]*message.Message { - return agg.eventPlatformForwarder.Purge() + forwarder, found := agg.eventPlatformForwarder.Get() + if !found { + return nil + } + return forwarder.Purge() } func (agg *BufferedAggregator) sendEvents(start time.Time, events event.Events) { diff --git a/pkg/aggregator/aggregator_test.go b/pkg/aggregator/aggregator_test.go index 2e3f5dddc2682..a60e3055f2206 100644 --- a/pkg/aggregator/aggregator_test.go +++ b/pkg/aggregator/aggregator_test.go @@ -25,7 +25,7 @@ import ( taggerMock "github.com/DataDog/datadog-agent/comp/core/tagger/mock" "github.com/DataDog/datadog-agent/comp/core/tagger/types" "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder" - eventplatform "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/def" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" orchestratorforwarder "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator" haagentmock "github.com/DataDog/datadog-agent/comp/haagent/mock" compressionmock "github.com/DataDog/datadog-agent/comp/serializer/compression/fx-mock" diff --git a/pkg/aggregator/check_sampler_bench_test.go b/pkg/aggregator/check_sampler_bench_test.go index 9a562434c955b..1e3f5eae3fd8f 100644 --- a/pkg/aggregator/check_sampler_bench_test.go +++ b/pkg/aggregator/check_sampler_bench_test.go @@ -15,7 +15,8 @@ import ( log "github.com/DataDog/datadog-agent/comp/core/log/def" "github.com/DataDog/datadog-agent/comp/core/tagger/mock" "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder" - eventplatformock "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/mock" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/eventplatformimpl" haagentmock "github.com/DataDog/datadog-agent/comp/haagent/mock" //nolint:revive // TODO(AML) Fix revive linter @@ -51,8 +52,9 @@ func benchmarkAddBucket(bucketValue int64, b *testing.B) { options.DontStartForwarders = true sharedForwarder := forwarder.NewDefaultForwarder(pkgconfigsetup.Datadog(), deps.Log, forwarderOpts) orchestratorForwarder := optional.NewOption[defaultforwarder.Forwarder](defaultforwarder.NoopForwarder{}) + eventPlatformForwarder := optional.NewOptionPtr[eventplatform.Forwarder](eventplatformimpl.NewNoopEventPlatformForwarder(deps.Hostname)) haAgent := haagentmock.NewMockHaAgent() - demux := InitAndStartAgentDemultiplexer(deps.Log, sharedForwarder, &orchestratorForwarder, options, eventplatformock.NewMock(), haAgent, deps.Compressor, taggerComponent, "hostname") + demux := InitAndStartAgentDemultiplexer(deps.Log, sharedForwarder, &orchestratorForwarder, options, eventPlatformForwarder, haAgent, deps.Compressor, taggerComponent, "hostname") defer demux.Stop(true) checkSampler := newCheckSampler(1, true, true, 1000, tags.NewStore(true, "bench"), checkid.ID("hello:world:1234"), taggerComponent) diff --git a/pkg/aggregator/demultiplexer_agent.go b/pkg/aggregator/demultiplexer_agent.go index de52762a92294..a519383eea73e 100644 --- a/pkg/aggregator/demultiplexer_agent.go +++ b/pkg/aggregator/demultiplexer_agent.go @@ -15,7 +15,7 @@ import ( log "github.com/DataDog/datadog-agent/comp/core/log/def" tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def" forwarder "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder" - eventplatform "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/def" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" orchestratorforwarder "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator" haagent "github.com/DataDog/datadog-agent/comp/haagent/def" compression "github.com/DataDog/datadog-agent/comp/serializer/compression/def" @@ -39,7 +39,7 @@ type DemultiplexerWithAggregator interface { // AggregateCheckSample adds check sample sent by a check from one of the collectors into a check sampler pipeline. AggregateCheckSample(sample metrics.MetricSample) Options() AgentDemultiplexerOptions - GetEventPlatformForwarder() eventplatform.Component + GetEventPlatformForwarder() (eventplatform.Forwarder, error) GetEventsAndServiceChecksChannels() (chan []*event.Event, chan []*servicecheck.ServiceCheck) DumpDogstatsdContexts(io.Writer) error } @@ -471,7 +471,7 @@ func (d *AgentDemultiplexer) GetEventsAndServiceChecksChannels() (chan []*event. } // GetEventPlatformForwarder returns underlying events and service checks channels. -func (d *AgentDemultiplexer) GetEventPlatformForwarder() eventplatform.Component { +func (d *AgentDemultiplexer) GetEventPlatformForwarder() (eventplatform.Forwarder, error) { return d.aggregator.GetEventPlatformForwarder() } diff --git a/pkg/aggregator/demultiplexer_agent_test.go b/pkg/aggregator/demultiplexer_agent_test.go index 079c80c2e2ebd..ce59d5ea8943f 100644 --- a/pkg/aggregator/demultiplexer_agent_test.go +++ b/pkg/aggregator/demultiplexer_agent_test.go @@ -19,8 +19,8 @@ import ( tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def" "github.com/DataDog/datadog-agent/comp/core/tagger/mock" "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder" - eventplatform "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/def" - eventplatformmock "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/fx-mock" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/eventplatformimpl" orchestratorforwarder "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator" "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator/orchestratorimpl" haagent "github.com/DataDog/datadog-agent/comp/haagent/def" @@ -170,7 +170,7 @@ func createDemultiplexerAgentTestDeps(t *testing.T) DemultiplexerAgentTestDeps { defaultforwarder.MockModule(), core.MockBundle(), orchestratorimpl.MockModule(), - eventplatformmock.MockModule(), + eventplatformimpl.MockModule(), haagentmock.Module(), compressionmock.MockModule(), fx.Provide(func() tagger.Component { return taggerComponent }), diff --git a/pkg/aggregator/demultiplexer_mock.go b/pkg/aggregator/demultiplexer_mock.go index 84108c6cd9b92..6d79e009338b3 100644 --- a/pkg/aggregator/demultiplexer_mock.go +++ b/pkg/aggregator/demultiplexer_mock.go @@ -14,7 +14,8 @@ import ( log "github.com/DataDog/datadog-agent/comp/core/log/def" nooptagger "github.com/DataDog/datadog-agent/comp/core/tagger/impl-noop" "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder" - eventplatformock "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/mock" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/eventplatformimpl" haagent "github.com/DataDog/datadog-agent/comp/haagent/def" compression "github.com/DataDog/datadog-agent/comp/serializer/compression/def" "github.com/DataDog/datadog-agent/pkg/util/optional" @@ -33,5 +34,6 @@ type TestDeps struct { // InitAndStartAgentDemultiplexerForTest initializes an aggregator for tests. func InitAndStartAgentDemultiplexerForTest(deps TestDeps, options AgentDemultiplexerOptions, hostname string) *AgentDemultiplexer { orchestratorForwarder := optional.NewOption[defaultforwarder.Forwarder](defaultforwarder.NoopForwarder{}) - return InitAndStartAgentDemultiplexer(deps.Log, deps.SharedForwarder, &orchestratorForwarder, options, eventplatformock.NewMock(), deps.HaAgent, deps.Compressor, nooptagger.NewComponent(), hostname) + eventPlatformForwarder := optional.NewOptionPtr[eventplatform.Forwarder](eventplatformimpl.NewNoopEventPlatformForwarder(deps.Hostname)) + return InitAndStartAgentDemultiplexer(deps.Log, deps.SharedForwarder, &orchestratorForwarder, options, eventPlatformForwarder, deps.HaAgent, deps.Compressor, nooptagger.NewComponent(), hostname) } diff --git a/pkg/aggregator/demultiplexer_test.go b/pkg/aggregator/demultiplexer_test.go index 5173b61f3ef15..72ed3464eb346 100644 --- a/pkg/aggregator/demultiplexer_test.go +++ b/pkg/aggregator/demultiplexer_test.go @@ -16,8 +16,8 @@ import ( "github.com/DataDog/datadog-agent/comp/core" nooptagger "github.com/DataDog/datadog-agent/comp/core/tagger/impl-noop" "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder" - eventplatform "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/def" - eventplatformfx "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/fx" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/eventplatformimpl" "github.com/DataDog/datadog-agent/comp/forwarder/eventplatformreceiver/eventplatformreceiverimpl" orchestratorForwarder "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator" orchestratorForwarderImpl "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator/orchestratorimpl" @@ -43,7 +43,7 @@ func TestDemuxIsSetAsGlobalInstance(t *testing.T) { require := require.New(t) opts := demuxTestOptions() - deps := createDemuxDeps(t, opts) + deps := createDemuxDeps(t, opts, eventplatformimpl.NewDefaultParams()) demux := deps.Demultiplexer require.NotNil(demux) @@ -59,12 +59,39 @@ func TestDemuxForwardersCreated(t *testing.T) { opts := demuxTestOptions() - deps := createDemuxDeps(t, opts) + deps := createDemuxDeps(t, opts, eventplatformimpl.NewDefaultParams()) demux := deps.Demultiplexer require.NotNil(demux) - require.NotNil(deps.EventPlatformFwd) - _, found := deps.OrchestratorFwd.Get() + _, found := deps.EventPlatformFwd.Get() + require.True(found) + _, found = deps.OrchestratorFwd.Get() + require.Equal(orchestratorForwarderSupport, found) + require.NotNil(deps.SharedForwarder) + demux.Stop(false) + + // options no event platform forwarder + + opts = demuxTestOptions() + deps = createDemuxDeps(t, opts, eventplatformimpl.Params{UseEventPlatformForwarder: false}) + demux = deps.Demultiplexer + require.NotNil(demux) + _, found = deps.EventPlatformFwd.Get() + require.False(found) + _, found = deps.OrchestratorFwd.Get() + require.Equal(orchestratorForwarderSupport, found) + require.NotNil(deps.SharedForwarder) + demux.Stop(false) + + // options noop event platform forwarder + + opts = demuxTestOptions() + deps = createDemuxDeps(t, opts, eventplatformimpl.Params{UseNoopEventPlatformForwarder: true}) + demux = deps.Demultiplexer + require.NotNil(demux) + _, found = deps.EventPlatformFwd.Get() + require.True(found) + _, found = deps.OrchestratorFwd.Get() require.Equal(orchestratorForwarderSupport, found) require.NotNil(deps.SharedForwarder) demux.Stop(false) @@ -87,10 +114,11 @@ func TestDemuxForwardersCreated(t *testing.T) { // needed feature above, we should have an orchestrator forwarder instantiated now opts = demuxTestOptions() - deps = createDemuxDeps(t, opts) + deps = createDemuxDeps(t, opts, eventplatformimpl.NewDefaultParams()) demux = deps.Demultiplexer require.NotNil(demux) - require.NotNil(deps.EventPlatformFwd) + _, found = deps.EventPlatformFwd.Get() + require.True(found) require.NotNil(deps.SharedForwarder) demux.Stop(false) @@ -98,10 +126,11 @@ func TestDemuxForwardersCreated(t *testing.T) { opts = demuxTestOptions() params := orchestratorForwarderImpl.NewDisabledParams() - deps = createDemuxDepsWithOrchestratorFwd(t, opts, params) + deps = createDemuxDepsWithOrchestratorFwd(t, opts, params, eventplatformimpl.NewDefaultParams()) demux = deps.Demultiplexer require.NotNil(demux) - require.NotNil(deps.EventPlatformFwd) + _, found = deps.EventPlatformFwd.Get() + require.True(found) _, found = deps.OrchestratorFwd.Get() require.False(found) require.NotNil(deps.SharedForwarder) @@ -111,10 +140,11 @@ func TestDemuxForwardersCreated(t *testing.T) { opts = demuxTestOptions() params = orchestratorForwarderImpl.NewNoopParams() - deps = createDemuxDepsWithOrchestratorFwd(t, opts, params) + deps = createDemuxDepsWithOrchestratorFwd(t, opts, params, eventplatformimpl.NewDefaultParams()) demux = deps.Demultiplexer require.NotNil(demux) - require.NotNil(deps.EventPlatformFwd) + _, found = deps.EventPlatformFwd.Get() + require.True(found) _, found = deps.OrchestratorFwd.Get() require.True(found) require.NotNil(deps.SharedForwarder) @@ -127,7 +157,7 @@ func TestDemuxSerializerCreated(t *testing.T) { // default options should have created all forwarders opts := demuxTestOptions() - deps := createDemuxDeps(t, opts) + deps := createDemuxDeps(t, opts, eventplatformimpl.NewDefaultParams()) demux := deps.Demultiplexer require.NotNil(demux) @@ -143,7 +173,7 @@ func TestDemuxFlushAggregatorToSerializer(t *testing.T) { opts := demuxTestOptions() opts.FlushInterval = time.Hour - deps := createDemuxDeps(t, opts) + deps := createDemuxDeps(t, opts, eventplatformimpl.NewDefaultParams()) demux := initAgentDemultiplexer(deps.Log, deps.SharedForwarder, deps.OrchestratorFwd, opts, deps.EventPlatformFwd, deps.HaAgent, deps.Compressor, nooptagger.NewComponent(), "") demux.Aggregator().tlmContainerTagsEnabled = false require.NotNil(demux) @@ -243,8 +273,8 @@ func TestGetDogStatsDWorkerAndPipelineCount(t *testing.T) { assert.Equal(4, pipelines) } -func createDemuxDeps(t *testing.T, opts AgentDemultiplexerOptions) aggregatorDeps { - return createDemuxDepsWithOrchestratorFwd(t, opts, orchestratorForwarderImpl.NewDefaultParams()) +func createDemuxDeps(t *testing.T, opts AgentDemultiplexerOptions, eventPlatformParams eventplatformimpl.Params) aggregatorDeps { + return createDemuxDepsWithOrchestratorFwd(t, opts, orchestratorForwarderImpl.NewDefaultParams(), eventPlatformParams) } type internalDemutiplexerDeps struct { @@ -258,12 +288,12 @@ func createDemuxDepsWithOrchestratorFwd( t *testing.T, opts AgentDemultiplexerOptions, orchestratorParams orchestratorForwarderImpl.Params, -) aggregatorDeps { + eventPlatformParams eventplatformimpl.Params) aggregatorDeps { modules := fx.Options( defaultforwarder.MockModule(), core.MockBundle(), orchestratorForwarderImpl.Module(orchestratorParams), - eventplatformfx.Module(), + eventplatformimpl.Module(eventPlatformParams), eventplatformreceiverimpl.Module(), compressionmock.MockModule(), haagentmock.Module(), diff --git a/pkg/aggregator/mocksender/mocksender.go b/pkg/aggregator/mocksender/mocksender.go index 340298442247f..d20e0a3d19081 100644 --- a/pkg/aggregator/mocksender/mocksender.go +++ b/pkg/aggregator/mocksender/mocksender.go @@ -12,10 +12,13 @@ import ( "github.com/stretchr/testify/mock" + "github.com/DataDog/datadog-agent/comp/core/hostname/hostnameimpl" logimpl "github.com/DataDog/datadog-agent/comp/core/log/impl" - nooptagger "github.com/DataDog/datadog-agent/comp/core/tagger/impl-noop" "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder" - eventplatformock "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/mock" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/eventplatformimpl" + + nooptagger "github.com/DataDog/datadog-agent/comp/core/tagger/impl-noop" haagentmock "github.com/DataDog/datadog-agent/comp/haagent/mock" compressionmock "github.com/DataDog/datadog-agent/comp/serializer/compression/fx-mock" "github.com/DataDog/datadog-agent/pkg/aggregator" @@ -39,8 +42,9 @@ func CreateDefaultDemultiplexer() *aggregator.AgentDemultiplexer { log := logimpl.NewTemporaryLoggerWithoutInit() sharedForwarder := defaultforwarder.NewDefaultForwarder(pkgconfigsetup.Datadog(), log, defaultforwarder.NewOptions(pkgconfigsetup.Datadog(), log, nil)) orchestratorForwarder := optional.NewOption[defaultforwarder.Forwarder](defaultforwarder.NoopForwarder{}) + eventPlatformForwarder := optional.NewOptionPtr[eventplatform.Forwarder](eventplatformimpl.NewNoopEventPlatformForwarder(hostnameimpl.NewHostnameService())) taggerComponent := nooptagger.NewComponent() - return aggregator.InitAndStartAgentDemultiplexer(log, sharedForwarder, &orchestratorForwarder, opts, eventplatformock.NewMock(), haagentmock.NewMockHaAgent(), compressionmock.NewMockCompressor(), taggerComponent, "") + return aggregator.InitAndStartAgentDemultiplexer(log, sharedForwarder, &orchestratorForwarder, opts, eventPlatformForwarder, haagentmock.NewMockHaAgent(), compressionmock.NewMockCompressor(), taggerComponent, "") } // NewMockSenderWithSenderManager returns a functional mocked Sender for testing diff --git a/pkg/aggregator/sender_test.go b/pkg/aggregator/sender_test.go index ae791c41b02d1..994cc96969d1c 100644 --- a/pkg/aggregator/sender_test.go +++ b/pkg/aggregator/sender_test.go @@ -21,7 +21,8 @@ import ( log "github.com/DataDog/datadog-agent/comp/core/log/def" nooptagger "github.com/DataDog/datadog-agent/comp/core/tagger/impl-noop" "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder" - eventplatformock "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/mock" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/eventplatformimpl" haagentmock "github.com/DataDog/datadog-agent/comp/haagent/mock" compressionmock "github.com/DataDog/datadog-agent/comp/serializer/compression/fx-mock" checkid "github.com/DataDog/datadog-agent/pkg/collector/check/id" @@ -53,11 +54,12 @@ func initSender(id checkid.ID, defaultHostname string) (s senderWithChans) { return s } -func testDemux(log log.Component) *AgentDemultiplexer { +func testDemux(log log.Component, hostname hostname.Component) *AgentDemultiplexer { opts := DefaultAgentDemultiplexerOptions() opts.DontStartForwarders = true orchestratorForwarder := optional.NewOption[defaultforwarder.Forwarder](defaultforwarder.NoopForwarder{}) - demux := initAgentDemultiplexer(log, NewForwarderTest(log), &orchestratorForwarder, opts, eventplatformock.NewMock(), haagentmock.NewMockHaAgent(), compressionmock.NewMockCompressor(), nooptagger.NewComponent(), defaultHostname) + eventPlatformForwarder := optional.NewOptionPtr[eventplatform.Forwarder](eventplatformimpl.NewNoopEventPlatformForwarder(hostname)) + demux := initAgentDemultiplexer(log, NewForwarderTest(log), &orchestratorForwarder, opts, eventPlatformForwarder, haagentmock.NewMockHaAgent(), compressionmock.NewMockCompressor(), nooptagger.NewComponent(), defaultHostname) return demux } @@ -84,7 +86,7 @@ func TestGetDefaultSenderReturnsSameSender(t *testing.T) { // this test not using anything global // - deps := fxutil.Test[SenderTestDeps](t, core.MockBundle()) - demux := testDemux(deps.Log) + demux := testDemux(deps.Log, deps.Hostname) aggregatorInstance := demux.Aggregator() go aggregatorInstance.run() defer aggregatorInstance.Stop() @@ -104,7 +106,7 @@ func TestGetSenderWithDifferentIDsReturnsDifferentCheckSamplers(t *testing.T) { // this test not using anything global // - deps := fxutil.Test[SenderTestDeps](t, core.MockBundle()) - demux := testDemux(deps.Log) + demux := testDemux(deps.Log, deps.Hostname) aggregatorInstance := demux.Aggregator() go aggregatorInstance.run() @@ -134,7 +136,7 @@ func TestGetSenderWithSameIDsReturnsSameSender(t *testing.T) { // - deps := fxutil.Test[SenderTestDeps](t, core.MockBundle()) - demux := testDemux(deps.Log) + demux := testDemux(deps.Log, deps.Hostname) aggregatorInstance := demux.Aggregator() go aggregatorInstance.run() defer aggregatorInstance.Stop() @@ -157,7 +159,7 @@ func TestDestroySender(t *testing.T) { // - deps := fxutil.Test[SenderTestDeps](t, core.MockBundle()) - demux := testDemux(deps.Log) + demux := testDemux(deps.Log, deps.Hostname) aggregatorInstance := demux.Aggregator() go aggregatorInstance.run() defer aggregatorInstance.Stop() @@ -187,7 +189,7 @@ func TestGetAndSetSender(t *testing.T) { // - deps := fxutil.Test[SenderTestDeps](t, core.MockBundle()) - demux := testDemux(deps.Log) + demux := testDemux(deps.Log, deps.Hostname) itemChan := make(chan senderItem, 10) serviceCheckChan := make(chan servicecheck.ServiceCheck, 10) @@ -210,7 +212,7 @@ func TestGetSenderDefaultHostname(t *testing.T) { // - deps := fxutil.Test[SenderTestDeps](t, core.MockBundle()) - demux := testDemux(deps.Log) + demux := testDemux(deps.Log, deps.Hostname) aggregatorInstance := demux.Aggregator() go aggregatorInstance.run() diff --git a/pkg/cli/subcommands/check/command.go b/pkg/cli/subcommands/check/command.go index 8c98dcd853086..d914682340504 100644 --- a/pkg/cli/subcommands/check/command.go +++ b/pkg/cli/subcommands/check/command.go @@ -58,7 +58,7 @@ import ( "github.com/DataDog/datadog-agent/comp/dogstatsd/server" "github.com/DataDog/datadog-agent/comp/forwarder" "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder" - eventplatformnoopfx "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/fx-noop" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/eventplatformimpl" "github.com/DataDog/datadog-agent/comp/forwarder/eventplatformreceiver/eventplatformreceiverimpl" orchestratorForwarderImpl "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator/orchestratorimpl" haagentfx "github.com/DataDog/datadog-agent/comp/haagent/fx" @@ -155,6 +155,9 @@ func MakeCommand(globalParamsGetter func() GlobalParams) *cobra.Command { cliParams.cmd = cmd cliParams.args = args + eventplatforParams := eventplatformimpl.NewDefaultParams() + eventplatforParams.UseNoopEventPlatformForwarder = true + disableCmdPort() return fxutil.OneShot(run, fx.Supply(cliParams), @@ -184,7 +187,7 @@ func MakeCommand(globalParamsGetter func() GlobalParams) *cobra.Command { // Initializing the aggregator with a flush interval of 0 (to disable the flush goroutines) demultiplexerimpl.Module(demultiplexerimpl.NewDefaultParams(demultiplexerimpl.WithFlushInterval(0))), orchestratorForwarderImpl.Module(orchestratorForwarderImpl.NewNoopParams()), - eventplatformnoopfx.Module(), + eventplatformimpl.Module(eventplatforParams), eventplatformreceiverimpl.Module(), fx.Supply( status.Params{ diff --git a/pkg/collector/corechecks/containerimage/processor.go b/pkg/collector/corechecks/containerimage/processor.go index d1f9161975fd9..d34bd492b1cb1 100644 --- a/pkg/collector/corechecks/containerimage/processor.go +++ b/pkg/collector/corechecks/containerimage/processor.go @@ -13,7 +13,7 @@ import ( tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def" "github.com/DataDog/datadog-agent/comp/core/tagger/types" workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def" - eventplatform "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/def" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" "github.com/DataDog/datadog-agent/pkg/aggregator/sender" queue "github.com/DataDog/datadog-agent/pkg/util/aggregatingqueue" "github.com/DataDog/datadog-agent/pkg/util/hostname" diff --git a/pkg/collector/corechecks/containerimage/processor_test.go b/pkg/collector/corechecks/containerimage/processor_test.go index ec4ac078d8a26..b6c89960d30fc 100644 --- a/pkg/collector/corechecks/containerimage/processor_test.go +++ b/pkg/collector/corechecks/containerimage/processor_test.go @@ -20,7 +20,7 @@ import ( taggerMock "github.com/DataDog/datadog-agent/comp/core/tagger/mock" workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def" - eventplatform "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/def" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" "github.com/DataDog/datadog-agent/pkg/aggregator/mocksender" "github.com/DataDog/datadog-agent/pkg/util/hostname" "github.com/DataDog/datadog-agent/pkg/util/pointer" diff --git a/pkg/collector/corechecks/containerlifecycle/processor.go b/pkg/collector/corechecks/containerlifecycle/processor.go index 113819babbf2d..2f514329c091c 100644 --- a/pkg/collector/corechecks/containerlifecycle/processor.go +++ b/pkg/collector/corechecks/containerlifecycle/processor.go @@ -13,7 +13,7 @@ import ( "github.com/DataDog/agent-payload/v5/contlcycle" workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def" - eventplatform "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/def" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" "github.com/DataDog/datadog-agent/pkg/aggregator/sender" types "github.com/DataDog/datadog-agent/pkg/containerlifecycle" "github.com/DataDog/datadog-agent/pkg/util/log" diff --git a/pkg/collector/corechecks/network-devices/cisco-sdwan/report/metadata.go b/pkg/collector/corechecks/network-devices/cisco-sdwan/report/metadata.go index bb86c815ff918..4de17c9626f00 100644 --- a/pkg/collector/corechecks/network-devices/cisco-sdwan/report/metadata.go +++ b/pkg/collector/corechecks/network-devices/cisco-sdwan/report/metadata.go @@ -9,7 +9,7 @@ import ( "encoding/json" "time" - eventplatform "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/def" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" devicemetadata "github.com/DataDog/datadog-agent/pkg/networkdevice/metadata" "github.com/DataDog/datadog-agent/pkg/util/log" ) diff --git a/pkg/collector/corechecks/networkpath/networkpath.go b/pkg/collector/corechecks/networkpath/networkpath.go index 3b48ef65235e0..28bef69f2bbca 100644 --- a/pkg/collector/corechecks/networkpath/networkpath.go +++ b/pkg/collector/corechecks/networkpath/networkpath.go @@ -15,7 +15,7 @@ import ( "github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration" telemetryComp "github.com/DataDog/datadog-agent/comp/core/telemetry" - eventplatform "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/def" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" "github.com/DataDog/datadog-agent/pkg/aggregator/sender" "github.com/DataDog/datadog-agent/pkg/collector/check" core "github.com/DataDog/datadog-agent/pkg/collector/corechecks" diff --git a/pkg/collector/corechecks/sbom/processor.go b/pkg/collector/corechecks/sbom/processor.go index b612aebbc6d52..77c1e54a796e4 100644 --- a/pkg/collector/corechecks/sbom/processor.go +++ b/pkg/collector/corechecks/sbom/processor.go @@ -16,7 +16,7 @@ import ( tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def" "github.com/DataDog/datadog-agent/comp/core/tagger/types" workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def" - eventplatform "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/def" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" "github.com/DataDog/datadog-agent/pkg/aggregator/sender" pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup" diff --git a/pkg/collector/corechecks/sbom/processor_test.go b/pkg/collector/corechecks/sbom/processor_test.go index bea2b359d42c4..4fcc1c3ba770e 100644 --- a/pkg/collector/corechecks/sbom/processor_test.go +++ b/pkg/collector/corechecks/sbom/processor_test.go @@ -31,7 +31,7 @@ import ( workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def" workloadmetafxmock "github.com/DataDog/datadog-agent/comp/core/workloadmeta/fx-mock" workloadmetamock "github.com/DataDog/datadog-agent/comp/core/workloadmeta/mock" - eventplatform "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/def" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" "github.com/DataDog/datadog-agent/pkg/aggregator/mocksender" configmock "github.com/DataDog/datadog-agent/pkg/config/mock" sbomscanner "github.com/DataDog/datadog-agent/pkg/sbom/scanner" diff --git a/pkg/collector/corechecks/servicediscovery/events.go b/pkg/collector/corechecks/servicediscovery/events.go index b5e136f8cf23a..5cba3f89243ac 100644 --- a/pkg/collector/corechecks/servicediscovery/events.go +++ b/pkg/collector/corechecks/servicediscovery/events.go @@ -11,7 +11,7 @@ import ( "github.com/DataDog/datadog-agent/comp/core/hostname" "github.com/DataDog/datadog-agent/comp/core/hostname/hostnameimpl" - eventplatform "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/def" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" "github.com/DataDog/datadog-agent/pkg/aggregator/sender" pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup" "github.com/DataDog/datadog-agent/pkg/util/log" diff --git a/pkg/collector/corechecks/snmp/internal/report/report_device_metadata.go b/pkg/collector/corechecks/snmp/internal/report/report_device_metadata.go index 76d6f55117c26..ff8d90912efb3 100644 --- a/pkg/collector/corechecks/snmp/internal/report/report_device_metadata.go +++ b/pkg/collector/corechecks/snmp/internal/report/report_device_metadata.go @@ -13,7 +13,7 @@ import ( "strings" "time" - eventplatform "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/def" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" "github.com/DataDog/datadog-agent/pkg/util" "github.com/DataDog/datadog-agent/pkg/util/log" diff --git a/pkg/diagnose/runner.go b/pkg/diagnose/runner.go index 303a520ddde4b..5d7f2ee88d1db 100644 --- a/pkg/diagnose/runner.go +++ b/pkg/diagnose/runner.go @@ -20,7 +20,7 @@ import ( "github.com/DataDog/datadog-agent/comp/core/secrets" tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def" workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def" - eventplatform "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/impl" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/eventplatformimpl" integrations "github.com/DataDog/datadog-agent/comp/logs/integrations/def" "github.com/DataDog/datadog-agent/pkg/aggregator/sender" "github.com/DataDog/datadog-agent/pkg/api/util" @@ -524,7 +524,7 @@ func RegisterConnectivityAutodiscovery(catalog *diagnosis.Catalog) { // RegisterConnectivityDatadogEventPlatform registers the connectivity-datadog-event-platform diagnose suite. func RegisterConnectivityDatadogEventPlatform(catalog *diagnosis.Catalog) { - catalog.Register("connectivity-datadog-event-platform", eventplatform.Diagnose) + catalog.Register("connectivity-datadog-event-platform", eventplatformimpl.Diagnose) } // RegisterPortConflict registers the port-conflict diagnose suite. diff --git a/pkg/serverless/invocationlifecycle/lifecycle_test.go b/pkg/serverless/invocationlifecycle/lifecycle_test.go index 7c4e90a02a7ab..3eefd8dc51bbd 100644 --- a/pkg/serverless/invocationlifecycle/lifecycle_test.go +++ b/pkg/serverless/invocationlifecycle/lifecycle_test.go @@ -15,6 +15,7 @@ import ( "github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer" "github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer/demultiplexerimpl" + "github.com/DataDog/datadog-agent/comp/core/hostname/hostnameimpl" log "github.com/DataDog/datadog-agent/comp/core/log/def" logmock "github.com/DataDog/datadog-agent/comp/core/log/mock" compressionmock "github.com/DataDog/datadog-agent/comp/serializer/compression/fx-mock" @@ -1382,5 +1383,5 @@ func getEventFromFile(filename string) []byte { } func createDemultiplexer(t *testing.T) demultiplexer.FakeSamplerMock { - return fxutil.Test[demultiplexer.FakeSamplerMock](t, fx.Provide(func() log.Component { return logmock.New(t) }), compressionmock.MockModule(), demultiplexerimpl.FakeSamplerMockModule()) + return fxutil.Test[demultiplexer.FakeSamplerMock](t, fx.Provide(func() log.Component { return logmock.New(t) }), compressionmock.MockModule(), demultiplexerimpl.FakeSamplerMockModule(), hostnameimpl.MockModule()) } diff --git a/pkg/serverless/logs/logs_test.go b/pkg/serverless/logs/logs_test.go index 95b14e1f8d271..6a8f0d30d3cb0 100644 --- a/pkg/serverless/logs/logs_test.go +++ b/pkg/serverless/logs/logs_test.go @@ -22,6 +22,7 @@ import ( "github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer" "github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer/demultiplexerimpl" + "github.com/DataDog/datadog-agent/comp/core/hostname/hostnameimpl" log "github.com/DataDog/datadog-agent/comp/core/log/def" logmock "github.com/DataDog/datadog-agent/comp/core/log/mock" "github.com/DataDog/datadog-agent/comp/logs/agent/config" @@ -1472,5 +1473,5 @@ func TestMultipleStartLogCollection(t *testing.T) { } func createDemultiplexer(t *testing.T) demultiplexer.FakeSamplerMock { - return fxutil.Test[demultiplexer.FakeSamplerMock](t, fx.Provide(func() log.Component { return logmock.New(t) }), compressionmock.MockModule(), demultiplexerimpl.FakeSamplerMockModule()) + return fxutil.Test[demultiplexer.FakeSamplerMock](t, fx.Provide(func() log.Component { return logmock.New(t) }), compressionmock.MockModule(), demultiplexerimpl.FakeSamplerMockModule(), hostnameimpl.MockModule()) } diff --git a/pkg/serverless/metrics/enhanced_metrics_test.go b/pkg/serverless/metrics/enhanced_metrics_test.go index 4de32deeef01d..ea6619861f726 100644 --- a/pkg/serverless/metrics/enhanced_metrics_test.go +++ b/pkg/serverless/metrics/enhanced_metrics_test.go @@ -17,6 +17,7 @@ import ( "github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer" "github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer/demultiplexerimpl" + "github.com/DataDog/datadog-agent/comp/core/hostname/hostnameimpl" log "github.com/DataDog/datadog-agent/comp/core/log/def" logmock "github.com/DataDog/datadog-agent/comp/core/log/mock" "github.com/DataDog/datadog-agent/pkg/metrics" @@ -833,5 +834,5 @@ func TestSendFailoverReasonMetric(t *testing.T) { } func createDemultiplexer(t *testing.T) demultiplexer.FakeSamplerMock { - return fxutil.Test[demultiplexer.FakeSamplerMock](t, fx.Provide(func() log.Component { return logmock.New(t) }), compressionmock.MockModule(), demultiplexerimpl.FakeSamplerMockModule()) + return fxutil.Test[demultiplexer.FakeSamplerMock](t, fx.Provide(func() log.Component { return logmock.New(t) }), compressionmock.MockModule(), demultiplexerimpl.FakeSamplerMockModule(), hostnameimpl.MockModule()) } diff --git a/tasks/components.py b/tasks/components.py index f80c62b00573a..c534d63c56b83 100644 --- a/tasks/components.py +++ b/tasks/components.py @@ -109,6 +109,7 @@ def has_type_component(content) -> bool: 'comp/dogstatsd/serverDebug/serverdebugimpl', 'comp/dogstatsd/status/statusimpl', 'comp/etw/impl', + 'comp/forwarder/eventplatform/eventplatformimpl', 'comp/forwarder/eventplatformreceiver/eventplatformreceiverimpl', 'comp/forwarder/orchestrator/orchestratorimpl', 'comp/languagedetection/client/clientimpl',