From b4093b6de0f3feff7a41c347d74e1366ac1dafca Mon Sep 17 00:00:00 2001 From: "Duong (Yoon)" <47346352+DDuongNguyen@users.noreply.github.com> Date: Wed, 28 Feb 2024 15:39:58 -0500 Subject: [PATCH] Removing start method in favor of on start hook in dogstatsd server component (#22584) Removing start method in favor of on start hook in dogstatsd server component Co-authored-by: gh123man --- cmd/agent/subcommands/run/command.go | 22 +---- cmd/agent/subcommands/run/command_windows.go | 2 +- .../settings/runtime_settings_test.go | 8 +- cmd/dogstatsd/subcommands/start/command.go | 7 -- .../demultiplexerimpl/demultiplexer.go | 2 + .../demultiplexer_fake_sampler_mock.go | 6 +- .../demultiplexerimpl/demultiplexer_mock.go | 6 +- comp/dogstatsd/server/component.go | 8 -- comp/dogstatsd/server/server.go | 50 ++++++---- comp/dogstatsd/server/server_bench_test.go | 7 +- comp/dogstatsd/server/server_test.go | 97 +++++++------------ comp/dogstatsd/server/serverless.go | 41 ++++++++ pkg/serverless/metrics/metric.go | 9 +- pkg/serverless/metrics/metric_test.go | 5 +- 14 files changed, 135 insertions(+), 135 deletions(-) create mode 100644 comp/dogstatsd/server/serverless.go diff --git a/cmd/agent/subcommands/run/command.go b/cmd/agent/subcommands/run/command.go index b0da4d42ca8a1..7add71abbb5e4 100644 --- a/cmd/agent/subcommands/run/command.go +++ b/cmd/agent/subcommands/run/command.go @@ -214,7 +214,7 @@ func run(log log.Component, collector collector.Component, ) error { defer func() { - stopAgent(cliParams, server, agentAPI) + stopAgent(cliParams, agentAPI) }() // prepare go runtime @@ -332,7 +332,7 @@ func getSharedFxOption() fx.Option { statusimpl.Module(), authtokenimpl.Module(), apiimpl.Module(), - + demultiplexerimpl.Module(), dogstatsd.Bundle(), otelcol.Bundle(), rctelemetryreporterimpl.Module(), @@ -368,7 +368,6 @@ func getSharedFxOption() fx.Option { params.EnableNoAggregationPipeline = config.GetBool("dogstatsd_no_aggregation_pipeline") return params }), - demultiplexerimpl.Module(), orchestratorForwarderImpl.Module(), fx.Supply(orchestratorForwarderImpl.NewDefaultParams()), eventplatformimpl.Module(), @@ -596,16 +595,6 @@ func startAgent( demultiplexer.AddAgentStartupTelemetry(version.AgentVersion) - // start dogstatsd - if pkgconfig.Datadog.GetBool("use_dogstatsd") { - err := server.Start(demultiplexer) - if err != nil { - log.Errorf("Could not start dogstatsd: %s", err) - } else { - log.Debugf("dogstatsd started") - } - } - // load and run all configs in AD common.AC.LoadAndRun(ctx) @@ -634,12 +623,12 @@ func startAgent( } // StopAgentWithDefaults is a temporary way for other packages to use stopAgent. -func StopAgentWithDefaults(server dogstatsdServer.Component, agentAPI internalAPI.Component) { - stopAgent(&cliParams{GlobalParams: &command.GlobalParams{}}, server, agentAPI) +func StopAgentWithDefaults(agentAPI internalAPI.Component) { + stopAgent(&cliParams{GlobalParams: &command.GlobalParams{}}, agentAPI) } // stopAgent Tears down the agent process -func stopAgent(cliParams *cliParams, server dogstatsdServer.Component, agentAPI internalAPI.Component) { +func stopAgent(cliParams *cliParams, agentAPI internalAPI.Component) { // retrieve the agent health before stopping the components // GetReadyNonBlocking has a 100ms timeout to avoid blocking health, err := health.GetReadyNonBlocking() @@ -654,7 +643,6 @@ func stopAgent(cliParams *cliParams, server dogstatsdServer.Component, agentAPI pkglog.Errorf("Error shutting down expvar server: %v", err) } } - server.Stop() if common.AC != nil { common.AC.Stop() } diff --git a/cmd/agent/subcommands/run/command_windows.go b/cmd/agent/subcommands/run/command_windows.go index a54f6f8c5ae21..c95d246588155 100644 --- a/cmd/agent/subcommands/run/command_windows.go +++ b/cmd/agent/subcommands/run/command_windows.go @@ -112,7 +112,7 @@ func StartAgentWithDefaults(ctxChan <-chan context.Context) (<-chan error, error collector collector.Component, ) error { - defer StopAgentWithDefaults(server, agentAPI) + defer StopAgentWithDefaults(agentAPI) err := startAgent( &cliParams{GlobalParams: &command.GlobalParams{}}, diff --git a/cmd/agent/subcommands/run/internal/settings/runtime_settings_test.go b/cmd/agent/subcommands/run/internal/settings/runtime_settings_test.go index 84965c9dcfdb3..8373d477dad60 100644 --- a/cmd/agent/subcommands/run/internal/settings/runtime_settings_test.go +++ b/cmd/agent/subcommands/run/internal/settings/runtime_settings_test.go @@ -9,7 +9,6 @@ import ( "testing" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" "go.uber.org/fx" "github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer" @@ -41,16 +40,11 @@ func TestDogstatsdMetricsStats(t *testing.T) { fx.Supply(server.Params{ Serverless: false, }), + demultiplexerimpl.MockModule(), dogstatsd.Bundle(), defaultforwarder.MockModule(), - demultiplexerimpl.MockModule(), )) - demux := deps.Demultiplexer - deps.Server.Start(demux) - - require.Nil(t, err) - s := DsdStatsRuntimeSetting{ ServerDebug: deps.Debug, } diff --git a/cmd/dogstatsd/subcommands/start/command.go b/cmd/dogstatsd/subcommands/start/command.go index e6aa500dbbf6d..f829a9d02f47c 100644 --- a/cmd/dogstatsd/subcommands/start/command.go +++ b/cmd/dogstatsd/subcommands/start/command.go @@ -276,11 +276,6 @@ func RunDogstatsd(ctx context.Context, cliParams *CLIParams, config config.Compo demultiplexer.AddAgentStartupTelemetry(version.AgentVersion) - err = components.DogstatsdServer.Start(demultiplexer) - if err != nil { - log.Criticalf("Unable to start dogstatsd: %s", err) - return - } return } @@ -325,8 +320,6 @@ func StopAgent(cancel context.CancelFunc, components *DogstatsdComponents) { } } - components.DogstatsdServer.Stop() - pkglog.Info("See ya!") pkglog.Flush() } diff --git a/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer.go b/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer.go index afc23b43c6f06..c063bcdd338ba 100644 --- a/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer.go +++ b/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer.go @@ -59,6 +59,7 @@ type provides struct { DiagnosticSenderManager diagnosesendermanager.Component SenderManager sender.SenderManager StatusProvider status.InformationProvider + AggregatorDemultiplexer aggregator.Demultiplexer } func newDemultiplexer(deps dependencies) (provides, error) { @@ -94,6 +95,7 @@ func newDemultiplexer(deps dependencies) (provides, error) { StatusProvider: status.NewInformationProvider(demultiplexerStatus{ Log: deps.Log, }), + AggregatorDemultiplexer: demultiplexer, }, nil } diff --git a/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer_fake_sampler_mock.go b/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer_fake_sampler_mock.go index 490e34ee5212f..5f934bf3304b7 100644 --- a/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer_fake_sampler_mock.go +++ b/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer_fake_sampler_mock.go @@ -22,7 +22,11 @@ import ( // FakeSamplerMockModule defines the fx options for FakeSamplerMock. func FakeSamplerMockModule() fxutil.Module { return fxutil.Component( - fx.Provide(newFakeSamplerMock)) + fx.Provide(newFakeSamplerMock), + fx.Provide(func(demux demultiplexerComp.FakeSamplerMock) aggregator.Demultiplexer { + return demux + }), + ) } type fakeSamplerMockDependencies struct { diff --git a/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer_mock.go b/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer_mock.go index 82b8636298139..c7ad28c9a8230 100644 --- a/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer_mock.go +++ b/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer_mock.go @@ -22,7 +22,11 @@ import ( // MockModule defines the fx options for this component. func MockModule() fxutil.Module { return fxutil.Component( - fx.Provide(newMock)) + fx.Provide(newMock), + fx.Provide(func(demux demultiplexerComp.Component) aggregator.Demultiplexer { + return demux + }), + ) } type mock struct { diff --git a/comp/dogstatsd/server/component.go b/comp/dogstatsd/server/component.go index f909780c8aa2f..9ffb2ce924afa 100644 --- a/comp/dogstatsd/server/component.go +++ b/comp/dogstatsd/server/component.go @@ -9,7 +9,6 @@ package server import ( "time" - "github.com/DataDog/datadog-agent/pkg/aggregator" "github.com/DataDog/datadog-agent/pkg/util/fxutil" "go.uber.org/fx" ) @@ -18,13 +17,6 @@ import ( // Component is the component type. type Component interface { - - // Start starts the dogstatsd server - Start(demultiplexer aggregator.Demultiplexer) error - - // Stop stops the dogstatsd server - Stop() - // IsRunning returns true if the server is running IsRunning() bool diff --git a/comp/dogstatsd/server/server.go b/comp/dogstatsd/server/server.go index a0d9f5699db81..3a9744f62e0b2 100644 --- a/comp/dogstatsd/server/server.go +++ b/comp/dogstatsd/server/server.go @@ -19,13 +19,11 @@ import ( configComponent "github.com/DataDog/datadog-agent/comp/core/config" logComponent "github.com/DataDog/datadog-agent/comp/core/log" - logComponentImpl "github.com/DataDog/datadog-agent/comp/core/log/logimpl" "github.com/DataDog/datadog-agent/comp/dogstatsd/listeners" "github.com/DataDog/datadog-agent/comp/dogstatsd/mapper" "github.com/DataDog/datadog-agent/comp/dogstatsd/packets" "github.com/DataDog/datadog-agent/comp/dogstatsd/replay" serverdebug "github.com/DataDog/datadog-agent/comp/dogstatsd/serverDebug" - "github.com/DataDog/datadog-agent/comp/dogstatsd/serverDebug/serverdebugimpl" "github.com/DataDog/datadog-agent/pkg/aggregator" "github.com/DataDog/datadog-agent/pkg/config" "github.com/DataDog/datadog-agent/pkg/metrics" @@ -67,6 +65,10 @@ var ( type dependencies struct { fx.In + Lc fx.Lifecycle + + Demultiplexer aggregator.Demultiplexer + Log logComponent.Component Config configComponent.Component Debug serverdebug.Component @@ -187,19 +189,22 @@ func initTelemetry(cfg config.Reader, logger logComponent.Component) { packets.InitTelemetry(get("telemetry.dogstatsd.listeners_channel_latency_buckets")) } -// TODO: (components) - remove once serverless is an FX app -// -//nolint:revive // TODO(AML) Fix revive linter -func NewServerlessServer() Component { - return newServerCompat(config.Datadog, logComponentImpl.NewTemporaryLoggerWithoutInit(), replay.NewServerlessTrafficCapture(), serverdebugimpl.NewServerlessServerDebug(), true) -} - // TODO: (components) - merge with newServerCompat once NewServerlessServer is removed func newServer(deps dependencies) Component { - return newServerCompat(deps.Config, deps.Log, deps.Replay, deps.Debug, deps.Params.Serverless) + s := newServerCompat(deps.Config, deps.Log, deps.Replay, deps.Debug, deps.Params.Serverless, deps.Demultiplexer) + + if config.Datadog.GetBool("use_dogstatsd") { + deps.Lc.Append(fx.Hook{ + OnStart: s.startHook, + OnStop: s.stop, + }) + } + + return s + } -func newServerCompat(cfg config.Reader, log logComponent.Component, capture replay.Component, debug serverdebug.Component, serverless bool) Component { +func newServerCompat(cfg config.Reader, log logComponent.Component, capture replay.Component, debug serverdebug.Component, serverless bool, demux aggregator.Demultiplexer) *server { // This needs to be done after the configuration is loaded once.Do(func() { initTelemetry(cfg, log) }) @@ -272,7 +277,7 @@ func newServerCompat(cfg config.Reader, log logComponent.Component, capture repl sharedPacketPool: nil, sharedPacketPoolManager: nil, sharedFloat64List: newFloat64ListPool(), - demultiplexer: nil, + demultiplexer: demux, listeners: nil, stopChan: make(chan bool), serverlessFlushChan: make(chan bool), @@ -301,13 +306,22 @@ func newServerCompat(cfg config.Reader, log logComponent.Component, capture repl originOptOutEnabled: cfg.GetBool("dogstatsd_origin_optout_enabled"), }, } + return s } -func (s *server) Start(demultiplexer aggregator.Demultiplexer) error { +func (s *server) startHook(context context.Context) error { - // TODO: (components) - DI this into Server when Demultiplexer is made into a component - s.demultiplexer = demultiplexer + err := s.start(context) + if err != nil { + s.log.Errorf("Could not start dogstatsd: %s", err) + } else { + s.log.Debug("dogstatsd started") + } + return nil +} + +func (s *server) start(context.Context) error { packetsChannel := make(chan packets.Packets, s.config.GetInt("dogstatsd_queue_size")) tmpListeners := make([]listeners.StatsdListener, 0, 2) @@ -444,9 +458,9 @@ func (s *server) Start(demultiplexer aggregator.Demultiplexer) error { return nil } -func (s *server) Stop() { +func (s *server) stop(context.Context) error { if !s.IsRunning() { - return + return nil } close(s.stopChan) for _, l := range s.listeners { @@ -460,6 +474,8 @@ func (s *server) Stop() { } s.health.Deregister() //nolint:errcheck s.Started = false + + return nil } func (s *server) IsRunning() bool { diff --git a/comp/dogstatsd/server/server_bench_test.go b/comp/dogstatsd/server/server_bench_test.go index 9c6ab5cba10b1..6394f74c64f23 100644 --- a/comp/dogstatsd/server/server_bench_test.go +++ b/comp/dogstatsd/server/server_bench_test.go @@ -37,8 +37,7 @@ func benchParsePackets(b *testing.B, rawPacket []byte) { pkgconfig.SetupLogger("", "off", "", "", false, true, false) demux := deps.Demultiplexer - _ = s.Start(demux) - defer s.Stop() + defer demux.Stop(false) done := make(chan struct{}) go func() { @@ -85,8 +84,6 @@ func BenchmarkPbarseMetricMessage(b *testing.B) { pkgconfig.SetupLogger("", "off", "", "", false, true, false) demux := deps.Demultiplexer - _ = s.Start(demux) - defer s.Stop() done := make(chan struct{}) go func() { @@ -138,8 +135,6 @@ func benchmarkMapperControl(b *testing.B, yaml string) { pkgconfig.SetupLogger("", "off", "", "", false, true, false) demux := deps.Demultiplexer - _ = s.Start(demux) - defer s.Stop() done := make(chan struct{}) go func() { diff --git a/comp/dogstatsd/server/server_test.go b/comp/dogstatsd/server/server_test.go index e45caa1c16773..454bdf36a5136 100644 --- a/comp/dogstatsd/server/server_test.go +++ b/comp/dogstatsd/server/server_test.go @@ -8,6 +8,7 @@ package server import ( + "context" "fmt" "net" "sort" @@ -23,13 +24,11 @@ import ( "github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer/demultiplexerimpl" "github.com/DataDog/datadog-agent/comp/core" configComponent "github.com/DataDog/datadog-agent/comp/core/config" - "github.com/DataDog/datadog-agent/comp/core/hostname/hostnameimpl" "github.com/DataDog/datadog-agent/comp/core/log" - "github.com/DataDog/datadog-agent/comp/core/log/logimpl" "github.com/DataDog/datadog-agent/comp/dogstatsd/listeners" "github.com/DataDog/datadog-agent/comp/dogstatsd/replay" + serverdebug "github.com/DataDog/datadog-agent/comp/dogstatsd/serverDebug" "github.com/DataDog/datadog-agent/comp/dogstatsd/serverDebug/serverdebugimpl" - "github.com/DataDog/datadog-agent/pkg/aggregator" "github.com/DataDog/datadog-agent/pkg/config" "github.com/DataDog/datadog-agent/pkg/metrics" "github.com/DataDog/datadog-agent/pkg/util/fxutil" @@ -38,10 +37,13 @@ import ( type serverDeps struct { fx.In - Server Component Config configComponent.Component Log log.Component Demultiplexer demultiplexer.FakeSamplerMock + Replay replay.Component + Debug serverdebug.Component + + Server Component } func fulfillDeps(t testing.TB) serverDeps { @@ -58,8 +60,8 @@ func fulfillDepsWithConfigOverrideAndFeatures(t testing.TB, overrides map[string }), fx.Supply(Params{Serverless: false}), replay.MockModule(), - Module(), demultiplexerimpl.FakeSamplerMockModule(), + Module(), )) } @@ -76,21 +78,18 @@ func fulfillDepsWithConfigYaml(t testing.TB, yaml string) serverDeps { }), fx.Supply(Params{Serverless: false}), replay.MockModule(), - Module(), demultiplexerimpl.FakeSamplerMockModule(), + Module(), )) } func TestNewServer(t *testing.T) { cfg := make(map[string]interface{}) - cfg["dogstatsd_port"] = listeners.RandomPortName deps := fulfillDepsWithConfigOverride(t, cfg) - demux := createDemultiplexer(t) - requireStart(t, deps.Server, demux) + requireStart(t, deps.Server) - deps.Server.Stop() } func TestStopServer(t *testing.T) { @@ -100,13 +99,16 @@ func TestStopServer(t *testing.T) { deps := fulfillDepsWithConfigOverride(t, cfg) - demux := createDemultiplexer(t) - requireStart(t, deps.Server, demux) - deps.Server.Stop() + s := newServerCompat(deps.Config, deps.Log, deps.Replay, deps.Debug, false, deps.Demultiplexer) + s.start(context.TODO()) + requireStart(t, s) + + s.stop(context.TODO()) // check that the port can be bound, try for 100 ms - address, err := net.ResolveUDPAddr("udp", deps.Server.UDPLocalAddr()) + address, err := net.ResolveUDPAddr("udp", s.UDPLocalAddr()) require.NoError(t, err, "cannot resolve address") + for i := 0; i < 10; i++ { var conn net.Conn conn, err = net.ListenUDP("udp", address) @@ -153,8 +155,6 @@ func TestUDPReceive(t *testing.T) { deps := fulfillDepsWithConfigOverride(t, cfg) demux := deps.Demultiplexer - requireStart(t, deps.Server, demux) - defer deps.Server.Stop() conn, err := net.Dial("udp", deps.Server.UDPLocalAddr()) require.NoError(t, err, "cannot connect to DSD socket") @@ -434,9 +434,7 @@ func TestUDPForward(t *testing.T) { defer pc.Close() - demux := deps.Demultiplexer - requireStart(t, deps.Server, demux) - defer deps.Server.Stop() + requireStart(t, deps.Server) conn, err := net.Dial("udp", deps.Server.UDPLocalAddr()) require.NoError(t, err, "cannot connect to DSD socket") @@ -465,9 +463,8 @@ func TestHistToDist(t *testing.T) { deps := fulfillDepsWithConfigOverride(t, cfg) - demux := createDemultiplexer(t) - requireStart(t, deps.Server, demux) - defer deps.Server.Stop() + demux := deps.Demultiplexer + requireStart(t, deps.Server) conn, err := net.Dial("udp", deps.Server.UDPLocalAddr()) require.NoError(t, err, "cannot connect to DSD socket") @@ -552,7 +549,7 @@ func TestE2EParsing(t *testing.T) { deps := fulfillDepsWithConfigOverride(t, cfg) demux := deps.Demultiplexer - requireStart(t, deps.Server, demux) + requireStart(t, deps.Server) conn, err := net.Dial("udp", deps.Server.UDPLocalAddr()) require.NoError(t, err, "cannot connect to DSD socket") @@ -565,14 +562,13 @@ func TestE2EParsing(t *testing.T) { assert.Equal(t, 0, len(timedSamples)) demux.Reset() demux.Stop(false) - deps.Server.Stop() // EOL enabled cfg["dogstatsd_eol_required"] = []string{"udp"} deps = fulfillDepsWithConfigOverride(t, cfg) demux = deps.Demultiplexer - requireStart(t, deps.Server, demux) + requireStart(t, deps.Server) conn, err = net.Dial("udp", deps.Server.UDPLocalAddr()) require.NoError(t, err, "cannot connect to DSD socket") @@ -583,7 +579,6 @@ func TestE2EParsing(t *testing.T) { samples, timedSamples = demux.WaitForSamples(time.Second * 2) require.Equal(t, 1, len(samples)) assert.Equal(t, 0, len(timedSamples)) - deps.Server.Stop() demux.Reset() } @@ -594,9 +589,8 @@ func TestExtraTags(t *testing.T) { deps := fulfillDepsWithConfigOverrideAndFeatures(t, cfg, []config.Feature{config.EKSFargate}) - demux := createDemultiplexer(t) - requireStart(t, deps.Server, demux) - defer deps.Server.Stop() + demux := deps.Demultiplexer + requireStart(t, deps.Server) conn, err := net.Dial("udp", deps.Server.UDPLocalAddr()) require.NoError(t, err, "cannot connect to DSD socket") @@ -623,9 +617,8 @@ func TestStaticTags(t *testing.T) { deps := fulfillDepsWithConfigOverrideAndFeatures(t, cfg, []config.Feature{config.EKSFargate}) - demux := createDemultiplexer(t) - requireStart(t, deps.Server, demux) - defer deps.Server.Stop() + demux := deps.Demultiplexer + requireStart(t, deps.Server) conn, err := net.Dial("udp", deps.Server.UDPLocalAddr()) require.NoError(t, err, "cannot connect to DSD socket") @@ -659,8 +652,7 @@ func TestNoMappingsConfig(t *testing.T) { samples := []metrics.MetricSample{} - demux := deps.Demultiplexer - requireStart(t, s, demux) + requireStart(t, s) assert.Nil(t, s.mapper) @@ -770,8 +762,7 @@ dogstatsd_mapper_profiles: cw.SetWithoutSource("dogstatsd_port", listeners.RandomPortName) - demux := deps.Demultiplexer - requireStart(t, s, demux) + requireStart(t, s) assert.Equal(t, deps.Config.Get("dogstatsd_mapper_cache_size"), scenario.expectedCacheSize, "Case `%s` failed. cache_size `%s` should be `%s`", scenario.name, deps.Config.Get("dogstatsd_mapper_cache_size"), scenario.expectedCacheSize) @@ -791,7 +782,6 @@ dogstatsd_mapper_profiles: sort.Strings(sample.Tags) } assert.Equal(t, scenario.expectedSamples, actualSamples, "Case `%s` failed. `%s` should be `%s`", scenario.name, actualSamples, scenario.expectedSamples) - s.Stop() }) } } @@ -804,32 +794,24 @@ func TestNewServerExtraTags(t *testing.T) { deps := fulfillDepsWithConfigOverride(t, cfg) s := deps.Server.(*server) - demux := deps.Demultiplexer - requireStart(t, s, demux) + requireStart(t, s) require.Len(s.extraTags, 0, "no tags should have been read") - s.Stop() - demux.Stop(false) // when the extraTags parameter isn't used, the DogStatsD server is not reading this env var cfg["tags"] = "hello:world" deps = fulfillDepsWithConfigOverride(t, cfg) s = deps.Server.(*server) - demux = deps.Demultiplexer - requireStart(t, s, demux) + requireStart(t, s) require.Len(s.extraTags, 0, "no tags should have been read") - s.Stop() - demux.Stop(false) // when the extraTags parameter isn't used, the DogStatsD server is automatically reading this env var for extra tags cfg["dogstatsd_tags"] = "hello:world extra:tags" deps = fulfillDepsWithConfigOverride(t, cfg) s = deps.Server.(*server) - demux = deps.Demultiplexer - requireStart(t, s, demux) + requireStart(t, s) require.Len(s.extraTags, 2, "two tags should have been read") require.Equal(s.extraTags[0], "extra:tags", "the tag extra:tags should be set") require.Equal(s.extraTags[1], "hello:world", "the tag hello:world should be set") - s.Stop() } func TestProcessedMetricsOrigin(t *testing.T) { @@ -842,8 +824,8 @@ func TestProcessedMetricsOrigin(t *testing.T) { s := deps.Server.(*server) assert := assert.New(t) - demux := deps.Demultiplexer - requireStart(t, s, demux) + s.start(context.TODO()) + requireStart(t, s) s.Stop() @@ -922,8 +904,7 @@ func testContainerIDParsing(t *testing.T, cfg map[string]interface{}) { deps := fulfillDeps(t) s := deps.Server.(*server) assert := assert.New(t) - requireStart(t, s, deps.Demultiplexer) - s.Stop() + requireStart(t, s) parser := newParser(deps.Config, newFloat64ListPool(), 1) parser.dsdOriginEnabled = true @@ -964,8 +945,7 @@ func testOriginOptout(t *testing.T, cfg map[string]interface{}, enabled bool) { s := deps.Server.(*server) assert := assert.New(t) - requireStart(t, s, deps.Demultiplexer) - s.Stop() + requireStart(t, s) parser := newParser(deps.Config, newFloat64ListPool(), 1) parser.dsdOriginEnabled = true @@ -1013,14 +993,7 @@ func TestOriginOptout(t *testing.T) { } } -func requireStart(t *testing.T, s Component, demux aggregator.Demultiplexer) { - err := s.Start(demux) - require.NoError(t, err, "cannot start DSD") +func requireStart(t *testing.T, s Component) { assert.NotNil(t, s) assert.True(t, s.IsRunning()) } - -func createDemultiplexer(t *testing.T) demultiplexer.FakeSamplerMock { - return fxutil.Test[demultiplexer.FakeSamplerMock](t, logimpl.MockModule(), - demultiplexerimpl.FakeSamplerMockModule(), hostnameimpl.MockModule()) -} diff --git a/comp/dogstatsd/server/serverless.go b/comp/dogstatsd/server/serverless.go new file mode 100644 index 0000000000000..fd8705508e837 --- /dev/null +++ b/comp/dogstatsd/server/serverless.go @@ -0,0 +1,41 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +// Package server implements a component to run the dogstatsd server +package server + +import ( + "context" + + logComponentImpl "github.com/DataDog/datadog-agent/comp/core/log/logimpl" + "github.com/DataDog/datadog-agent/comp/dogstatsd/replay" + "github.com/DataDog/datadog-agent/comp/dogstatsd/serverDebug/serverdebugimpl" + "github.com/DataDog/datadog-agent/pkg/aggregator" + "github.com/DataDog/datadog-agent/pkg/config" +) + +// team: agent-metrics-logs + +// ServerlessDogstatsd is the interface for the serverless dogstatsd server. +type ServerlessDogstatsd interface { + Component + Stop() +} + +//nolint:revive // TODO(AML) Fix revive linter +func NewServerlessServer(demux aggregator.Demultiplexer) (ServerlessDogstatsd, error) { + s := newServerCompat(config.Datadog, logComponentImpl.NewTemporaryLoggerWithoutInit(), replay.NewServerlessTrafficCapture(), serverdebugimpl.NewServerlessServerDebug(), true, demux) + + err := s.start(context.TODO()) + if err != nil { + return nil, err + } + + return s, nil +} + +func (s *server) Stop() { + _ = s.stop(context.TODO()) +} diff --git a/pkg/serverless/metrics/metric.go b/pkg/serverless/metrics/metric.go index d21e3fd74bf7e..9abaaaf6078e4 100644 --- a/pkg/serverless/metrics/metric.go +++ b/pkg/serverless/metrics/metric.go @@ -20,7 +20,7 @@ import ( // ServerlessMetricAgent represents the DogStatsD server and the aggregator type ServerlessMetricAgent struct { - dogStatsDServer dogstatsdServer.Component + dogStatsDServer dogstatsdServer.ServerlessDogstatsd tags []string Demux aggregator.Demultiplexer @@ -42,7 +42,7 @@ type MultipleEndpointConfig interface { // DogStatsDFactory allows create a new DogStatsD server type DogStatsDFactory interface { - NewServer(aggregator.Demultiplexer) (dogstatsdServer.Component, error) + NewServer(aggregator.Demultiplexer) (dogstatsdServer.ServerlessDogstatsd, error) } const ( @@ -56,9 +56,8 @@ func (m *MetricConfig) GetMultipleEndpoints() (map[string][]string, error) { } // NewServer returns a running DogStatsD server -func (m *MetricDogStatsD) NewServer(demux aggregator.Demultiplexer) (dogstatsdServer.Component, error) { - s := dogstatsdServer.NewServerlessServer() - return s, s.Start(demux) +func (m *MetricDogStatsD) NewServer(demux aggregator.Demultiplexer) (dogstatsdServer.ServerlessDogstatsd, error) { + return dogstatsdServer.NewServerlessServer(demux) } // Start starts the DogStatsD agent diff --git a/pkg/serverless/metrics/metric_test.go b/pkg/serverless/metrics/metric_test.go index 0cacfcd836d5a..5f6cb380047b1 100644 --- a/pkg/serverless/metrics/metric_test.go +++ b/pkg/serverless/metrics/metric_test.go @@ -70,7 +70,7 @@ func TestStartInvalidConfig(t *testing.T) { type MetricDogStatsDMocked struct{} //nolint:revive // TODO(SERV) Fix revive linter -func (m *MetricDogStatsDMocked) NewServer(demux aggregator.Demultiplexer) (dogstatsdServer.Component, error) { +func (m *MetricDogStatsDMocked) NewServer(demux aggregator.Demultiplexer) (dogstatsdServer.ServerlessDogstatsd, error) { return nil, fmt.Errorf("error") } @@ -205,8 +205,7 @@ func TestRaceFlushVersusParsePacket(t *testing.T) { demux := aggregator.InitAndStartServerlessDemultiplexer(nil, time.Second*1000) - s := dogstatsdServer.NewServerlessServer() - err = s.Start(demux) + s, err := dogstatsdServer.NewServerlessServer(demux) require.NoError(t, err, "cannot start DSD") defer s.Stop()