From ef48776409edf07f452f8bcadacac3d4650d411e Mon Sep 17 00:00:00 2001 From: yoon nguyen Date: Wed, 31 Jan 2024 16:04:52 -0500 Subject: [PATCH 01/16] dogstatsd on agent --- cmd/agent/subcommands/run/command.go | 5 +- .../settings/runtime_settings_test.go | 4 +- cmd/dogstatsd/subcommands/start/command.go | 2 +- comp/dogstatsd/server/component.go | 5 +- comp/dogstatsd/server/server.go | 29 ++++--- comp/dogstatsd/server/server_bench_test.go | 9 ++- comp/dogstatsd/server/server_test.go | 12 ++- pkg/serverless/metrics/metric.go | 8 +- pkg/serverless/metrics/metric_test.go | 76 +++++++++---------- 9 files changed, 83 insertions(+), 67 deletions(-) diff --git a/cmd/agent/subcommands/run/command.go b/cmd/agent/subcommands/run/command.go index 335802e7d19587..06121bdbb53e06 100644 --- a/cmd/agent/subcommands/run/command.go +++ b/cmd/agent/subcommands/run/command.go @@ -28,7 +28,6 @@ import ( "github.com/DataDog/datadog-agent/cmd/agent/common/misconfig" "github.com/DataDog/datadog-agent/cmd/agent/common/path" "github.com/DataDog/datadog-agent/cmd/agent/common/signals" - global "github.com/DataDog/datadog-agent/cmd/agent/dogstatsd" "github.com/DataDog/datadog-agent/cmd/agent/gui" "github.com/DataDog/datadog-agent/cmd/agent/subcommands/run/internal/clcrunnerapi" "github.com/DataDog/datadog-agent/cmd/manager" @@ -588,8 +587,8 @@ func startAgent( // start dogstatsd if pkgconfig.Datadog.GetBool("use_dogstatsd") { - global.DSD = server - err := server.Start(demultiplexer) + // global.DSD = server + // err := server.Start(demultiplexer) if err != nil { log.Errorf("Could not start dogstatsd: %s", err) } else { diff --git a/cmd/agent/subcommands/run/internal/settings/runtime_settings_test.go b/cmd/agent/subcommands/run/internal/settings/runtime_settings_test.go index e78c27b194041a..3ed49ea6c23e04 100644 --- a/cmd/agent/subcommands/run/internal/settings/runtime_settings_test.go +++ b/cmd/agent/subcommands/run/internal/settings/runtime_settings_test.go @@ -46,9 +46,9 @@ func TestDogstatsdMetricsStats(t *testing.T) { demultiplexerimpl.MockModule(), )) - demux := deps.Demultiplexer + // demux := deps.Demultiplexer global.DSD = deps.Server - deps.Server.Start(demux) + // deps.Server.Start(demux) require.Nil(t, err) diff --git a/cmd/dogstatsd/subcommands/start/command.go b/cmd/dogstatsd/subcommands/start/command.go index 499d913cbcdd19..f45377fe0d0c8c 100644 --- a/cmd/dogstatsd/subcommands/start/command.go +++ b/cmd/dogstatsd/subcommands/start/command.go @@ -268,7 +268,7 @@ func RunDogstatsd(ctx context.Context, cliParams *CLIParams, config config.Compo demultiplexer.AddAgentStartupTelemetry(version.AgentVersion) - err = components.DogstatsdServer.Start(demultiplexer) + // err = components.DogstatsdServer.Start(demultiplexer) if err != nil { log.Criticalf("Unable to start dogstatsd: %s", err) return diff --git a/comp/dogstatsd/server/component.go b/comp/dogstatsd/server/component.go index f909780c8aa2f3..1c72e53f6bca86 100644 --- a/comp/dogstatsd/server/component.go +++ b/comp/dogstatsd/server/component.go @@ -9,7 +9,6 @@ package server import ( "time" - "github.com/DataDog/datadog-agent/pkg/aggregator" "github.com/DataDog/datadog-agent/pkg/util/fxutil" "go.uber.org/fx" ) @@ -19,8 +18,8 @@ import ( // Component is the component type. type Component interface { - // Start starts the dogstatsd server - Start(demultiplexer aggregator.Demultiplexer) error + // // Start starts the dogstatsd server + // Start(demultiplexer aggregator.Demultiplexer) error // Stop stops the dogstatsd server Stop() diff --git a/comp/dogstatsd/server/server.go b/comp/dogstatsd/server/server.go index a0d9f5699db81f..a7ebb42bbd7682 100644 --- a/comp/dogstatsd/server/server.go +++ b/comp/dogstatsd/server/server.go @@ -17,15 +17,14 @@ import ( "go.uber.org/fx" + demultiplexer "github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer" configComponent "github.com/DataDog/datadog-agent/comp/core/config" logComponent "github.com/DataDog/datadog-agent/comp/core/log" - logComponentImpl "github.com/DataDog/datadog-agent/comp/core/log/logimpl" "github.com/DataDog/datadog-agent/comp/dogstatsd/listeners" "github.com/DataDog/datadog-agent/comp/dogstatsd/mapper" "github.com/DataDog/datadog-agent/comp/dogstatsd/packets" "github.com/DataDog/datadog-agent/comp/dogstatsd/replay" serverdebug "github.com/DataDog/datadog-agent/comp/dogstatsd/serverDebug" - "github.com/DataDog/datadog-agent/comp/dogstatsd/serverDebug/serverdebugimpl" "github.com/DataDog/datadog-agent/pkg/aggregator" "github.com/DataDog/datadog-agent/pkg/config" "github.com/DataDog/datadog-agent/pkg/metrics" @@ -67,6 +66,10 @@ var ( type dependencies struct { fx.In + Lc fx.Lifecycle + + Demultiplexer demultiplexer.Component + Log logComponent.Component Config configComponent.Component Debug serverdebug.Component @@ -190,16 +193,16 @@ func initTelemetry(cfg config.Reader, logger logComponent.Component) { // TODO: (components) - remove once serverless is an FX app // //nolint:revive // TODO(AML) Fix revive linter -func NewServerlessServer() Component { - return newServerCompat(config.Datadog, logComponentImpl.NewTemporaryLoggerWithoutInit(), replay.NewServerlessTrafficCapture(), serverdebugimpl.NewServerlessServerDebug(), true) -} +// func NewServerlessServer() Component { +// return newServerCompat(config.Datadog, logComponentImpl.NewTemporaryLoggerWithoutInit(), replay.NewServerlessTrafficCapture(), serverdebugimpl.NewServerlessServerDebug(), true) +// } // TODO: (components) - merge with newServerCompat once NewServerlessServer is removed func newServer(deps dependencies) Component { - return newServerCompat(deps.Config, deps.Log, deps.Replay, deps.Debug, deps.Params.Serverless) + return newServerCompat(deps.Config, deps.Log, deps.Replay, deps.Debug, deps.Params.Serverless, deps.Lc, deps.Demultiplexer) } -func newServerCompat(cfg config.Reader, log logComponent.Component, capture replay.Component, debug serverdebug.Component, serverless bool) Component { +func newServerCompat(cfg config.Reader, log logComponent.Component, capture replay.Component, debug serverdebug.Component, serverless bool, lc fx.Lifecycle, demux demultiplexer.Component) Component { // This needs to be done after the configuration is loaded once.Do(func() { initTelemetry(cfg, log) }) @@ -272,7 +275,7 @@ func newServerCompat(cfg config.Reader, log logComponent.Component, capture repl sharedPacketPool: nil, sharedPacketPoolManager: nil, sharedFloat64List: newFloat64ListPool(), - demultiplexer: nil, + demultiplexer: demux, listeners: nil, stopChan: make(chan bool), serverlessFlushChan: make(chan bool), @@ -301,13 +304,19 @@ func newServerCompat(cfg config.Reader, log logComponent.Component, capture repl originOptOutEnabled: cfg.GetBool("dogstatsd_origin_optout_enabled"), }, } + + lc.Append(fx.Hook{ + OnStart: s.start, + OnStop: nil, + }) + return s } -func (s *server) Start(demultiplexer aggregator.Demultiplexer) error { +func (s *server) start(context.Context) error { // TODO: (components) - DI this into Server when Demultiplexer is made into a component - s.demultiplexer = demultiplexer + // s.demultiplexer = demultiplexer packetsChannel := make(chan packets.Packets, s.config.GetInt("dogstatsd_queue_size")) tmpListeners := make([]listeners.StatsdListener, 0, 2) diff --git a/comp/dogstatsd/server/server_bench_test.go b/comp/dogstatsd/server/server_bench_test.go index 9c6ab5cba10b1f..0abfac56b8ed95 100644 --- a/comp/dogstatsd/server/server_bench_test.go +++ b/comp/dogstatsd/server/server_bench_test.go @@ -37,7 +37,12 @@ func benchParsePackets(b *testing.B, rawPacket []byte) { pkgconfig.SetupLogger("", "off", "", "", false, true, false) demux := deps.Demultiplexer +<<<<<<< Updated upstream _ = s.Start(demux) +======= + defer demux.Stop(false) + // _ = s.Start(demux) +>>>>>>> Stashed changes defer s.Stop() done := make(chan struct{}) @@ -85,7 +90,7 @@ func BenchmarkPbarseMetricMessage(b *testing.B) { pkgconfig.SetupLogger("", "off", "", "", false, true, false) demux := deps.Demultiplexer - _ = s.Start(demux) + // _ = s.Start(demux) defer s.Stop() done := make(chan struct{}) @@ -138,7 +143,7 @@ func benchmarkMapperControl(b *testing.B, yaml string) { pkgconfig.SetupLogger("", "off", "", "", false, true, false) demux := deps.Demultiplexer - _ = s.Start(demux) + // _ = s.Start(demux) defer s.Stop() done := make(chan struct{}) diff --git a/comp/dogstatsd/server/server_test.go b/comp/dogstatsd/server/server_test.go index b3b035fa24de46..f84c69fd6b24ed 100644 --- a/comp/dogstatsd/server/server_test.go +++ b/comp/dogstatsd/server/server_test.go @@ -37,6 +37,7 @@ import ( type serverDeps struct { fx.In + Lc fx.Lifecycle Server Component Config configComponent.Component Log log.Component @@ -1012,9 +1013,14 @@ func TestOriginOptout(t *testing.T) { } } -func requireStart(t *testing.T, s Component, demux aggregator.Demultiplexer) { - err := s.Start(demux) - require.NoError(t, err, "cannot start DSD") +func requireStart(t *testing.T, s Component, demux aggregator.Demultiplexer, sd serverDeps) { + // err := s.start(demux) + // require.NoError(t, _, "cannot start DSD") + sd.Lc.Append(fx.Hook{ + OnStart: s.start, + OnStop: nil, + }) + assert.NotNil(t, s) assert.True(t, s.IsRunning()) } diff --git a/pkg/serverless/metrics/metric.go b/pkg/serverless/metrics/metric.go index d21e3fd74bf7e6..3f6bb4661d8124 100644 --- a/pkg/serverless/metrics/metric.go +++ b/pkg/serverless/metrics/metric.go @@ -56,10 +56,10 @@ func (m *MetricConfig) GetMultipleEndpoints() (map[string][]string, error) { } // NewServer returns a running DogStatsD server -func (m *MetricDogStatsD) NewServer(demux aggregator.Demultiplexer) (dogstatsdServer.Component, error) { - s := dogstatsdServer.NewServerlessServer() - return s, s.Start(demux) -} +// func (m *MetricDogStatsD) NewServer(demux aggregator.Demultiplexer) (dogstatsdServer.Component, error) { +// s := dogstatsdServer.NewServerlessServer() +// return s, s.Start(demux) +// } // Start starts the DogStatsD agent func (c *ServerlessMetricAgent) Start(forwarderTimeout time.Duration, multipleEndpointConfig MultipleEndpointConfig, dogstatFactory DogStatsDFactory) { diff --git a/pkg/serverless/metrics/metric_test.go b/pkg/serverless/metrics/metric_test.go index 0cacfcd836d5a6..ee6cf4469a9130 100644 --- a/pkg/serverless/metrics/metric_test.go +++ b/pkg/serverless/metrics/metric_test.go @@ -13,12 +13,10 @@ import ( "net/http" "os" "strconv" - "sync" "testing" "time" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" dogstatsdServer "github.com/DataDog/datadog-agent/comp/dogstatsd/server" "github.com/DataDog/datadog-agent/pkg/aggregator" @@ -198,40 +196,40 @@ func getAvailableUDPPort() (int, error) { return portInt, nil } -func TestRaceFlushVersusParsePacket(t *testing.T) { - port, err := getAvailableUDPPort() - require.NoError(t, err) - config.Datadog.SetDefault("dogstatsd_port", port) - - demux := aggregator.InitAndStartServerlessDemultiplexer(nil, time.Second*1000) - - s := dogstatsdServer.NewServerlessServer() - err = s.Start(demux) - require.NoError(t, err, "cannot start DSD") - defer s.Stop() - - url := fmt.Sprintf("127.0.0.1:%d", config.Datadog.GetInt("dogstatsd_port")) - conn, err := net.Dial("udp", url) - require.NoError(t, err, "cannot connect to DSD socket") - defer conn.Close() - - finish := &sync.WaitGroup{} - finish.Add(2) - - go func(wg *sync.WaitGroup) { - for i := 0; i < 1000; i++ { - conn.Write([]byte("daemon:666|g|#sometag1:somevalue1,sometag2:somevalue2")) - time.Sleep(10 * time.Nanosecond) - } - finish.Done() - }(finish) - - go func(wg *sync.WaitGroup) { - for i := 0; i < 1000; i++ { - s.ServerlessFlush(time.Second * 10) - } - finish.Done() - }(finish) - - finish.Wait() -} +// func TestRaceFlushVersusParsePacket(t *testing.T) { +// port, err := getAvailableUDPPort() +// require.NoError(t, err) +// config.Datadog.SetDefault("dogstatsd_port", port) + +// demux := aggregator.InitAndStartServerlessDemultiplexer(nil, time.Second*1000) + +// s := dogstatsdServer.NewServerlessServer() +// err = s.Start(demux) +// require.NoError(t, err, "cannot start DSD") +// defer s.Stop() + +// url := fmt.Sprintf("127.0.0.1:%d", config.Datadog.GetInt("dogstatsd_port")) +// conn, err := net.Dial("udp", url) +// require.NoError(t, err, "cannot connect to DSD socket") +// defer conn.Close() + +// finish := &sync.WaitGroup{} +// finish.Add(2) + +// go func(wg *sync.WaitGroup) { +// for i := 0; i < 1000; i++ { +// conn.Write([]byte("daemon:666|g|#sometag1:somevalue1,sometag2:somevalue2")) +// time.Sleep(10 * time.Nanosecond) +// } +// finish.Done() +// }(finish) + +// go func(wg *sync.WaitGroup) { +// for i := 0; i < 1000; i++ { +// s.ServerlessFlush(time.Second * 10) +// } +// finish.Done() +// }(finish) + +// finish.Wait() +// } From 3a9d7e346017e41d246233a5b7644c219b960293 Mon Sep 17 00:00:00 2001 From: yoon nguyen Date: Mon, 5 Feb 2024 14:47:48 -0500 Subject: [PATCH 02/16] fixing test --- comp/dogstatsd/server/component.go | 2 +- comp/dogstatsd/server/server.go | 19 +++-- comp/dogstatsd/server/server_bench_test.go | 10 +-- comp/dogstatsd/server/server_test.go | 92 +++++++++------------- pkg/serverless/metrics/metric.go | 10 ++- 5 files changed, 62 insertions(+), 71 deletions(-) diff --git a/comp/dogstatsd/server/component.go b/comp/dogstatsd/server/component.go index 1c72e53f6bca86..1919f55486b16b 100644 --- a/comp/dogstatsd/server/component.go +++ b/comp/dogstatsd/server/component.go @@ -22,7 +22,7 @@ type Component interface { // Start(demultiplexer aggregator.Demultiplexer) error // Stop stops the dogstatsd server - Stop() + // Stop() // IsRunning returns true if the server is running IsRunning() bool diff --git a/comp/dogstatsd/server/server.go b/comp/dogstatsd/server/server.go index a7ebb42bbd7682..e19a4d28cdb2f6 100644 --- a/comp/dogstatsd/server/server.go +++ b/comp/dogstatsd/server/server.go @@ -20,11 +20,13 @@ import ( demultiplexer "github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer" configComponent "github.com/DataDog/datadog-agent/comp/core/config" logComponent "github.com/DataDog/datadog-agent/comp/core/log" + logComponentImpl "github.com/DataDog/datadog-agent/comp/core/log/logimpl" "github.com/DataDog/datadog-agent/comp/dogstatsd/listeners" "github.com/DataDog/datadog-agent/comp/dogstatsd/mapper" "github.com/DataDog/datadog-agent/comp/dogstatsd/packets" "github.com/DataDog/datadog-agent/comp/dogstatsd/replay" serverdebug "github.com/DataDog/datadog-agent/comp/dogstatsd/serverDebug" + "github.com/DataDog/datadog-agent/comp/dogstatsd/serverDebug/serverdebugimpl" "github.com/DataDog/datadog-agent/pkg/aggregator" "github.com/DataDog/datadog-agent/pkg/config" "github.com/DataDog/datadog-agent/pkg/metrics" @@ -193,9 +195,12 @@ func initTelemetry(cfg config.Reader, logger logComponent.Component) { // TODO: (components) - remove once serverless is an FX app // //nolint:revive // TODO(AML) Fix revive linter -// func NewServerlessServer() Component { -// return newServerCompat(config.Datadog, logComponentImpl.NewTemporaryLoggerWithoutInit(), replay.NewServerlessTrafficCapture(), serverdebugimpl.NewServerlessServerDebug(), true) -// } +func NewServerlessServer(deps dependencies) Component { + + // start the dogstatsd server here before new: + + return newServerCompat(config.Datadog, logComponentImpl.NewTemporaryLoggerWithoutInit(), replay.NewServerlessTrafficCapture(), serverdebugimpl.NewServerlessServerDebug(), true, deps.Lc, deps.Demultiplexer) +} // TODO: (components) - merge with newServerCompat once NewServerlessServer is removed func newServer(deps dependencies) Component { @@ -307,7 +312,7 @@ func newServerCompat(cfg config.Reader, log logComponent.Component, capture repl lc.Append(fx.Hook{ OnStart: s.start, - OnStop: nil, + OnStop: s.stop, }) return s @@ -453,9 +458,9 @@ func (s *server) start(context.Context) error { return nil } -func (s *server) Stop() { +func (s *server) stop(context.Context) error { if !s.IsRunning() { - return + return nil } close(s.stopChan) for _, l := range s.listeners { @@ -469,6 +474,8 @@ func (s *server) Stop() { } s.health.Deregister() //nolint:errcheck s.Started = false + + return nil } func (s *server) IsRunning() bool { diff --git a/comp/dogstatsd/server/server_bench_test.go b/comp/dogstatsd/server/server_bench_test.go index 0abfac56b8ed95..8bde002b06ffa0 100644 --- a/comp/dogstatsd/server/server_bench_test.go +++ b/comp/dogstatsd/server/server_bench_test.go @@ -37,13 +37,9 @@ func benchParsePackets(b *testing.B, rawPacket []byte) { pkgconfig.SetupLogger("", "off", "", "", false, true, false) demux := deps.Demultiplexer -<<<<<<< Updated upstream - _ = s.Start(demux) -======= defer demux.Stop(false) // _ = s.Start(demux) ->>>>>>> Stashed changes - defer s.Stop() + // defer s.Stop() done := make(chan struct{}) go func() { @@ -91,7 +87,7 @@ func BenchmarkPbarseMetricMessage(b *testing.B) { demux := deps.Demultiplexer // _ = s.Start(demux) - defer s.Stop() + // defer s.Stop() done := make(chan struct{}) go func() { @@ -144,7 +140,7 @@ func benchmarkMapperControl(b *testing.B, yaml string) { demux := deps.Demultiplexer // _ = s.Start(demux) - defer s.Stop() + // defer s.Stop() done := make(chan struct{}) go func() { diff --git a/comp/dogstatsd/server/server_test.go b/comp/dogstatsd/server/server_test.go index f84c69fd6b24ed..b656d9b61cc0e1 100644 --- a/comp/dogstatsd/server/server_test.go +++ b/comp/dogstatsd/server/server_test.go @@ -28,7 +28,6 @@ import ( "github.com/DataDog/datadog-agent/comp/dogstatsd/listeners" "github.com/DataDog/datadog-agent/comp/dogstatsd/replay" "github.com/DataDog/datadog-agent/comp/dogstatsd/serverDebug/serverdebugimpl" - "github.com/DataDog/datadog-agent/pkg/aggregator" "github.com/DataDog/datadog-agent/pkg/config" "github.com/DataDog/datadog-agent/pkg/metrics" "github.com/DataDog/datadog-agent/pkg/util/fxutil" @@ -37,11 +36,10 @@ import ( type serverDeps struct { fx.In - Lc fx.Lifecycle - Server Component Config configComponent.Component Log log.Component Demultiplexer demultiplexer.FakeSamplerMock + Server Component } func fulfillDeps(t testing.TB) serverDeps { @@ -58,8 +56,9 @@ func fulfillDepsWithConfigOverrideAndFeatures(t testing.TB, overrides map[string }), fx.Supply(Params{Serverless: false}), replay.MockModule(), - Module(), demultiplexerimpl.FakeSamplerMockModule(), + demultiplexerimpl.MockModule(), + Module(), )) } @@ -76,21 +75,20 @@ func fulfillDepsWithConfigYaml(t testing.TB, yaml string) serverDeps { }), fx.Supply(Params{Serverless: false}), replay.MockModule(), - Module(), demultiplexerimpl.FakeSamplerMockModule(), + demultiplexerimpl.MockModule(), + Module(), )) } func TestNewServer(t *testing.T) { cfg := make(map[string]interface{}) - cfg["dogstatsd_port"] = listeners.RandomPortName deps := fulfillDepsWithConfigOverride(t, cfg) - demux := createDemultiplexer(t) - requireStart(t, deps.Server, demux) + // demux := createDemultiplexer(t) + requireStart(t, deps.Server) - deps.Server.Stop() } func TestStopServer(t *testing.T) { @@ -100,13 +98,12 @@ func TestStopServer(t *testing.T) { deps := fulfillDepsWithConfigOverride(t, cfg) - demux := createDemultiplexer(t) - requireStart(t, deps.Server, demux) - deps.Server.Stop() + requireStart(t, deps.Server) // check that the port can be bound, try for 100 ms address, err := net.ResolveUDPAddr("udp", deps.Server.UDPLocalAddr()) require.NoError(t, err, "cannot resolve address") + for i := 0; i < 10; i++ { var conn net.Conn conn, err = net.ListenUDP("udp", address) @@ -116,6 +113,7 @@ func TestStopServer(t *testing.T) { } time.Sleep(10 * time.Millisecond) } + require.NoError(t, err, "port is not available, it should be") } @@ -153,8 +151,7 @@ func TestUDPReceive(t *testing.T) { deps := fulfillDepsWithConfigOverride(t, cfg) demux := deps.Demultiplexer - requireStart(t, deps.Server, demux) - defer deps.Server.Stop() + requireStart(t, deps.Server) conn, err := net.Dial("udp", deps.Server.UDPLocalAddr()) require.NoError(t, err, "cannot connect to DSD socket") @@ -434,9 +431,8 @@ func TestUDPForward(t *testing.T) { defer pc.Close() - demux := deps.Demultiplexer - requireStart(t, deps.Server, demux) - defer deps.Server.Stop() + // demux := deps.Demultiplexer + requireStart(t, deps.Server) conn, err := net.Dial("udp", deps.Server.UDPLocalAddr()) require.NoError(t, err, "cannot connect to DSD socket") @@ -466,8 +462,7 @@ func TestHistToDist(t *testing.T) { deps := fulfillDepsWithConfigOverride(t, cfg) demux := createDemultiplexer(t) - requireStart(t, deps.Server, demux) - defer deps.Server.Stop() + requireStart(t, deps.Server) conn, err := net.Dial("udp", deps.Server.UDPLocalAddr()) require.NoError(t, err, "cannot connect to DSD socket") @@ -552,7 +547,7 @@ func TestE2EParsing(t *testing.T) { deps := fulfillDepsWithConfigOverride(t, cfg) demux := deps.Demultiplexer - requireStart(t, deps.Server, demux) + requireStart(t, deps.Server) conn, err := net.Dial("udp", deps.Server.UDPLocalAddr()) require.NoError(t, err, "cannot connect to DSD socket") @@ -565,14 +560,13 @@ func TestE2EParsing(t *testing.T) { assert.Equal(t, 0, len(timedSamples)) demux.Reset() demux.Stop(false) - deps.Server.Stop() // EOL enabled cfg["dogstatsd_eol_required"] = []string{"udp"} deps = fulfillDepsWithConfigOverride(t, cfg) demux = deps.Demultiplexer - requireStart(t, deps.Server, demux) + requireStart(t, deps.Server) conn, err = net.Dial("udp", deps.Server.UDPLocalAddr()) require.NoError(t, err, "cannot connect to DSD socket") @@ -583,7 +577,6 @@ func TestE2EParsing(t *testing.T) { samples, timedSamples = demux.WaitForSamples(time.Second * 2) require.Equal(t, 1, len(samples)) assert.Equal(t, 0, len(timedSamples)) - deps.Server.Stop() demux.Reset() } @@ -595,8 +588,7 @@ func TestExtraTags(t *testing.T) { deps := fulfillDepsWithConfigOverrideAndFeatures(t, cfg, []config.Feature{config.EKSFargate}) demux := createDemultiplexer(t) - requireStart(t, deps.Server, demux) - defer deps.Server.Stop() + requireStart(t, deps.Server) conn, err := net.Dial("udp", deps.Server.UDPLocalAddr()) require.NoError(t, err, "cannot connect to DSD socket") @@ -624,8 +616,7 @@ func TestStaticTags(t *testing.T) { deps := fulfillDepsWithConfigOverrideAndFeatures(t, cfg, []config.Feature{config.EKSFargate}) demux := createDemultiplexer(t) - requireStart(t, deps.Server, demux) - defer deps.Server.Stop() + requireStart(t, deps.Server) conn, err := net.Dial("udp", deps.Server.UDPLocalAddr()) require.NoError(t, err, "cannot connect to DSD socket") @@ -659,8 +650,8 @@ func TestNoMappingsConfig(t *testing.T) { samples := []metrics.MetricSample{} - demux := deps.Demultiplexer - requireStart(t, s, demux) + // demux := deps.Demultiplexer + requireStart(t, s) assert.Nil(t, s.mapper) @@ -770,8 +761,8 @@ dogstatsd_mapper_profiles: cw.SetWithoutSource("dogstatsd_port", listeners.RandomPortName) - demux := deps.Demultiplexer - requireStart(t, s, demux) + // demux := deps.Demultiplexer + requireStart(t, s) assert.Equal(t, deps.Config.Get("dogstatsd_mapper_cache_size"), scenario.expectedCacheSize, "Case `%s` failed. cache_size `%s` should be `%s`", scenario.name, deps.Config.Get("dogstatsd_mapper_cache_size"), scenario.expectedCacheSize) @@ -791,7 +782,7 @@ dogstatsd_mapper_profiles: sort.Strings(sample.Tags) } assert.Equal(t, scenario.expectedSamples, actualSamples, "Case `%s` failed. `%s` should be `%s`", scenario.name, actualSamples, scenario.expectedSamples) - s.Stop() + // s.Stop() }) } } @@ -805,9 +796,9 @@ func TestNewServerExtraTags(t *testing.T) { deps := fulfillDepsWithConfigOverride(t, cfg) s := deps.Server.(*server) demux := deps.Demultiplexer - requireStart(t, s, demux) + requireStart(t, s) require.Len(s.extraTags, 0, "no tags should have been read") - s.Stop() + // s.Stop() demux.Stop(false) // when the extraTags parameter isn't used, the DogStatsD server is not reading this env var @@ -815,9 +806,9 @@ func TestNewServerExtraTags(t *testing.T) { deps = fulfillDepsWithConfigOverride(t, cfg) s = deps.Server.(*server) demux = deps.Demultiplexer - requireStart(t, s, demux) + requireStart(t, s) require.Len(s.extraTags, 0, "no tags should have been read") - s.Stop() + // s.Stop() demux.Stop(false) // when the extraTags parameter isn't used, the DogStatsD server is automatically reading this env var for extra tags @@ -825,11 +816,11 @@ func TestNewServerExtraTags(t *testing.T) { deps = fulfillDepsWithConfigOverride(t, cfg) s = deps.Server.(*server) demux = deps.Demultiplexer - requireStart(t, s, demux) + requireStart(t, s) require.Len(s.extraTags, 2, "two tags should have been read") require.Equal(s.extraTags[0], "extra:tags", "the tag extra:tags should be set") require.Equal(s.extraTags[1], "hello:world", "the tag hello:world should be set") - s.Stop() + // s.Stop() } func TestProcessedMetricsOrigin(t *testing.T) { @@ -842,10 +833,10 @@ func TestProcessedMetricsOrigin(t *testing.T) { s := deps.Server.(*server) assert := assert.New(t) - demux := deps.Demultiplexer - requireStart(t, s, demux) + // demux := deps.Demultiplexer + requireStart(t, s) - s.Stop() + // s.Stop() assert.Len(s.cachedOriginCounters, 0, "this cache must be empty") assert.Len(s.cachedOrder, 0, "this cache list must be empty") @@ -922,8 +913,8 @@ func testContainerIDParsing(t *testing.T, cfg map[string]interface{}) { deps := fulfillDeps(t) s := deps.Server.(*server) assert := assert.New(t) - requireStart(t, s, deps.Demultiplexer) - s.Stop() + requireStart(t, s) + // s.Stop() parser := newParser(deps.Config, newFloat64ListPool(), 1) parser.dsdOriginEnabled = true @@ -964,8 +955,8 @@ func testOriginOptout(t *testing.T, cfg map[string]interface{}, enabled bool) { s := deps.Server.(*server) assert := assert.New(t) - requireStart(t, s, deps.Demultiplexer) - s.Stop() + // requireStart(t, s, deps.Demultiplexer) + // s.Stop() parser := newParser(deps.Config, newFloat64ListPool(), 1) parser.dsdOriginEnabled = true @@ -1013,14 +1004,9 @@ func TestOriginOptout(t *testing.T) { } } -func requireStart(t *testing.T, s Component, demux aggregator.Demultiplexer, sd serverDeps) { - // err := s.start(demux) - // require.NoError(t, _, "cannot start DSD") - sd.Lc.Append(fx.Hook{ - OnStart: s.start, - OnStop: nil, - }) - +func requireStart(t *testing.T, s Component) { + // err := s.Start(demux) + // require.NoError(t, err, "cannot start DSD") assert.NotNil(t, s) assert.True(t, s.IsRunning()) } diff --git a/pkg/serverless/metrics/metric.go b/pkg/serverless/metrics/metric.go index 3f6bb4661d8124..9ed3e6ef966d14 100644 --- a/pkg/serverless/metrics/metric.go +++ b/pkg/serverless/metrics/metric.go @@ -56,10 +56,12 @@ func (m *MetricConfig) GetMultipleEndpoints() (map[string][]string, error) { } // NewServer returns a running DogStatsD server -// func (m *MetricDogStatsD) NewServer(demux aggregator.Demultiplexer) (dogstatsdServer.Component, error) { -// s := dogstatsdServer.NewServerlessServer() -// return s, s.Start(demux) -// } +func (m *MetricDogStatsD) NewServer(demux aggregator.Demultiplexer) (dogstatsdServer.Component, error) { + s := dogstatsdServer.NewServerlessServer() + + // call start inside newserverlessserver + return s, s.Start(demux) +} // Start starts the DogStatsD agent func (c *ServerlessMetricAgent) Start(forwarderTimeout time.Duration, multipleEndpointConfig MultipleEndpointConfig, dogstatFactory DogStatsDFactory) { From 66d19f404d14efe22e7b0b39633f7b4f087585e4 Mon Sep 17 00:00:00 2001 From: yoon nguyen Date: Wed, 7 Feb 2024 11:25:51 -0500 Subject: [PATCH 03/16] serverless --- cmd/agent/subcommands/run/command.go | 3 +- comp/dogstatsd/server/component.go | 20 ++++++++---- comp/dogstatsd/server/server.go | 25 ++++++++------ comp/dogstatsd/server/server_test.go | 49 ++++++++++++++++------------ 4 files changed, 58 insertions(+), 39 deletions(-) diff --git a/cmd/agent/subcommands/run/command.go b/cmd/agent/subcommands/run/command.go index 06121bdbb53e06..3055b75c0cd271 100644 --- a/cmd/agent/subcommands/run/command.go +++ b/cmd/agent/subcommands/run/command.go @@ -28,6 +28,7 @@ import ( "github.com/DataDog/datadog-agent/cmd/agent/common/misconfig" "github.com/DataDog/datadog-agent/cmd/agent/common/path" "github.com/DataDog/datadog-agent/cmd/agent/common/signals" + global "github.com/DataDog/datadog-agent/cmd/agent/dogstatsd" "github.com/DataDog/datadog-agent/cmd/agent/gui" "github.com/DataDog/datadog-agent/cmd/agent/subcommands/run/internal/clcrunnerapi" "github.com/DataDog/datadog-agent/cmd/manager" @@ -587,7 +588,7 @@ func startAgent( // start dogstatsd if pkgconfig.Datadog.GetBool("use_dogstatsd") { - // global.DSD = server + global.DSD = server // err := server.Start(demultiplexer) if err != nil { log.Errorf("Could not start dogstatsd: %s", err) diff --git a/comp/dogstatsd/server/component.go b/comp/dogstatsd/server/component.go index 1919f55486b16b..a4fdbcde57e0d0 100644 --- a/comp/dogstatsd/server/component.go +++ b/comp/dogstatsd/server/component.go @@ -7,6 +7,8 @@ package server import ( + "context" + "fmt" "time" "github.com/DataDog/datadog-agent/pkg/util/fxutil" @@ -17,13 +19,6 @@ import ( // Component is the component type. type Component interface { - - // // Start starts the dogstatsd server - // Start(demultiplexer aggregator.Demultiplexer) error - - // Stop stops the dogstatsd server - // Stop() - // IsRunning returns true if the server is running IsRunning() bool @@ -40,6 +35,12 @@ type Component interface { UDPLocalAddr() string } +// ServerlessDogstatsd is the interface for the serverless dogstatsd server. +type ServerlessDogstatsd interface { + Component + Stop() +} + // Mock implements mock-specific methods. type Mock interface { Component @@ -56,3 +57,8 @@ func MockModule() fxutil.Module { return fxutil.Component( fx.Provide(newMock)) } + +func (s *server) Stop() { + _ = s.stop(context.TODO()) + fmt.Printf("Stopping serverless dogstatsd \n") +} diff --git a/comp/dogstatsd/server/server.go b/comp/dogstatsd/server/server.go index e19a4d28cdb2f6..d351adeefe34f4 100644 --- a/comp/dogstatsd/server/server.go +++ b/comp/dogstatsd/server/server.go @@ -195,19 +195,29 @@ func initTelemetry(cfg config.Reader, logger logComponent.Component) { // TODO: (components) - remove once serverless is an FX app // //nolint:revive // TODO(AML) Fix revive linter -func NewServerlessServer(deps dependencies) Component { +func NewServerlessServer(deps dependencies) ServerlessDogstatsd { - // start the dogstatsd server here before new: + s := newServerCompat(config.Datadog, logComponentImpl.NewTemporaryLoggerWithoutInit(), replay.NewServerlessTrafficCapture(), serverdebugimpl.NewServerlessServerDebug(), true, deps.Demultiplexer) - return newServerCompat(config.Datadog, logComponentImpl.NewTemporaryLoggerWithoutInit(), replay.NewServerlessTrafficCapture(), serverdebugimpl.NewServerlessServerDebug(), true, deps.Lc, deps.Demultiplexer) + s.start(context.TODO()) + + return s } // TODO: (components) - merge with newServerCompat once NewServerlessServer is removed func newServer(deps dependencies) Component { - return newServerCompat(deps.Config, deps.Log, deps.Replay, deps.Debug, deps.Params.Serverless, deps.Lc, deps.Demultiplexer) + s := newServerCompat(deps.Config, deps.Log, deps.Replay, deps.Debug, deps.Params.Serverless, deps.Demultiplexer) + + deps.Lc.Append(fx.Hook{ + OnStart: s.start, + OnStop: s.stop, + }) + + return s + } -func newServerCompat(cfg config.Reader, log logComponent.Component, capture replay.Component, debug serverdebug.Component, serverless bool, lc fx.Lifecycle, demux demultiplexer.Component) Component { +func newServerCompat(cfg config.Reader, log logComponent.Component, capture replay.Component, debug serverdebug.Component, serverless bool, demux demultiplexer.Component) *server { // This needs to be done after the configuration is loaded once.Do(func() { initTelemetry(cfg, log) }) @@ -310,11 +320,6 @@ func newServerCompat(cfg config.Reader, log logComponent.Component, capture repl }, } - lc.Append(fx.Hook{ - OnStart: s.start, - OnStop: s.stop, - }) - return s } diff --git a/comp/dogstatsd/server/server_test.go b/comp/dogstatsd/server/server_test.go index b656d9b61cc0e1..7a0e420ab63748 100644 --- a/comp/dogstatsd/server/server_test.go +++ b/comp/dogstatsd/server/server_test.go @@ -8,6 +8,7 @@ package server import ( + "context" "fmt" "net" "sort" @@ -27,6 +28,7 @@ import ( "github.com/DataDog/datadog-agent/comp/core/log/logimpl" "github.com/DataDog/datadog-agent/comp/dogstatsd/listeners" "github.com/DataDog/datadog-agent/comp/dogstatsd/replay" + serverdebug "github.com/DataDog/datadog-agent/comp/dogstatsd/serverDebug" "github.com/DataDog/datadog-agent/comp/dogstatsd/serverDebug/serverdebugimpl" "github.com/DataDog/datadog-agent/pkg/config" "github.com/DataDog/datadog-agent/pkg/metrics" @@ -39,7 +41,11 @@ type serverDeps struct { Config configComponent.Component Log log.Component Demultiplexer demultiplexer.FakeSamplerMock - Server Component + Demux demultiplexer.Component + Replay replay.Component + Debug serverdebug.Component + + Server Component } func fulfillDeps(t testing.TB) serverDeps { @@ -98,10 +104,17 @@ func TestStopServer(t *testing.T) { deps := fulfillDepsWithConfigOverride(t, cfg) - requireStart(t, deps.Server) + s := newServerCompat(deps.Config, deps.Log, deps.Replay, deps.Debug, false, deps.Demux) + s.start(context.Background()) + // manually create the server and stop the server for this test. + // newServerCompat () => + // start the server + // stop the server + + // requireStart(t, deps.Server) // check that the port can be bound, try for 100 ms - address, err := net.ResolveUDPAddr("udp", deps.Server.UDPLocalAddr()) + address, err := net.ResolveUDPAddr("udp", s.UDPLocalAddr()) require.NoError(t, err, "cannot resolve address") for i := 0; i < 10; i++ { @@ -113,7 +126,9 @@ func TestStopServer(t *testing.T) { } time.Sleep(10 * time.Millisecond) } + s.Stop() + time.Sleep(100 * time.Millisecond) require.NoError(t, err, "port is not available, it should be") } @@ -461,7 +476,7 @@ func TestHistToDist(t *testing.T) { deps := fulfillDepsWithConfigOverride(t, cfg) - demux := createDemultiplexer(t) + // demux := createDemultiplexer(t) requireStart(t, deps.Server) conn, err := net.Dial("udp", deps.Server.UDPLocalAddr()) @@ -471,7 +486,7 @@ func TestHistToDist(t *testing.T) { // Test metric conn.Write([]byte("daemon:666|h|#sometag1:somevalue1,sometag2:somevalue2")) time.Sleep(time.Millisecond * 200) // give some time to the socket write/read - samples, timedSamples := demux.WaitForSamples(time.Second * 2) + samples, timedSamples := deps.Demultiplexer.WaitForSamples(time.Second * 10) require.Equal(t, 2, len(samples)) require.Equal(t, 0, len(timedSamples)) histMetric := samples[0] @@ -485,7 +500,7 @@ func TestHistToDist(t *testing.T) { assert.Equal(t, distMetric.Name, "dist.daemon") assert.EqualValues(t, distMetric.Value, 666.0) assert.Equal(t, metrics.DistributionType, distMetric.Mtype) - demux.Reset() + deps.Demultiplexer.Reset() } func TestScanLines(t *testing.T) { @@ -782,7 +797,7 @@ dogstatsd_mapper_profiles: sort.Strings(sample.Tags) } assert.Equal(t, scenario.expectedSamples, actualSamples, "Case `%s` failed. `%s` should be `%s`", scenario.name, actualSamples, scenario.expectedSamples) - // s.Stop() + }) } } @@ -795,32 +810,29 @@ func TestNewServerExtraTags(t *testing.T) { deps := fulfillDepsWithConfigOverride(t, cfg) s := deps.Server.(*server) - demux := deps.Demultiplexer + // demux := deps.Demultiplexer requireStart(t, s) require.Len(s.extraTags, 0, "no tags should have been read") - // s.Stop() - demux.Stop(false) + // demux.Stop(false) // when the extraTags parameter isn't used, the DogStatsD server is not reading this env var cfg["tags"] = "hello:world" deps = fulfillDepsWithConfigOverride(t, cfg) s = deps.Server.(*server) - demux = deps.Demultiplexer + // demux = deps.Demultiplexer requireStart(t, s) require.Len(s.extraTags, 0, "no tags should have been read") - // s.Stop() - demux.Stop(false) + // demux.Stop(false) // when the extraTags parameter isn't used, the DogStatsD server is automatically reading this env var for extra tags cfg["dogstatsd_tags"] = "hello:world extra:tags" deps = fulfillDepsWithConfigOverride(t, cfg) s = deps.Server.(*server) - demux = deps.Demultiplexer + // demux = deps.Demultiplexer requireStart(t, s) require.Len(s.extraTags, 2, "two tags should have been read") require.Equal(s.extraTags[0], "extra:tags", "the tag extra:tags should be set") require.Equal(s.extraTags[1], "hello:world", "the tag hello:world should be set") - // s.Stop() } func TestProcessedMetricsOrigin(t *testing.T) { @@ -833,11 +845,8 @@ func TestProcessedMetricsOrigin(t *testing.T) { s := deps.Server.(*server) assert := assert.New(t) - // demux := deps.Demultiplexer requireStart(t, s) - // s.Stop() - assert.Len(s.cachedOriginCounters, 0, "this cache must be empty") assert.Len(s.cachedOrder, 0, "this cache list must be empty") @@ -914,7 +923,6 @@ func testContainerIDParsing(t *testing.T, cfg map[string]interface{}) { s := deps.Server.(*server) assert := assert.New(t) requireStart(t, s) - // s.Stop() parser := newParser(deps.Config, newFloat64ListPool(), 1) parser.dsdOriginEnabled = true @@ -955,8 +963,7 @@ func testOriginOptout(t *testing.T, cfg map[string]interface{}, enabled bool) { s := deps.Server.(*server) assert := assert.New(t) - // requireStart(t, s, deps.Demultiplexer) - // s.Stop() + // requireStart(t, s, deps.Demultiplexer parser := newParser(deps.Config, newFloat64ListPool(), 1) parser.dsdOriginEnabled = true From 6cce5effb59913c43174c2c5f8d415dbe3b3f8bc Mon Sep 17 00:00:00 2001 From: yoon nguyen Date: Wed, 7 Feb 2024 14:45:40 -0500 Subject: [PATCH 04/16] unfinished test + metric agent --- comp/dogstatsd/server/component.go | 2 - comp/dogstatsd/server/server.go | 6 +-- comp/dogstatsd/server/server_test.go | 37 ++++++------- pkg/serverless/metrics/metric.go | 3 +- pkg/serverless/metrics/metric_test.go | 76 ++++++++++++++------------- 5 files changed, 58 insertions(+), 66 deletions(-) diff --git a/comp/dogstatsd/server/component.go b/comp/dogstatsd/server/component.go index a4fdbcde57e0d0..593e12a74cc085 100644 --- a/comp/dogstatsd/server/component.go +++ b/comp/dogstatsd/server/component.go @@ -8,7 +8,6 @@ package server import ( "context" - "fmt" "time" "github.com/DataDog/datadog-agent/pkg/util/fxutil" @@ -60,5 +59,4 @@ func MockModule() fxutil.Module { func (s *server) Stop() { _ = s.stop(context.TODO()) - fmt.Printf("Stopping serverless dogstatsd \n") } diff --git a/comp/dogstatsd/server/server.go b/comp/dogstatsd/server/server.go index d351adeefe34f4..39a5b4817e316f 100644 --- a/comp/dogstatsd/server/server.go +++ b/comp/dogstatsd/server/server.go @@ -199,7 +199,7 @@ func NewServerlessServer(deps dependencies) ServerlessDogstatsd { s := newServerCompat(config.Datadog, logComponentImpl.NewTemporaryLoggerWithoutInit(), replay.NewServerlessTrafficCapture(), serverdebugimpl.NewServerlessServerDebug(), true, deps.Demultiplexer) - s.start(context.TODO()) + s.Start(context.TODO()) return s } @@ -209,7 +209,7 @@ func newServer(deps dependencies) Component { s := newServerCompat(deps.Config, deps.Log, deps.Replay, deps.Debug, deps.Params.Serverless, deps.Demultiplexer) deps.Lc.Append(fx.Hook{ - OnStart: s.start, + OnStart: s.Start, OnStop: s.stop, }) @@ -323,7 +323,7 @@ func newServerCompat(cfg config.Reader, log logComponent.Component, capture repl return s } -func (s *server) start(context.Context) error { +func (s *server) Start(context.Context) error { // TODO: (components) - DI this into Server when Demultiplexer is made into a component // s.demultiplexer = demultiplexer diff --git a/comp/dogstatsd/server/server_test.go b/comp/dogstatsd/server/server_test.go index 7a0e420ab63748..cfb10f4078d486 100644 --- a/comp/dogstatsd/server/server_test.go +++ b/comp/dogstatsd/server/server_test.go @@ -92,7 +92,6 @@ func TestNewServer(t *testing.T) { cfg["dogstatsd_port"] = listeners.RandomPortName deps := fulfillDepsWithConfigOverride(t, cfg) - // demux := createDemultiplexer(t) requireStart(t, deps.Server) } @@ -105,13 +104,10 @@ func TestStopServer(t *testing.T) { deps := fulfillDepsWithConfigOverride(t, cfg) s := newServerCompat(deps.Config, deps.Log, deps.Replay, deps.Debug, false, deps.Demux) - s.start(context.Background()) - // manually create the server and stop the server for this test. - // newServerCompat () => - // start the server - // stop the server + s.Start(context.TODO()) + requireStart(t, s) - // requireStart(t, deps.Server) + s.Stop() // check that the port can be bound, try for 100 ms address, err := net.ResolveUDPAddr("udp", s.UDPLocalAddr()) @@ -126,9 +122,6 @@ func TestStopServer(t *testing.T) { } time.Sleep(10 * time.Millisecond) } - s.Stop() - - time.Sleep(100 * time.Millisecond) require.NoError(t, err, "port is not available, it should be") } @@ -166,14 +159,18 @@ func TestUDPReceive(t *testing.T) { deps := fulfillDepsWithConfigOverride(t, cfg) demux := deps.Demultiplexer - requireStart(t, deps.Server) + s := newServerCompat(deps.Config, deps.Log, deps.Replay, deps.Debug, false, deps.Demux) + s.Start(context.TODO()) + requireStart(t, s) + defer s.Stop() - conn, err := net.Dial("udp", deps.Server.UDPLocalAddr()) + conn, err := net.Dial("udp", s.UDPLocalAddr()) require.NoError(t, err, "cannot connect to DSD socket") defer conn.Close() // Test metric conn.Write([]byte("daemon:666|g|#sometag1:somevalue1,sometag2:somevalue2")) + time.Sleep(10 * time.Second) samples, timedSamples := demux.WaitForSamples(time.Second * 2) require.Len(t, samples, 1) require.Len(t, timedSamples, 0) @@ -810,25 +807,20 @@ func TestNewServerExtraTags(t *testing.T) { deps := fulfillDepsWithConfigOverride(t, cfg) s := deps.Server.(*server) - // demux := deps.Demultiplexer requireStart(t, s) require.Len(s.extraTags, 0, "no tags should have been read") - // demux.Stop(false) // when the extraTags parameter isn't used, the DogStatsD server is not reading this env var cfg["tags"] = "hello:world" deps = fulfillDepsWithConfigOverride(t, cfg) s = deps.Server.(*server) - // demux = deps.Demultiplexer requireStart(t, s) require.Len(s.extraTags, 0, "no tags should have been read") - // demux.Stop(false) // when the extraTags parameter isn't used, the DogStatsD server is automatically reading this env var for extra tags cfg["dogstatsd_tags"] = "hello:world extra:tags" deps = fulfillDepsWithConfigOverride(t, cfg) s = deps.Server.(*server) - // demux = deps.Demultiplexer requireStart(t, s) require.Len(s.extraTags, 2, "two tags should have been read") require.Equal(s.extraTags[0], "extra:tags", "the tag extra:tags should be set") @@ -842,10 +834,13 @@ func TestProcessedMetricsOrigin(t *testing.T) { cfg["dogstatsd_origin_optout_enabled"] = enabled deps := fulfillDepsWithConfigOverride(t, cfg) - s := deps.Server.(*server) - assert := assert.New(t) - requireStart(t, s) + s := newServerCompat(deps.Config, deps.Log, deps.Replay, deps.Debug, false, deps.Demux) + s.Start(context.TODO()) + s.Stop() + // requireStart(t, s) + + assert := assert.New(t) assert.Len(s.cachedOriginCounters, 0, "this cache must be empty") assert.Len(s.cachedOrder, 0, "this cache list must be empty") @@ -1012,8 +1007,6 @@ func TestOriginOptout(t *testing.T) { } func requireStart(t *testing.T, s Component) { - // err := s.Start(demux) - // require.NoError(t, err, "cannot start DSD") assert.NotNil(t, s) assert.True(t, s.IsRunning()) } diff --git a/pkg/serverless/metrics/metric.go b/pkg/serverless/metrics/metric.go index 9ed3e6ef966d14..82fc146e424c3a 100644 --- a/pkg/serverless/metrics/metric.go +++ b/pkg/serverless/metrics/metric.go @@ -59,8 +59,7 @@ func (m *MetricConfig) GetMultipleEndpoints() (map[string][]string, error) { func (m *MetricDogStatsD) NewServer(demux aggregator.Demultiplexer) (dogstatsdServer.Component, error) { s := dogstatsdServer.NewServerlessServer() - // call start inside newserverlessserver - return s, s.Start(demux) + return s } // Start starts the DogStatsD agent diff --git a/pkg/serverless/metrics/metric_test.go b/pkg/serverless/metrics/metric_test.go index ee6cf4469a9130..1fca6380c019b6 100644 --- a/pkg/serverless/metrics/metric_test.go +++ b/pkg/serverless/metrics/metric_test.go @@ -13,10 +13,12 @@ import ( "net/http" "os" "strconv" + "sync" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" dogstatsdServer "github.com/DataDog/datadog-agent/comp/dogstatsd/server" "github.com/DataDog/datadog-agent/pkg/aggregator" @@ -196,40 +198,40 @@ func getAvailableUDPPort() (int, error) { return portInt, nil } -// func TestRaceFlushVersusParsePacket(t *testing.T) { -// port, err := getAvailableUDPPort() -// require.NoError(t, err) -// config.Datadog.SetDefault("dogstatsd_port", port) - -// demux := aggregator.InitAndStartServerlessDemultiplexer(nil, time.Second*1000) - -// s := dogstatsdServer.NewServerlessServer() -// err = s.Start(demux) -// require.NoError(t, err, "cannot start DSD") -// defer s.Stop() - -// url := fmt.Sprintf("127.0.0.1:%d", config.Datadog.GetInt("dogstatsd_port")) -// conn, err := net.Dial("udp", url) -// require.NoError(t, err, "cannot connect to DSD socket") -// defer conn.Close() - -// finish := &sync.WaitGroup{} -// finish.Add(2) - -// go func(wg *sync.WaitGroup) { -// for i := 0; i < 1000; i++ { -// conn.Write([]byte("daemon:666|g|#sometag1:somevalue1,sometag2:somevalue2")) -// time.Sleep(10 * time.Nanosecond) -// } -// finish.Done() -// }(finish) - -// go func(wg *sync.WaitGroup) { -// for i := 0; i < 1000; i++ { -// s.ServerlessFlush(time.Second * 10) -// } -// finish.Done() -// }(finish) - -// finish.Wait() -// } +func TestRaceFlushVersusParsePacket(t *testing.T) { + port, err := getAvailableUDPPort() + require.NoError(t, err) + config.Datadog.SetDefault("dogstatsd_port", port) + + demux := aggregator.InitAndStartServerlessDemultiplexer(nil, time.Second*1000) + + s := dogstatsdServer.NewServerlessServer() + // err = s.Start() + require.NoError(t, err, "cannot start DSD") + defer s.Stop() + + url := fmt.Sprintf("127.0.0.1:%d", config.Datadog.GetInt("dogstatsd_port")) + conn, err := net.Dial("udp", url) + require.NoError(t, err, "cannot connect to DSD socket") + defer conn.Close() + + finish := &sync.WaitGroup{} + finish.Add(2) + + go func(wg *sync.WaitGroup) { + for i := 0; i < 1000; i++ { + conn.Write([]byte("daemon:666|g|#sometag1:somevalue1,sometag2:somevalue2")) + time.Sleep(10 * time.Nanosecond) + } + finish.Done() + }(finish) + + go func(wg *sync.WaitGroup) { + for i := 0; i < 1000; i++ { + s.ServerlessFlush(time.Second * 10) + } + finish.Done() + }(finish) + + finish.Wait() +} From 317e15b2f31a11582f327c4733ee3578680225f3 Mon Sep 17 00:00:00 2001 From: yoon nguyen Date: Thu, 8 Feb 2024 15:41:46 -0500 Subject: [PATCH 05/16] fixed test --- .../demultiplexer_fake_sampler_mock.go | 6 +++- comp/dogstatsd/server/server.go | 20 +++++++------ comp/dogstatsd/server/server_test.go | 30 +++++++------------ pkg/serverless/metrics/metric.go | 7 +++-- 4 files changed, 32 insertions(+), 31 deletions(-) diff --git a/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer_fake_sampler_mock.go b/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer_fake_sampler_mock.go index 41ef8d079ec2b6..eb45f7e5100581 100644 --- a/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer_fake_sampler_mock.go +++ b/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer_fake_sampler_mock.go @@ -21,7 +21,11 @@ import ( // FakeSamplerMockModule defines the fx options for FakeSamplerMock. func FakeSamplerMockModule() fxutil.Module { return fxutil.Component( - fx.Provide(newFakeSamplerMock)) + fx.Provide(newFakeSamplerMock), + fx.Provide(func(demux demultiplexerComp.FakeSamplerMock) aggregator.Demultiplexer { + return demux + }), + ) } type fakeSamplerMockDependencies struct { diff --git a/comp/dogstatsd/server/server.go b/comp/dogstatsd/server/server.go index 39a5b4817e316f..4789ade925ee77 100644 --- a/comp/dogstatsd/server/server.go +++ b/comp/dogstatsd/server/server.go @@ -17,7 +17,6 @@ import ( "go.uber.org/fx" - demultiplexer "github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer" configComponent "github.com/DataDog/datadog-agent/comp/core/config" logComponent "github.com/DataDog/datadog-agent/comp/core/log" logComponentImpl "github.com/DataDog/datadog-agent/comp/core/log/logimpl" @@ -70,7 +69,7 @@ type dependencies struct { Lc fx.Lifecycle - Demultiplexer demultiplexer.Component + Demultiplexer aggregator.Demultiplexer Log logComponent.Component Config configComponent.Component @@ -195,13 +194,16 @@ func initTelemetry(cfg config.Reader, logger logComponent.Component) { // TODO: (components) - remove once serverless is an FX app // //nolint:revive // TODO(AML) Fix revive linter -func NewServerlessServer(deps dependencies) ServerlessDogstatsd { +func NewServerlessServer(demux aggregator.Demultiplexer) (ServerlessDogstatsd, error) { - s := newServerCompat(config.Datadog, logComponentImpl.NewTemporaryLoggerWithoutInit(), replay.NewServerlessTrafficCapture(), serverdebugimpl.NewServerlessServerDebug(), true, deps.Demultiplexer) + s := newServerCompat(config.Datadog, logComponentImpl.NewTemporaryLoggerWithoutInit(), replay.NewServerlessTrafficCapture(), serverdebugimpl.NewServerlessServerDebug(), true, demux) - s.Start(context.TODO()) + err := s.start(context.TODO()) + if err != nil { + return nil, err + } - return s + return s, nil } // TODO: (components) - merge with newServerCompat once NewServerlessServer is removed @@ -209,7 +211,7 @@ func newServer(deps dependencies) Component { s := newServerCompat(deps.Config, deps.Log, deps.Replay, deps.Debug, deps.Params.Serverless, deps.Demultiplexer) deps.Lc.Append(fx.Hook{ - OnStart: s.Start, + OnStart: s.start, OnStop: s.stop, }) @@ -217,7 +219,7 @@ func newServer(deps dependencies) Component { } -func newServerCompat(cfg config.Reader, log logComponent.Component, capture replay.Component, debug serverdebug.Component, serverless bool, demux demultiplexer.Component) *server { +func newServerCompat(cfg config.Reader, log logComponent.Component, capture replay.Component, debug serverdebug.Component, serverless bool, demux aggregator.Demultiplexer) *server { // This needs to be done after the configuration is loaded once.Do(func() { initTelemetry(cfg, log) }) @@ -323,7 +325,7 @@ func newServerCompat(cfg config.Reader, log logComponent.Component, capture repl return s } -func (s *server) Start(context.Context) error { +func (s *server) start(context.Context) error { // TODO: (components) - DI this into Server when Demultiplexer is made into a component // s.demultiplexer = demultiplexer diff --git a/comp/dogstatsd/server/server_test.go b/comp/dogstatsd/server/server_test.go index cfb10f4078d486..1c89bc91185816 100644 --- a/comp/dogstatsd/server/server_test.go +++ b/comp/dogstatsd/server/server_test.go @@ -41,7 +41,6 @@ type serverDeps struct { Config configComponent.Component Log log.Component Demultiplexer demultiplexer.FakeSamplerMock - Demux demultiplexer.Component Replay replay.Component Debug serverdebug.Component @@ -63,7 +62,6 @@ func fulfillDepsWithConfigOverrideAndFeatures(t testing.TB, overrides map[string fx.Supply(Params{Serverless: false}), replay.MockModule(), demultiplexerimpl.FakeSamplerMockModule(), - demultiplexerimpl.MockModule(), Module(), )) } @@ -82,7 +80,6 @@ func fulfillDepsWithConfigYaml(t testing.TB, yaml string) serverDeps { fx.Supply(Params{Serverless: false}), replay.MockModule(), demultiplexerimpl.FakeSamplerMockModule(), - demultiplexerimpl.MockModule(), Module(), )) } @@ -103,11 +100,11 @@ func TestStopServer(t *testing.T) { deps := fulfillDepsWithConfigOverride(t, cfg) - s := newServerCompat(deps.Config, deps.Log, deps.Replay, deps.Debug, false, deps.Demux) - s.Start(context.TODO()) + s := newServerCompat(deps.Config, deps.Log, deps.Replay, deps.Debug, false, deps.Demultiplexer) + s.start(context.TODO()) requireStart(t, s) - s.Stop() + s.stop(context.TODO()) // check that the port can be bound, try for 100 ms address, err := net.ResolveUDPAddr("udp", s.UDPLocalAddr()) @@ -159,12 +156,8 @@ func TestUDPReceive(t *testing.T) { deps := fulfillDepsWithConfigOverride(t, cfg) demux := deps.Demultiplexer - s := newServerCompat(deps.Config, deps.Log, deps.Replay, deps.Debug, false, deps.Demux) - s.Start(context.TODO()) - requireStart(t, s) - defer s.Stop() - conn, err := net.Dial("udp", s.UDPLocalAddr()) + conn, err := net.Dial("udp", deps.Server.UDPLocalAddr()) require.NoError(t, err, "cannot connect to DSD socket") defer conn.Close() @@ -473,7 +466,6 @@ func TestHistToDist(t *testing.T) { deps := fulfillDepsWithConfigOverride(t, cfg) - // demux := createDemultiplexer(t) requireStart(t, deps.Server) conn, err := net.Dial("udp", deps.Server.UDPLocalAddr()) @@ -599,7 +591,7 @@ func TestExtraTags(t *testing.T) { deps := fulfillDepsWithConfigOverrideAndFeatures(t, cfg, []config.Feature{config.EKSFargate}) - demux := createDemultiplexer(t) + demux := deps.Demultiplexer requireStart(t, deps.Server) conn, err := net.Dial("udp", deps.Server.UDPLocalAddr()) @@ -627,7 +619,7 @@ func TestStaticTags(t *testing.T) { deps := fulfillDepsWithConfigOverrideAndFeatures(t, cfg, []config.Feature{config.EKSFargate}) - demux := createDemultiplexer(t) + demux := deps.Demultiplexer requireStart(t, deps.Server) conn, err := net.Dial("udp", deps.Server.UDPLocalAddr()) @@ -834,13 +826,13 @@ func TestProcessedMetricsOrigin(t *testing.T) { cfg["dogstatsd_origin_optout_enabled"] = enabled deps := fulfillDepsWithConfigOverride(t, cfg) + s := deps.Server.(*server) + assert := assert.New(t) - s := newServerCompat(deps.Config, deps.Log, deps.Replay, deps.Debug, false, deps.Demux) - s.Start(context.TODO()) - s.Stop() - // requireStart(t, s) + s.start(context.TODO()) + requireStart(t, s) - assert := assert.New(t) + s.Stop() assert.Len(s.cachedOriginCounters, 0, "this cache must be empty") assert.Len(s.cachedOrder, 0, "this cache list must be empty") diff --git a/pkg/serverless/metrics/metric.go b/pkg/serverless/metrics/metric.go index 82fc146e424c3a..b7fb79619157b9 100644 --- a/pkg/serverless/metrics/metric.go +++ b/pkg/serverless/metrics/metric.go @@ -57,9 +57,12 @@ func (m *MetricConfig) GetMultipleEndpoints() (map[string][]string, error) { // NewServer returns a running DogStatsD server func (m *MetricDogStatsD) NewServer(demux aggregator.Demultiplexer) (dogstatsdServer.Component, error) { - s := dogstatsdServer.NewServerlessServer() + s, err := dogstatsdServer.NewServerlessServer(demux) + if err != nil { + return nil, err + } - return s + return s, nil } // Start starts the DogStatsD agent From 7af9b59c897717e2016ed4ec26c39fa6156af86c Mon Sep 17 00:00:00 2001 From: yoon nguyen Date: Thu, 8 Feb 2024 16:15:09 -0500 Subject: [PATCH 06/16] lint --- cmd/agent/subcommands/run/command.go | 4 +--- .../run/internal/settings/runtime_settings_test.go | 2 -- cmd/dogstatsd/subcommands/start/command.go | 3 --- 3 files changed, 1 insertion(+), 8 deletions(-) diff --git a/cmd/agent/subcommands/run/command.go b/cmd/agent/subcommands/run/command.go index 3055b75c0cd271..fcc59a0a51dcbc 100644 --- a/cmd/agent/subcommands/run/command.go +++ b/cmd/agent/subcommands/run/command.go @@ -421,7 +421,7 @@ func startAgent( ) if jmxLoggerSetupErr != nil { - return fmt.Errorf("Error while setting up logging, exiting: %v", jmxLoggerSetupErr) + return fmt.Errorf(" Error while setting up logging, exiting: %v", jmxLoggerSetupErr) } if flavor.GetFlavor() == flavor.IotAgent { @@ -589,7 +589,6 @@ func startAgent( // start dogstatsd if pkgconfig.Datadog.GetBool("use_dogstatsd") { global.DSD = server - // err := server.Start(demultiplexer) if err != nil { log.Errorf("Could not start dogstatsd: %s", err) } else { @@ -645,7 +644,6 @@ func stopAgent(cliParams *cliParams, server dogstatsdServer.Component, agentAPI pkglog.Errorf("Error shutting down expvar server: %v", err) } } - server.Stop() if common.AC != nil { common.AC.Stop() } diff --git a/cmd/agent/subcommands/run/internal/settings/runtime_settings_test.go b/cmd/agent/subcommands/run/internal/settings/runtime_settings_test.go index 3ed49ea6c23e04..124e9365cbdb84 100644 --- a/cmd/agent/subcommands/run/internal/settings/runtime_settings_test.go +++ b/cmd/agent/subcommands/run/internal/settings/runtime_settings_test.go @@ -46,9 +46,7 @@ func TestDogstatsdMetricsStats(t *testing.T) { demultiplexerimpl.MockModule(), )) - // demux := deps.Demultiplexer global.DSD = deps.Server - // deps.Server.Start(demux) require.Nil(t, err) diff --git a/cmd/dogstatsd/subcommands/start/command.go b/cmd/dogstatsd/subcommands/start/command.go index f45377fe0d0c8c..4d020d5b7bc5ae 100644 --- a/cmd/dogstatsd/subcommands/start/command.go +++ b/cmd/dogstatsd/subcommands/start/command.go @@ -268,7 +268,6 @@ func RunDogstatsd(ctx context.Context, cliParams *CLIParams, config config.Compo demultiplexer.AddAgentStartupTelemetry(version.AgentVersion) - // err = components.DogstatsdServer.Start(demultiplexer) if err != nil { log.Criticalf("Unable to start dogstatsd: %s", err) return @@ -317,8 +316,6 @@ func StopAgent(cancel context.CancelFunc, components *DogstatsdComponents) { } } - components.DogstatsdServer.Stop() - pkglog.Info("See ya!") pkglog.Flush() } From 7291ab7c35eeaffaf9578035fd2812b9b878ca2e Mon Sep 17 00:00:00 2001 From: yoon nguyen Date: Mon, 12 Feb 2024 16:46:46 -0500 Subject: [PATCH 07/16] fixed test + agent start --- cmd/agent/subcommands/run/command.go | 3 +-- .../run/internal/settings/runtime_settings_test.go | 7 +------ .../demultiplexer/demultiplexerimpl/demultiplexer.go | 5 ++++- comp/core/bundle_mock.go | 5 ----- comp/dogstatsd/bundle.go | 6 ++++++ 5 files changed, 12 insertions(+), 14 deletions(-) diff --git a/cmd/agent/subcommands/run/command.go b/cmd/agent/subcommands/run/command.go index fac6ccde0bdd90..a261fa7e82c1ae 100644 --- a/cmd/agent/subcommands/run/command.go +++ b/cmd/agent/subcommands/run/command.go @@ -329,7 +329,7 @@ func getSharedFxOption() fx.Option { dogstatsdStatusimpl.Module(), statusimpl.Module(), apiimpl.Module(), - + demultiplexerimpl.Module(), dogstatsd.Bundle(), otelcol.Bundle(), rcclient.Module(), @@ -362,7 +362,6 @@ func getSharedFxOption() fx.Option { params.EnableNoAggregationPipeline = config.GetBool("dogstatsd_no_aggregation_pipeline") return params }), - demultiplexerimpl.Module(), orchestratorForwarderImpl.Module(), fx.Supply(orchestratorForwarderImpl.NewDefaultParams()), eventplatformimpl.Module(), diff --git a/cmd/agent/subcommands/run/internal/settings/runtime_settings_test.go b/cmd/agent/subcommands/run/internal/settings/runtime_settings_test.go index 85e188f1c913ae..8373d477dad602 100644 --- a/cmd/agent/subcommands/run/internal/settings/runtime_settings_test.go +++ b/cmd/agent/subcommands/run/internal/settings/runtime_settings_test.go @@ -9,7 +9,6 @@ import ( "testing" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" "go.uber.org/fx" "github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer" @@ -40,16 +39,12 @@ func TestDogstatsdMetricsStats(t *testing.T) { fx.Supply(core.BundleParams{}), fx.Supply(server.Params{ Serverless: false, - }), + }), demultiplexerimpl.MockModule(), dogstatsd.Bundle(), defaultforwarder.MockModule(), )) - // demux := deps.Demultiplexer - // deps.Server.Start(demux) - require.Nil(t, err) - s := DsdStatsRuntimeSetting{ ServerDebug: deps.Debug, } diff --git a/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer.go b/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer.go index 530a848fdcb07b..325bc4848dcc3e 100644 --- a/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer.go +++ b/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer.go @@ -27,7 +27,10 @@ import ( // Module defines the fx options for this component. func Module() fxutil.Module { return fxutil.Component( - fx.Provide(newDemultiplexer)) + fx.Provide(newDemultiplexer), + fx.Provide(func(demux demultiplexerComp.Component) aggregator.Demultiplexer { + return demux + })) } type dependencies struct { diff --git a/comp/core/bundle_mock.go b/comp/core/bundle_mock.go index 13a7f1f5ca380a..c004d426325cff 100644 --- a/comp/core/bundle_mock.go +++ b/comp/core/bundle_mock.go @@ -16,13 +16,11 @@ package core import ( - demultiplexerComp "github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer" "github.com/DataDog/datadog-agent/comp/core/config" "github.com/DataDog/datadog-agent/comp/core/hostname/hostnameimpl" "github.com/DataDog/datadog-agent/comp/core/log/logimpl" "github.com/DataDog/datadog-agent/comp/core/sysprobeconfig/sysprobeconfigimpl" "github.com/DataDog/datadog-agent/comp/core/telemetry" - "github.com/DataDog/datadog-agent/pkg/aggregator" "github.com/DataDog/datadog-agent/pkg/util/fxutil" "go.uber.org/fx" ) @@ -32,9 +30,6 @@ import ( // MakeMockBundle returns a core bundle with a customized set of fx.Option including sane defaults. func MakeMockBundle(logParams, logger fx.Option) fxutil.BundleOptions { return fxutil.Bundle( - fx.Provide(func(demux demultiplexerComp.FakeSamplerMock) aggregator.Demultiplexer { - return demux - }), fx.Provide(func(params BundleParams) config.Params { return params.ConfigParams }), config.MockModule(), logParams, diff --git a/comp/dogstatsd/bundle.go b/comp/dogstatsd/bundle.go index ffd727345a00dd..a371d06a5c6988 100644 --- a/comp/dogstatsd/bundle.go +++ b/comp/dogstatsd/bundle.go @@ -6,11 +6,14 @@ package dogstatsd //nolint:revive // TODO(AML) Fix revive linter import ( + demultiplexerComp "github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer" "github.com/DataDog/datadog-agent/comp/dogstatsd/replay" "github.com/DataDog/datadog-agent/comp/dogstatsd/server" "github.com/DataDog/datadog-agent/comp/dogstatsd/serverDebug/serverdebugimpl" "github.com/DataDog/datadog-agent/comp/dogstatsd/statsd" + "github.com/DataDog/datadog-agent/pkg/aggregator" "github.com/DataDog/datadog-agent/pkg/util/fxutil" + "go.uber.org/fx" ) // team: agent-metrics-logs @@ -18,6 +21,9 @@ import ( // Bundle defines the fx options for this bundle. func Bundle() fxutil.BundleOptions { return fxutil.Bundle( + fx.Provide(func(demux demultiplexerComp.Component) aggregator.Demultiplexer { + return demux + }), serverdebugimpl.Module(), replay.Module(), server.Module()) From 438db33dbdac0862d8c7b778059aa9b8b7a1c492 Mon Sep 17 00:00:00 2001 From: yoon nguyen Date: Tue, 13 Feb 2024 10:19:45 -0500 Subject: [PATCH 08/16] lint --- cmd/agent/subcommands/run/command.go | 2 +- comp/dogstatsd/server/server_test.go | 10 ++++------ pkg/serverless/metrics/metric.go | 4 +++- pkg/serverless/metrics/metric_test.go | 3 +-- 4 files changed, 9 insertions(+), 10 deletions(-) diff --git a/cmd/agent/subcommands/run/command.go b/cmd/agent/subcommands/run/command.go index a261fa7e82c1ae..07947171a104ec 100644 --- a/cmd/agent/subcommands/run/command.go +++ b/cmd/agent/subcommands/run/command.go @@ -639,7 +639,7 @@ func StopAgentWithDefaults(server dogstatsdServer.Component, agentAPI internalAP } // stopAgent Tears down the agent process -func stopAgent(cliParams *cliParams, server dogstatsdServer.Component, agentAPI internalAPI.Component) { +func stopAgent(cliParams *cliParams, _ dogstatsdServer.Component, agentAPI internalAPI.Component) { // retrieve the agent health before stopping the components // GetReadyNonBlocking has a 100ms timeout to avoid blocking health, err := health.GetReadyNonBlocking() diff --git a/comp/dogstatsd/server/server_test.go b/comp/dogstatsd/server/server_test.go index 1566e3844f6519..2ed6f5fb5f65e1 100644 --- a/comp/dogstatsd/server/server_test.go +++ b/comp/dogstatsd/server/server_test.go @@ -24,9 +24,7 @@ import ( "github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer/demultiplexerimpl" "github.com/DataDog/datadog-agent/comp/core" configComponent "github.com/DataDog/datadog-agent/comp/core/config" - "github.com/DataDog/datadog-agent/comp/core/hostname/hostnameimpl" "github.com/DataDog/datadog-agent/comp/core/log" - "github.com/DataDog/datadog-agent/comp/core/log/logimpl" "github.com/DataDog/datadog-agent/comp/dogstatsd/listeners" "github.com/DataDog/datadog-agent/comp/dogstatsd/replay" serverdebug "github.com/DataDog/datadog-agent/comp/dogstatsd/serverDebug" @@ -1004,7 +1002,7 @@ func requireStart(t *testing.T, s Component) { assert.True(t, s.IsRunning()) } -func createDemultiplexer(t *testing.T) demultiplexer.FakeSamplerMock { - return fxutil.Test[demultiplexer.FakeSamplerMock](t, logimpl.MockModule(), - demultiplexerimpl.FakeSamplerMockModule(), hostnameimpl.MockModule()) -} +// func createDemultiplexer(t *testing.T) demultiplexer.FakeSamplerMock { +// return fxutil.Test[demultiplexer.FakeSamplerMock](t, logimpl.MockModule(), +// demultiplexerimpl.FakeSamplerMockModule(), hostnameimpl.MockModule()) +// } diff --git a/pkg/serverless/metrics/metric.go b/pkg/serverless/metrics/metric.go index b7fb79619157b9..0d7b66a92794b8 100644 --- a/pkg/serverless/metrics/metric.go +++ b/pkg/serverless/metrics/metric.go @@ -109,7 +109,9 @@ func (c *ServerlessMetricAgent) Flush() { // Stop stops the DogStatsD server func (c *ServerlessMetricAgent) Stop() { if c.IsReady() { - c.dogStatsDServer.Stop() + if serverlessServer, ok := c.dogStatsDServer.(dogstatsdServer.ServerlessDogstatsd); ok { + serverlessServer.Stop() + } } } diff --git a/pkg/serverless/metrics/metric_test.go b/pkg/serverless/metrics/metric_test.go index 1fca6380c019b6..3db8bd4b1a3f3f 100644 --- a/pkg/serverless/metrics/metric_test.go +++ b/pkg/serverless/metrics/metric_test.go @@ -205,8 +205,7 @@ func TestRaceFlushVersusParsePacket(t *testing.T) { demux := aggregator.InitAndStartServerlessDemultiplexer(nil, time.Second*1000) - s := dogstatsdServer.NewServerlessServer() - // err = s.Start() + s, err := dogstatsdServer.NewServerlessServer(demux) require.NoError(t, err, "cannot start DSD") defer s.Stop() From 4f2c355c338aadcf596d8b4dff51383d50001a29 Mon Sep 17 00:00:00 2001 From: yoon nguyen Date: Tue, 13 Feb 2024 15:45:53 -0500 Subject: [PATCH 09/16] adjusted to comments --- cmd/agent/subcommands/run/command.go | 9 ++-- .../demultiplexerimpl/demultiplexer.go | 11 +++-- comp/dogstatsd/bundle.go | 6 --- comp/dogstatsd/server/component.go | 11 ----- comp/dogstatsd/server/server.go | 18 -------- comp/dogstatsd/server/server_bench_test.go | 4 -- comp/dogstatsd/server/server_test.go | 17 ++------ comp/dogstatsd/server/serverless.go | 43 +++++++++++++++++++ pkg/serverless/metrics/metric.go | 15 ++----- pkg/serverless/metrics/metric_test.go | 2 +- 10 files changed, 61 insertions(+), 75 deletions(-) create mode 100644 comp/dogstatsd/server/serverless.go diff --git a/cmd/agent/subcommands/run/command.go b/cmd/agent/subcommands/run/command.go index 07947171a104ec..7b46f91e742056 100644 --- a/cmd/agent/subcommands/run/command.go +++ b/cmd/agent/subcommands/run/command.go @@ -212,7 +212,7 @@ func run(log log.Component, collector collector.Component, ) error { defer func() { - stopAgent(cliParams, server, agentAPI) + stopAgent(cliParams, agentAPI) }() // prepare go runtime @@ -427,7 +427,7 @@ func startAgent( ) if jmxLoggerSetupErr != nil { - return fmt.Errorf(" Error while setting up logging, exiting: %v", jmxLoggerSetupErr) + return fmt.Errorf("Error while setting up logging, exiting: %v", jmxLoggerSetupErr) } if flavor.GetFlavor() == flavor.IotAgent { @@ -598,7 +598,6 @@ func startAgent( // start dogstatsd if pkgconfig.Datadog.GetBool("use_dogstatsd") { - // err := server.Start(demultiplexer) if err != nil { log.Errorf("Could not start dogstatsd: %s", err) } else { @@ -635,11 +634,11 @@ func startAgent( // StopAgentWithDefaults is a temporary way for other packages to use stopAgent. func StopAgentWithDefaults(server dogstatsdServer.Component, agentAPI internalAPI.Component) { - stopAgent(&cliParams{GlobalParams: &command.GlobalParams{}}, server, agentAPI) + stopAgent(&cliParams{GlobalParams: &command.GlobalParams{}}, agentAPI) } // stopAgent Tears down the agent process -func stopAgent(cliParams *cliParams, _ dogstatsdServer.Component, agentAPI internalAPI.Component) { +func stopAgent(cliParams *cliParams, agentAPI internalAPI.Component) { // retrieve the agent health before stopping the components // GetReadyNonBlocking has a 100ms timeout to avoid blocking health, err := health.GetReadyNonBlocking() diff --git a/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer.go b/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer.go index 325bc4848dcc3e..ee434d1fdc17f6 100644 --- a/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer.go +++ b/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer.go @@ -27,10 +27,7 @@ import ( // Module defines the fx options for this component. func Module() fxutil.Module { return fxutil.Component( - fx.Provide(newDemultiplexer), - fx.Provide(func(demux demultiplexerComp.Component) aggregator.Demultiplexer { - return demux - })) + fx.Provide(newDemultiplexer)) } type dependencies struct { @@ -59,8 +56,9 @@ type provides struct { // implements diagnosesendermanager.Component). This has the nice consequence of preventing having // demultiplexerimpl.Module and diagnosesendermanagerimpl.Module in the same fx.App because there would // be two ways to create diagnosesendermanager.Component. - SenderManager diagnosesendermanager.Component - StatusProvider status.InformationProvider + SenderManager diagnosesendermanager.Component + StatusProvider status.InformationProvider + AggregatorDemultiplexer aggregator.Demultiplexer } func newDemultiplexer(deps dependencies) (provides, error) { @@ -95,6 +93,7 @@ func newDemultiplexer(deps dependencies) (provides, error) { StatusProvider: status.NewInformationProvider(demultiplexerStatus{ Log: deps.Log, }), + AggregatorDemultiplexer: demultiplexer, }, nil } diff --git a/comp/dogstatsd/bundle.go b/comp/dogstatsd/bundle.go index a371d06a5c6988..ffd727345a00dd 100644 --- a/comp/dogstatsd/bundle.go +++ b/comp/dogstatsd/bundle.go @@ -6,14 +6,11 @@ package dogstatsd //nolint:revive // TODO(AML) Fix revive linter import ( - demultiplexerComp "github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer" "github.com/DataDog/datadog-agent/comp/dogstatsd/replay" "github.com/DataDog/datadog-agent/comp/dogstatsd/server" "github.com/DataDog/datadog-agent/comp/dogstatsd/serverDebug/serverdebugimpl" "github.com/DataDog/datadog-agent/comp/dogstatsd/statsd" - "github.com/DataDog/datadog-agent/pkg/aggregator" "github.com/DataDog/datadog-agent/pkg/util/fxutil" - "go.uber.org/fx" ) // team: agent-metrics-logs @@ -21,9 +18,6 @@ import ( // Bundle defines the fx options for this bundle. func Bundle() fxutil.BundleOptions { return fxutil.Bundle( - fx.Provide(func(demux demultiplexerComp.Component) aggregator.Demultiplexer { - return demux - }), serverdebugimpl.Module(), replay.Module(), server.Module()) diff --git a/comp/dogstatsd/server/component.go b/comp/dogstatsd/server/component.go index 593e12a74cc085..9ffb2ce924afae 100644 --- a/comp/dogstatsd/server/component.go +++ b/comp/dogstatsd/server/component.go @@ -7,7 +7,6 @@ package server import ( - "context" "time" "github.com/DataDog/datadog-agent/pkg/util/fxutil" @@ -34,12 +33,6 @@ type Component interface { UDPLocalAddr() string } -// ServerlessDogstatsd is the interface for the serverless dogstatsd server. -type ServerlessDogstatsd interface { - Component - Stop() -} - // Mock implements mock-specific methods. type Mock interface { Component @@ -56,7 +49,3 @@ func MockModule() fxutil.Module { return fxutil.Component( fx.Provide(newMock)) } - -func (s *server) Stop() { - _ = s.stop(context.TODO()) -} diff --git a/comp/dogstatsd/server/server.go b/comp/dogstatsd/server/server.go index 4789ade925ee77..1b7f021800e47e 100644 --- a/comp/dogstatsd/server/server.go +++ b/comp/dogstatsd/server/server.go @@ -19,13 +19,11 @@ import ( configComponent "github.com/DataDog/datadog-agent/comp/core/config" logComponent "github.com/DataDog/datadog-agent/comp/core/log" - logComponentImpl "github.com/DataDog/datadog-agent/comp/core/log/logimpl" "github.com/DataDog/datadog-agent/comp/dogstatsd/listeners" "github.com/DataDog/datadog-agent/comp/dogstatsd/mapper" "github.com/DataDog/datadog-agent/comp/dogstatsd/packets" "github.com/DataDog/datadog-agent/comp/dogstatsd/replay" serverdebug "github.com/DataDog/datadog-agent/comp/dogstatsd/serverDebug" - "github.com/DataDog/datadog-agent/comp/dogstatsd/serverDebug/serverdebugimpl" "github.com/DataDog/datadog-agent/pkg/aggregator" "github.com/DataDog/datadog-agent/pkg/config" "github.com/DataDog/datadog-agent/pkg/metrics" @@ -191,21 +189,6 @@ func initTelemetry(cfg config.Reader, logger logComponent.Component) { packets.InitTelemetry(get("telemetry.dogstatsd.listeners_channel_latency_buckets")) } -// TODO: (components) - remove once serverless is an FX app -// -//nolint:revive // TODO(AML) Fix revive linter -func NewServerlessServer(demux aggregator.Demultiplexer) (ServerlessDogstatsd, error) { - - s := newServerCompat(config.Datadog, logComponentImpl.NewTemporaryLoggerWithoutInit(), replay.NewServerlessTrafficCapture(), serverdebugimpl.NewServerlessServerDebug(), true, demux) - - err := s.start(context.TODO()) - if err != nil { - return nil, err - } - - return s, nil -} - // TODO: (components) - merge with newServerCompat once NewServerlessServer is removed func newServer(deps dependencies) Component { s := newServerCompat(deps.Config, deps.Log, deps.Replay, deps.Debug, deps.Params.Serverless, deps.Demultiplexer) @@ -328,7 +311,6 @@ func newServerCompat(cfg config.Reader, log logComponent.Component, capture repl func (s *server) start(context.Context) error { // TODO: (components) - DI this into Server when Demultiplexer is made into a component - // s.demultiplexer = demultiplexer packetsChannel := make(chan packets.Packets, s.config.GetInt("dogstatsd_queue_size")) tmpListeners := make([]listeners.StatsdListener, 0, 2) diff --git a/comp/dogstatsd/server/server_bench_test.go b/comp/dogstatsd/server/server_bench_test.go index 8bde002b06ffa0..ffc8678524b173 100644 --- a/comp/dogstatsd/server/server_bench_test.go +++ b/comp/dogstatsd/server/server_bench_test.go @@ -38,8 +38,6 @@ func benchParsePackets(b *testing.B, rawPacket []byte) { demux := deps.Demultiplexer defer demux.Stop(false) - // _ = s.Start(demux) - // defer s.Stop() done := make(chan struct{}) go func() { @@ -139,8 +137,6 @@ func benchmarkMapperControl(b *testing.B, yaml string) { pkgconfig.SetupLogger("", "off", "", "", false, true, false) demux := deps.Demultiplexer - // _ = s.Start(demux) - // defer s.Stop() done := make(chan struct{}) go func() { diff --git a/comp/dogstatsd/server/server_test.go b/comp/dogstatsd/server/server_test.go index 2ed6f5fb5f65e1..454bdf36a5136d 100644 --- a/comp/dogstatsd/server/server_test.go +++ b/comp/dogstatsd/server/server_test.go @@ -162,7 +162,6 @@ func TestUDPReceive(t *testing.T) { // Test metric conn.Write([]byte("daemon:666|g|#sometag1:somevalue1,sometag2:somevalue2")) - time.Sleep(10 * time.Second) samples, timedSamples := demux.WaitForSamples(time.Second * 2) require.Len(t, samples, 1) require.Len(t, timedSamples, 0) @@ -435,7 +434,6 @@ func TestUDPForward(t *testing.T) { defer pc.Close() - // demux := deps.Demultiplexer requireStart(t, deps.Server) conn, err := net.Dial("udp", deps.Server.UDPLocalAddr()) @@ -465,6 +463,7 @@ func TestHistToDist(t *testing.T) { deps := fulfillDepsWithConfigOverride(t, cfg) + demux := deps.Demultiplexer requireStart(t, deps.Server) conn, err := net.Dial("udp", deps.Server.UDPLocalAddr()) @@ -474,7 +473,7 @@ func TestHistToDist(t *testing.T) { // Test metric conn.Write([]byte("daemon:666|h|#sometag1:somevalue1,sometag2:somevalue2")) time.Sleep(time.Millisecond * 200) // give some time to the socket write/read - samples, timedSamples := deps.Demultiplexer.WaitForSamples(time.Second * 10) + samples, timedSamples := demux.WaitForSamples(time.Second * 2) require.Equal(t, 2, len(samples)) require.Equal(t, 0, len(timedSamples)) histMetric := samples[0] @@ -488,7 +487,7 @@ func TestHistToDist(t *testing.T) { assert.Equal(t, distMetric.Name, "dist.daemon") assert.EqualValues(t, distMetric.Value, 666.0) assert.Equal(t, metrics.DistributionType, distMetric.Mtype) - deps.Demultiplexer.Reset() + demux.Reset() } func TestScanLines(t *testing.T) { @@ -653,7 +652,6 @@ func TestNoMappingsConfig(t *testing.T) { samples := []metrics.MetricSample{} - // demux := deps.Demultiplexer requireStart(t, s) assert.Nil(t, s.mapper) @@ -764,7 +762,6 @@ dogstatsd_mapper_profiles: cw.SetWithoutSource("dogstatsd_port", listeners.RandomPortName) - // demux := deps.Demultiplexer requireStart(t, s) assert.Equal(t, deps.Config.Get("dogstatsd_mapper_cache_size"), scenario.expectedCacheSize, "Case `%s` failed. cache_size `%s` should be `%s`", scenario.name, deps.Config.Get("dogstatsd_mapper_cache_size"), scenario.expectedCacheSize) @@ -785,7 +782,6 @@ dogstatsd_mapper_profiles: sort.Strings(sample.Tags) } assert.Equal(t, scenario.expectedSamples, actualSamples, "Case `%s` failed. `%s` should be `%s`", scenario.name, actualSamples, scenario.expectedSamples) - }) } } @@ -949,7 +945,7 @@ func testOriginOptout(t *testing.T, cfg map[string]interface{}, enabled bool) { s := deps.Server.(*server) assert := assert.New(t) - // requireStart(t, s, deps.Demultiplexer + requireStart(t, s) parser := newParser(deps.Config, newFloat64ListPool(), 1) parser.dsdOriginEnabled = true @@ -1001,8 +997,3 @@ func requireStart(t *testing.T, s Component) { assert.NotNil(t, s) assert.True(t, s.IsRunning()) } - -// func createDemultiplexer(t *testing.T) demultiplexer.FakeSamplerMock { -// return fxutil.Test[demultiplexer.FakeSamplerMock](t, logimpl.MockModule(), -// demultiplexerimpl.FakeSamplerMockModule(), hostnameimpl.MockModule()) -// } diff --git a/comp/dogstatsd/server/serverless.go b/comp/dogstatsd/server/serverless.go new file mode 100644 index 00000000000000..63cf3f3addcf5c --- /dev/null +++ b/comp/dogstatsd/server/serverless.go @@ -0,0 +1,43 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +// Package server implements a component to run the dogstatsd server +package server + +import ( + "context" + + logComponentImpl "github.com/DataDog/datadog-agent/comp/core/log/logimpl" + "github.com/DataDog/datadog-agent/comp/dogstatsd/replay" + "github.com/DataDog/datadog-agent/comp/dogstatsd/serverDebug/serverdebugimpl" + "github.com/DataDog/datadog-agent/pkg/aggregator" + "github.com/DataDog/datadog-agent/pkg/config" +) + +// team: agent-metrics-logs + +// ServerlessDogstatsd is the interface for the serverless dogstatsd server. +type ServerlessDogstatsd interface { + Component + Stop() +} + +// TODO: (components) - remove once serverless is an FX app +// +//nolint:revive // TODO(AML) Fix revive linter +func NewServerlessServer(demux aggregator.Demultiplexer) (ServerlessDogstatsd, error) { + s := newServerCompat(config.Datadog, logComponentImpl.NewTemporaryLoggerWithoutInit(), replay.NewServerlessTrafficCapture(), serverdebugimpl.NewServerlessServerDebug(), true, demux) + + err := s.start(context.TODO()) + if err != nil { + return nil, err + } + + return s, nil +} + +func (s *server) Stop() { + _ = s.stop(context.TODO()) +} diff --git a/pkg/serverless/metrics/metric.go b/pkg/serverless/metrics/metric.go index 0d7b66a92794b8..d1ec3c46b7dfa4 100644 --- a/pkg/serverless/metrics/metric.go +++ b/pkg/serverless/metrics/metric.go @@ -42,7 +42,7 @@ type MultipleEndpointConfig interface { // DogStatsDFactory allows create a new DogStatsD server type DogStatsDFactory interface { - NewServer(aggregator.Demultiplexer) (dogstatsdServer.Component, error) + NewServer(aggregator.Demultiplexer) (dogstatsdServer.ServerlessDogstatsd, error) } const ( @@ -56,13 +56,8 @@ func (m *MetricConfig) GetMultipleEndpoints() (map[string][]string, error) { } // NewServer returns a running DogStatsD server -func (m *MetricDogStatsD) NewServer(demux aggregator.Demultiplexer) (dogstatsdServer.Component, error) { - s, err := dogstatsdServer.NewServerlessServer(demux) - if err != nil { - return nil, err - } - - return s, nil +func (m *MetricDogStatsD) NewServer(demux aggregator.Demultiplexer) (dogstatsdServer.ServerlessDogstatsd, error) { + return dogstatsdServer.NewServerlessServer(demux) } // Start starts the DogStatsD agent @@ -109,9 +104,7 @@ func (c *ServerlessMetricAgent) Flush() { // Stop stops the DogStatsD server func (c *ServerlessMetricAgent) Stop() { if c.IsReady() { - if serverlessServer, ok := c.dogStatsDServer.(dogstatsdServer.ServerlessDogstatsd); ok { - serverlessServer.Stop() - } + c.Stop() } } diff --git a/pkg/serverless/metrics/metric_test.go b/pkg/serverless/metrics/metric_test.go index 3db8bd4b1a3f3f..5f6cb380047b10 100644 --- a/pkg/serverless/metrics/metric_test.go +++ b/pkg/serverless/metrics/metric_test.go @@ -70,7 +70,7 @@ func TestStartInvalidConfig(t *testing.T) { type MetricDogStatsDMocked struct{} //nolint:revive // TODO(SERV) Fix revive linter -func (m *MetricDogStatsDMocked) NewServer(demux aggregator.Demultiplexer) (dogstatsdServer.Component, error) { +func (m *MetricDogStatsDMocked) NewServer(demux aggregator.Demultiplexer) (dogstatsdServer.ServerlessDogstatsd, error) { return nil, fmt.Errorf("error") } From 15f5d531caac0d02fe43e144520190f284ae5ac4 Mon Sep 17 00:00:00 2001 From: yoon nguyen Date: Tue, 13 Feb 2024 16:01:53 -0500 Subject: [PATCH 10/16] lint --- comp/dogstatsd/server/server_bench_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/comp/dogstatsd/server/server_bench_test.go b/comp/dogstatsd/server/server_bench_test.go index ffc8678524b173..6394f74c64f238 100644 --- a/comp/dogstatsd/server/server_bench_test.go +++ b/comp/dogstatsd/server/server_bench_test.go @@ -84,8 +84,6 @@ func BenchmarkPbarseMetricMessage(b *testing.B) { pkgconfig.SetupLogger("", "off", "", "", false, true, false) demux := deps.Demultiplexer - // _ = s.Start(demux) - // defer s.Stop() done := make(chan struct{}) go func() { From 9b9dec7f5eabed4a57135b972bfb428b9559eb96 Mon Sep 17 00:00:00 2001 From: yoon nguyen Date: Wed, 14 Feb 2024 12:48:59 -0500 Subject: [PATCH 11/16] lint --- cmd/agent/subcommands/run/command.go | 2 +- cmd/agent/subcommands/run/command_windows.go | 2 +- pkg/serverless/metrics/metric.go | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/agent/subcommands/run/command.go b/cmd/agent/subcommands/run/command.go index 7b46f91e742056..d2c066a4102050 100644 --- a/cmd/agent/subcommands/run/command.go +++ b/cmd/agent/subcommands/run/command.go @@ -633,7 +633,7 @@ func startAgent( } // StopAgentWithDefaults is a temporary way for other packages to use stopAgent. -func StopAgentWithDefaults(server dogstatsdServer.Component, agentAPI internalAPI.Component) { +func StopAgentWithDefaults(agentAPI internalAPI.Component) { stopAgent(&cliParams{GlobalParams: &command.GlobalParams{}}, agentAPI) } diff --git a/cmd/agent/subcommands/run/command_windows.go b/cmd/agent/subcommands/run/command_windows.go index d728b89c3fd9b5..d5ed02e525c7c1 100644 --- a/cmd/agent/subcommands/run/command_windows.go +++ b/cmd/agent/subcommands/run/command_windows.go @@ -112,7 +112,7 @@ func StartAgentWithDefaults(ctxChan <-chan context.Context) (<-chan error, error collector collector.Component, ) error { - defer StopAgentWithDefaults(server, agentAPI) + defer StopAgentWithDefaults(agentAPI) err := startAgent( &cliParams{GlobalParams: &command.GlobalParams{}}, diff --git a/pkg/serverless/metrics/metric.go b/pkg/serverless/metrics/metric.go index d1ec3c46b7dfa4..9abaaaf6078e4c 100644 --- a/pkg/serverless/metrics/metric.go +++ b/pkg/serverless/metrics/metric.go @@ -20,7 +20,7 @@ import ( // ServerlessMetricAgent represents the DogStatsD server and the aggregator type ServerlessMetricAgent struct { - dogStatsDServer dogstatsdServer.Component + dogStatsDServer dogstatsdServer.ServerlessDogstatsd tags []string Demux aggregator.Demultiplexer @@ -104,7 +104,7 @@ func (c *ServerlessMetricAgent) Flush() { // Stop stops the DogStatsD server func (c *ServerlessMetricAgent) Stop() { if c.IsReady() { - c.Stop() + c.dogStatsDServer.Stop() } } From 2538b87f23d5ca5bb0c236b77e31004e80409858 Mon Sep 17 00:00:00 2001 From: yoon nguyen Date: Wed, 14 Feb 2024 14:37:08 -0500 Subject: [PATCH 12/16] fix test --- .../demultiplexer/demultiplexerimpl/demultiplexer_mock.go | 6 +++++- comp/dogstatsd/server/server.go | 2 -- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer_mock.go b/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer_mock.go index 903af3e8762471..aadcb9264fed08 100644 --- a/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer_mock.go +++ b/comp/aggregator/demultiplexer/demultiplexerimpl/demultiplexer_mock.go @@ -21,7 +21,11 @@ import ( // MockModule defines the fx options for this component. func MockModule() fxutil.Module { return fxutil.Component( - fx.Provide(newMock)) + fx.Provide(newMock), + fx.Provide(func(demux demultiplexerComp.Component) aggregator.Demultiplexer { + return demux + }), + ) } type mock struct { diff --git a/comp/dogstatsd/server/server.go b/comp/dogstatsd/server/server.go index 1b7f021800e47e..e1c7cc56373cbc 100644 --- a/comp/dogstatsd/server/server.go +++ b/comp/dogstatsd/server/server.go @@ -310,8 +310,6 @@ func newServerCompat(cfg config.Reader, log logComponent.Component, capture repl func (s *server) start(context.Context) error { - // TODO: (components) - DI this into Server when Demultiplexer is made into a component - packetsChannel := make(chan packets.Packets, s.config.GetInt("dogstatsd_queue_size")) tmpListeners := make([]listeners.StatsdListener, 0, 2) err := s.tCapture.Configure() From fae914f20c16f3d8188ec1503962af008b60a4b2 Mon Sep 17 00:00:00 2001 From: yoon nguyen Date: Wed, 21 Feb 2024 14:49:01 -0500 Subject: [PATCH 13/16] update loggin for dogstatsd server when started --- cmd/agent/subcommands/run/command.go | 7 ++++--- comp/dogstatsd/server/serverless.go | 2 -- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/cmd/agent/subcommands/run/command.go b/cmd/agent/subcommands/run/command.go index d2c066a4102050..2b43ac123b57bb 100644 --- a/cmd/agent/subcommands/run/command.go +++ b/cmd/agent/subcommands/run/command.go @@ -598,10 +598,11 @@ func startAgent( // start dogstatsd if pkgconfig.Datadog.GetBool("use_dogstatsd") { - if err != nil { - log.Errorf("Could not start dogstatsd: %s", err) - } else { + serverStarted := server.IsRunning() + if serverStarted { log.Debugf("dogstatsd started") + } else { + log.Errorf("Could not start dogstatsd: %s", err) } } diff --git a/comp/dogstatsd/server/serverless.go b/comp/dogstatsd/server/serverless.go index 63cf3f3addcf5c..fd8705508e8378 100644 --- a/comp/dogstatsd/server/serverless.go +++ b/comp/dogstatsd/server/serverless.go @@ -24,8 +24,6 @@ type ServerlessDogstatsd interface { Stop() } -// TODO: (components) - remove once serverless is an FX app -// //nolint:revive // TODO(AML) Fix revive linter func NewServerlessServer(demux aggregator.Demultiplexer) (ServerlessDogstatsd, error) { s := newServerCompat(config.Datadog, logComponentImpl.NewTemporaryLoggerWithoutInit(), replay.NewServerlessTrafficCapture(), serverdebugimpl.NewServerlessServerDebug(), true, demux) From fb6f6e34933817f658e01fa26aa35acdf132667a Mon Sep 17 00:00:00 2001 From: yoon nguyen Date: Wed, 21 Feb 2024 16:16:18 -0500 Subject: [PATCH 14/16] move dogstatsd start logging --- cmd/agent/subcommands/run/command.go | 10 -- comp/dogstatsd/server/server.go | 214 ++++++++++++++------------- 2 files changed, 108 insertions(+), 116 deletions(-) diff --git a/cmd/agent/subcommands/run/command.go b/cmd/agent/subcommands/run/command.go index 77cd45f9b6c2bf..bf5b208abb84b0 100644 --- a/cmd/agent/subcommands/run/command.go +++ b/cmd/agent/subcommands/run/command.go @@ -592,16 +592,6 @@ func startAgent( demultiplexer.AddAgentStartupTelemetry(version.AgentVersion) - // start dogstatsd - if pkgconfig.Datadog.GetBool("use_dogstatsd") { - serverStarted := server.IsRunning() - if serverStarted { - log.Debugf("dogstatsd started") - } else { - log.Errorf("Could not start dogstatsd: %s", err) - } - } - // load and run all configs in AD common.AC.LoadAndRun(ctx) diff --git a/comp/dogstatsd/server/server.go b/comp/dogstatsd/server/server.go index e1c7cc56373cbc..651c7cd2c4efce 100644 --- a/comp/dogstatsd/server/server.go +++ b/comp/dogstatsd/server/server.go @@ -309,137 +309,139 @@ func newServerCompat(cfg config.Reader, log logComponent.Component, capture repl } func (s *server) start(context.Context) error { + if config.Datadog.GetBool("use_dogstatsd") { + packetsChannel := make(chan packets.Packets, s.config.GetInt("dogstatsd_queue_size")) + tmpListeners := make([]listeners.StatsdListener, 0, 2) + err := s.tCapture.Configure() - packetsChannel := make(chan packets.Packets, s.config.GetInt("dogstatsd_queue_size")) - tmpListeners := make([]listeners.StatsdListener, 0, 2) - err := s.tCapture.Configure() - - if err != nil { - return err - } + if err != nil { + s.log.Debugf("Could not start dogstatsd: %s", err) + } - // sharedPacketPool is used by the packet assembler to retrieve already allocated - // buffer in order to avoid allocation. The packets are pushed back by the server. - sharedPacketPool := packets.NewPool(s.config.GetInt("dogstatsd_buffer_size")) - sharedPacketPoolManager := packets.NewPoolManager(sharedPacketPool) + // sharedPacketPool is used by the packet assembler to retrieve already allocated + // buffer in order to avoid allocation. The packets are pushed back by the server. + sharedPacketPool := packets.NewPool(s.config.GetInt("dogstatsd_buffer_size")) + sharedPacketPoolManager := packets.NewPoolManager(sharedPacketPool) - udsListenerRunning := false + udsListenerRunning := false - socketPath := s.config.GetString("dogstatsd_socket") - socketStreamPath := s.config.GetString("dogstatsd_stream_socket") - originDetection := s.config.GetBool("dogstatsd_origin_detection") - var sharedUDSOobPoolManager *packets.PoolManager - if originDetection { - sharedUDSOobPoolManager = listeners.NewUDSOobPoolManager() - } - - if s.tCapture != nil { - err := s.tCapture.RegisterSharedPoolManager(sharedPacketPoolManager) - if err != nil { - s.log.Errorf("Can't register shared pool manager: %s", err.Error()) - } - err = s.tCapture.RegisterOOBPoolManager(sharedUDSOobPoolManager) - if err != nil { - s.log.Errorf("Can't register OOB pool manager: %s", err.Error()) + socketPath := s.config.GetString("dogstatsd_socket") + socketStreamPath := s.config.GetString("dogstatsd_stream_socket") + originDetection := s.config.GetBool("dogstatsd_origin_detection") + var sharedUDSOobPoolManager *packets.PoolManager + if originDetection { + sharedUDSOobPoolManager = listeners.NewUDSOobPoolManager() } - } - if len(socketPath) > 0 { - unixListener, err := listeners.NewUDSDatagramListener(packetsChannel, sharedPacketPoolManager, sharedUDSOobPoolManager, s.config, s.tCapture) - if err != nil { - s.log.Errorf("Can't init listener: %s", err.Error()) - } else { - tmpListeners = append(tmpListeners, unixListener) - udsListenerRunning = true + if s.tCapture != nil { + err := s.tCapture.RegisterSharedPoolManager(sharedPacketPoolManager) + if err != nil { + s.log.Errorf("Can't register shared pool manager: %s", err.Error()) + } + err = s.tCapture.RegisterOOBPoolManager(sharedUDSOobPoolManager) + if err != nil { + s.log.Errorf("Can't register OOB pool manager: %s", err.Error()) + } } - } - if len(socketStreamPath) > 0 { - s.log.Warnf("dogstatsd_stream_socket is not yet supported, run it at your own risk") - unixListener, err := listeners.NewUDSStreamListener(packetsChannel, sharedPacketPoolManager, sharedUDSOobPoolManager, s.config, s.tCapture) - if err != nil { - s.log.Errorf("Can't init listener: %s", err.Error()) - } else { - tmpListeners = append(tmpListeners, unixListener) + if len(socketPath) > 0 { + unixListener, err := listeners.NewUDSDatagramListener(packetsChannel, sharedPacketPoolManager, sharedUDSOobPoolManager, s.config, s.tCapture) + if err != nil { + s.log.Errorf("Can't init listener: %s", err.Error()) + } else { + tmpListeners = append(tmpListeners, unixListener) + udsListenerRunning = true + } } - } - if s.config.GetString("dogstatsd_port") == listeners.RandomPortName || s.config.GetInt("dogstatsd_port") > 0 { - udpListener, err := listeners.NewUDPListener(packetsChannel, sharedPacketPoolManager, s.config, s.tCapture) - if err != nil { - s.log.Errorf(err.Error()) - } else { - tmpListeners = append(tmpListeners, udpListener) - s.udpLocalAddr = udpListener.LocalAddr() + if len(socketStreamPath) > 0 { + s.log.Warnf("dogstatsd_stream_socket is not yet supported, run it at your own risk") + unixListener, err := listeners.NewUDSStreamListener(packetsChannel, sharedPacketPoolManager, sharedUDSOobPoolManager, s.config, s.tCapture) + if err != nil { + s.log.Errorf("Can't init listener: %s", err.Error()) + } else { + tmpListeners = append(tmpListeners, unixListener) + } } - } - pipeName := s.config.GetString("dogstatsd_pipe_name") - if len(pipeName) > 0 { - namedPipeListener, err := listeners.NewNamedPipeListener(pipeName, packetsChannel, sharedPacketPoolManager, s.config, s.tCapture) - if err != nil { - s.log.Errorf("named pipe error: %v", err.Error()) - } else { - tmpListeners = append(tmpListeners, namedPipeListener) + if s.config.GetString("dogstatsd_port") == listeners.RandomPortName || s.config.GetInt("dogstatsd_port") > 0 { + udpListener, err := listeners.NewUDPListener(packetsChannel, sharedPacketPoolManager, s.config, s.tCapture) + if err != nil { + s.log.Errorf(err.Error()) + } else { + tmpListeners = append(tmpListeners, udpListener) + s.udpLocalAddr = udpListener.LocalAddr() + } } - } - - if len(tmpListeners) == 0 { - return fmt.Errorf("listening on neither udp nor socket, please check your configuration") - } - s.udsListenerRunning = udsListenerRunning - s.packetsIn = packetsChannel - s.captureChan = packetsChannel - s.sharedPacketPool = sharedPacketPool - s.sharedPacketPoolManager = sharedPacketPoolManager - s.listeners = tmpListeners + pipeName := s.config.GetString("dogstatsd_pipe_name") + if len(pipeName) > 0 { + namedPipeListener, err := listeners.NewNamedPipeListener(pipeName, packetsChannel, sharedPacketPoolManager, s.config, s.tCapture) + if err != nil { + s.log.Errorf("named pipe error: %v", err.Error()) + } else { + tmpListeners = append(tmpListeners, namedPipeListener) + } + } - // packets forwarding - // ---------------------- + if len(tmpListeners) == 0 { + return fmt.Errorf("listening on neither udp nor socket, please check your configuration") + } - forwardHost := s.config.GetString("statsd_forward_host") - forwardPort := s.config.GetInt("statsd_forward_port") - if forwardHost != "" && forwardPort != 0 { - forwardAddress := fmt.Sprintf("%s:%d", forwardHost, forwardPort) - con, err := net.Dial("udp", forwardAddress) - if err != nil { - s.log.Warnf("Could not connect to statsd forward host : %s", err) - } else { - s.packetsIn = make(chan packets.Packets, s.config.GetInt("dogstatsd_queue_size")) - go s.forwarder(con) + s.udsListenerRunning = udsListenerRunning + s.packetsIn = packetsChannel + s.captureChan = packetsChannel + s.sharedPacketPool = sharedPacketPool + s.sharedPacketPoolManager = sharedPacketPoolManager + s.listeners = tmpListeners + + // packets forwarding + // ---------------------- + + forwardHost := s.config.GetString("statsd_forward_host") + forwardPort := s.config.GetInt("statsd_forward_port") + if forwardHost != "" && forwardPort != 0 { + forwardAddress := fmt.Sprintf("%s:%d", forwardHost, forwardPort) + con, err := net.Dial("udp", forwardAddress) + if err != nil { + s.log.Warnf("Could not connect to statsd forward host : %s", err) + } else { + s.packetsIn = make(chan packets.Packets, s.config.GetInt("dogstatsd_queue_size")) + go s.forwarder(con) + } } - } - // start the workers processing the packets read on the socket - // ---------------------- + // start the workers processing the packets read on the socket + // ---------------------- - s.health = health.RegisterLiveness("dogstatsd-main") - s.handleMessages() - s.Started = true + s.health = health.RegisterLiveness("dogstatsd-main") + s.handleMessages() + s.Started = true + s.log.Debug("Dogstatsd started") - // start the debug loop - // ---------------------- + // start the debug loop + // ---------------------- - if s.config.GetBool("dogstatsd_metrics_stats_enable") { - s.log.Info("Dogstatsd: metrics statistics will be stored.") - s.Debug.SetMetricStatsEnabled(true) - } + if s.config.GetBool("dogstatsd_metrics_stats_enable") { + s.log.Info("Dogstatsd: metrics statistics will be stored.") + s.Debug.SetMetricStatsEnabled(true) + } - // map some metric name - // ---------------------- + // map some metric name + // ---------------------- - cacheSize := s.config.GetInt("dogstatsd_mapper_cache_size") + cacheSize := s.config.GetInt("dogstatsd_mapper_cache_size") - mappings, err := config.GetDogstatsdMappingProfiles() - if err != nil { - s.log.Warnf("Could not parse mapping profiles: %v", err) - } else if len(mappings) != 0 { - mapperInstance, err := mapper.NewMetricMapper(mappings, cacheSize) + mappings, err := config.GetDogstatsdMappingProfiles() if err != nil { - s.log.Warnf("Could not create metric mapper: %v", err) - } else { - s.mapper = mapperInstance + s.log.Warnf("Could not parse mapping profiles: %v", err) + } else if len(mappings) != 0 { + mapperInstance, err := mapper.NewMetricMapper(mappings, cacheSize) + if err != nil { + s.log.Warnf("Could not create metric mapper: %v", err) + } else { + s.mapper = mapperInstance + } } } return nil From c304636291e9981914f873736ae6b481ae4d9cc3 Mon Sep 17 00:00:00 2001 From: yoon nguyen Date: Wed, 21 Feb 2024 17:03:26 -0500 Subject: [PATCH 15/16] add a starthook --- cmd/dogstatsd/subcommands/start/command.go | 4 - comp/dogstatsd/server/server.go | 231 +++++++++++---------- 2 files changed, 122 insertions(+), 113 deletions(-) diff --git a/cmd/dogstatsd/subcommands/start/command.go b/cmd/dogstatsd/subcommands/start/command.go index 08a4729822e06f..5a1f417301fc8f 100644 --- a/cmd/dogstatsd/subcommands/start/command.go +++ b/cmd/dogstatsd/subcommands/start/command.go @@ -274,10 +274,6 @@ func RunDogstatsd(ctx context.Context, cliParams *CLIParams, config config.Compo demultiplexer.AddAgentStartupTelemetry(version.AgentVersion) - if err != nil { - log.Criticalf("Unable to start dogstatsd: %s", err) - return - } return } diff --git a/comp/dogstatsd/server/server.go b/comp/dogstatsd/server/server.go index 651c7cd2c4efce..c49810cef6f285 100644 --- a/comp/dogstatsd/server/server.go +++ b/comp/dogstatsd/server/server.go @@ -194,7 +194,7 @@ func newServer(deps dependencies) Component { s := newServerCompat(deps.Config, deps.Log, deps.Replay, deps.Debug, deps.Params.Serverless, deps.Demultiplexer) deps.Lc.Append(fx.Hook{ - OnStart: s.start, + OnStart: s.startHook, OnStop: s.stop, }) @@ -308,140 +308,153 @@ func newServerCompat(cfg config.Reader, log logComponent.Component, capture repl return s } +func (s *server) startHook(context context.Context) error { + if !config.Datadog.GetBool("use_dogstatsd") { + s.log.Info("Dogstatsd is not enabled") + return nil + } + + err := s.start(context) + if err != nil { + s.log.Errorf("Could not start dogstatsd: %s", err) + } else { + s.log.Debug("dogstatsd started") + } + return nil +} + func (s *server) start(context.Context) error { - if config.Datadog.GetBool("use_dogstatsd") { - packetsChannel := make(chan packets.Packets, s.config.GetInt("dogstatsd_queue_size")) - tmpListeners := make([]listeners.StatsdListener, 0, 2) - err := s.tCapture.Configure() - if err != nil { - s.log.Debugf("Could not start dogstatsd: %s", err) - } + packetsChannel := make(chan packets.Packets, s.config.GetInt("dogstatsd_queue_size")) + tmpListeners := make([]listeners.StatsdListener, 0, 2) + err := s.tCapture.Configure() - // sharedPacketPool is used by the packet assembler to retrieve already allocated - // buffer in order to avoid allocation. The packets are pushed back by the server. - sharedPacketPool := packets.NewPool(s.config.GetInt("dogstatsd_buffer_size")) - sharedPacketPoolManager := packets.NewPoolManager(sharedPacketPool) + if err != nil { + return err + } - udsListenerRunning := false + // sharedPacketPool is used by the packet assembler to retrieve already allocated + // buffer in order to avoid allocation. The packets are pushed back by the server. + sharedPacketPool := packets.NewPool(s.config.GetInt("dogstatsd_buffer_size")) + sharedPacketPoolManager := packets.NewPoolManager(sharedPacketPool) - socketPath := s.config.GetString("dogstatsd_socket") - socketStreamPath := s.config.GetString("dogstatsd_stream_socket") - originDetection := s.config.GetBool("dogstatsd_origin_detection") - var sharedUDSOobPoolManager *packets.PoolManager - if originDetection { - sharedUDSOobPoolManager = listeners.NewUDSOobPoolManager() - } + udsListenerRunning := false - if s.tCapture != nil { - err := s.tCapture.RegisterSharedPoolManager(sharedPacketPoolManager) - if err != nil { - s.log.Errorf("Can't register shared pool manager: %s", err.Error()) - } - err = s.tCapture.RegisterOOBPoolManager(sharedUDSOobPoolManager) - if err != nil { - s.log.Errorf("Can't register OOB pool manager: %s", err.Error()) - } - } + socketPath := s.config.GetString("dogstatsd_socket") + socketStreamPath := s.config.GetString("dogstatsd_stream_socket") + originDetection := s.config.GetBool("dogstatsd_origin_detection") + var sharedUDSOobPoolManager *packets.PoolManager + if originDetection { + sharedUDSOobPoolManager = listeners.NewUDSOobPoolManager() + } - if len(socketPath) > 0 { - unixListener, err := listeners.NewUDSDatagramListener(packetsChannel, sharedPacketPoolManager, sharedUDSOobPoolManager, s.config, s.tCapture) - if err != nil { - s.log.Errorf("Can't init listener: %s", err.Error()) - } else { - tmpListeners = append(tmpListeners, unixListener) - udsListenerRunning = true - } + if s.tCapture != nil { + err := s.tCapture.RegisterSharedPoolManager(sharedPacketPoolManager) + if err != nil { + s.log.Errorf("Can't register shared pool manager: %s", err.Error()) } - - if len(socketStreamPath) > 0 { - s.log.Warnf("dogstatsd_stream_socket is not yet supported, run it at your own risk") - unixListener, err := listeners.NewUDSStreamListener(packetsChannel, sharedPacketPoolManager, sharedUDSOobPoolManager, s.config, s.tCapture) - if err != nil { - s.log.Errorf("Can't init listener: %s", err.Error()) - } else { - tmpListeners = append(tmpListeners, unixListener) - } + err = s.tCapture.RegisterOOBPoolManager(sharedUDSOobPoolManager) + if err != nil { + s.log.Errorf("Can't register OOB pool manager: %s", err.Error()) } + } - if s.config.GetString("dogstatsd_port") == listeners.RandomPortName || s.config.GetInt("dogstatsd_port") > 0 { - udpListener, err := listeners.NewUDPListener(packetsChannel, sharedPacketPoolManager, s.config, s.tCapture) - if err != nil { - s.log.Errorf(err.Error()) - } else { - tmpListeners = append(tmpListeners, udpListener) - s.udpLocalAddr = udpListener.LocalAddr() - } + if len(socketPath) > 0 { + unixListener, err := listeners.NewUDSDatagramListener(packetsChannel, sharedPacketPoolManager, sharedUDSOobPoolManager, s.config, s.tCapture) + if err != nil { + s.log.Errorf("Can't init listener: %s", err.Error()) + } else { + tmpListeners = append(tmpListeners, unixListener) + udsListenerRunning = true } + } - pipeName := s.config.GetString("dogstatsd_pipe_name") - if len(pipeName) > 0 { - namedPipeListener, err := listeners.NewNamedPipeListener(pipeName, packetsChannel, sharedPacketPoolManager, s.config, s.tCapture) - if err != nil { - s.log.Errorf("named pipe error: %v", err.Error()) - } else { - tmpListeners = append(tmpListeners, namedPipeListener) - } + if len(socketStreamPath) > 0 { + s.log.Warnf("dogstatsd_stream_socket is not yet supported, run it at your own risk") + unixListener, err := listeners.NewUDSStreamListener(packetsChannel, sharedPacketPoolManager, sharedUDSOobPoolManager, s.config, s.tCapture) + if err != nil { + s.log.Errorf("Can't init listener: %s", err.Error()) + } else { + tmpListeners = append(tmpListeners, unixListener) } + } - if len(tmpListeners) == 0 { - return fmt.Errorf("listening on neither udp nor socket, please check your configuration") + if s.config.GetString("dogstatsd_port") == listeners.RandomPortName || s.config.GetInt("dogstatsd_port") > 0 { + udpListener, err := listeners.NewUDPListener(packetsChannel, sharedPacketPoolManager, s.config, s.tCapture) + if err != nil { + s.log.Errorf(err.Error()) + } else { + tmpListeners = append(tmpListeners, udpListener) + s.udpLocalAddr = udpListener.LocalAddr() } + } - s.udsListenerRunning = udsListenerRunning - s.packetsIn = packetsChannel - s.captureChan = packetsChannel - s.sharedPacketPool = sharedPacketPool - s.sharedPacketPoolManager = sharedPacketPoolManager - s.listeners = tmpListeners - - // packets forwarding - // ---------------------- - - forwardHost := s.config.GetString("statsd_forward_host") - forwardPort := s.config.GetInt("statsd_forward_port") - if forwardHost != "" && forwardPort != 0 { - forwardAddress := fmt.Sprintf("%s:%d", forwardHost, forwardPort) - con, err := net.Dial("udp", forwardAddress) - if err != nil { - s.log.Warnf("Could not connect to statsd forward host : %s", err) - } else { - s.packetsIn = make(chan packets.Packets, s.config.GetInt("dogstatsd_queue_size")) - go s.forwarder(con) - } + pipeName := s.config.GetString("dogstatsd_pipe_name") + if len(pipeName) > 0 { + namedPipeListener, err := listeners.NewNamedPipeListener(pipeName, packetsChannel, sharedPacketPoolManager, s.config, s.tCapture) + if err != nil { + s.log.Errorf("named pipe error: %v", err.Error()) + } else { + tmpListeners = append(tmpListeners, namedPipeListener) } + } - // start the workers processing the packets read on the socket - // ---------------------- + if len(tmpListeners) == 0 { + return fmt.Errorf("listening on neither udp nor socket, please check your configuration") + } - s.health = health.RegisterLiveness("dogstatsd-main") - s.handleMessages() - s.Started = true - s.log.Debug("Dogstatsd started") + s.udsListenerRunning = udsListenerRunning + s.packetsIn = packetsChannel + s.captureChan = packetsChannel + s.sharedPacketPool = sharedPacketPool + s.sharedPacketPoolManager = sharedPacketPoolManager + s.listeners = tmpListeners - // start the debug loop - // ---------------------- + // packets forwarding + // ---------------------- - if s.config.GetBool("dogstatsd_metrics_stats_enable") { - s.log.Info("Dogstatsd: metrics statistics will be stored.") - s.Debug.SetMetricStatsEnabled(true) + forwardHost := s.config.GetString("statsd_forward_host") + forwardPort := s.config.GetInt("statsd_forward_port") + if forwardHost != "" && forwardPort != 0 { + forwardAddress := fmt.Sprintf("%s:%d", forwardHost, forwardPort) + con, err := net.Dial("udp", forwardAddress) + if err != nil { + s.log.Warnf("Could not connect to statsd forward host : %s", err) + } else { + s.packetsIn = make(chan packets.Packets, s.config.GetInt("dogstatsd_queue_size")) + go s.forwarder(con) } + } + + // start the workers processing the packets read on the socket + // ---------------------- - // map some metric name - // ---------------------- + s.health = health.RegisterLiveness("dogstatsd-main") + s.handleMessages() + s.Started = true - cacheSize := s.config.GetInt("dogstatsd_mapper_cache_size") + // start the debug loop + // ---------------------- - mappings, err := config.GetDogstatsdMappingProfiles() + if s.config.GetBool("dogstatsd_metrics_stats_enable") { + s.log.Info("Dogstatsd: metrics statistics will be stored.") + s.Debug.SetMetricStatsEnabled(true) + } + + // map some metric name + // ---------------------- + + cacheSize := s.config.GetInt("dogstatsd_mapper_cache_size") + + mappings, err := config.GetDogstatsdMappingProfiles() + if err != nil { + s.log.Warnf("Could not parse mapping profiles: %v", err) + } else if len(mappings) != 0 { + mapperInstance, err := mapper.NewMetricMapper(mappings, cacheSize) if err != nil { - s.log.Warnf("Could not parse mapping profiles: %v", err) - } else if len(mappings) != 0 { - mapperInstance, err := mapper.NewMetricMapper(mappings, cacheSize) - if err != nil { - s.log.Warnf("Could not create metric mapper: %v", err) - } else { - s.mapper = mapperInstance - } + s.log.Warnf("Could not create metric mapper: %v", err) + } else { + s.mapper = mapperInstance } } return nil From 305a864c85c3e3202bb8258874d6f4b23efcca5c Mon Sep 17 00:00:00 2001 From: yoon nguyen Date: Wed, 28 Feb 2024 11:17:46 -0500 Subject: [PATCH 16/16] condition on fx hook --- comp/dogstatsd/server/server.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/comp/dogstatsd/server/server.go b/comp/dogstatsd/server/server.go index c49810cef6f285..3a9744f62e0b2b 100644 --- a/comp/dogstatsd/server/server.go +++ b/comp/dogstatsd/server/server.go @@ -193,10 +193,12 @@ func initTelemetry(cfg config.Reader, logger logComponent.Component) { func newServer(deps dependencies) Component { s := newServerCompat(deps.Config, deps.Log, deps.Replay, deps.Debug, deps.Params.Serverless, deps.Demultiplexer) - deps.Lc.Append(fx.Hook{ - OnStart: s.startHook, - OnStop: s.stop, - }) + if config.Datadog.GetBool("use_dogstatsd") { + deps.Lc.Append(fx.Hook{ + OnStart: s.startHook, + OnStop: s.stop, + }) + } return s @@ -309,10 +311,6 @@ func newServerCompat(cfg config.Reader, log logComponent.Component, capture repl } func (s *server) startHook(context context.Context) error { - if !config.Datadog.GetBool("use_dogstatsd") { - s.log.Info("Dogstatsd is not enabled") - return nil - } err := s.start(context) if err != nil {