Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Factorize all the aggregator, forwarder and sender code used to send container_lifecycle, container_image and sbom payloads. #16084

Merged
merged 4 commits into from
Mar 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions cmd/agent/subcommands/run/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,9 +397,6 @@ func startAgent(cliParams *cliParams, flare flare.Component, sysprobeconfig sysp
forwarderOpts.EnabledFeatures = forwarder.SetFeature(forwarderOpts.EnabledFeatures, forwarder.CoreFeatures)
opts := aggregator.DefaultAgentDemultiplexerOptions(forwarderOpts)
opts.EnableNoAggregationPipeline = pkgconfig.Datadog.GetBool("dogstatsd_no_aggregation_pipeline")
opts.UseContainerLifecycleForwarder = pkgconfig.Datadog.GetBool("container_lifecycle.enabled")
opts.UseContainerImageForwarder = pkgconfig.Datadog.GetBool("container_image.enabled")
opts.UseSBOMForwarder = pkgconfig.Datadog.GetBool("sbom.enabled")
demux = aggregator.InitAndStartAgentDemultiplexer(opts, hostnameDetected)

// Setup stats telemetry handler
Expand Down
5 changes: 3 additions & 2 deletions cmd/cluster-agent-cloudfoundry/subcommands/run/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package run
import (
"context"
"fmt"

"github.com/DataDog/datadog-agent/cmd/agent/common"
"github.com/DataDog/datadog-agent/cmd/cluster-agent-cloudfoundry/command"
"github.com/DataDog/datadog-agent/cmd/cluster-agent/api"
Expand All @@ -35,12 +36,13 @@ import (
"github.com/gorilla/mux"
"github.com/spf13/cobra"

"go.uber.org/fx"
"os"
"os/signal"
"regexp"
"syscall"
"time"

"go.uber.org/fx"
L3n41c marked this conversation as resolved.
Show resolved Hide resolved
)

// Commands returns a slice of subcommands for the 'cluster-agent-cloudfoundry' command.
Expand Down Expand Up @@ -99,7 +101,6 @@ func run(log log.Component, config config.Component, cliParams *command.GlobalPa
opts := aggregator.DefaultAgentDemultiplexerOptions(forwarderOpts)
opts.UseEventPlatformForwarder = false
opts.UseOrchestratorForwarder = false
opts.UseContainerLifecycleForwarder = false
demux := aggregator.InitAndStartAgentDemultiplexer(opts, hname)
demux.AddAgentStartupTelemetry(fmt.Sprintf("%s - Datadog Cluster Agent", version.AgentVersion))

Expand Down
1 change: 0 additions & 1 deletion cmd/cluster-agent/subcommands/start/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ func start(log log.Component, config config.Component, cliParams *command.Global
forwarderOpts.DisableAPIKeyChecking = true
opts := aggregator.DefaultAgentDemultiplexerOptions(forwarderOpts)
opts.UseEventPlatformForwarder = false
opts.UseContainerLifecycleForwarder = false
demux := aggregator.InitAndStartAgentDemultiplexer(opts, hname)
demux.AddAgentStartupTelemetry(fmt.Sprintf("%s - Datadog Cluster Agent", version.AgentVersion))

Expand Down
1 change: 0 additions & 1 deletion cmd/dogstatsd/subcommands/start/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ func RunAgent(ctx context.Context, cliParams *CLIParams, config config.Component
opts := aggregator.DefaultAgentDemultiplexerOptions(forwarderOpts)
opts.UseOrchestratorForwarder = false
opts.UseEventPlatformForwarder = false
opts.UseContainerLifecycleForwarder = false
opts.EnableNoAggregationPipeline = config.GetBool("dogstatsd_no_aggregation_pipeline")
hname, err := hostname.Get(context.TODO())
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion cmd/security-agent/subcommands/start/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ func RunAgent(ctx context.Context, log log.Component, config config.Component, p
opts := aggregator.DefaultAgentDemultiplexerOptions(forwarderOpts)
opts.UseEventPlatformForwarder = false
opts.UseOrchestratorForwarder = false
opts.UseContainerLifecycleForwarder = false
demux := aggregator.InitAndStartAgentDemultiplexer(opts, hostnameDetected)
demux.AddAgentStartupTelemetry(fmt.Sprintf("%s - Datadog Security Agent", version.AgentVersion))

Expand Down
132 changes: 1 addition & 131 deletions pkg/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,6 @@ var (
aggregatorDogstatsdContextsByMtype = []expvar.Int{}
aggregatorEventPlatformEvents = expvar.Map{}
aggregatorEventPlatformEventsErrors = expvar.Map{}
aggregatorContainerLifecycleEvents = expvar.Int{}
aggregatorContainerLifecycleEventsErrors = expvar.Int{}
aggregatorContainerImages = expvar.Int{}
aggregatorContainerImagesErrors = expvar.Int{}
aggregatorSBOM = expvar.Int{}
aggregatorSBOMErrors = expvar.Int{}

tlmFlush = telemetry.NewCounter("aggregator", "flush",
[]string{"data_type", "state"}, "Number of metrics/service checks/events flushed")
Expand Down Expand Up @@ -181,12 +175,6 @@ func init() {
aggregatorExpvars.Set("DogstatsdContexts", &aggregatorDogstatsdContexts)
aggregatorExpvars.Set("EventPlatformEvents", &aggregatorEventPlatformEvents)
aggregatorExpvars.Set("EventPlatformEventsErrors", &aggregatorEventPlatformEventsErrors)
aggregatorExpvars.Set("ContainerLifecycleEvents", &aggregatorContainerLifecycleEvents)
aggregatorExpvars.Set("ContainerLifecycleEventsErrors", &aggregatorContainerLifecycleEventsErrors)
aggregatorExpvars.Set("ContainerImages", &aggregatorContainerImages)
aggregatorExpvars.Set("ContainerImagesErrors", &aggregatorContainerImagesErrors)
aggregatorExpvars.Set("SBOM", &aggregatorSBOM)
aggregatorExpvars.Set("SBOMErrors", &aggregatorSBOMErrors)

contextsByMtypeMap := expvar.Map{}
aggregatorDogstatsdContextsByMtype = make([]expvar.Int, int(metrics.NumMetricTypes))
Expand Down Expand Up @@ -215,21 +203,6 @@ type BufferedAggregator struct {
orchestratorManifestIn chan senderOrchestratorManifest
eventPlatformIn chan senderEventPlatformEvent

contLcycleIn chan senderContainerLifecycleEvent
contLcycleBuffer chan senderContainerLifecycleEvent
contLcycleStopper chan struct{}
contLcycleDequeueOnce sync.Once

contImageIn chan senderContainerImage
contImageBuffer chan senderContainerImage
contImageStopper chan struct{}
contImageDequeueOnce sync.Once

sbomIn chan senderSBOM
sbomBuffer chan senderSBOM
sbomStopper chan struct{}
sbomDequeueOnce sync.Once

// metricSamplePool is a pool of slices of metric sample to avoid allocations.
// Used by the Dogstatsd Batcher.
MetricSamplePool *metrics.MetricSamplePool
Expand Down Expand Up @@ -305,18 +278,6 @@ func NewBufferedAggregator(s serializer.MetricSerializer, eventPlatformForwarder
orchestratorManifestIn: make(chan senderOrchestratorManifest, bufferSize),
eventPlatformIn: make(chan senderEventPlatformEvent, bufferSize),

contLcycleIn: make(chan senderContainerLifecycleEvent, bufferSize),
contLcycleBuffer: make(chan senderContainerLifecycleEvent, bufferSize),
contLcycleStopper: make(chan struct{}),

contImageIn: make(chan senderContainerImage, bufferSize),
contImageBuffer: make(chan senderContainerImage, bufferSize),
contImageStopper: make(chan struct{}),

sbomIn: make(chan senderSBOM, bufferSize),
sbomBuffer: make(chan senderSBOM, bufferSize),
sbomStopper: make(chan struct{}),

tagsStore: tagsStore,
checkSamplers: make(map[check.ID]*CheckSampler),
flushInterval: flushInterval,
Expand Down Expand Up @@ -453,7 +414,7 @@ func (agg *BufferedAggregator) handleEventPlatformEvent(event senderEventPlatfor
if agg.eventPlatformForwarder == nil {
return errors.New("event platform forwarder not initialized")
}
m := &message.Message{Content: []byte(event.rawEvent)}
m := &message.Message{Content: event.rawEvent}
// eventPlatformForwarder is threadsafe so no locking needed here
return agg.eventPlatformForwarder.SendEventPlatformEvent(m, event.eventType)
}
Expand Down Expand Up @@ -732,7 +693,6 @@ func (agg *BufferedAggregator) Flush(trigger flushTrigger) {
// Stop stops the aggregator.
func (agg *BufferedAggregator) Stop() {
agg.stopChan <- struct{}{}
close(agg.contLcycleStopper)
}

func (agg *BufferedAggregator) run() {
Expand Down Expand Up @@ -811,100 +771,10 @@ func (agg *BufferedAggregator) run() {
}
}
tlmFlush.Add(1, event.eventType, state)
case event := <-agg.contLcycleIn:
aggregatorContainerLifecycleEvents.Add(1)
agg.handleContainerLifecycleEvent(event)
case event := <-agg.contImageIn:
aggregatorContainerImages.Add(1)
agg.handleContainerImage(event)
case event := <-agg.sbomIn:
aggregatorSBOM.Add(1)
agg.handleSBOM(event)
}
}
}

// dequeueContainerLifecycleEvents consumes buffered container lifecycle events.
// It is blocking so it should be started in its own routine and only one instance should be started.
func (agg *BufferedAggregator) dequeueContainerLifecycleEvents() {
for {
select {
case event := <-agg.contLcycleBuffer:
if err := agg.serializer.SendContainerLifecycleEvent(event.msgs, agg.hostname); err != nil {
aggregatorContainerLifecycleEventsErrors.Add(1)
log.Warnf("Error submitting container lifecycle data: %v", err)
}
case <-agg.contLcycleStopper:
return
}
}
}

// dequeueContainerImages consumes buffered container image.
// It is blocking so it should be started in its own routine and only one instance should be started.
func (agg *BufferedAggregator) dequeueContainerImages() {
for {
select {
case event := <-agg.contImageBuffer:
if err := agg.serializer.SendContainerImage(event.msgs, agg.hostname); err != nil {
aggregatorContainerImagesErrors.Add(1)
log.Warnf("Error submitting container image data: %v", err)
}
case <-agg.contImageStopper:
return
}
}
}

// dequeueSBOM consumes buffered SBOM.
// It is blocking so it should be started in its own routine and only one instance should be started.
func (agg *BufferedAggregator) dequeueSBOM() {
for {
select {
case event := <-agg.sbomBuffer:
if err := agg.serializer.SendSBOM(event.msgs, agg.hostname); err != nil {
aggregatorSBOMErrors.Add(1)
log.Warnf("Error submitting SBOM data: %v", err)
}
case <-agg.sbomStopper:
return
}
}
}

// handleContainerLifecycleEvent forwards container lifecycle events to the buffering channel.
func (agg *BufferedAggregator) handleContainerLifecycleEvent(event senderContainerLifecycleEvent) {
select {
case agg.contLcycleBuffer <- event:
return
default:
aggregatorContainerLifecycleEventsErrors.Add(1)
log.Warn("Container lifecycle events channel is full")
}
}

// handleContainerImage forwards container image to the buffering channel.
func (agg *BufferedAggregator) handleContainerImage(event senderContainerImage) {
select {
case agg.contImageBuffer <- event:
return
default:
aggregatorContainerImagesErrors.Add(1)
log.Warn("Container image channel is full")
}
}

// handleSBOM forwards SBOM to the buffering channel.
func (agg *BufferedAggregator) handleSBOM(event senderSBOM) {
select {
case agg.sbomBuffer <- event:
return
default:
aggregatorSBOMErrors.Add(1)
log.Warn("SBOM channel is full")
}
}

// tags returns the list of tags that should be added to the agent telemetry metrics
// Container agent tags may be missing in the first seconds after agent startup
func (agg *BufferedAggregator) tags(withVersion bool) []string {
Expand Down
Loading