Skip to content

Commit

Permalink
[ASCII-931][component] Create orchestrator forwarder component (#21058)
Browse files Browse the repository at this point in the history
* Move NewOrchestratorForwarder into a new component

* Reduce the number of calls to `InitAndStartAgentDemultiplexerForTest`
Goal: Make next commit easier
Note: DemultiplexerMock cannot be used in pkg/aggregator because comp/aggregator depends on  pkg/aggregator

* Use orchestrator forwarder component.

* Make orchestrator component optional

* Fix TestDemuxForwardersCreated

* Update CODEOWNERS (aggregator is owned by agent-metrics-logs)

* Fix linters

* Move the component to comp/forwarder/orchestrator

* Fix lint-components

* Fix Windows compilation issue

* Disable Orchestrator Forwarder for open telemetry

* Update .github/CODEOWNERS

Co-authored-by: Pierre Margueritte <[email protected]>

* Remove empty lines

* Improve comment for Component.Reset

* Use constructor for Params

* Fix windows linter

* Improve comments

* Fix code owner

---------

Co-authored-by: Pierre Margueritte <[email protected]>
  • Loading branch information
ogaca-dd and mfpierre authored Dec 1, 2023
1 parent 307265e commit 04eeda4
Show file tree
Hide file tree
Showing 34 changed files with 398 additions and 203 deletions.
2 changes: 2 additions & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@
/cmd/cluster-agent/api/v1/cloudfoundry_metadata.go @DataDog/platform-integrations
/cmd/cws-instrumentation/ @DataDog/agent-security
/cmd/dogstatsd/ @DataDog/agent-metrics-logs
/cmd/otel-agent/ @DataDog/opentelemetry
/cmd/process-agent/ @DataDog/processes
/cmd/serverless/ @DataDog/serverless
/cmd/serverless-init/ @DataDog/serverless
Expand Down Expand Up @@ -227,6 +228,7 @@
/comp/core/sysprobeconfig @DataDog/ebpf-platform
/comp/core/workloadmeta @DataDog/container-integrations
/comp/dogstatsd/statsd @DataDog/agent-shared-components
/comp/forwarder/orchestrator @DataDog/agent-metrics-logs
# END COMPONENTS

# pkg
Expand Down
3 changes: 3 additions & 0 deletions cmd/agent/subcommands/run/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import (
"github.com/DataDog/datadog-agent/comp/forwarder"
"github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"
pkgforwarder "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"
orchestratorForwarderImpl "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator/orchestratorimpl"
langDetectionCl "github.com/DataDog/datadog-agent/comp/languagedetection/client"
"github.com/DataDog/datadog-agent/comp/logs"
logsAgent "github.com/DataDog/datadog-agent/comp/logs/agent"
Expand Down Expand Up @@ -351,6 +352,8 @@ func getSharedFxOption() fx.Option {
return demultiplexer.Params{Options: opts}
}),
demultiplexer.Module,
orchestratorForwarderImpl.Module,
fx.Supply(orchestratorForwarderImpl.NewDefaultParams()),
// injecting the shared Serializer to FX until we migrate it to a prpoper component. This allows other
// already migrated components to request it.
fx.Provide(func(demuxInstance demultiplexer.Component) serializer.MetricSerializer {
Expand Down
4 changes: 3 additions & 1 deletion cmd/cluster-agent-cloudfoundry/subcommands/run/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors"
"github.com/DataDog/datadog-agent/comp/forwarder"
"github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"
orchestratorForwarderImpl "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator/orchestratorimpl"
"github.com/DataDog/datadog-agent/pkg/aggregator"
"github.com/DataDog/datadog-agent/pkg/api/healthprobe"
"github.com/DataDog/datadog-agent/pkg/clusteragent"
Expand Down Expand Up @@ -67,10 +68,11 @@ func Commands(globalParams *command.GlobalParams) []*cobra.Command {
forwarder.Bundle,
fx.Provide(defaultforwarder.NewParamsWithResolvers),
demultiplexer.Module,
orchestratorForwarderImpl.Module,
fx.Supply(orchestratorForwarderImpl.NewDisabledParams()),
fx.Provide(func() demultiplexer.Params {
opts := aggregator.DefaultAgentDemultiplexerOptions()
opts.UseEventPlatformForwarder = false
opts.UseOrchestratorForwarder = false
return demultiplexer.Params{Options: opts}
}),
// setup workloadmeta
Expand Down
3 changes: 3 additions & 0 deletions cmd/cluster-agent/subcommands/start/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors"
"github.com/DataDog/datadog-agent/comp/forwarder"
"github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"
orchestratorForwarderImpl "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator/orchestratorimpl"
"github.com/DataDog/datadog-agent/pkg/aggregator"
"github.com/DataDog/datadog-agent/pkg/api/healthprobe"
"github.com/DataDog/datadog-agent/pkg/clusteragent"
Expand Down Expand Up @@ -93,6 +94,8 @@ func Commands(globalParams *command.GlobalParams) []*cobra.Command {
return params
}),
demultiplexer.Module,
orchestratorForwarderImpl.Module,
fx.Supply(orchestratorForwarderImpl.NewDefaultParams()),
fx.Provide(func() demultiplexer.Params {
opts := aggregator.DefaultAgentDemultiplexerOptions()
opts.UseEventPlatformForwarder = false
Expand Down
4 changes: 3 additions & 1 deletion cmd/dogstatsd/subcommands/start/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
dogstatsdServer "github.com/DataDog/datadog-agent/comp/dogstatsd/server"
"github.com/DataDog/datadog-agent/comp/forwarder"
"github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"
orchestratorForwarderImpl "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator/orchestratorimpl"
"github.com/DataDog/datadog-agent/comp/metadata"
"github.com/DataDog/datadog-agent/comp/metadata/host"
"github.com/DataDog/datadog-agent/comp/metadata/resources/resourcesimpl"
Expand Down Expand Up @@ -128,14 +129,15 @@ func RunDogstatsdFct(cliParams *CLIParams, defaultConfPath string, defaultLogFil
workloadmeta.OptionalModule,
demultiplexer.Module,
secretsimpl.Module,
orchestratorForwarderImpl.Module,
fx.Supply(orchestratorForwarderImpl.NewDisabledParams()),
// injecting the shared Serializer to FX until we migrate it to a prpoper component. This allows other
// already migrated components to request it.
fx.Provide(func(demuxInstance demultiplexer.Component) serializer.MetricSerializer {
return demuxInstance.Serializer()
}),
fx.Provide(func(config config.Component) demultiplexer.Params {
opts := aggregator.DefaultAgentDemultiplexerOptions()
opts.UseOrchestratorForwarder = false
opts.UseEventPlatformForwarder = false
opts.EnableNoAggregationPipeline = config.GetBool("dogstatsd_no_aggregation_pipeline")
return demultiplexer.Params{Options: opts, ContinueOnMissingHostname: true}
Expand Down
3 changes: 3 additions & 0 deletions cmd/otel-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/DataDog/datadog-agent/comp/core/secrets"
"github.com/DataDog/datadog-agent/comp/forwarder"
"github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"
orchestratorForwarderImpl "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator/orchestratorimpl"
"github.com/DataDog/datadog-agent/comp/logs"
logsAgent "github.com/DataDog/datadog-agent/comp/logs/agent"
"github.com/DataDog/datadog-agent/comp/otelcol"
Expand Down Expand Up @@ -72,6 +73,8 @@ func main() {
),
fx.Provide(newForwarderParams),
demultiplexer.Module,
orchestratorForwarderImpl.Module,
fx.Supply(orchestratorForwarderImpl.NewDisabledParams()),
fx.Provide(newSerializer),
fx.Provide(func(cfg config.Component) demultiplexer.Params {
opts := aggregator.DefaultAgentDemultiplexerOptions()
Expand Down
4 changes: 3 additions & 1 deletion cmd/security-agent/main_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/DataDog/datadog-agent/comp/dogstatsd/statsd"
"github.com/DataDog/datadog-agent/comp/forwarder"
"github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"
orchestratorForwarderImpl "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator/orchestratorimpl"

"github.com/DataDog/datadog-agent/pkg/aggregator"
pkgconfig "github.com/DataDog/datadog-agent/pkg/config"
Expand Down Expand Up @@ -102,10 +103,11 @@ func (s *service) Run(svcctx context.Context) error {
forwarder.Bundle,
fx.Provide(defaultforwarder.NewParamsWithResolvers),
demultiplexer.Module,
orchestratorForwarderImpl.Module,
fx.Supply(orchestratorForwarderImpl.NewDisabledParams()),
fx.Provide(func() demultiplexer.Params {
opts := aggregator.DefaultAgentDemultiplexerOptions()
opts.UseEventPlatformForwarder = false
opts.UseOrchestratorForwarder = false
return demultiplexer.Params{Options: opts}
}),

Expand Down
5 changes: 4 additions & 1 deletion cmd/security-agent/subcommands/start/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/DataDog/datadog-agent/comp/dogstatsd/statsd"
"github.com/DataDog/datadog-agent/comp/forwarder"
"github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"
orchestratorForwarderImpl "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator/orchestratorimpl"
"github.com/DataDog/datadog-agent/pkg/aggregator"
pkgconfig "github.com/DataDog/datadog-agent/pkg/config"
"github.com/DataDog/datadog-agent/pkg/config/model"
Expand Down Expand Up @@ -90,10 +91,12 @@ func Commands(globalParams *command.GlobalParams) []*cobra.Command {
forwarder.Bundle,
fx.Provide(defaultforwarder.NewParamsWithResolvers),
demultiplexer.Module,
orchestratorForwarderImpl.Module,
fx.Supply(orchestratorForwarderImpl.NewDisabledParams()),
fx.Provide(func() demultiplexer.Params {
opts := aggregator.DefaultAgentDemultiplexerOptions()
opts.UseEventPlatformForwarder = false
opts.UseOrchestratorForwarder = false

return demultiplexer.Params{Options: opts}
}),
// workloadmeta setup
Expand Down
6 changes: 6 additions & 0 deletions comp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ Package forwarder implements the "forwarder" bundle

Package defaultForwarder implements a component to send payloads to the backend

### [comp/forwarder/orchestrator](https://pkg.go.dev/github.com/DataDog/datadog-agent/comp/forwarder/orchestrator)

*Datadog Team*: agent-metrics-logs

Package orchestrator implements the orchestrator forwarder component.

## [comp/languagedetection](https://pkg.go.dev/github.com/DataDog/datadog-agent/comp/languagedetection) (Component Bundle)

*Datadog Team*: container-integrations
Expand Down
2 changes: 2 additions & 0 deletions comp/aggregator/bundle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer"
"github.com/DataDog/datadog-agent/comp/core"
"github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"
orchestratorForwarderImpl "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator/orchestratorimpl"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
"go.uber.org/fx"
)
Expand All @@ -19,6 +20,7 @@ func TestBundleDependencies(t *testing.T) {
fxutil.TestBundle(t, Bundle,
core.MockBundle,
defaultforwarder.MockModule,
orchestratorForwarderImpl.MockModule,
fx.Supply(demultiplexer.Params{}),
)
}
7 changes: 5 additions & 2 deletions comp/aggregator/demultiplexer/demultiplexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/DataDog/datadog-agent/comp/aggregator/diagnosesendermanager"
"github.com/DataDog/datadog-agent/comp/core/log"
"github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"
orchestratorforwarder "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator"
"github.com/DataDog/datadog-agent/pkg/aggregator"
"github.com/DataDog/datadog-agent/pkg/aggregator/sender"
"github.com/DataDog/datadog-agent/pkg/util/hostname"
Expand All @@ -19,8 +20,9 @@ import (

type dependencies struct {
fx.In
Log log.Component
SharedForwarder defaultforwarder.Component
Log log.Component
SharedForwarder defaultforwarder.Component
OrchestratorForwarder orchestratorforwarder.Component

Params Params
}
Expand Down Expand Up @@ -57,6 +59,7 @@ func newDemultiplexer(deps dependencies) (provides, error) {
agentDemultiplexer := aggregator.InitAndStartAgentDemultiplexer(
deps.Log,
deps.SharedForwarder,
deps.OrchestratorForwarder,
deps.Params.Options,
hostnameDetected)
demultiplexer := demultiplexer{
Expand Down
10 changes: 8 additions & 2 deletions comp/aggregator/demultiplexer/demultiplexer_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
package demultiplexer

import (
"github.com/DataDog/datadog-agent/comp/core/log"
"github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"
"github.com/DataDog/datadog-agent/pkg/aggregator"
"github.com/DataDog/datadog-agent/pkg/aggregator/sender"
"go.uber.org/fx"
Expand Down Expand Up @@ -35,15 +37,19 @@ func (m *mock) LazyGetSenderManager() (sender.SenderManager, error) {

type mockDependencies struct {
fx.In
AggregatorDeps aggregator.TestDeps
Log log.Component
}

func newMock(deps mockDependencies) (Component, Mock) {
opts := aggregator.DefaultAgentDemultiplexerOptions()
opts.DontStartForwarders = true

aggDeps := aggregator.TestDeps{
Log: deps.Log,
SharedForwarder: defaultforwarder.NoopForwarder{},
}
demultiplexer := demultiplexer{
AgentDemultiplexer: aggregator.InitAndStartAgentDemultiplexerForTest(deps.AggregatorDeps, opts, ""),
AgentDemultiplexer: aggregator.InitAndStartAgentDemultiplexerForTest(aggDeps, opts, ""),
}

instance := &mock{Component: demultiplexer}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,15 @@ func (sender *diagnoseSenderManager) LazyGetSenderManager() (sender.SenderManage
opts.FlushInterval = 0
opts.DontStartForwarders = true
opts.UseNoopEventPlatformForwarder = true
opts.UseNoopOrchestratorForwarder = true

log := sender.deps.Log
config := sender.deps.Config
forwarder := defaultforwarder.NewDefaultForwarder(config, log, defaultforwarder.NewOptions(config, log, nil))
orchestratorForwarder := optional.NewOption[defaultforwarder.Forwarder](defaultforwarder.NoopForwarder{})
senderManager = aggregator.InitAndStartAgentDemultiplexer(
log,
forwarder,
&orchestratorForwarder,
opts,
hostnameDetected)

Expand Down
14 changes: 6 additions & 8 deletions comp/dogstatsd/server/server_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,23 @@ import (

"github.com/DataDog/datadog-agent/comp/core/config"
"github.com/DataDog/datadog-agent/comp/core/log"
forwarder "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"
"github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"
"github.com/DataDog/datadog-agent/pkg/aggregator"
pkgconfig "github.com/DataDog/datadog-agent/pkg/config"
"github.com/DataDog/datadog-agent/pkg/metrics"
"github.com/DataDog/datadog-agent/pkg/util/optional"

"github.com/DataDog/datadog-agent/comp/dogstatsd/packets"
)

func mockDemultiplexer(config config.Component, log log.Component) aggregator.Demultiplexer {
return mockDemultiplexerWithFlushInterval(config, log, time.Millisecond*10)
}

func mockDemultiplexerWithFlushInterval(config config.Component, log log.Component, d time.Duration) aggregator.Demultiplexer {
d := time.Millisecond * 10
opts := aggregator.DefaultAgentDemultiplexerOptions()
opts.FlushInterval = d
opts.DontStartForwarders = true
forwarder := forwarder.NewDefaultForwarder(config, log, forwarder.NewOptions(config, log, nil))

demux := aggregator.InitAndStartAgentDemultiplexer(log, forwarder, opts, "hostname")
forwarder := defaultforwarder.NewDefaultForwarder(config, log, defaultforwarder.NewOptions(config, log, nil))
orchestratorForwarder := optional.NewOption[defaultforwarder.Forwarder](defaultforwarder.NoopForwarder{})
demux := aggregator.InitAndStartAgentDemultiplexer(log, forwarder, &orchestratorForwarder, opts, "hostname")
return demux
}

Expand Down
23 changes: 23 additions & 0 deletions comp/forwarder/orchestrator/component.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2023-present Datadog, Inc.

// Package orchestrator implements the orchestrator forwarder component.
package orchestrator

import "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"

// team: agent-metrics-logs

// Component is the component type.
// The main method of this component is `Get` which returns the forwarder instance only if it enabled.
type Component interface {
// Get the forwarder instance if it exists.
Get() (defaultforwarder.Forwarder, bool)

// TODO: (components): This function is used to know if Stop was already called in AgentDemultiplexer.Stop.
// Reset results `Get` methods to return false.
// Remove it when Stop is not part of this interface.
Reset()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2023-present Datadog, Inc.

//go:build !orchestrator

// Package orchestratorimpl implements the orchestrator forwarder component.
package orchestratorimpl

import (
"go.uber.org/fx"

"github.com/DataDog/datadog-agent/comp/core/config"
"github.com/DataDog/datadog-agent/comp/core/log"
"github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"
"github.com/DataDog/datadog-agent/comp/forwarder/orchestrator"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
"github.com/DataDog/datadog-agent/pkg/util/optional"
)

// Module defines the fx options for this component.
var Module = fxutil.Component(
fx.Provide(newOrchestratorForwarder),
)

// newOrchestratorForwarder builds the orchestrator forwarder.
// This func has been extracted in this file to not include all the orchestrator
// dependencies (k8s, several MBs) while building binaries not needing these.
func newOrchestratorForwarder(_ log.Component, _ config.Component, params Params) orchestrator.Component {
if params.useNoopOrchestratorForwarder {
forwarder := optional.NewOption[defaultforwarder.Forwarder](defaultforwarder.NoopForwarder{})
return &forwarder
}
forwarder := optional.NewNoneOption[defaultforwarder.Forwarder]()
return &forwarder
}
Loading

0 comments on commit 04eeda4

Please sign in to comment.