Skip to content

Commit

Permalink
Factorize all the aggregator, forwarder and sender code used to send …
Browse files Browse the repository at this point in the history
…`container_lifecycle`, `container_image` and `sbom` payloads. (#16084)
  • Loading branch information
L3n41c authored Mar 17, 2023
1 parent 66a5485 commit 2a42758
Show file tree
Hide file tree
Showing 56 changed files with 207 additions and 972 deletions.
3 changes: 0 additions & 3 deletions cmd/agent/subcommands/run/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,9 +397,6 @@ func startAgent(cliParams *cliParams, flare flare.Component, sysprobeconfig sysp
forwarderOpts.EnabledFeatures = forwarder.SetFeature(forwarderOpts.EnabledFeatures, forwarder.CoreFeatures)
opts := aggregator.DefaultAgentDemultiplexerOptions(forwarderOpts)
opts.EnableNoAggregationPipeline = pkgconfig.Datadog.GetBool("dogstatsd_no_aggregation_pipeline")
opts.UseContainerLifecycleForwarder = pkgconfig.Datadog.GetBool("container_lifecycle.enabled")
opts.UseContainerImageForwarder = pkgconfig.Datadog.GetBool("container_image.enabled")
opts.UseSBOMForwarder = pkgconfig.Datadog.GetBool("sbom.enabled")
demux = aggregator.InitAndStartAgentDemultiplexer(opts, hostnameDetected)

// Setup stats telemetry handler
Expand Down
5 changes: 3 additions & 2 deletions cmd/cluster-agent-cloudfoundry/subcommands/run/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package run
import (
"context"
"fmt"

"github.com/DataDog/datadog-agent/cmd/agent/common"
"github.com/DataDog/datadog-agent/cmd/cluster-agent-cloudfoundry/command"
"github.com/DataDog/datadog-agent/cmd/cluster-agent/api"
Expand All @@ -35,12 +36,13 @@ import (
"github.com/gorilla/mux"
"github.com/spf13/cobra"

"go.uber.org/fx"
"os"
"os/signal"
"regexp"
"syscall"
"time"

"go.uber.org/fx"
)

// Commands returns a slice of subcommands for the 'cluster-agent-cloudfoundry' command.
Expand Down Expand Up @@ -99,7 +101,6 @@ func run(log log.Component, config config.Component, cliParams *command.GlobalPa
opts := aggregator.DefaultAgentDemultiplexerOptions(forwarderOpts)
opts.UseEventPlatformForwarder = false
opts.UseOrchestratorForwarder = false
opts.UseContainerLifecycleForwarder = false
demux := aggregator.InitAndStartAgentDemultiplexer(opts, hname)
demux.AddAgentStartupTelemetry(fmt.Sprintf("%s - Datadog Cluster Agent", version.AgentVersion))

Expand Down
1 change: 0 additions & 1 deletion cmd/cluster-agent/subcommands/start/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ func start(log log.Component, config config.Component, cliParams *command.Global
forwarderOpts.DisableAPIKeyChecking = true
opts := aggregator.DefaultAgentDemultiplexerOptions(forwarderOpts)
opts.UseEventPlatformForwarder = false
opts.UseContainerLifecycleForwarder = false
demux := aggregator.InitAndStartAgentDemultiplexer(opts, hname)
demux.AddAgentStartupTelemetry(fmt.Sprintf("%s - Datadog Cluster Agent", version.AgentVersion))

Expand Down
1 change: 0 additions & 1 deletion cmd/dogstatsd/subcommands/start/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ func RunAgent(ctx context.Context, cliParams *CLIParams, config config.Component
opts := aggregator.DefaultAgentDemultiplexerOptions(forwarderOpts)
opts.UseOrchestratorForwarder = false
opts.UseEventPlatformForwarder = false
opts.UseContainerLifecycleForwarder = false
opts.EnableNoAggregationPipeline = config.GetBool("dogstatsd_no_aggregation_pipeline")
hname, err := hostname.Get(context.TODO())
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion cmd/security-agent/subcommands/start/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ func RunAgent(ctx context.Context, log log.Component, config config.Component, p
opts := aggregator.DefaultAgentDemultiplexerOptions(forwarderOpts)
opts.UseEventPlatformForwarder = false
opts.UseOrchestratorForwarder = false
opts.UseContainerLifecycleForwarder = false
demux := aggregator.InitAndStartAgentDemultiplexer(opts, hostnameDetected)
demux.AddAgentStartupTelemetry(fmt.Sprintf("%s - Datadog Security Agent", version.AgentVersion))

Expand Down
132 changes: 1 addition & 131 deletions pkg/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,6 @@ var (
aggregatorDogstatsdContextsByMtype = []expvar.Int{}
aggregatorEventPlatformEvents = expvar.Map{}
aggregatorEventPlatformEventsErrors = expvar.Map{}
aggregatorContainerLifecycleEvents = expvar.Int{}
aggregatorContainerLifecycleEventsErrors = expvar.Int{}
aggregatorContainerImages = expvar.Int{}
aggregatorContainerImagesErrors = expvar.Int{}
aggregatorSBOM = expvar.Int{}
aggregatorSBOMErrors = expvar.Int{}

tlmFlush = telemetry.NewCounter("aggregator", "flush",
[]string{"data_type", "state"}, "Number of metrics/service checks/events flushed")
Expand Down Expand Up @@ -181,12 +175,6 @@ func init() {
aggregatorExpvars.Set("DogstatsdContexts", &aggregatorDogstatsdContexts)
aggregatorExpvars.Set("EventPlatformEvents", &aggregatorEventPlatformEvents)
aggregatorExpvars.Set("EventPlatformEventsErrors", &aggregatorEventPlatformEventsErrors)
aggregatorExpvars.Set("ContainerLifecycleEvents", &aggregatorContainerLifecycleEvents)
aggregatorExpvars.Set("ContainerLifecycleEventsErrors", &aggregatorContainerLifecycleEventsErrors)
aggregatorExpvars.Set("ContainerImages", &aggregatorContainerImages)
aggregatorExpvars.Set("ContainerImagesErrors", &aggregatorContainerImagesErrors)
aggregatorExpvars.Set("SBOM", &aggregatorSBOM)
aggregatorExpvars.Set("SBOMErrors", &aggregatorSBOMErrors)

contextsByMtypeMap := expvar.Map{}
aggregatorDogstatsdContextsByMtype = make([]expvar.Int, int(metrics.NumMetricTypes))
Expand Down Expand Up @@ -215,21 +203,6 @@ type BufferedAggregator struct {
orchestratorManifestIn chan senderOrchestratorManifest
eventPlatformIn chan senderEventPlatformEvent

contLcycleIn chan senderContainerLifecycleEvent
contLcycleBuffer chan senderContainerLifecycleEvent
contLcycleStopper chan struct{}
contLcycleDequeueOnce sync.Once

contImageIn chan senderContainerImage
contImageBuffer chan senderContainerImage
contImageStopper chan struct{}
contImageDequeueOnce sync.Once

sbomIn chan senderSBOM
sbomBuffer chan senderSBOM
sbomStopper chan struct{}
sbomDequeueOnce sync.Once

// metricSamplePool is a pool of slices of metric sample to avoid allocations.
// Used by the Dogstatsd Batcher.
MetricSamplePool *metrics.MetricSamplePool
Expand Down Expand Up @@ -305,18 +278,6 @@ func NewBufferedAggregator(s serializer.MetricSerializer, eventPlatformForwarder
orchestratorManifestIn: make(chan senderOrchestratorManifest, bufferSize),
eventPlatformIn: make(chan senderEventPlatformEvent, bufferSize),

contLcycleIn: make(chan senderContainerLifecycleEvent, bufferSize),
contLcycleBuffer: make(chan senderContainerLifecycleEvent, bufferSize),
contLcycleStopper: make(chan struct{}),

contImageIn: make(chan senderContainerImage, bufferSize),
contImageBuffer: make(chan senderContainerImage, bufferSize),
contImageStopper: make(chan struct{}),

sbomIn: make(chan senderSBOM, bufferSize),
sbomBuffer: make(chan senderSBOM, bufferSize),
sbomStopper: make(chan struct{}),

tagsStore: tagsStore,
checkSamplers: make(map[check.ID]*CheckSampler),
flushInterval: flushInterval,
Expand Down Expand Up @@ -453,7 +414,7 @@ func (agg *BufferedAggregator) handleEventPlatformEvent(event senderEventPlatfor
if agg.eventPlatformForwarder == nil {
return errors.New("event platform forwarder not initialized")
}
m := &message.Message{Content: []byte(event.rawEvent)}
m := &message.Message{Content: event.rawEvent}
// eventPlatformForwarder is threadsafe so no locking needed here
return agg.eventPlatformForwarder.SendEventPlatformEvent(m, event.eventType)
}
Expand Down Expand Up @@ -732,7 +693,6 @@ func (agg *BufferedAggregator) Flush(trigger flushTrigger) {
// Stop stops the aggregator.
func (agg *BufferedAggregator) Stop() {
agg.stopChan <- struct{}{}
close(agg.contLcycleStopper)
}

func (agg *BufferedAggregator) run() {
Expand Down Expand Up @@ -811,100 +771,10 @@ func (agg *BufferedAggregator) run() {
}
}
tlmFlush.Add(1, event.eventType, state)
case event := <-agg.contLcycleIn:
aggregatorContainerLifecycleEvents.Add(1)
agg.handleContainerLifecycleEvent(event)
case event := <-agg.contImageIn:
aggregatorContainerImages.Add(1)
agg.handleContainerImage(event)
case event := <-agg.sbomIn:
aggregatorSBOM.Add(1)
agg.handleSBOM(event)
}
}
}

// dequeueContainerLifecycleEvents consumes buffered container lifecycle events.
// It is blocking so it should be started in its own routine and only one instance should be started.
func (agg *BufferedAggregator) dequeueContainerLifecycleEvents() {
for {
select {
case event := <-agg.contLcycleBuffer:
if err := agg.serializer.SendContainerLifecycleEvent(event.msgs, agg.hostname); err != nil {
aggregatorContainerLifecycleEventsErrors.Add(1)
log.Warnf("Error submitting container lifecycle data: %v", err)
}
case <-agg.contLcycleStopper:
return
}
}
}

// dequeueContainerImages consumes buffered container image.
// It is blocking so it should be started in its own routine and only one instance should be started.
func (agg *BufferedAggregator) dequeueContainerImages() {
for {
select {
case event := <-agg.contImageBuffer:
if err := agg.serializer.SendContainerImage(event.msgs, agg.hostname); err != nil {
aggregatorContainerImagesErrors.Add(1)
log.Warnf("Error submitting container image data: %v", err)
}
case <-agg.contImageStopper:
return
}
}
}

// dequeueSBOM consumes buffered SBOM.
// It is blocking so it should be started in its own routine and only one instance should be started.
func (agg *BufferedAggregator) dequeueSBOM() {
for {
select {
case event := <-agg.sbomBuffer:
if err := agg.serializer.SendSBOM(event.msgs, agg.hostname); err != nil {
aggregatorSBOMErrors.Add(1)
log.Warnf("Error submitting SBOM data: %v", err)
}
case <-agg.sbomStopper:
return
}
}
}

// handleContainerLifecycleEvent forwards container lifecycle events to the buffering channel.
func (agg *BufferedAggregator) handleContainerLifecycleEvent(event senderContainerLifecycleEvent) {
select {
case agg.contLcycleBuffer <- event:
return
default:
aggregatorContainerLifecycleEventsErrors.Add(1)
log.Warn("Container lifecycle events channel is full")
}
}

// handleContainerImage forwards container image to the buffering channel.
func (agg *BufferedAggregator) handleContainerImage(event senderContainerImage) {
select {
case agg.contImageBuffer <- event:
return
default:
aggregatorContainerImagesErrors.Add(1)
log.Warn("Container image channel is full")
}
}

// handleSBOM forwards SBOM to the buffering channel.
func (agg *BufferedAggregator) handleSBOM(event senderSBOM) {
select {
case agg.sbomBuffer <- event:
return
default:
aggregatorSBOMErrors.Add(1)
log.Warn("SBOM channel is full")
}
}

// tags returns the list of tags that should be added to the agent telemetry metrics
// Container agent tags may be missing in the first seconds after agent startup
func (agg *BufferedAggregator) tags(withVersion bool) []string {
Expand Down
Loading

0 comments on commit 2a42758

Please sign in to comment.