Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removing start method in favor of on start hook in dogstatsd server component #22584

Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
ef48776
dogstatsd on agent
DDuongNguyen Jan 31, 2024
3a9d7e3
fixing test
DDuongNguyen Feb 5, 2024
66d19f4
serverless
DDuongNguyen Feb 7, 2024
6cce5ef
unfinished test + metric agent
DDuongNguyen Feb 7, 2024
317e15b
fixed test
DDuongNguyen Feb 8, 2024
7af9b59
lint
DDuongNguyen Feb 8, 2024
23e7aec
merged main
DDuongNguyen Feb 9, 2024
7291ab7
fixed test + agent start
DDuongNguyen Feb 12, 2024
438db33
lint
DDuongNguyen Feb 13, 2024
5535958
Merge branch 'main' into AMLII-1408-remove-start-method-in-favor-of-o…
DDuongNguyen Feb 13, 2024
fa7c83e
Merge branch 'main' into AMLII-1408-remove-start-method-in-favor-of-o…
DDuongNguyen Feb 13, 2024
854071b
Merge branch 'main' into AMLII-1408-remove-start-method-in-favor-of-o…
DDuongNguyen Feb 13, 2024
4f2c355
adjusted to comments
DDuongNguyen Feb 13, 2024
0aae35f
Merge branch 'AMLII-1408-remove-start-method-in-favor-of-on-start-hoo…
DDuongNguyen Feb 13, 2024
15f5d53
lint
DDuongNguyen Feb 13, 2024
9b9dec7
lint
DDuongNguyen Feb 14, 2024
2538b87
fix test
DDuongNguyen Feb 14, 2024
ba45285
Merge branch 'main' into AMLII-1408-remove-start-method-in-favor-of-o…
DDuongNguyen Feb 14, 2024
be9fd32
Merge branch 'main' into AMLII-1408-remove-start-method-in-favor-of-o…
gh123man Feb 15, 2024
fae914f
update loggin for dogstatsd server when started
DDuongNguyen Feb 21, 2024
dc8cf0c
Merge branch 'AMLII-1408-remove-start-method-in-favor-of-on-start-hoo…
DDuongNguyen Feb 21, 2024
fb6f6e3
move dogstatsd start logging
DDuongNguyen Feb 21, 2024
c304636
add a starthook
DDuongNguyen Feb 21, 2024
305a864
condition on fx hook
DDuongNguyen Feb 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions cmd/agent/subcommands/run/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func getSharedFxOption() fx.Option {
dogstatsdStatusimpl.Module(),
statusimpl.Module(),
apiimpl.Module(),

demultiplexerimpl.Module(),
dogstatsd.Bundle(),
otelcol.Bundle(),
rcclient.Module(),
Expand Down Expand Up @@ -362,7 +362,6 @@ func getSharedFxOption() fx.Option {
params.EnableNoAggregationPipeline = config.GetBool("dogstatsd_no_aggregation_pipeline")
return params
}),
demultiplexerimpl.Module(),
orchestratorForwarderImpl.Module(),
fx.Supply(orchestratorForwarderImpl.NewDefaultParams()),
eventplatformimpl.Module(),
Expand Down Expand Up @@ -428,7 +427,7 @@ func startAgent(
)

if jmxLoggerSetupErr != nil {
return fmt.Errorf("Error while setting up logging, exiting: %v", jmxLoggerSetupErr)
return fmt.Errorf(" Error while setting up logging, exiting: %v", jmxLoggerSetupErr)
DDuongNguyen marked this conversation as resolved.
Show resolved Hide resolved
}

