diff --git a/comp/logs/agent/agentimpl/agent_core_init.go b/comp/logs/agent/agentimpl/agent_core_init.go index 37d5c029cf0f6..fae9804bae9b8 100644 --- a/comp/logs/agent/agentimpl/agent_core_init.go +++ b/comp/logs/agent/agentimpl/agent_core_init.go @@ -46,7 +46,7 @@ func (a *logAgent) SetupPipeline(processingRules []*config.ProcessingRule, wmeta diagnosticMessageReceiver := diagnostic.NewBufferedMessageReceiver(nil, a.hostname) // setup the pipeline provider that provides pairs of processor and sender - pipelineProvider := pipeline.NewProvider(config.NumberOfPipelines, auditor, diagnosticMessageReceiver, processingRules, a.endpoints, destinationsCtx, NewStatusProvider(), a.hostname, a.config) + pipelineProvider := pipeline.NewProvider(a.config.GetInt("logs_config.pipelines"), auditor, diagnosticMessageReceiver, processingRules, a.endpoints, destinationsCtx, NewStatusProvider(), a.hostname, a.config) // setup the launchers lnchrs := launchers.NewLaunchers(a.sources, pipelineProvider, auditor, a.tracker) diff --git a/comp/logs/agent/agentimpl/agent_serverless_init.go b/comp/logs/agent/agentimpl/agent_serverless_init.go index 31dbf3e41d2dc..67711def14029 100644 --- a/comp/logs/agent/agentimpl/agent_serverless_init.go +++ b/comp/logs/agent/agentimpl/agent_serverless_init.go @@ -49,7 +49,7 @@ func (a *logAgent) SetupPipeline( destinationsCtx := client.NewDestinationsContext() // setup the pipeline provider that provides pairs of processor and sender - pipelineProvider := pipeline.NewServerlessProvider(config.NumberOfPipelines, a.auditor, diagnosticMessageReceiver, processingRules, a.endpoints, destinationsCtx, NewStatusProvider(), a.hostname, a.config) + pipelineProvider := pipeline.NewServerlessProvider(a.config.GetInt("logs_config.pipelines"), a.auditor, diagnosticMessageReceiver, processingRules, a.endpoints, destinationsCtx, NewStatusProvider(), a.hostname, a.config) lnchrs := launchers.NewLaunchers(a.sources, pipelineProvider, a.auditor, a.tracker) lnchrs.AddLauncher(channel.NewLauncher()) diff --git a/comp/logs/agent/config/constants.go b/comp/logs/agent/config/constants.go index ae9a0d74680f0..a66b989366370 100644 --- a/comp/logs/agent/config/constants.go +++ b/comp/logs/agent/config/constants.go @@ -5,11 +5,6 @@ package config -// Pipeline constraints -const ( - NumberOfPipelines = 4 -) - const ( // DateFormat is the default date format. DateFormat = "2006-01-02T15:04:05.000000000Z" diff --git a/comp/otelcol/logsagentpipeline/logsagentpipelineimpl/agent.go b/comp/otelcol/logsagentpipeline/logsagentpipelineimpl/agent.go index d1910d28db034..b014d5b31689e 100644 --- a/comp/otelcol/logsagentpipeline/logsagentpipelineimpl/agent.go +++ b/comp/otelcol/logsagentpipeline/logsagentpipelineimpl/agent.go @@ -210,7 +210,7 @@ func (a *Agent) SetupPipeline( destinationsCtx := client.NewDestinationsContext() // setup the pipeline provider that provides pairs of processor and sender - pipelineProvider := pipeline.NewProvider(config.NumberOfPipelines, auditor, &diagnostic.NoopMessageReceiver{}, processingRules, a.endpoints, destinationsCtx, NewStatusProvider(), a.hostname, a.config) + pipelineProvider := pipeline.NewProvider(a.config.GetInt("logs_config.pipelines"), auditor, &diagnostic.NoopMessageReceiver{}, processingRules, a.endpoints, destinationsCtx, NewStatusProvider(), a.hostname, a.config) a.auditor = auditor a.destinationsCtx = destinationsCtx diff --git a/pkg/compliance/reporter.go b/pkg/compliance/reporter.go index 501ca6f10bc43..cf59ee9043489 100644 --- a/pkg/compliance/reporter.go +++ b/pkg/compliance/reporter.go @@ -44,7 +44,7 @@ func NewLogReporter(hostname string, sourceName, sourceType string, endpoints *c auditor.Start() // setup the pipeline provider that provides pairs of processor and sender - pipelineProvider := pipeline.NewProvider(config.NumberOfPipelines, auditor, &diagnostic.NoopMessageReceiver{}, nil, endpoints, dstcontext, agentimpl.NewStatusProvider(), hostnameimpl.NewHostnameService(), pkgconfigsetup.Datadog()) + pipelineProvider := pipeline.NewProvider(4, auditor, &diagnostic.NoopMessageReceiver{}, nil, endpoints, dstcontext, agentimpl.NewStatusProvider(), hostnameimpl.NewHostnameService(), pkgconfigsetup.Datadog()) pipelineProvider.Start() logSource := sources.NewLogSource( diff --git a/pkg/config/setup/config.go b/pkg/config/setup/config.go index 27215e0975243..03e5b6ac7c273 100644 --- a/pkg/config/setup/config.go +++ b/pkg/config/setup/config.go @@ -1555,6 +1555,10 @@ func logsagent(config pkgconfigmodel.Setup) { // Add a tag to logs that are truncated by the agent config.BindEnvAndSetDefault("logs_config.tag_truncated_logs", false) + // Number of logs pipeline instances. Defaults to number of logical CPU cores as defined by GOMAXPROCS or 4, whichever is lower. + logsPipelines := min(4, runtime.GOMAXPROCS(0)) + config.BindEnvAndSetDefault("logs_config.pipelines", logsPipelines) + // If true, the agent looks for container logs in the location used by podman, rather // than docker. This is a temporary configuration parameter to support podman logs until // a more substantial refactor of autodiscovery is made to determine this automatically. diff --git a/pkg/security/reporter/reporter.go b/pkg/security/reporter/reporter.go index 1c8a4f71bf5a0..227f94ac456ed 100644 --- a/pkg/security/reporter/reporter.go +++ b/pkg/security/reporter/reporter.go @@ -52,7 +52,7 @@ func newReporter(hostname string, stopper startstop.Stopper, sourceName, sourceT stopper.Add(auditor) // setup the pipeline provider that provides pairs of processor and sender - pipelineProvider := pipeline.NewProvider(logsconfig.NumberOfPipelines, auditor, &diagnostic.NoopMessageReceiver{}, nil, endpoints, context, agentimpl.NewStatusProvider(), hostnameimpl.NewHostnameService(), pkgconfigsetup.Datadog()) + pipelineProvider := pipeline.NewProvider(4, auditor, &diagnostic.NoopMessageReceiver{}, nil, endpoints, context, agentimpl.NewStatusProvider(), hostnameimpl.NewHostnameService(), pkgconfigsetup.Datadog()) pipelineProvider.Start() stopper.Add(pipelineProvider)