Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make # of log pipelines configurable and default to GOMAXPROCS #31190

Merged
merged 7 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion comp/logs/agent/agentimpl/agent_core_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (a *logAgent) SetupPipeline(processingRules []*config.ProcessingRule, wmeta
diagnosticMessageReceiver := diagnostic.NewBufferedMessageReceiver(nil, a.hostname)

// setup the pipeline provider that provides pairs of processor and sender
pipelineProvider := pipeline.NewProvider(config.NumberOfPipelines, auditor, diagnosticMessageReceiver, processingRules, a.endpoints, destinationsCtx, NewStatusProvider(), a.hostname, a.config)
pipelineProvider := pipeline.NewProvider(a.config.GetInt("logs_config.pipelines"), auditor, diagnosticMessageReceiver, processingRules, a.endpoints, destinationsCtx, NewStatusProvider(), a.hostname, a.config)

// setup the launchers
lnchrs := launchers.NewLaunchers(a.sources, pipelineProvider, auditor, a.tracker)
Expand Down
2 changes: 1 addition & 1 deletion comp/logs/agent/agentimpl/agent_serverless_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (a *logAgent) SetupPipeline(
destinationsCtx := client.NewDestinationsContext()

// setup the pipeline provider that provides pairs of processor and sender
pipelineProvider := pipeline.NewServerlessProvider(config.NumberOfPipelines, a.auditor, diagnosticMessageReceiver, processingRules, a.endpoints, destinationsCtx, NewStatusProvider(), a.hostname, a.config)
pipelineProvider := pipeline.NewServerlessProvider(a.config.GetInt("logs_config.pipelines"), a.auditor, diagnosticMessageReceiver, processingRules, a.endpoints, destinationsCtx, NewStatusProvider(), a.hostname, a.config)

lnchrs := launchers.NewLaunchers(a.sources, pipelineProvider, a.auditor, a.tracker)
lnchrs.AddLauncher(channel.NewLauncher())
Expand Down
5 changes: 0 additions & 5 deletions comp/logs/agent/config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@

package config

// Pipeline constraints
const (
NumberOfPipelines = 4
)

const (
// DateFormat is the default date format.
DateFormat = "2006-01-02T15:04:05.000000000Z"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (a *Agent) SetupPipeline(
destinationsCtx := client.NewDestinationsContext()

// setup the pipeline provider that provides pairs of processor and sender
pipelineProvider := pipeline.NewProvider(config.NumberOfPipelines, auditor, &diagnostic.NoopMessageReceiver{}, processingRules, a.endpoints, destinationsCtx, NewStatusProvider(), a.hostname, a.config)
pipelineProvider := pipeline.NewProvider(a.config.GetInt("logs_config.pipelines"), auditor, &diagnostic.NoopMessageReceiver{}, processingRules, a.endpoints, destinationsCtx, NewStatusProvider(), a.hostname, a.config)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to be clear, if this value is not set by the user, the default is still applied? I want to make sure because we use this dependency in the upstream collector and there wouldn't be a way for the user to specify it currently

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the collector uses the same default configurations as the agent uses - yes it will use the default value here. The default was just moved to the config instead of being hard coded so a user could configure it if they wanted.

To be safe (if you want) - I could return this line back to the original default (4) as I did for the security and compliance agent.


a.auditor = auditor
a.destinationsCtx = destinationsCtx
Expand Down
2 changes: 1 addition & 1 deletion pkg/compliance/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func NewLogReporter(hostname string, sourceName, sourceType string, endpoints *c
auditor.Start()

// setup the pipeline provider that provides pairs of processor and sender
pipelineProvider := pipeline.NewProvider(config.NumberOfPipelines, auditor, &diagnostic.NoopMessageReceiver{}, nil, endpoints, dstcontext, agentimpl.NewStatusProvider(), hostnameimpl.NewHostnameService(), pkgconfigsetup.Datadog())
pipelineProvider := pipeline.NewProvider(4, auditor, &diagnostic.NoopMessageReceiver{}, nil, endpoints, dstcontext, agentimpl.NewStatusProvider(), hostnameimpl.NewHostnameService(), pkgconfigsetup.Datadog())
pipelineProvider.Start()

logSource := sources.NewLogSource(
Expand Down
4 changes: 4 additions & 0 deletions pkg/config/setup/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1555,6 +1555,10 @@ func logsagent(config pkgconfigmodel.Setup) {
// Add a tag to logs that are truncated by the agent
config.BindEnvAndSetDefault("logs_config.tag_truncated_logs", false)

// Number of logs pipeline instances. Defaults to number of logical CPU cores as defined by GOMAXPROCS or 8, whichever is lower.
logsPipelines := min(8, runtime.GOMAXPROCS(0))
config.BindEnvAndSetDefault("logs_config.pipelines", logsPipelines)

// If true, the agent looks for container logs in the location used by podman, rather
// than docker. This is a temporary configuration parameter to support podman logs until
// a more substantial refactor of autodiscovery is made to determine this automatically.
Expand Down
2 changes: 1 addition & 1 deletion pkg/security/reporter/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func newReporter(hostname string, stopper startstop.Stopper, sourceName, sourceT
stopper.Add(auditor)

// setup the pipeline provider that provides pairs of processor and sender
pipelineProvider := pipeline.NewProvider(logsconfig.NumberOfPipelines, auditor, &diagnostic.NoopMessageReceiver{}, nil, endpoints, context, agentimpl.NewStatusProvider(), hostnameimpl.NewHostnameService(), pkgconfigsetup.Datadog())
pipelineProvider := pipeline.NewProvider(4, auditor, &diagnostic.NoopMessageReceiver{}, nil, endpoints, context, agentimpl.NewStatusProvider(), hostnameimpl.NewHostnameService(), pkgconfigsetup.Datadog())
pipelineProvider.Start()
stopper.Add(pipelineProvider)

Expand Down
Loading