if flavor.GetFlavor() == flavor.IotAgent {
Expand Down Expand Up @@ -599,7 +598,7 @@ func startAgent(

// start dogstatsd
if pkgconfig.Datadog.GetBool("use_dogstatsd") {
err := server.Start(demultiplexer)
// err := server.Start(demultiplexer)
DDuongNguyen marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
log.Errorf("Could not start dogstatsd: %s", err)
} else {
Expand Down Expand Up @@ -640,7 +639,7 @@ func StopAgentWithDefaults(server dogstatsdServer.Component, agentAPI internalAP
}

// stopAgent Tears down the agent process
func stopAgent(cliParams *cliParams, server dogstatsdServer.Component, agentAPI internalAPI.Component) {
func stopAgent(cliParams *cliParams, _ dogstatsdServer.Component, agentAPI internalAPI.Component) {
DDuongNguyen marked this conversation as resolved.
Show resolved Hide resolved
// retrieve the agent health before stopping the components
// GetReadyNonBlocking has a 100ms timeout to avoid blocking
health, err := health.GetReadyNonBlocking()
Expand All @@ -655,7 +654,6 @@ func stopAgent(cliParams *cliParams, server dogstatsdServer.Component, agentAPI
pkglog.Errorf("Error shutting down expvar server: %v", err)
}
}
server.Stop()
if common.AC != nil {
common.AC.Stop()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/fx"

"github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer"
Expand Down Expand Up @@ -41,16 +40,11 @@ func TestDogstatsdMetricsStats(t *testing.T) {
fx.Supply(server.Params{
Serverless: false,
}),
demultiplexerimpl.MockModule(),
dogstatsd.Bundle(),
defaultforwarder.MockModule(),
demultiplexerimpl.MockModule(),
))

demux := deps.Demultiplexer
deps.Server.Start(demux)

require.Nil(t, err)

s := DsdStatsRuntimeSetting{
ServerDebug: deps.Debug,
}
Expand Down
3 changes: 0 additions & 3 deletions cmd/dogstatsd/subcommands/start/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,6 @@ func RunDogstatsd(ctx context.Context, cliParams *CLIParams, config config.Compo

demultiplexer.AddAgentStartupTelemetry(version.AgentVersion)

err = components.DogstatsdServer.Start(demultiplexer)
if err != nil {
log.Criticalf("Unable to start dogstatsd: %s", err)
return
Expand Down Expand Up @@ -323,8 +322,6 @@ func StopAgent(cancel context.CancelFunc, components *DogstatsdComponents) {
}
}

components.DogstatsdServer.Stop()

pkglog.Info("See ya!")
pkglog.Flush()
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ import (
// Module defines the fx options for this component.
func Module() fxutil.Module {
return fxutil.Component(
fx.Provide(newDemultiplexer))
fx.Provide(newDemultiplexer),
fx.Provide(func(demux demultiplexerComp.Component) aggregator.Demultiplexer {
DDuongNguyen marked this conversation as resolved.
Show resolved Hide resolved
return demux
}))
}

type dependencies struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ import (
// FakeSamplerMockModule defines the fx options for FakeSamplerMock.
func FakeSamplerMockModule() fxutil.Module {
return fxutil.Component(
fx.Provide(newFakeSamplerMock))
fx.Provide(newFakeSamplerMock),
fx.Provide(func(demux demultiplexerComp.FakeSamplerMock) aggregator.Demultiplexer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why you need this as it is not common to provide non component type (except for Params type).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since I replaced how a server is started, a new dogstatsd server no longer require a demultiplexer when started anymore. This causes some changes in the test files such as this function. Or any function that used to require the dogstatsdserver to include a demultiplexer.

Doing this provide a mock implementation instead of an actual component for those tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using aggregator.Demultiplexer can you use demultiplexer.Component?

return demux
}),
)
}

type fakeSamplerMockDependencies struct {
Expand Down
6 changes: 6 additions & 0 deletions comp/dogstatsd/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,24 @@
package dogstatsd //nolint:revive // TODO(AML) Fix revive linter

import (
demultiplexerComp "github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer"
"github.com/DataDog/datadog-agent/comp/dogstatsd/replay"
"github.com/DataDog/datadog-agent/comp/dogstatsd/server"
"github.com/DataDog/datadog-agent/comp/dogstatsd/serverDebug/serverdebugimpl"
"github.com/DataDog/datadog-agent/comp/dogstatsd/statsd"
"github.com/DataDog/datadog-agent/pkg/aggregator"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
"go.uber.org/fx"
)

// team: agent-metrics-logs

// Bundle defines the fx options for this bundle.
func Bundle() fxutil.BundleOptions {
return fxutil.Bundle(
fx.Provide(func(demux demultiplexerComp.Component) aggregator.Demultiplexer {
DDuongNguyen marked this conversation as resolved.
Show resolved Hide resolved
return demux
}),
serverdebugimpl.Module(),
replay.Module(),
server.Module())
Expand Down
19 changes: 11 additions & 8 deletions comp/dogstatsd/server/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
package server

import (
"context"
"time"

"github.com/DataDog/datadog-agent/pkg/aggregator"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
"go.uber.org/fx"
)
Expand All @@ -18,13 +18,6 @@ import (

// Component is the component type.
type Component interface {

// Start starts the dogstatsd server
Start(demultiplexer aggregator.Demultiplexer) error

// Stop stops the dogstatsd server
Stop()

// IsRunning returns true if the server is running
IsRunning() bool

Expand All @@ -41,6 +34,12 @@ type Component interface {
UDPLocalAddr() string
}

// ServerlessDogstatsd is the interface for the serverless dogstatsd server.
type ServerlessDogstatsd interface {
Component
Stop()
}

// Mock implements mock-specific methods.
type Mock interface {
Component
Expand All @@ -57,3 +56,7 @@ func MockModule() fxutil.Module {
return fxutil.Component(
fx.Provide(newMock))
}

func (s *server) Stop() {
DDuongNguyen marked this conversation as resolved.
Show resolved Hide resolved
_ = s.stop(context.TODO())
}
41 changes: 32 additions & 9 deletions comp/dogstatsd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ var (
type dependencies struct {
fx.In

Lc fx.Lifecycle

Demultiplexer aggregator.Demultiplexer

Log logComponent.Component
Config configComponent.Component
Debug serverdebug.Component
Expand Down Expand Up @@ -190,16 +194,32 @@ func initTelemetry(cfg config.Reader, logger logComponent.Component) {
// TODO: (components) - remove once serverless is an FX app
//
//nolint:revive // TODO(AML) Fix revive linter
func NewServerlessServer() Component {
return newServerCompat(config.Datadog, logComponentImpl.NewTemporaryLoggerWithoutInit(), replay.NewServerlessTrafficCapture(), serverdebugimpl.NewServerlessServerDebug(), true)
func NewServerlessServer(demux aggregator.Demultiplexer) (ServerlessDogstatsd, error) {
DDuongNguyen marked this conversation as resolved.
Show resolved Hide resolved

s := newServerCompat(config.Datadog, logComponentImpl.NewTemporaryLoggerWithoutInit(), replay.NewServerlessTrafficCapture(), serverdebugimpl.NewServerlessServerDebug(), true, demux)

err := s.start(context.TODO())
if err != nil {
return nil, err
}

return s, nil
}

// TODO: (components) - merge with newServerCompat once NewServerlessServer is removed
func newServer(deps dependencies) Component {
return newServerCompat(deps.Config, deps.Log, deps.Replay, deps.Debug, deps.Params.Serverless)
s := newServerCompat(deps.Config, deps.Log, deps.Replay, deps.Debug, deps.Params.Serverless, deps.Demultiplexer)

deps.Lc.Append(fx.Hook{
OnStart: s.start,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here when start returns an error the application logs the error whereas with your changes the application stops. Is this behavior expected?

Copy link
Contributor Author

@DDuongNguyen DDuongNguyen Feb 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the logic a bit here now so that when the server is running/stopped, it will log similarly instead. Good Catch! 🙇🏼‍♂️

OnStop: s.stop,
})

return s

}

func newServerCompat(cfg config.Reader, log logComponent.Component, capture replay.Component, debug serverdebug.Component, serverless bool) Component {
func newServerCompat(cfg config.Reader, log logComponent.Component, capture replay.Component, debug serverdebug.Component, serverless bool, demux aggregator.Demultiplexer) *server {
// This needs to be done after the configuration is loaded
once.Do(func() { initTelemetry(cfg, log) })

Expand Down Expand Up @@ -272,7 +292,7 @@ func newServerCompat(cfg config.Reader, log logComponent.Component, capture repl
sharedPacketPool: nil,
sharedPacketPoolManager: nil,
sharedFloat64List: newFloat64ListPool(),
demultiplexer: nil,
demultiplexer: demux,
listeners: nil,
stopChan: make(chan bool),
serverlessFlushChan: make(chan bool),
Expand Down Expand Up @@ -301,13 +321,14 @@ func newServerCompat(cfg config.Reader, log logComponent.Component, capture repl
originOptOutEnabled: cfg.GetBool("dogstatsd_origin_optout_enabled"),
},
}

return s
}

func (s *server) Start(demultiplexer aggregator.Demultiplexer) error {
func (s *server) start(context.Context) error {

// TODO: (components) - DI this into Server when Demultiplexer is made into a component
DDuongNguyen marked this conversation as resolved.
Show resolved Hide resolved
s.demultiplexer = demultiplexer
// s.demultiplexer = demultiplexer

packetsChannel := make(chan packets.Packets, s.config.GetInt("dogstatsd_queue_size"))
tmpListeners := make([]listeners.StatsdListener, 0, 2)
Expand Down Expand Up @@ -444,9 +465,9 @@ func (s *server) Start(demultiplexer aggregator.Demultiplexer) error {
return nil
}

func (s *server) Stop() {
func (s *server) stop(context.Context) error {
if !s.IsRunning() {
return
return nil
}
close(s.stopChan)
for _, l := range s.listeners {
Expand All @@ -460,6 +481,8 @@ func (s *server) Stop() {
}
s.health.Deregister() //nolint:errcheck
s.Started = false

return nil
}

func (s *server) IsRunning() bool {
Expand Down
13 changes: 7 additions & 6 deletions comp/dogstatsd/server/server_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ func benchParsePackets(b *testing.B, rawPacket []byte) {
pkgconfig.SetupLogger("", "off", "", "", false, true, false)

demux := deps.Demultiplexer
_ = s.Start(demux)
defer s.Stop()
defer demux.Stop(false)
// _ = s.Start(demux)
DDuongNguyen marked this conversation as resolved.
Show resolved Hide resolved
// defer s.Stop()

done := make(chan struct{})
go func() {
Expand Down Expand Up @@ -85,8 +86,8 @@ func BenchmarkPbarseMetricMessage(b *testing.B) {
pkgconfig.SetupLogger("", "off", "", "", false, true, false)

demux := deps.Demultiplexer
_ = s.Start(demux)
defer s.Stop()
// _ = s.Start(demux)
// defer s.Stop()

done := make(chan struct{})
go func() {
Expand Down Expand Up @@ -138,8 +139,8 @@ func benchmarkMapperControl(b *testing.B, yaml string) {
pkgconfig.SetupLogger("", "off", "", "", false, true, false)

demux := deps.Demultiplexer
_ = s.Start(demux)
defer s.Stop()
// _ = s.Start(demux)
// defer s.Stop()

done := make(chan struct{})
go func() {
Expand Down
Loading
Loading