Skip to content

Commit

Permalink
[mq] working branch - merge be9fd32 on top of main at d3c07fc
Browse files Browse the repository at this point in the history
{"baseBranch":"main","baseCommit":"d3c07fce4a0dc7eea2a79b9dd168c73c718f4cba","createdAt":"2024-02-15T19:53:49.206988Z","headSha":"be9fd32f7c4590fd39e9810f86219eb1e22f6b52","id":"78b4842b-000b-41cc-ac80-8a207e24c70f","priority":"200","pullRequestNumber":"22584","queuedAt":"2024-02-15T19:53:49.206329Z","status":"STATUS_QUEUED"}
  • Loading branch information
dd-mergequeue[bot] authored Feb 15, 2024
2 parents 4a6f256 + be9fd32 commit 4ad76ca
Show file tree
Hide file tree
Showing 14 changed files with 125 additions and 123 deletions.
13 changes: 5 additions & 8 deletions cmd/agent/subcommands/run/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func run(log log.Component,
collector collector.Component,
) error {
defer func() {
stopAgent(cliParams, server, agentAPI)
stopAgent(cliParams, agentAPI)
}()

// prepare go runtime
Expand Down Expand Up @@ -330,7 +330,7 @@ func getSharedFxOption() fx.Option {
dogstatsdStatusimpl.Module(),
statusimpl.Module(),
apiimpl.Module(),

demultiplexerimpl.Module(),
dogstatsd.Bundle(),
otelcol.Bundle(),
rctelemetryreporterimpl.Module(),
Expand Down Expand Up @@ -365,7 +365,6 @@ func getSharedFxOption() fx.Option {
params.EnableNoAggregationPipeline = config.GetBool("dogstatsd_no_aggregation_pipeline")
return params
}),
demultiplexerimpl.Module(),
orchestratorForwarderImpl.Module(),
fx.Supply(orchestratorForwarderImpl.NewDefaultParams()),
eventplatformimpl.Module(),
Expand Down Expand Up @@ -595,7 +594,6 @@ func startAgent(

// start dogstatsd
if pkgconfig.Datadog.GetBool("use_dogstatsd") {
err := server.Start(demultiplexer)
if err != nil {
log.Errorf("Could not start dogstatsd: %s", err)
} else {
Expand Down Expand Up @@ -631,12 +629,12 @@ func startAgent(
}

// StopAgentWithDefaults is a temporary way for other packages to use stopAgent.
func StopAgentWithDefaults(server dogstatsdServer.Component, agentAPI internalAPI.Component) {
stopAgent(&cliParams{GlobalParams: &command.GlobalParams{}}, server, agentAPI)
func StopAgentWithDefaults(agentAPI internalAPI.Component) {
stopAgent(&cliParams{GlobalParams: &command.GlobalParams{}}, agentAPI)
}

// stopAgent Tears down the agent process
func stopAgent(cliParams *cliParams, server dogstatsdServer.Component, agentAPI internalAPI.Component) {
func stopAgent(cliParams *cliParams, agentAPI internalAPI.Component) {
// retrieve the agent health before stopping the components
// GetReadyNonBlocking has a 100ms timeout to avoid blocking
health, err := health.GetReadyNonBlocking()
Expand All @@ -651,7 +649,6 @@ func stopAgent(cliParams *cliParams, server dogstatsdServer.Component, agentAPI
pkglog.Errorf("Error shutting down expvar server: %v", err)
}
}
server.Stop()
if common.AC != nil {
common.AC.Stop()
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/agent/subcommands/run/command_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func StartAgentWithDefaults(ctxChan <-chan context.Context) (<-chan error, error
collector collector.Component,
) error {

defer StopAgentWithDefaults(server, agentAPI)
defer StopAgentWithDefaults(agentAPI)

err := startAgent(
&cliParams{GlobalParams: &command.GlobalParams{}},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/fx"

"github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer"
Expand Down Expand Up @@ -41,16 +40,11 @@ func TestDogstatsdMetricsStats(t *testing.T) {
fx.Supply(server.Params{
Serverless: false,
}),
demultiplexerimpl.MockModule(),
dogstatsd.Bundle(),
defaultforwarder.MockModule(),
demultiplexerimpl.MockModule(),
))

demux := deps.Demultiplexer
deps.Server.Start(demux)

require.Nil(t, err)

s := DsdStatsRuntimeSetting{
ServerDebug: deps.Debug,
}
Expand Down
3 changes: 0 additions & 3 deletions cmd/dogstatsd/subcommands/start/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,6 @@ func RunDogstatsd(ctx context.Context, cliParams *CLIParams, config config.Compo

demultiplexer.AddAgentStartupTelemetry(version.AgentVersion)

err = components.DogstatsdServer.Start(demultiplexer)
if err != nil {
log.Criticalf("Unable to start dogstatsd: %s", err)
return
Expand Down Expand Up @@ -323,8 +322,6 @@ func StopAgent(cancel context.CancelFunc, components *DogstatsdComponents) {
}
}

components.DogstatsdServer.Stop()

pkglog.Info("See ya!")
pkglog.Flush()
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type provides struct {
DiagnosticSenderManager diagnosesendermanager.Component
SenderManager sender.SenderManager
StatusProvider status.InformationProvider
AggregatorDemultiplexer aggregator.Demultiplexer
}

func newDemultiplexer(deps dependencies) (provides, error) {
Expand Down Expand Up @@ -94,6 +95,7 @@ func newDemultiplexer(deps dependencies) (provides, error) {
StatusProvider: status.NewInformationProvider(demultiplexerStatus{
Log: deps.Log,
}),
AggregatorDemultiplexer: demultiplexer,
}, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ import (
// FakeSamplerMockModule defines the fx options for FakeSamplerMock.
func FakeSamplerMockModule() fxutil.Module {
return fxutil.Component(
fx.Provide(newFakeSamplerMock))
fx.Provide(newFakeSamplerMock),
fx.Provide(func(demux demultiplexerComp.FakeSamplerMock) aggregator.Demultiplexer {
return demux
}),
)
}

type fakeSamplerMockDependencies struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ import (
// MockModule defines the fx options for this component.
func MockModule() fxutil.Module {
return fxutil.Component(
fx.Provide(newMock))
fx.Provide(newMock),
fx.Provide(func(demux demultiplexerComp.Component) aggregator.Demultiplexer {
return demux
}),
)
}

type mock struct {
Expand Down
8 changes: 0 additions & 8 deletions comp/dogstatsd/server/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ package server
import (
"time"

"github.com/DataDog/datadog-agent/pkg/aggregator"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
"go.uber.org/fx"
)
Expand All @@ -18,13 +17,6 @@ import (

// Component is the component type.
type Component interface {

// Start starts the dogstatsd server
Start(demultiplexer aggregator.Demultiplexer) error

// Stop stops the dogstatsd server
Stop()

// IsRunning returns true if the server is running
IsRunning() bool

Expand Down
39 changes: 21 additions & 18 deletions comp/dogstatsd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@ import (

configComponent "github.com/DataDog/datadog-agent/comp/core/config"
logComponent "github.com/DataDog/datadog-agent/comp/core/log"
logComponentImpl "github.com/DataDog/datadog-agent/comp/core/log/logimpl"
"github.com/DataDog/datadog-agent/comp/dogstatsd/listeners"
"github.com/DataDog/datadog-agent/comp/dogstatsd/mapper"
"github.com/DataDog/datadog-agent/comp/dogstatsd/packets"
"github.com/DataDog/datadog-agent/comp/dogstatsd/replay"
serverdebug "github.com/DataDog/datadog-agent/comp/dogstatsd/serverDebug"
"github.com/DataDog/datadog-agent/comp/dogstatsd/serverDebug/serverdebugimpl"
"github.com/DataDog/datadog-agent/pkg/aggregator"
"github.com/DataDog/datadog-agent/pkg/config"
"github.com/DataDog/datadog-agent/pkg/metrics"
Expand Down Expand Up @@ -67,6 +65,10 @@ var (
type dependencies struct {
fx.In

Lc fx.Lifecycle

Demultiplexer aggregator.Demultiplexer

Log logComponent.Component
Config configComponent.Component
Debug serverdebug.Component
Expand Down Expand Up @@ -187,19 +189,20 @@ func initTelemetry(cfg config.Reader, logger logComponent.Component) {
packets.InitTelemetry(get("telemetry.dogstatsd.listeners_channel_latency_buckets"))
}

// TODO: (components) - remove once serverless is an FX app
//
//nolint:revive // TODO(AML) Fix revive linter
func NewServerlessServer() Component {
return newServerCompat(config.Datadog, logComponentImpl.NewTemporaryLoggerWithoutInit(), replay.NewServerlessTrafficCapture(), serverdebugimpl.NewServerlessServerDebug(), true)
}

// TODO: (components) - merge with newServerCompat once NewServerlessServer is removed
func newServer(deps dependencies) Component {
return newServerCompat(deps.Config, deps.Log, deps.Replay, deps.Debug, deps.Params.Serverless)
s := newServerCompat(deps.Config, deps.Log, deps.Replay, deps.Debug, deps.Params.Serverless, deps.Demultiplexer)

deps.Lc.Append(fx.Hook{
OnStart: s.start,
OnStop: s.stop,
})

return s

}

func newServerCompat(cfg config.Reader, log logComponent.Component, capture replay.Component, debug serverdebug.Component, serverless bool) Component {
func newServerCompat(cfg config.Reader, log logComponent.Component, capture replay.Component, debug serverdebug.Component, serverless bool, demux aggregator.Demultiplexer) *server {
// This needs to be done after the configuration is loaded
once.Do(func() { initTelemetry(cfg, log) })

Expand Down Expand Up @@ -272,7 +275,7 @@ func newServerCompat(cfg config.Reader, log logComponent.Component, capture repl
sharedPacketPool: nil,
sharedPacketPoolManager: nil,
sharedFloat64List: newFloat64ListPool(),
demultiplexer: nil,
demultiplexer: demux,
listeners: nil,
stopChan: make(chan bool),
serverlessFlushChan: make(chan bool),
Expand Down Expand Up @@ -301,13 +304,11 @@ func newServerCompat(cfg config.Reader, log logComponent.Component, capture repl
originOptOutEnabled: cfg.GetBool("dogstatsd_origin_optout_enabled"),
},
}

return s
}

func (s *server) Start(demultiplexer aggregator.Demultiplexer) error {

// TODO: (components) - DI this into Server when Demultiplexer is made into a component
s.demultiplexer = demultiplexer
func (s *server) start(context.Context) error {

packetsChannel := make(chan packets.Packets, s.config.GetInt("dogstatsd_queue_size"))
tmpListeners := make([]listeners.StatsdListener, 0, 2)
Expand Down Expand Up @@ -444,9 +445,9 @@ func (s *server) Start(demultiplexer aggregator.Demultiplexer) error {
return nil
}

func (s *server) Stop() {
func (s *server) stop(context.Context) error {
if !s.IsRunning() {
return
return nil
}
close(s.stopChan)
for _, l := range s.listeners {
Expand All @@ -460,6 +461,8 @@ func (s *server) Stop() {
}
s.health.Deregister() //nolint:errcheck
s.Started = false

return nil
}

func (s *server) IsRunning() bool {
Expand Down
7 changes: 1 addition & 6 deletions comp/dogstatsd/server/server_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ func benchParsePackets(b *testing.B, rawPacket []byte) {
pkgconfig.SetupLogger("", "off", "", "", false, true, false)

demux := deps.Demultiplexer
_ = s.Start(demux)
defer s.Stop()
defer demux.Stop(false)

done := make(chan struct{})
go func() {
Expand Down Expand Up @@ -85,8 +84,6 @@ func BenchmarkPbarseMetricMessage(b *testing.B) {
pkgconfig.SetupLogger("", "off", "", "", false, true, false)

demux := deps.Demultiplexer
_ = s.Start(demux)
defer s.Stop()

done := make(chan struct{})
go func() {
Expand Down Expand Up @@ -138,8 +135,6 @@ func benchmarkMapperControl(b *testing.B, yaml string) {
pkgconfig.SetupLogger("", "off", "", "", false, true, false)

demux := deps.Demultiplexer
_ = s.Start(demux)
defer s.Stop()

done := make(chan struct{})
go func() {
Expand Down
Loading

0 comments on commit 4ad76ca

Please sign in to comment.