Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ASCII-931][component] Create orchestrator forwarder component #21058

Merged
merged 21 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@
/cmd/cluster-agent/api/v1/cloudfoundry_metadata.go @DataDog/platform-integrations
/cmd/cws-instrumentation/ @DataDog/agent-security
/cmd/dogstatsd/ @DataDog/agent-metrics-logs
/cmd/otel-agent/ @DataDog/opentelemetry
/cmd/process-agent/ @DataDog/processes
/cmd/serverless/ @DataDog/serverless
/cmd/serverless-init/ @DataDog/serverless
Expand Down Expand Up @@ -224,6 +225,7 @@
/comp/core/sysprobeconfig @DataDog/ebpf-platform
/comp/core/workloadmeta @DataDog/container-integrations
/comp/dogstatsd/statsd @DataDog/agent-shared-components
/comp/forwarder/orchestrator @DataDog/agent-metrics-logs @DataDog/container-app
# END COMPONENTS

# pkg
Expand Down
3 changes: 3 additions & 0 deletions cmd/agent/subcommands/run/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"github.com/DataDog/datadog-agent/comp/forwarder"
"github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"
pkgforwarder "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"
orchestratorForwarderImpl "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator/orchestratorimpl"
langDetectionCl "github.com/DataDog/datadog-agent/comp/languagedetection/client"
"github.com/DataDog/datadog-agent/comp/logs"
logsAgent "github.com/DataDog/datadog-agent/comp/logs/agent"
Expand Down Expand Up @@ -347,6 +348,8 @@ func getSharedFxOption() fx.Option {
return demultiplexer.Params{Options: opts}
}),
demultiplexer.Module,
orchestratorForwarderImpl.Module,
fx.Supply(orchestratorForwarderImpl.NewDefaultParams()),
// injecting the shared Serializer to FX until we migrate it to a prpoper component. This allows other
// already migrated components to request it.
fx.Provide(func(demuxInstance demultiplexer.Component) serializer.MetricSerializer {
Expand Down
4 changes: 3 additions & 1 deletion cmd/cluster-agent-cloudfoundry/subcommands/run/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors"
"github.com/DataDog/datadog-agent/comp/forwarder"
"github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"
orchestratorForwarderImpl "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator/orchestratorimpl"
"github.com/DataDog/datadog-agent/pkg/aggregator"
"github.com/DataDog/datadog-agent/pkg/api/healthprobe"
"github.com/DataDog/datadog-agent/pkg/clusteragent"
Expand Down Expand Up @@ -67,10 +68,11 @@ func Commands(globalParams *command.GlobalParams) []*cobra.Command {
forwarder.Bundle,
fx.Provide(defaultforwarder.NewParamsWithResolvers),
demultiplexer.Module,
orchestratorForwarderImpl.Module,
fx.Supply(orchestratorForwarderImpl.Params{UseOrchestratorForwarder: false}),
ogaca-dd marked this conversation as resolved.
Show resolved Hide resolved
fx.Provide(func() demultiplexer.Params {
opts := aggregator.DefaultAgentDemultiplexerOptions()
opts.UseEventPlatformForwarder = false
opts.UseOrchestratorForwarder = false
return demultiplexer.Params{Options: opts}
}),
// setup workloadmeta
Expand Down
3 changes: 3 additions & 0 deletions cmd/cluster-agent/subcommands/start/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors"
"github.com/DataDog/datadog-agent/comp/forwarder"
"github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"
orchestratorForwarderImpl "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator/orchestratorimpl"
"github.com/DataDog/datadog-agent/pkg/aggregator"
"github.com/DataDog/datadog-agent/pkg/api/healthprobe"
"github.com/DataDog/datadog-agent/pkg/clusteragent"
Expand Down Expand Up @@ -93,6 +94,8 @@ func Commands(globalParams *command.GlobalParams) []*cobra.Command {
return params
}),
demultiplexer.Module,
orchestratorForwarderImpl.Module,
fx.Supply(orchestratorForwarderImpl.NewDefaultParams()),
fx.Provide(func() demultiplexer.Params {
opts := aggregator.DefaultAgentDemultiplexerOptions()
opts.UseEventPlatformForwarder = false
Expand Down
4 changes: 3 additions & 1 deletion cmd/dogstatsd/subcommands/start/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
dogstatsdServer "github.com/DataDog/datadog-agent/comp/dogstatsd/server"
"github.com/DataDog/datadog-agent/comp/forwarder"
"github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"
orchestratorForwarderImpl "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator/orchestratorimpl"
"github.com/DataDog/datadog-agent/comp/metadata"
"github.com/DataDog/datadog-agent/comp/metadata/host"
"github.com/DataDog/datadog-agent/comp/metadata/resources/resourcesimpl"
Expand Down Expand Up @@ -128,14 +129,15 @@ func RunDogstatsdFct(cliParams *CLIParams, defaultConfPath string, defaultLogFil
workloadmeta.OptionalModule,
demultiplexer.Module,
secretsimpl.Module,
orchestratorForwarderImpl.Module,
fx.Supply(orchestratorForwarderImpl.Params{UseOrchestratorForwarder: false}),
// injecting the shared Serializer to FX until we migrate it to a prpoper component. This allows other
// already migrated components to request it.
fx.Provide(func(demuxInstance demultiplexer.Component) serializer.MetricSerializer {
return demuxInstance.Serializer()
}),
fx.Provide(func(config config.Component) demultiplexer.Params {
opts := aggregator.DefaultAgentDemultiplexerOptions()
opts.UseOrchestratorForwarder = false
opts.UseEventPlatformForwarder = false
opts.EnableNoAggregationPipeline = config.GetBool("dogstatsd_no_aggregation_pipeline")
return demultiplexer.Params{Options: opts, ContinueOnMissingHostname: true}
Expand Down
3 changes: 3 additions & 0 deletions cmd/otel-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/DataDog/datadog-agent/comp/core/secrets"
"github.com/DataDog/datadog-agent/comp/forwarder"
"github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"
orchestratorForwarderImpl "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator/orchestratorimpl"
"github.com/DataDog/datadog-agent/comp/logs"
logsAgent "github.com/DataDog/datadog-agent/comp/logs/agent"
"github.com/DataDog/datadog-agent/comp/otelcol"
Expand Down Expand Up @@ -72,6 +73,8 @@ func main() {
),
fx.Provide(newForwarderParams),
demultiplexer.Module,
orchestratorForwarderImpl.Module,
fx.Supply(orchestratorForwarderImpl.Params{UseOrchestratorForwarder: false}),
fx.Provide(newSerializer),
fx.Provide(func(cfg config.Component) demultiplexer.Params {
opts := aggregator.DefaultAgentDemultiplexerOptions()
Expand Down
4 changes: 3 additions & 1 deletion cmd/security-agent/main_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/DataDog/datadog-agent/comp/dogstatsd/statsd"
"github.com/DataDog/datadog-agent/comp/forwarder"
"github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"
orchestratorForwarderImpl "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator/orchestratorimpl"

"github.com/DataDog/datadog-agent/pkg/aggregator"
pkgconfig "github.com/DataDog/datadog-agent/pkg/config"
Expand Down Expand Up @@ -102,10 +103,11 @@ func (s *service) Run(svcctx context.Context) error {
forwarder.Bundle,
fx.Provide(defaultforwarder.NewParamsWithResolvers),
demultiplexer.Module,
orchestratorForwarderImpl.Module,
fx.Supply(orchestratorForwarderImpl.Params{UseOrchestratorForwarder: false}),
fx.Provide(func() demultiplexer.Params {
opts := aggregator.DefaultAgentDemultiplexerOptions()
opts.UseEventPlatformForwarder = false
opts.UseOrchestratorForwarder = false
return demultiplexer.Params{Options: opts}
}),

Expand Down
5 changes: 4 additions & 1 deletion cmd/security-agent/subcommands/start/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/DataDog/datadog-agent/comp/dogstatsd/statsd"
"github.com/DataDog/datadog-agent/comp/forwarder"
"github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"
orchestratorForwarderImpl "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator/orchestratorimpl"
"github.com/DataDog/datadog-agent/pkg/aggregator"
pkgconfig "github.com/DataDog/datadog-agent/pkg/config"
"github.com/DataDog/datadog-agent/pkg/config/model"
Expand Down Expand Up @@ -91,10 +92,12 @@ func Commands(globalParams *command.GlobalParams) []*cobra.Command {
forwarder.Bundle,
fx.Provide(defaultforwarder.NewParamsWithResolvers),
demultiplexer.Module,
orchestratorForwarderImpl.Module,
fx.Supply(orchestratorForwarderImpl.Params{UseOrchestratorForwarder: false}),
fx.Provide(func() demultiplexer.Params {
opts := aggregator.DefaultAgentDemultiplexerOptions()
opts.UseEventPlatformForwarder = false
opts.UseOrchestratorForwarder = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As can be seen here, I believe the security agent does not need the orchestrator forwarder

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, it doesn't and this is the reason why it is not enabled. See fx.Supply(orchestratorForwarderImpl.Params{UseOrchestratorForwarder: false}) few lines upper.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok indeed, what would happen if we did not provide the module and the params supply ? Wouldn't it default to disabled as well ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the demultiplexer uses the orchestrator forwarder, orchestrator forwarder is always mandatory when using the demultiplexer (Fx refuses to start the application if the module and the params are missing).
Note: There are units tests that check subcommands provides all the require modules and params. There is a linter to make sure all subcommand are tested (I also have a card to check there is no missing tests for other cases).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok understood


return demultiplexer.Params{Options: opts}
}),
// workloadmeta setup
Expand Down
6 changes: 6 additions & 0 deletions comp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ Package forwarder implements the "forwarder" bundle

Package defaultForwarder implements a component to send payloads to the backend

### [comp/forwarder/orchestrator](https://pkg.go.dev/github.com/DataDog/dd-agent-comp-experiments/comp/forwarder/orchestrator)

*Datadog Team*: agent-metrics-logs

Package orchestrator implements the orchestrator forwarder component.

## [comp/languagedetection](https://pkg.go.dev/github.com/DataDog/dd-agent-comp-experiments/comp/languagedetection) (Component Bundle)

*Datadog Team*: container-integrations
Expand Down
2 changes: 2 additions & 0 deletions comp/aggregator/bundle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer"
"github.com/DataDog/datadog-agent/comp/core"
"github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"
orchestratorForwarderImpl "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator/orchestratorimpl"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
"go.uber.org/fx"
)
Expand All @@ -19,6 +20,7 @@ func TestBundleDependencies(t *testing.T) {
fxutil.TestBundle(t, Bundle,
core.MockBundle,
defaultforwarder.MockModule,
orchestratorForwarderImpl.MockModule,
fx.Supply(demultiplexer.Params{}),
)
}
7 changes: 5 additions & 2 deletions comp/aggregator/demultiplexer/demultiplexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/DataDog/datadog-agent/comp/aggregator/diagnosesendermanager"
"github.com/DataDog/datadog-agent/comp/core/log"
"github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"
orchestratorforwarder "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator"
"github.com/DataDog/datadog-agent/pkg/aggregator"
"github.com/DataDog/datadog-agent/pkg/aggregator/sender"
"github.com/DataDog/datadog-agent/pkg/util/hostname"
Expand All @@ -19,8 +20,9 @@ import (

type dependencies struct {
fx.In
Log log.Component
SharedForwarder defaultforwarder.Component
Log log.Component
SharedForwarder defaultforwarder.Component
OrchestratorForwarder orchestratorforwarder.Component

Params Params
}
Expand Down Expand Up @@ -57,6 +59,7 @@ func newDemultiplexer(deps dependencies) (provides, error) {
agentDemultiplexer := aggregator.InitAndStartAgentDemultiplexer(
deps.Log,
deps.SharedForwarder,
deps.OrchestratorForwarder,
deps.Params.Options,
hostnameDetected)
demultiplexer := demultiplexer{
Expand Down
10 changes: 8 additions & 2 deletions comp/aggregator/demultiplexer/demultiplexer_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
package demultiplexer

import (
"github.com/DataDog/datadog-agent/comp/core/log"
"github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"
"github.com/DataDog/datadog-agent/pkg/aggregator"
"github.com/DataDog/datadog-agent/pkg/aggregator/sender"
"go.uber.org/fx"
Expand Down Expand Up @@ -35,15 +37,19 @@ func (m *mock) LazyGetSenderManager() (sender.SenderManager, error) {

type mockDependencies struct {
fx.In
AggregatorDeps aggregator.TestDeps
Log log.Component
}

func newMock(deps mockDependencies) (Component, Mock) {
opts := aggregator.DefaultAgentDemultiplexerOptions()
opts.DontStartForwarders = true

aggDeps := aggregator.TestDeps{
Log: deps.Log,
SharedForwarder: defaultforwarder.NoopForwarder{},
}
demultiplexer := demultiplexer{
AgentDemultiplexer: aggregator.InitAndStartAgentDemultiplexerForTest(deps.AggregatorDeps, opts, ""),
AgentDemultiplexer: aggregator.InitAndStartAgentDemultiplexerForTest(aggDeps, opts, ""),
}

instance := &mock{Component: demultiplexer}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,15 @@ func (sender *diagnoseSenderManager) LazyGetSenderManager() (sender.SenderManage
opts.FlushInterval = 0
opts.DontStartForwarders = true
opts.UseNoopEventPlatformForwarder = true
opts.UseNoopOrchestratorForwarder = true

log := sender.deps.Log
config := sender.deps.Config
forwarder := defaultforwarder.NewDefaultForwarder(config, log, defaultforwarder.NewOptions(config, log, nil))
orchestratorForwarder := optional.NewOption[defaultforwarder.Forwarder](defaultforwarder.NoopForwarder{})
senderManager = aggregator.InitAndStartAgentDemultiplexer(
log,
forwarder,
&orchestratorForwarder,
opts,
hostnameDetected)

Expand Down
14 changes: 6 additions & 8 deletions comp/dogstatsd/server/server_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,23 @@ import (

"github.com/DataDog/datadog-agent/comp/core/config"
"github.com/DataDog/datadog-agent/comp/core/log"
forwarder "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"
"github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"
"github.com/DataDog/datadog-agent/pkg/aggregator"
pkgconfig "github.com/DataDog/datadog-agent/pkg/config"
"github.com/DataDog/datadog-agent/pkg/metrics"
"github.com/DataDog/datadog-agent/pkg/util/optional"

"github.com/DataDog/datadog-agent/comp/dogstatsd/packets"
)

func mockDemultiplexer(config config.Component, log log.Component) aggregator.Demultiplexer {
return mockDemultiplexerWithFlushInterval(config, log, time.Millisecond*10)
}

func mockDemultiplexerWithFlushInterval(config config.Component, log log.Component, d time.Duration) aggregator.Demultiplexer {
d := time.Millisecond * 10
opts := aggregator.DefaultAgentDemultiplexerOptions()
opts.FlushInterval = d
opts.DontStartForwarders = true
forwarder := forwarder.NewDefaultForwarder(config, log, forwarder.NewOptions(config, log, nil))

demux := aggregator.InitAndStartAgentDemultiplexer(log, forwarder, opts, "hostname")
forwarder := defaultforwarder.NewDefaultForwarder(config, log, defaultforwarder.NewOptions(config, log, nil))
orchestratorForwarder := optional.NewOption[defaultforwarder.Forwarder](defaultforwarder.NoopForwarder{})
demux := aggregator.InitAndStartAgentDemultiplexer(log, forwarder, &orchestratorForwarder, opts, "hostname")
return demux
}

Expand Down
20 changes: 20 additions & 0 deletions comp/forwarder/orchestrator/component.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2023-present Datadog, Inc.

// Package orchestrator implements the orchestrator forwarder component.
package orchestrator

import "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"

// team: agent-metrics-logs

// Component is the component type.
type Component interface {
hush-hush marked this conversation as resolved.
Show resolved Hide resolved
Get() (defaultforwarder.Forwarder, bool)
ogaca-dd marked this conversation as resolved.
Show resolved Hide resolved

// TODO: (components): This function is used to know if Stop was already called.
// Remove it when Stop is not part of this interface.
Reset()
hush-hush marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2023-present Datadog, Inc.

//go:build !orchestrator

// Package orchestratorimpl implements the orchestrator forwarder component.
package orchestratorimpl

import (
"go.uber.org/fx"

"github.com/DataDog/datadog-agent/comp/core/config"
"github.com/DataDog/datadog-agent/comp/core/log"
"github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"
"github.com/DataDog/datadog-agent/comp/forwarder/orchestrator"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
"github.com/DataDog/datadog-agent/pkg/util/optional"
)

// Module defines the fx options for this component.
var Module = fxutil.Component(
fx.Provide(newOrchestratorForwarder),
)

// newOrchestratorForwarder builds the orchestrator forwarder.
// This func has been extracted in this file to not include all the orchestrator
// dependencies (k8s, several MBs) while building binaries not needing these.
func newOrchestratorForwarder(_ log.Component, _ config.Component, params Params) orchestrator.Component {
if params.UseNoopOrchestratorForwarder {
forwarder := optional.NewOption[defaultforwarder.Forwarder](defaultforwarder.NoopForwarder{})
return &forwarder
}
forwarder := optional.NewNoneOption[defaultforwarder.Forwarder]()
return &forwarder
ogaca-dd marked this conversation as resolved.
Show resolved Hide resolved
}
Loading