From 2a427580f9ccd31e649bab377c5913932d55184f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=A9na=C3=AFc=20Huard?= Date: Fri, 17 Mar 2023 15:01:29 +0100 Subject: [PATCH] Factorize all the aggregator, forwarder and sender code used to send `container_lifecycle`, `container_image` and `sbom` payloads. (#16084) --- cmd/agent/subcommands/run/command.go | 3 - .../subcommands/run/command.go | 5 +- .../subcommands/start/command.go | 1 - cmd/dogstatsd/subcommands/start/command.go | 1 - .../subcommands/start/command.go | 1 - pkg/aggregator/aggregator.go | 132 +----------------- pkg/aggregator/demultiplexer_agent.go | 108 +++----------- pkg/aggregator/demultiplexer_senders.go | 3 - pkg/aggregator/demultiplexer_serverless.go | 2 +- pkg/aggregator/mocksender/asserts.go | 14 +- pkg/aggregator/mocksender/mocked_methods.go | 17 +-- pkg/aggregator/mocksender/mocksender.go | 4 +- pkg/aggregator/sender.go | 45 +----- pkg/aggregator/sender_test.go | 17 +-- .../corechecks/containerimage/check.go | 6 - .../corechecks/containerimage/processor.go | 18 ++- .../containerimage/processor_test.go | 16 ++- .../containerlifecycle/processor.go | 20 ++- .../containerlifecycle/processor_test.go | 6 +- pkg/collector/corechecks/sbom/check.go | 6 - pkg/collector/corechecks/sbom/processor.go | 24 ++-- .../corechecks/sbom/processor_test.go | 16 ++- .../internal/report/report_device_metadata.go | 2 +- .../report/report_device_metadata_test.go | 8 +- .../corechecks/snmp/profile_metadata_test.go | 2 +- pkg/collector/corechecks/snmp/snmp_test.go | 8 +- pkg/collector/python/aggregator.go | 6 +- pkg/collector/python/init.go | 2 +- pkg/collector/python/test_aggregator.go | 3 +- pkg/config/config.go | 12 +- pkg/containerimage/forwarder.go | 59 -------- pkg/containerlifecycle/forwarder.go | 60 -------- pkg/epforwarder/epforwarder.go | 50 ++++++- pkg/forwarder/endpoints/endpoints.go | 6 - pkg/forwarder/forwarder.go | 21 --- pkg/forwarder/forwarder_test.go | 12 -- pkg/forwarder/noop_forwarder.go | 15 -- pkg/forwarder/sync_forwarder.go | 15 -- pkg/forwarder/test_common.go | 15 -- pkg/logs/client/http/destination.go | 5 +- pkg/netflow/flowaggregator/aggregator.go | 5 +- pkg/netflow/flowaggregator/aggregator_test.go | 4 +- .../serializerexporter/consumer_test.go | 12 -- pkg/sbom/forwarder.go | 59 -------- pkg/serializer/serializer.go | 121 +--------------- pkg/serializer/serializer_test.go | 121 ++-------------- pkg/serializer/test_common.go | 17 +-- pkg/serializer/types_contimage.go | 13 -- pkg/serializer/types_contlcycle.go | 13 -- pkg/serializer/types_sbom.go | 16 --- pkg/snmp/traps/forwarder.go | 2 +- pkg/snmp/traps/forwarder_test.go | 6 +- rtloader/common/builtins/aggregator.c | 13 +- rtloader/common/builtins/aggregator.h | 1 + rtloader/include/rtloader_types.h | 2 +- rtloader/test/aggregator/aggregator.go | 8 +- 56 files changed, 207 insertions(+), 972 deletions(-) delete mode 100644 pkg/containerimage/forwarder.go delete mode 100644 pkg/containerlifecycle/forwarder.go delete mode 100644 pkg/sbom/forwarder.go delete mode 100644 pkg/serializer/types_contimage.go delete mode 100644 pkg/serializer/types_contlcycle.go delete mode 100644 pkg/serializer/types_sbom.go diff --git a/cmd/agent/subcommands/run/command.go b/cmd/agent/subcommands/run/command.go index ed2776ff9a4fe..7114d36b7a864 100644 --- a/cmd/agent/subcommands/run/command.go +++ b/cmd/agent/subcommands/run/command.go @@ -397,9 +397,6 @@ func startAgent(cliParams *cliParams, flare flare.Component, sysprobeconfig sysp forwarderOpts.EnabledFeatures = forwarder.SetFeature(forwarderOpts.EnabledFeatures, forwarder.CoreFeatures) opts := aggregator.DefaultAgentDemultiplexerOptions(forwarderOpts) opts.EnableNoAggregationPipeline = pkgconfig.Datadog.GetBool("dogstatsd_no_aggregation_pipeline") - opts.UseContainerLifecycleForwarder = pkgconfig.Datadog.GetBool("container_lifecycle.enabled") - opts.UseContainerImageForwarder = pkgconfig.Datadog.GetBool("container_image.enabled") - opts.UseSBOMForwarder = pkgconfig.Datadog.GetBool("sbom.enabled") demux = aggregator.InitAndStartAgentDemultiplexer(opts, hostnameDetected) // Setup stats telemetry handler diff --git a/cmd/cluster-agent-cloudfoundry/subcommands/run/command.go b/cmd/cluster-agent-cloudfoundry/subcommands/run/command.go index ee80a8aa2c8be..0ac43d54468c3 100644 --- a/cmd/cluster-agent-cloudfoundry/subcommands/run/command.go +++ b/cmd/cluster-agent-cloudfoundry/subcommands/run/command.go @@ -11,6 +11,7 @@ package run import ( "context" "fmt" + "github.com/DataDog/datadog-agent/cmd/agent/common" "github.com/DataDog/datadog-agent/cmd/cluster-agent-cloudfoundry/command" "github.com/DataDog/datadog-agent/cmd/cluster-agent/api" @@ -35,12 +36,13 @@ import ( "github.com/gorilla/mux" "github.com/spf13/cobra" - "go.uber.org/fx" "os" "os/signal" "regexp" "syscall" "time" + + "go.uber.org/fx" ) // Commands returns a slice of subcommands for the 'cluster-agent-cloudfoundry' command. @@ -99,7 +101,6 @@ func run(log log.Component, config config.Component, cliParams *command.GlobalPa opts := aggregator.DefaultAgentDemultiplexerOptions(forwarderOpts) opts.UseEventPlatformForwarder = false opts.UseOrchestratorForwarder = false - opts.UseContainerLifecycleForwarder = false demux := aggregator.InitAndStartAgentDemultiplexer(opts, hname) demux.AddAgentStartupTelemetry(fmt.Sprintf("%s - Datadog Cluster Agent", version.AgentVersion)) diff --git a/cmd/cluster-agent/subcommands/start/command.go b/cmd/cluster-agent/subcommands/start/command.go index 4fb71ab898d31..dd3663da29a21 100644 --- a/cmd/cluster-agent/subcommands/start/command.go +++ b/cmd/cluster-agent/subcommands/start/command.go @@ -191,7 +191,6 @@ func start(log log.Component, config config.Component, cliParams *command.Global forwarderOpts.DisableAPIKeyChecking = true opts := aggregator.DefaultAgentDemultiplexerOptions(forwarderOpts) opts.UseEventPlatformForwarder = false - opts.UseContainerLifecycleForwarder = false demux := aggregator.InitAndStartAgentDemultiplexer(opts, hname) demux.AddAgentStartupTelemetry(fmt.Sprintf("%s - Datadog Cluster Agent", version.AgentVersion)) diff --git a/cmd/dogstatsd/subcommands/start/command.go b/cmd/dogstatsd/subcommands/start/command.go index 880fa027b9a17..31e88a7283c56 100644 --- a/cmd/dogstatsd/subcommands/start/command.go +++ b/cmd/dogstatsd/subcommands/start/command.go @@ -197,7 +197,6 @@ func RunAgent(ctx context.Context, cliParams *CLIParams, config config.Component opts := aggregator.DefaultAgentDemultiplexerOptions(forwarderOpts) opts.UseOrchestratorForwarder = false opts.UseEventPlatformForwarder = false - opts.UseContainerLifecycleForwarder = false opts.EnableNoAggregationPipeline = config.GetBool("dogstatsd_no_aggregation_pipeline") hname, err := hostname.Get(context.TODO()) if err != nil { diff --git a/cmd/security-agent/subcommands/start/command.go b/cmd/security-agent/subcommands/start/command.go index 093d2b2628fea..eb1d764e13cf2 100644 --- a/cmd/security-agent/subcommands/start/command.go +++ b/cmd/security-agent/subcommands/start/command.go @@ -218,7 +218,6 @@ func RunAgent(ctx context.Context, log log.Component, config config.Component, p opts := aggregator.DefaultAgentDemultiplexerOptions(forwarderOpts) opts.UseEventPlatformForwarder = false opts.UseOrchestratorForwarder = false - opts.UseContainerLifecycleForwarder = false demux := aggregator.InitAndStartAgentDemultiplexer(opts, hostnameDetected) demux.AddAgentStartupTelemetry(fmt.Sprintf("%s - Datadog Security Agent", version.AgentVersion)) diff --git a/pkg/aggregator/aggregator.go b/pkg/aggregator/aggregator.go index e511b32c5a22d..4d127f1656742 100644 --- a/pkg/aggregator/aggregator.go +++ b/pkg/aggregator/aggregator.go @@ -122,12 +122,6 @@ var ( aggregatorDogstatsdContextsByMtype = []expvar.Int{} aggregatorEventPlatformEvents = expvar.Map{} aggregatorEventPlatformEventsErrors = expvar.Map{} - aggregatorContainerLifecycleEvents = expvar.Int{} - aggregatorContainerLifecycleEventsErrors = expvar.Int{} - aggregatorContainerImages = expvar.Int{} - aggregatorContainerImagesErrors = expvar.Int{} - aggregatorSBOM = expvar.Int{} - aggregatorSBOMErrors = expvar.Int{} tlmFlush = telemetry.NewCounter("aggregator", "flush", []string{"data_type", "state"}, "Number of metrics/service checks/events flushed") @@ -181,12 +175,6 @@ func init() { aggregatorExpvars.Set("DogstatsdContexts", &aggregatorDogstatsdContexts) aggregatorExpvars.Set("EventPlatformEvents", &aggregatorEventPlatformEvents) aggregatorExpvars.Set("EventPlatformEventsErrors", &aggregatorEventPlatformEventsErrors) - aggregatorExpvars.Set("ContainerLifecycleEvents", &aggregatorContainerLifecycleEvents) - aggregatorExpvars.Set("ContainerLifecycleEventsErrors", &aggregatorContainerLifecycleEventsErrors) - aggregatorExpvars.Set("ContainerImages", &aggregatorContainerImages) - aggregatorExpvars.Set("ContainerImagesErrors", &aggregatorContainerImagesErrors) - aggregatorExpvars.Set("SBOM", &aggregatorSBOM) - aggregatorExpvars.Set("SBOMErrors", &aggregatorSBOMErrors) contextsByMtypeMap := expvar.Map{} aggregatorDogstatsdContextsByMtype = make([]expvar.Int, int(metrics.NumMetricTypes)) @@ -215,21 +203,6 @@ type BufferedAggregator struct { orchestratorManifestIn chan senderOrchestratorManifest eventPlatformIn chan senderEventPlatformEvent - contLcycleIn chan senderContainerLifecycleEvent - contLcycleBuffer chan senderContainerLifecycleEvent - contLcycleStopper chan struct{} - contLcycleDequeueOnce sync.Once - - contImageIn chan senderContainerImage - contImageBuffer chan senderContainerImage - contImageStopper chan struct{} - contImageDequeueOnce sync.Once - - sbomIn chan senderSBOM - sbomBuffer chan senderSBOM - sbomStopper chan struct{} - sbomDequeueOnce sync.Once - // metricSamplePool is a pool of slices of metric sample to avoid allocations. // Used by the Dogstatsd Batcher. MetricSamplePool *metrics.MetricSamplePool @@ -305,18 +278,6 @@ func NewBufferedAggregator(s serializer.MetricSerializer, eventPlatformForwarder orchestratorManifestIn: make(chan senderOrchestratorManifest, bufferSize), eventPlatformIn: make(chan senderEventPlatformEvent, bufferSize), - contLcycleIn: make(chan senderContainerLifecycleEvent, bufferSize), - contLcycleBuffer: make(chan senderContainerLifecycleEvent, bufferSize), - contLcycleStopper: make(chan struct{}), - - contImageIn: make(chan senderContainerImage, bufferSize), - contImageBuffer: make(chan senderContainerImage, bufferSize), - contImageStopper: make(chan struct{}), - - sbomIn: make(chan senderSBOM, bufferSize), - sbomBuffer: make(chan senderSBOM, bufferSize), - sbomStopper: make(chan struct{}), - tagsStore: tagsStore, checkSamplers: make(map[check.ID]*CheckSampler), flushInterval: flushInterval, @@ -453,7 +414,7 @@ func (agg *BufferedAggregator) handleEventPlatformEvent(event senderEventPlatfor if agg.eventPlatformForwarder == nil { return errors.New("event platform forwarder not initialized") } - m := &message.Message{Content: []byte(event.rawEvent)} + m := &message.Message{Content: event.rawEvent} // eventPlatformForwarder is threadsafe so no locking needed here return agg.eventPlatformForwarder.SendEventPlatformEvent(m, event.eventType) } @@ -732,7 +693,6 @@ func (agg *BufferedAggregator) Flush(trigger flushTrigger) { // Stop stops the aggregator. func (agg *BufferedAggregator) Stop() { agg.stopChan <- struct{}{} - close(agg.contLcycleStopper) } func (agg *BufferedAggregator) run() { @@ -811,100 +771,10 @@ func (agg *BufferedAggregator) run() { } } tlmFlush.Add(1, event.eventType, state) - case event := <-agg.contLcycleIn: - aggregatorContainerLifecycleEvents.Add(1) - agg.handleContainerLifecycleEvent(event) - case event := <-agg.contImageIn: - aggregatorContainerImages.Add(1) - agg.handleContainerImage(event) - case event := <-agg.sbomIn: - aggregatorSBOM.Add(1) - agg.handleSBOM(event) - } - } -} - -// dequeueContainerLifecycleEvents consumes buffered container lifecycle events. -// It is blocking so it should be started in its own routine and only one instance should be started. -func (agg *BufferedAggregator) dequeueContainerLifecycleEvents() { - for { - select { - case event := <-agg.contLcycleBuffer: - if err := agg.serializer.SendContainerLifecycleEvent(event.msgs, agg.hostname); err != nil { - aggregatorContainerLifecycleEventsErrors.Add(1) - log.Warnf("Error submitting container lifecycle data: %v", err) - } - case <-agg.contLcycleStopper: - return - } - } -} - -// dequeueContainerImages consumes buffered container image. -// It is blocking so it should be started in its own routine and only one instance should be started. -func (agg *BufferedAggregator) dequeueContainerImages() { - for { - select { - case event := <-agg.contImageBuffer: - if err := agg.serializer.SendContainerImage(event.msgs, agg.hostname); err != nil { - aggregatorContainerImagesErrors.Add(1) - log.Warnf("Error submitting container image data: %v", err) - } - case <-agg.contImageStopper: - return } } } -// dequeueSBOM consumes buffered SBOM. -// It is blocking so it should be started in its own routine and only one instance should be started. -func (agg *BufferedAggregator) dequeueSBOM() { - for { - select { - case event := <-agg.sbomBuffer: - if err := agg.serializer.SendSBOM(event.msgs, agg.hostname); err != nil { - aggregatorSBOMErrors.Add(1) - log.Warnf("Error submitting SBOM data: %v", err) - } - case <-agg.sbomStopper: - return - } - } -} - -// handleContainerLifecycleEvent forwards container lifecycle events to the buffering channel. -func (agg *BufferedAggregator) handleContainerLifecycleEvent(event senderContainerLifecycleEvent) { - select { - case agg.contLcycleBuffer <- event: - return - default: - aggregatorContainerLifecycleEventsErrors.Add(1) - log.Warn("Container lifecycle events channel is full") - } -} - -// handleContainerImage forwards container image to the buffering channel. -func (agg *BufferedAggregator) handleContainerImage(event senderContainerImage) { - select { - case agg.contImageBuffer <- event: - return - default: - aggregatorContainerImagesErrors.Add(1) - log.Warn("Container image channel is full") - } -} - -// handleSBOM forwards SBOM to the buffering channel. -func (agg *BufferedAggregator) handleSBOM(event senderSBOM) { - select { - case agg.sbomBuffer <- event: - return - default: - aggregatorSBOMErrors.Add(1) - log.Warn("SBOM channel is full") - } -} - // tags returns the list of tags that should be added to the agent telemetry metrics // Container agent tags may be missing in the first seconds after agent startup func (agg *BufferedAggregator) tags(withVersion bool) []string { diff --git a/pkg/aggregator/demultiplexer_agent.go b/pkg/aggregator/demultiplexer_agent.go index dfae7bbccb56a..7a556e87a6942 100644 --- a/pkg/aggregator/demultiplexer_agent.go +++ b/pkg/aggregator/demultiplexer_agent.go @@ -12,12 +12,9 @@ import ( "github.com/DataDog/datadog-agent/pkg/aggregator/internal/tags" "github.com/DataDog/datadog-agent/pkg/config" - "github.com/DataDog/datadog-agent/pkg/containerimage" - "github.com/DataDog/datadog-agent/pkg/containerlifecycle" "github.com/DataDog/datadog-agent/pkg/epforwarder" "github.com/DataDog/datadog-agent/pkg/forwarder" "github.com/DataDog/datadog-agent/pkg/metrics" - "github.com/DataDog/datadog-agent/pkg/sbom" "github.com/DataDog/datadog-agent/pkg/serializer" "github.com/DataDog/datadog-agent/pkg/util/log" ) @@ -57,16 +54,13 @@ type AgentDemultiplexer struct { // AgentDemultiplexerOptions are the options used to initialize a Demultiplexer. type AgentDemultiplexerOptions struct { - SharedForwarderOptions *forwarder.Options - UseNoopForwarder bool - UseNoopEventPlatformForwarder bool - UseNoopOrchestratorForwarder bool - UseEventPlatformForwarder bool - UseOrchestratorForwarder bool - UseContainerLifecycleForwarder bool - UseContainerImageForwarder bool - UseSBOMForwarder bool - FlushInterval time.Duration + SharedForwarderOptions *forwarder.Options + UseNoopForwarder bool + UseNoopEventPlatformForwarder bool + UseNoopOrchestratorForwarder bool + UseEventPlatformForwarder bool + UseOrchestratorForwarder bool + FlushInterval time.Duration EnableNoAggregationPipeline bool @@ -80,16 +74,13 @@ func DefaultAgentDemultiplexerOptions(options *forwarder.Options) AgentDemultipl } return AgentDemultiplexerOptions{ - SharedForwarderOptions: options, - FlushInterval: DefaultFlushInterval, - UseEventPlatformForwarder: true, - UseOrchestratorForwarder: true, - UseNoopForwarder: false, - UseNoopEventPlatformForwarder: false, - UseNoopOrchestratorForwarder: false, - UseContainerLifecycleForwarder: false, - UseContainerImageForwarder: false, - UseSBOMForwarder: false, + SharedForwarderOptions: options, + FlushInterval: DefaultFlushInterval, + UseEventPlatformForwarder: true, + UseOrchestratorForwarder: true, + UseNoopForwarder: false, + UseNoopEventPlatformForwarder: false, + UseNoopOrchestratorForwarder: false, // the different agents/binaries enable it on a per-need basis EnableNoAggregationPipeline: false, } @@ -115,8 +106,6 @@ type forwarders struct { orchestrator forwarder.Forwarder eventPlatform epforwarder.EventPlatformForwarder containerLifecycle *forwarder.DefaultForwarder - containerImage *forwarder.DefaultForwarder - sbom *forwarder.DefaultForwarder } type dataOutputs struct { @@ -165,24 +154,6 @@ func initAgentDemultiplexer(options AgentDemultiplexerOptions, hostname string) eventPlatformForwarder = epforwarder.NewEventPlatformForwarder() } - // setup the container lifecycle events forwarder - var containerLifecycleForwarder *forwarder.DefaultForwarder - if options.UseContainerLifecycleForwarder { - containerLifecycleForwarder = containerlifecycle.NewForwarder() - } - - // setup the container image forwarder - var containerImageForwarder *forwarder.DefaultForwarder - if options.UseContainerImageForwarder { - containerImageForwarder = containerimage.NewForwarder() - } - - // setup the SBOM forwarder - var sbomForwarder *forwarder.DefaultForwarder - if options.UseSBOMForwarder { - sbomForwarder = sbom.NewForwarder() - } - var sharedForwarder forwarder.Forwarder if options.UseNoopForwarder { sharedForwarder = forwarder.NoopForwarder{} @@ -198,7 +169,7 @@ func initAgentDemultiplexer(options AgentDemultiplexerOptions, hostname string) // prepare the serializer // ---------------------- - sharedSerializer := serializer.NewSerializer(sharedForwarder, orchestratorForwarder, containerLifecycleForwarder, containerImageForwarder, sbomForwarder) + sharedSerializer := serializer.NewSerializer(sharedForwarder, orchestratorForwarder) // prepare the embedded aggregator // -- @@ -230,7 +201,7 @@ func initAgentDemultiplexer(options AgentDemultiplexerOptions, hostname string) var noAggWorker *noAggregationStreamWorker var noAggSerializer serializer.MetricSerializer if options.EnableNoAggregationPipeline { - noAggSerializer = serializer.NewSerializer(sharedForwarder, orchestratorForwarder, containerLifecycleForwarder, containerImageForwarder, sbomForwarder) + noAggSerializer = serializer.NewSerializer(sharedForwarder, orchestratorForwarder) noAggWorker = newNoAggregationStreamWorker( config.Datadog.GetInt("dogstatsd_no_aggregation_pipeline_batch_size"), noAggSerializer, @@ -252,12 +223,9 @@ func initAgentDemultiplexer(options AgentDemultiplexerOptions, hostname string) dataOutputs: dataOutputs{ forwarders: forwarders{ - shared: sharedForwarder, - orchestrator: orchestratorForwarder, - eventPlatform: eventPlatformForwarder, - containerLifecycle: containerLifecycleForwarder, - containerImage: containerImageForwarder, - sbom: sbomForwarder, + shared: sharedForwarder, + orchestrator: orchestratorForwarder, + eventPlatform: eventPlatformForwarder, }, sharedSerializer: sharedSerializer, @@ -337,24 +305,6 @@ func (d *AgentDemultiplexer) Run() { log.Debug("not starting the container lifecycle forwarder") } - // container image forwarder - if d.forwarders.containerImage != nil { - if err := d.forwarders.containerImage.Start(); err != nil { - log.Errorf("error starting container image forwarder: %v", err) - } - } else { - log.Debug("not starting the container image forwarder") - } - - // sbom forwarder - if d.forwarders.sbom != nil { - if err := d.forwarders.sbom.Start(); err != nil { - log.Errorf("error starting SBOM forwarder: %v", err) - } - } else { - log.Debug("not starting the SBOM forwarder") - } - // shared forwarder if d.forwarders.shared != nil { d.forwarders.shared.Start() //nolint:errcheck @@ -364,18 +314,6 @@ func (d *AgentDemultiplexer) Run() { log.Debug("Forwarders started") } - if d.options.UseContainerLifecycleForwarder { - d.aggregator.contLcycleDequeueOnce.Do(func() { go d.aggregator.dequeueContainerLifecycleEvents() }) - } - - if d.options.UseContainerImageForwarder { - d.aggregator.contImageDequeueOnce.Do(func() { go d.aggregator.dequeueContainerImages() }) - } - - if d.options.UseSBOMForwarder { - d.aggregator.sbomDequeueOnce.Do(func() { go d.aggregator.dequeueSBOM() }) - } - for _, w := range d.statsd.workers { go w.run() } @@ -471,14 +409,6 @@ func (d *AgentDemultiplexer) Stop(flush bool) { d.dataOutputs.forwarders.containerLifecycle.Stop() d.dataOutputs.forwarders.containerLifecycle = nil } - if d.dataOutputs.forwarders.containerImage != nil { - d.dataOutputs.forwarders.containerImage.Stop() - d.dataOutputs.forwarders.containerImage = nil - } - if d.dataOutputs.forwarders.sbom != nil { - d.dataOutputs.forwarders.sbom.Stop() - d.dataOutputs.forwarders.sbom = nil - } if d.dataOutputs.forwarders.shared != nil { d.dataOutputs.forwarders.shared.Stop() d.dataOutputs.forwarders.shared = nil diff --git a/pkg/aggregator/demultiplexer_senders.go b/pkg/aggregator/demultiplexer_senders.go index 4fcbb3e3753dc..3aea93a82b644 100644 --- a/pkg/aggregator/demultiplexer_senders.go +++ b/pkg/aggregator/demultiplexer_senders.go @@ -73,9 +73,6 @@ func (s *senders) GetDefaultSender() (Sender, error) { s.agg.orchestratorMetadataIn, s.agg.orchestratorManifestIn, s.agg.eventPlatformIn, - s.agg.contLcycleIn, - s.agg.contImageIn, - s.agg.sbomIn, ) }) return s.defaultSender, nil diff --git a/pkg/aggregator/demultiplexer_serverless.go b/pkg/aggregator/demultiplexer_serverless.go index d1b7ed8227a36..4061ad1cf823a 100644 --- a/pkg/aggregator/demultiplexer_serverless.go +++ b/pkg/aggregator/demultiplexer_serverless.go @@ -39,7 +39,7 @@ type ServerlessDemultiplexer struct { func InitAndStartServerlessDemultiplexer(domainResolvers map[string]resolver.DomainResolver, forwarderTimeout time.Duration) *ServerlessDemultiplexer { bufferSize := config.Datadog.GetInt("aggregator_buffer_size") forwarder := forwarder.NewSyncForwarder(domainResolvers, forwarderTimeout) - serializer := serializer.NewSerializer(forwarder, nil, nil, nil, nil) + serializer := serializer.NewSerializer(forwarder, nil) metricSamplePool := metrics.NewMetricSamplePool(MetricSamplePoolBatchSize) tagsStore := tags.NewStore(config.Datadog.GetBool("aggregator_use_tags_store"), "timesampler") diff --git a/pkg/aggregator/mocksender/asserts.go b/pkg/aggregator/mocksender/asserts.go index 0c3918127edf9..7ecd94c11c3d2 100644 --- a/pkg/aggregator/mocksender/asserts.go +++ b/pkg/aggregator/mocksender/asserts.go @@ -12,8 +12,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" - "github.com/DataDog/agent-payload/v5/contimage" - "github.com/DataDog/agent-payload/v5/sbom" "github.com/DataDog/datadog-agent/pkg/metrics" ) @@ -68,20 +66,10 @@ func (m *MockSender) AssertEvent(t *testing.T, expectedEvent metrics.Event, allo } // AssertEventPlatformEvent assert the expected event was emitted with the following values -func (m *MockSender) AssertEventPlatformEvent(t *testing.T, expectedRawEvent string, expectedEventType string) bool { +func (m *MockSender) AssertEventPlatformEvent(t *testing.T, expectedRawEvent []byte, expectedEventType string) bool { return m.Mock.AssertCalled(t, "EventPlatformEvent", expectedRawEvent, expectedEventType) } -// AssertContainerImage assert the expected event was emitted with the following values -func (m *MockSender) AssertContainerImage(t *testing.T, expectedContainerImages []contimage.ContainerImagePayload) bool { - return m.Mock.AssertCalled(t, "ContainerImage", expectedContainerImages) -} - -// AssertSBOM assert the expected event was emitted with the following values -func (m *MockSender) AssertSBOM(t *testing.T, expectedSBOM []sbom.SBOMPayload) bool { - return m.Mock.AssertCalled(t, "SBOM", expectedSBOM) -} - // AssertEventMissing assert the expectedEvent was never emitted with the following values: // AggregationKey, Priority, SourceTypeName, EventType, Host and a Ts range weighted with the parameter allowedDelta func (m *MockSender) AssertEventMissing(t *testing.T, expectedEvent metrics.Event, allowedDelta time.Duration) bool { diff --git a/pkg/aggregator/mocksender/mocked_methods.go b/pkg/aggregator/mocksender/mocked_methods.go index 28ef14c46d86a..ef6a74fce1f9f 100644 --- a/pkg/aggregator/mocksender/mocked_methods.go +++ b/pkg/aggregator/mocksender/mocked_methods.go @@ -72,7 +72,7 @@ func (m *MockSender) Event(e metrics.Event) { } // EventPlatformEvent enables the event platform event mock call. -func (m *MockSender) EventPlatformEvent(rawEvent string, eventType string) { +func (m *MockSender) EventPlatformEvent(rawEvent []byte, eventType string) { m.Called(rawEvent, eventType) } @@ -112,21 +112,6 @@ func (m *MockSender) OrchestratorMetadata(msgs []serializer.ProcessMessageBody, m.Called(msgs, clusterID, nodeType) } -// ContainerLifecycleEvent submit container life cycle messages -func (m *MockSender) ContainerLifecycleEvent(msgs []serializer.ContainerLifecycleMessage) { - m.Called(msgs) -} - -// ContainerImage submit container image messages -func (m *MockSender) ContainerImage(msgs []serializer.ContainerImageMessage) { - m.Called(msgs) -} - -// SBOM submit sbom data -func (m *MockSender) SBOM(msgs []serializer.SBOMMessage) { - m.Called(msgs) -} - // OrchestratorManifest submit orchestrator manifest messages func (m *MockSender) OrchestratorManifest(msgs []serializer.ProcessMessageBody, clusterID string) { m.Called(msgs, clusterID) diff --git a/pkg/aggregator/mocksender/mocksender.go b/pkg/aggregator/mocksender/mocksender.go index e713125cdb4f7..ab903319e0eba 100644 --- a/pkg/aggregator/mocksender/mocksender.go +++ b/pkg/aggregator/mocksender/mocksender.go @@ -65,7 +65,9 @@ func (m *MockSender) SetupAcceptAll() { mock.AnythingOfType("string"), // message ).Return() m.On("Event", mock.AnythingOfType("metrics.Event")).Return() - m.On("EventPlatformEvent", mock.AnythingOfType("string"), mock.AnythingOfType("string")).Return() + // The second argument should have been `mock.AnythingOfType("[]byte")` instead of `mock.AnythingOfType("[]uint8")` + // See https://github.com/stretchr/testify/issues/387 + m.On("EventPlatformEvent", mock.AnythingOfType("[]uint8"), mock.AnythingOfType("string")).Return() m.On("HistogramBucket", mock.AnythingOfType("string"), // metric name mock.AnythingOfType("int64"), // value diff --git a/pkg/aggregator/sender.go b/pkg/aggregator/sender.go index 97b33c50f20eb..6c424864abfaa 100644 --- a/pkg/aggregator/sender.go +++ b/pkg/aggregator/sender.go @@ -32,7 +32,7 @@ type Sender interface { ServiceCheck(checkName string, status metrics.ServiceCheckStatus, hostname string, tags []string, message string) HistogramBucket(metric string, value int64, lowerBound, upperBound float64, monotonic bool, hostname string, tags []string, flushFirstValue bool) Event(e metrics.Event) - EventPlatformEvent(rawEvent string, eventType string) + EventPlatformEvent(rawEvent []byte, eventType string) GetSenderStats() check.SenderStats DisableDefaultHostname(disable bool) SetCheckCustomTags(tags []string) @@ -40,9 +40,6 @@ type Sender interface { FinalizeCheckServiceTag() OrchestratorMetadata(msgs []serializer.ProcessMessageBody, clusterID string, nodeType int) OrchestratorManifest(msgs []serializer.ProcessMessageBody, clusterID string) - ContainerLifecycleEvent(msgs []serializer.ContainerLifecycleMessage) - ContainerImage(msgs []serializer.ContainerImageMessage) - SBOM(msgs []serializer.SBOMMessage) } // RawSender interface to submit samples to aggregator directly @@ -65,9 +62,6 @@ type checkSender struct { eventOut chan<- metrics.Event orchestratorMetadataOut chan<- senderOrchestratorMetadata orchestratorManifestOut chan<- senderOrchestratorManifest - contlcycleOut chan<- senderContainerLifecycleEvent - contimageOut chan<- senderContainerImage - sbomOut chan<- senderSBOM eventPlatformOut chan<- senderEventPlatformEvent checkTags []string service string @@ -99,7 +93,7 @@ func (s *senderHistogramBucket) handle(agg *BufferedAggregator) { type senderEventPlatformEvent struct { id check.ID - rawEvent string + rawEvent []byte eventType string } @@ -109,18 +103,6 @@ type senderOrchestratorMetadata struct { payloadType int } -type senderContainerLifecycleEvent struct { - msgs []serializer.ContainerLifecycleMessage -} - -type senderContainerImage struct { - msgs []serializer.ContainerImageMessage -} - -type senderSBOM struct { - msgs []serializer.SBOMMessage -} - type senderOrchestratorManifest struct { msgs []serializer.ProcessMessageBody clusterID string @@ -141,9 +123,6 @@ func newCheckSender( orchestratorMetadataOut chan<- senderOrchestratorMetadata, orchestratorManifestOut chan<- senderOrchestratorManifest, eventPlatformOut chan<- senderEventPlatformEvent, - contlcycleOut chan<- senderContainerLifecycleEvent, - contimageOut chan<- senderContainerImage, - sbomOut chan<- senderSBOM, ) *checkSender { return &checkSender{ id: id, @@ -156,9 +135,6 @@ func newCheckSender( orchestratorMetadataOut: orchestratorMetadataOut, orchestratorManifestOut: orchestratorManifestOut, eventPlatformOut: eventPlatformOut, - contlcycleOut: contlcycleOut, - contimageOut: contimageOut, - sbomOut: sbomOut, } } @@ -422,7 +398,7 @@ func (s *checkSender) Event(e metrics.Event) { } // Event submits an event -func (s *checkSender) EventPlatformEvent(rawEvent string, eventType string) { +func (s *checkSender) EventPlatformEvent(rawEvent []byte, eventType string) { s.eventPlatformOut <- senderEventPlatformEvent{ id: s.id, rawEvent: rawEvent, @@ -451,18 +427,6 @@ func (s *checkSender) OrchestratorManifest(msgs []serializer.ProcessMessageBody, s.orchestratorManifestOut <- om } -func (s *checkSender) ContainerLifecycleEvent(msgs []serializer.ContainerLifecycleMessage) { - s.contlcycleOut <- senderContainerLifecycleEvent{msgs: msgs} -} - -func (s *checkSender) ContainerImage(msgs []serializer.ContainerImageMessage) { - s.contimageOut <- senderContainerImage{msgs: msgs} -} - -func (s *checkSender) SBOM(msgs []serializer.SBOMMessage) { - s.sbomOut <- senderSBOM{msgs: msgs} -} - func (sp *checkSenderPool) getSender(id check.ID) (Sender, error) { sp.m.Lock() defer sp.m.Unlock() @@ -487,9 +451,6 @@ func (sp *checkSenderPool) mkSender(id check.ID) (Sender, error) { sp.agg.orchestratorMetadataIn, sp.agg.orchestratorManifestIn, sp.agg.eventPlatformIn, - sp.agg.contLcycleIn, - sp.agg.contImageIn, - sp.agg.sbomIn, ) sp.senders[id] = sender return sender, err diff --git a/pkg/aggregator/sender_test.go b/pkg/aggregator/sender_test.go index 1fa8e15fcc070..c13eb2579ffc9 100644 --- a/pkg/aggregator/sender_test.go +++ b/pkg/aggregator/sender_test.go @@ -27,9 +27,6 @@ type senderWithChans struct { orchestratorChan chan senderOrchestratorMetadata orchestratorManifestChan chan senderOrchestratorManifest eventPlatformEventChan chan senderEventPlatformEvent - contlcycleOut chan senderContainerLifecycleEvent - contimageOut chan senderContainerImage - sbomOut chan senderSBOM sender *checkSender } @@ -40,10 +37,7 @@ func initSender(id check.ID, defaultHostname string) (s senderWithChans) { s.orchestratorChan = make(chan senderOrchestratorMetadata, 10) s.orchestratorManifestChan = make(chan senderOrchestratorManifest, 10) s.eventPlatformEventChan = make(chan senderEventPlatformEvent, 10) - s.contlcycleOut = make(chan senderContainerLifecycleEvent, 10) - s.contimageOut = make(chan senderContainerImage, 10) - s.sbomOut = make(chan senderSBOM, 10) - s.sender = newCheckSender(id, defaultHostname, s.itemChan, s.serviceCheckChan, s.eventChan, s.orchestratorChan, s.orchestratorManifestChan, s.eventPlatformEventChan, s.contlcycleOut, s.contimageOut, s.sbomOut) + s.sender = newCheckSender(id, defaultHostname, s.itemChan, s.serviceCheckChan, s.eventChan, s.orchestratorChan, s.orchestratorManifestChan, s.eventPlatformEventChan) return s } @@ -178,10 +172,7 @@ func TestGetAndSetSender(t *testing.T) { orchestratorChan := make(chan senderOrchestratorMetadata, 10) orchestratorManifestChan := make(chan senderOrchestratorManifest, 10) eventPlatformChan := make(chan senderEventPlatformEvent, 10) - contlcycleChan := make(chan senderContainerLifecycleEvent, 10) - contimageChan := make(chan senderContainerImage, 10) - sbomChan := make(chan senderSBOM, 10) - testCheckSender := newCheckSender(checkID1, "", itemChan, serviceCheckChan, eventChan, orchestratorChan, orchestratorManifestChan, eventPlatformChan, contlcycleChan, contimageChan, sbomChan) + testCheckSender := newCheckSender(checkID1, "", itemChan, serviceCheckChan, eventChan, orchestratorChan, orchestratorManifestChan, eventPlatformChan) err := demux.SetSender(testCheckSender, checkID1) assert.Nil(t, err) @@ -447,7 +438,7 @@ func TestCheckSenderInterface(t *testing.T) { s.sender.HistogramBucket("my.histogram_bucket", 42, 1.0, 2.0, true, "my-hostname", []string{"foo", "bar"}, true) s.sender.Commit() s.sender.ServiceCheck("my_service.can_connect", metrics.ServiceCheckOK, "my-hostname", []string{"foo", "bar"}, "message") - s.sender.EventPlatformEvent("raw-event", "dbm-sample") + s.sender.EventPlatformEvent([]byte("raw-event"), "dbm-sample") submittedEvent := metrics.Event{ Title: "Something happened", Text: "Description of the event", @@ -524,7 +515,7 @@ func TestCheckSenderInterface(t *testing.T) { eventPlatformEvent := <-s.eventPlatformEventChan assert.Equal(t, checkID1, eventPlatformEvent.id) - assert.Equal(t, "raw-event", eventPlatformEvent.rawEvent) + assert.Equal(t, []byte("raw-event"), eventPlatformEvent.rawEvent) assert.Equal(t, "dbm-sample", eventPlatformEvent.eventType) } diff --git a/pkg/collector/corechecks/containerimage/check.go b/pkg/collector/corechecks/containerimage/check.go index 71b762f67345a..fe181a0a41c45 100644 --- a/pkg/collector/corechecks/containerimage/check.go +++ b/pkg/collector/corechecks/containerimage/check.go @@ -6,7 +6,6 @@ package containerimage import ( - "errors" "time" yaml "gopkg.in/yaml.v2" @@ -14,7 +13,6 @@ import ( "github.com/DataDog/datadog-agent/pkg/autodiscovery/integration" "github.com/DataDog/datadog-agent/pkg/collector/check" core "github.com/DataDog/datadog-agent/pkg/collector/corechecks" - ddConfig "github.com/DataDog/datadog-agent/pkg/config" "github.com/DataDog/datadog-agent/pkg/util/log" "github.com/DataDog/datadog-agent/pkg/workloadmeta" ) @@ -103,10 +101,6 @@ func CheckFactory() check.Check { // Configure parses the check configuration and initializes the container_image check func (c *Check) Configure(integrationConfigDigest uint64, config, initConfig integration.Data, source string) error { - if !ddConfig.Datadog.GetBool("container_image.enabled") { - return errors.New("collection of container images is disabled") - } - if err := c.CommonConfigure(integrationConfigDigest, initConfig, config, source); err != nil { return err } diff --git a/pkg/collector/corechecks/containerimage/processor.go b/pkg/collector/corechecks/containerimage/processor.go index 8f9247baa2682..21adad772af21 100644 --- a/pkg/collector/corechecks/containerimage/processor.go +++ b/pkg/collector/corechecks/containerimage/processor.go @@ -10,6 +10,7 @@ import ( "time" "github.com/DataDog/datadog-agent/pkg/aggregator" + "github.com/DataDog/datadog-agent/pkg/epforwarder" "github.com/DataDog/datadog-agent/pkg/tagger" "github.com/DataDog/datadog-agent/pkg/tagger/collectors" queue "github.com/DataDog/datadog-agent/pkg/util/aggregatingqueue" @@ -18,6 +19,7 @@ import ( model "github.com/DataDog/agent-payload/v5/contimage" + "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -32,13 +34,17 @@ type processor struct { func newProcessor(sender aggregator.Sender, maxNbItem int, maxRetentionTime time.Duration) *processor { return &processor{ queue: queue.NewQueue(maxNbItem, maxRetentionTime, func(images []*model.ContainerImage) { - sender.ContainerImage([]model.ContainerImagePayload{ - { - Version: "v1", - Source: &sourceAgent, - Images: images, - }, + encoded, err := proto.Marshal(&model.ContainerImagePayload{ + Version: "v1", + Source: &sourceAgent, + Images: images, }) + if err != nil { + log.Errorf("Unable to encode message: %+v", err) + return + } + + sender.EventPlatformEvent(encoded, epforwarder.EventTypeContainerImages) }), } } diff --git a/pkg/collector/corechecks/containerimage/processor_test.go b/pkg/collector/corechecks/containerimage/processor_test.go index 4af983873e9aa..21567e5d4294c 100644 --- a/pkg/collector/corechecks/containerimage/processor_test.go +++ b/pkg/collector/corechecks/containerimage/processor_test.go @@ -15,8 +15,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "go.uber.org/atomic" + "google.golang.org/protobuf/proto" "github.com/DataDog/datadog-agent/pkg/aggregator/mocksender" + "github.com/DataDog/datadog-agent/pkg/epforwarder" "github.com/DataDog/datadog-agent/pkg/util/pointer" "github.com/DataDog/datadog-agent/pkg/workloadmeta" ) @@ -412,7 +414,7 @@ func TestProcessEvents(t *testing.T) { var imagesSent = atomic.NewInt32(0) sender := mocksender.NewMockSender("") - sender.On("ContainerImage", mock.Anything, mock.Anything).Return().Run(func(_ mock.Arguments) { + sender.On("EventPlatformEvent", mock.Anything, mock.Anything).Return().Run(func(_ mock.Arguments) { imagesSent.Inc() }) @@ -434,13 +436,13 @@ func TestProcessEvents(t *testing.T) { }, 1*time.Second, 5*time.Millisecond) for _, expectedImage := range test.expectedImages { - sender.AssertContainerImage(t, []model.ContainerImagePayload{ - { - Version: "v1", - Source: &sourceAgent, - Images: []*model.ContainerImage{expectedImage}, - }, + encoded, err := proto.Marshal(&model.ContainerImagePayload{ + Version: "v1", + Source: &sourceAgent, + Images: []*model.ContainerImage{expectedImage}, }) + assert.Nil(t, err) + sender.AssertEventPlatformEvent(t, encoded, epforwarder.EventTypeContainerImages) } }) } diff --git a/pkg/collector/corechecks/containerlifecycle/processor.go b/pkg/collector/corechecks/containerlifecycle/processor.go index 5b5421b6d5bba..43c935fe7ae6e 100644 --- a/pkg/collector/corechecks/containerlifecycle/processor.go +++ b/pkg/collector/corechecks/containerlifecycle/processor.go @@ -11,10 +11,14 @@ import ( "time" "github.com/DataDog/agent-payload/v5/contlcycle" + "github.com/DataDog/datadog-agent/pkg/aggregator" types "github.com/DataDog/datadog-agent/pkg/containerlifecycle" + "github.com/DataDog/datadog-agent/pkg/epforwarder" "github.com/DataDog/datadog-agent/pkg/util/log" "github.com/DataDog/datadog-agent/pkg/workloadmeta" + + "github.com/gogo/protobuf/proto" ) type processor struct { @@ -130,7 +134,7 @@ func (p *processor) flush() { func (p *processor) flushContainers() { msgs := p.containersQueue.flush() if len(msgs) > 0 { - p.sender.ContainerLifecycleEvent(msgs) + p.containerLifecycleEvent(msgs) for eventType, eventCount := range eventCountByType(msgs) { emittedEvents.Add(float64(eventCount), eventType, types.ObjectKindContainer) @@ -142,7 +146,7 @@ func (p *processor) flushContainers() { func (p *processor) flushPods() { msgs := p.podsQueue.flush() if len(msgs) > 0 { - p.sender.ContainerLifecycleEvent(msgs) + p.containerLifecycleEvent(msgs) for eventType, eventCount := range eventCountByType(msgs) { emittedEvents.Add(float64(eventCount), eventType, types.ObjectKindPod) @@ -150,6 +154,18 @@ func (p *processor) flushPods() { } } +func (p *processor) containerLifecycleEvent(msgs []contlcycle.EventsPayload) { + for _, msg := range msgs { + encoded, err := proto.Marshal(&msg) + if err != nil { + log.Errorf("Unable to encode message: %+v", err) + continue + } + + p.sender.EventPlatformEvent(encoded, epforwarder.EventTypeContainerLifecycle) + } +} + func eventCountByType(eventPayloads []contlcycle.EventsPayload) map[string]int { res := make(map[string]int) diff --git a/pkg/collector/corechecks/containerlifecycle/processor_test.go b/pkg/collector/corechecks/containerlifecycle/processor_test.go index d0cab5572d83c..e18af7c3ad26f 100644 --- a/pkg/collector/corechecks/containerlifecycle/processor_test.go +++ b/pkg/collector/corechecks/containerlifecycle/processor_test.go @@ -38,7 +38,7 @@ func TestProcessQueues(t *testing.T) { }}, podsQueue: &queue{}, wantFunc: func(t *testing.T, s *mocksender.MockSender) { - s.AssertNumberOfCalls(t, "ContainerLifecycleEvent", 1) + s.AssertNumberOfCalls(t, "EventPlatformEvent", 1) }, }, { @@ -52,7 +52,7 @@ func TestProcessQueues(t *testing.T) { {Version: "v1", Events: modelEvents("pod3")}, }}, wantFunc: func(t *testing.T, s *mocksender.MockSender) { - s.AssertNumberOfCalls(t, "ContainerLifecycleEvent", 2) + s.AssertNumberOfCalls(t, "EventPlatformEvent", 4) }, }, } @@ -64,7 +64,7 @@ func TestProcessQueues(t *testing.T) { } sender := mocksender.NewMockSender(check.ID(tt.name)) - sender.On("ContainerLifecycleEvent", mock.Anything, mock.Anything).Return() + sender.On("EventPlatformEvent", mock.Anything, mock.Anything).Return() p.sender = sender ctx, cancel := context.WithCancel(context.Background()) diff --git a/pkg/collector/corechecks/sbom/check.go b/pkg/collector/corechecks/sbom/check.go index 488e14bc594c9..d6bfb468668ee 100644 --- a/pkg/collector/corechecks/sbom/check.go +++ b/pkg/collector/corechecks/sbom/check.go @@ -6,7 +6,6 @@ package sbom import ( - "errors" "time" yaml "gopkg.in/yaml.v2" @@ -14,7 +13,6 @@ import ( "github.com/DataDog/datadog-agent/pkg/autodiscovery/integration" "github.com/DataDog/datadog-agent/pkg/collector/check" core "github.com/DataDog/datadog-agent/pkg/collector/corechecks" - ddConfig "github.com/DataDog/datadog-agent/pkg/config" "github.com/DataDog/datadog-agent/pkg/util/log" "github.com/DataDog/datadog-agent/pkg/workloadmeta" ) @@ -103,10 +101,6 @@ func CheckFactory() check.Check { // Configure parses the check configuration and initializes the sbom check func (c *Check) Configure(integrationConfigDigest uint64, config, initConfig integration.Data, source string) error { - if !ddConfig.Datadog.GetBool("sbom.enabled") { - return errors.New("collection of SBOM is disabled") - } - if err := c.CommonConfigure(integrationConfigDigest, initConfig, config, source); err != nil { return err } diff --git a/pkg/collector/corechecks/sbom/processor.go b/pkg/collector/corechecks/sbom/processor.go index 619170f25ee5b..18603bf5bc313 100644 --- a/pkg/collector/corechecks/sbom/processor.go +++ b/pkg/collector/corechecks/sbom/processor.go @@ -10,15 +10,17 @@ import ( "time" "github.com/DataDog/datadog-agent/pkg/aggregator" + "github.com/DataDog/datadog-agent/pkg/epforwarder" "github.com/DataDog/datadog-agent/pkg/tagger" "github.com/DataDog/datadog-agent/pkg/tagger/collectors" queue "github.com/DataDog/datadog-agent/pkg/util/aggregatingqueue" "github.com/DataDog/datadog-agent/pkg/util/log" "github.com/DataDog/datadog-agent/pkg/workloadmeta" - "google.golang.org/protobuf/types/known/timestamppb" - "github.com/DataDog/agent-payload/v5/sbom" model "github.com/DataDog/agent-payload/v5/sbom" + + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/timestamppb" ) var /* const */ ( @@ -35,13 +37,17 @@ type processor struct { func newProcessor(workloadmetaStore workloadmeta.Store, sender aggregator.Sender, maxNbItem int, maxRetentionTime time.Duration) *processor { return &processor{ queue: queue.NewQueue(maxNbItem, maxRetentionTime, func(entities []*model.SBOMEntity) { - sender.SBOM([]sbom.SBOMPayload{ - { - Version: 1, - Source: &sourceAgent, - Entities: entities, - }, + encoded, err := proto.Marshal(&model.SBOMPayload{ + Version: 1, + Source: &sourceAgent, + Entities: entities, }) + if err != nil { + log.Errorf("Unable to encode message: %+v", err) + return + } + + sender.EventPlatformEvent(encoded, epforwarder.EventTypeContainerSBOM) }), workloadmetaStore: workloadmetaStore, imageRepoDigests: make(map[string]string), @@ -207,7 +213,7 @@ func (p *processor) processSBOM(img *workloadmeta.ContainerImageMetadata) { RepoTags: repoTags, InUse: inUse, GenerationDuration: convertDuration(img.SBOM.GenerationDuration), - Sbom: &sbom.SBOMEntity_Cyclonedx{ + Sbom: &model.SBOMEntity_Cyclonedx{ Cyclonedx: convertBOM(img.SBOM.CycloneDXBOM), }, } diff --git a/pkg/collector/corechecks/sbom/processor_test.go b/pkg/collector/corechecks/sbom/processor_test.go index 8be13912e4a08..37eee607f117f 100644 --- a/pkg/collector/corechecks/sbom/processor_test.go +++ b/pkg/collector/corechecks/sbom/processor_test.go @@ -15,10 +15,12 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "go.uber.org/atomic" + "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" "github.com/DataDog/datadog-agent/pkg/aggregator/mocksender" + "github.com/DataDog/datadog-agent/pkg/epforwarder" "github.com/DataDog/datadog-agent/pkg/util/pointer" "github.com/DataDog/datadog-agent/pkg/workloadmeta" fakeworkloadmeta "github.com/DataDog/datadog-agent/pkg/workloadmeta/testing" @@ -394,7 +396,7 @@ func TestProcessEvents(t *testing.T) { fakeworkloadmeta := fakeworkloadmeta.NewStore() sender := mocksender.NewMockSender("") - sender.On("SBOM", mock.Anything, mock.Anything).Return().Run(func(_ mock.Arguments) { + sender.On("EventPlatformEvent", mock.Anything, mock.Anything).Return().Run(func(_ mock.Arguments) { SBOMsSent.Inc() }) @@ -425,13 +427,13 @@ func TestProcessEvents(t *testing.T) { }, 1*time.Second, 5*time.Millisecond) for _, expectedSBOM := range test.expectedSBOMs { - sender.AssertSBOM(t, []model.SBOMPayload{ - { - Version: 1, - Source: &sourceAgent, - Entities: []*model.SBOMEntity{expectedSBOM}, - }, + encoded, err := proto.Marshal(&model.SBOMPayload{ + Version: 1, + Source: &sourceAgent, + Entities: []*model.SBOMEntity{expectedSBOM}, }) + assert.Nil(t, err) + sender.AssertEventPlatformEvent(t, encoded, epforwarder.EventTypeContainerSBOM) } }) } diff --git a/pkg/collector/corechecks/snmp/internal/report/report_device_metadata.go b/pkg/collector/corechecks/snmp/internal/report/report_device_metadata.go index d8c394248027f..059e4a30956e2 100644 --- a/pkg/collector/corechecks/snmp/internal/report/report_device_metadata.go +++ b/pkg/collector/corechecks/snmp/internal/report/report_device_metadata.go @@ -48,7 +48,7 @@ func (ms *MetricSender) ReportNetworkDeviceMetadata(config *checkconfig.CheckCon log.Errorf("Error marshalling device metadata: %s", err) return } - ms.sender.EventPlatformEvent(string(payloadBytes), epforwarder.EventTypeNetworkDevicesMetadata) + ms.sender.EventPlatformEvent(payloadBytes, epforwarder.EventTypeNetworkDevicesMetadata) } // Telemetry diff --git a/pkg/collector/corechecks/snmp/internal/report/report_device_metadata_test.go b/pkg/collector/corechecks/snmp/internal/report/report_device_metadata_test.go index 32dbe561ee997..b807887e73fa5 100644 --- a/pkg/collector/corechecks/snmp/internal/report/report_device_metadata_test.go +++ b/pkg/collector/corechecks/snmp/internal/report/report_device_metadata_test.go @@ -150,7 +150,7 @@ func Test_metricSender_reportNetworkDeviceMetadata_withoutInterfaces(t *testing. err = json.Compact(compactEvent, event) assert.NoError(t, err) - sender.AssertEventPlatformEvent(t, compactEvent.String(), "network-devices-metadata") + sender.AssertEventPlatformEvent(t, compactEvent.Bytes(), "network-devices-metadata") w.Flush() logs := b.String() @@ -227,7 +227,7 @@ profiles: err = json.Compact(compactEvent, event) assert.NoError(t, err) - sender.AssertEventPlatformEvent(t, compactEvent.String(), "network-devices-metadata") + sender.AssertEventPlatformEvent(t, compactEvent.Bytes(), "network-devices-metadata") } func Test_metricSender_reportNetworkDeviceMetadata_withInterfaces(t *testing.T) { @@ -366,7 +366,7 @@ func Test_metricSender_reportNetworkDeviceMetadata_withInterfaces(t *testing.T) err = json.Compact(compactEvent, event) assert.NoError(t, err) - sender.AssertEventPlatformEvent(t, compactEvent.String(), "network-devices-metadata") + sender.AssertEventPlatformEvent(t, compactEvent.Bytes(), "network-devices-metadata") } func Test_metricSender_reportNetworkDeviceMetadata_fallbackOnFieldValue(t *testing.T) { @@ -435,7 +435,7 @@ func Test_metricSender_reportNetworkDeviceMetadata_fallbackOnFieldValue(t *testi err = json.Compact(compactEvent, event) assert.NoError(t, err) - sender.AssertEventPlatformEvent(t, compactEvent.String(), "network-devices-metadata") + sender.AssertEventPlatformEvent(t, compactEvent.Bytes(), "network-devices-metadata") } func Test_batchPayloads(t *testing.T) { diff --git a/pkg/collector/corechecks/snmp/profile_metadata_test.go b/pkg/collector/corechecks/snmp/profile_metadata_test.go index d8092aadb270f..0928af6467e80 100644 --- a/pkg/collector/corechecks/snmp/profile_metadata_test.go +++ b/pkg/collector/corechecks/snmp/profile_metadata_test.go @@ -627,5 +627,5 @@ profiles: err = json.Compact(compactEvent, event) assert.NoError(t, err) - sender.AssertEventPlatformEvent(t, compactEvent.String(), "network-devices-metadata") + sender.AssertEventPlatformEvent(t, compactEvent.Bytes(), "network-devices-metadata") } diff --git a/pkg/collector/corechecks/snmp/snmp_test.go b/pkg/collector/corechecks/snmp/snmp_test.go index 1075efdccaea2..94060fb9a1fa2 100644 --- a/pkg/collector/corechecks/snmp/snmp_test.go +++ b/pkg/collector/corechecks/snmp/snmp_test.go @@ -905,7 +905,7 @@ profiles: err = json.Compact(compactEvent, event) assert.NoError(t, err) - sender.AssertEventPlatformEvent(t, compactEvent.String(), "network-devices-metadata") + sender.AssertEventPlatformEvent(t, compactEvent.Bytes(), "network-devices-metadata") sender.AssertServiceCheck(t, "snmp.can_check", metrics.ServiceCheckOK, "", snmpTags, "") } @@ -1543,7 +1543,7 @@ tags: err = json.Compact(compactEvent, event) assert.NoError(t, err) - sender.AssertEventPlatformEvent(t, compactEvent.String(), "network-devices-metadata") + sender.AssertEventPlatformEvent(t, compactEvent.Bytes(), "network-devices-metadata") sender.AssertServiceCheck(t, "snmp.can_check", metrics.ServiceCheckCritical, "", snmpTags, "failed to autodetect profile: failed to fetch sysobjectid: cannot get sysobjectid: no value") } @@ -1632,7 +1632,7 @@ tags: err = json.Compact(compactEvent, event) assert.NoError(t, err) - sender.AssertEventPlatformEvent(t, compactEvent.String(), "network-devices-metadata") + sender.AssertEventPlatformEvent(t, compactEvent.Bytes(), "network-devices-metadata") sender.AssertServiceCheck(t, "snmp.can_check", metrics.ServiceCheckCritical, "", snmpTags, expectedErrMsg) } @@ -1959,7 +1959,7 @@ metric_tags: err = json.Compact(compactEvent, event) assert.NoError(t, err) - sender.AssertEventPlatformEvent(t, compactEvent.String(), "network-devices-metadata") + sender.AssertEventPlatformEvent(t, compactEvent.Bytes(), "network-devices-metadata") } networkTags := []string{"network:10.10.0.0/30", "autodiscovery_subnet:10.10.0.0/30"} sender.AssertMetric(t, "Gauge", "snmp.discovered_devices_count", 4, "", networkTags) diff --git a/pkg/collector/python/aggregator.go b/pkg/collector/python/aggregator.go index 7faeaa2efbd3b..b20ddc5fc64b5 100644 --- a/pkg/collector/python/aggregator.go +++ b/pkg/collector/python/aggregator.go @@ -9,6 +9,8 @@ package python import ( + "unsafe" + "github.com/DataDog/datadog-agent/pkg/aggregator" chk "github.com/DataDog/datadog-agent/pkg/collector/check" "github.com/DataDog/datadog-agent/pkg/metrics" @@ -142,12 +144,12 @@ func SubmitHistogramBucket(checkID *C.char, metricName *C.char, value C.longlong // SubmitEventPlatformEvent is the method exposed to Python scripts to submit event platform events // //export SubmitEventPlatformEvent -func SubmitEventPlatformEvent(checkID *C.char, rawEvent *C.char, eventType *C.char) { +func SubmitEventPlatformEvent(checkID *C.char, rawEventPtr *C.char, rawEventSize C.int, eventType *C.char) { _checkID := C.GoString(checkID) sender, err := aggregator.GetSender(chk.ID(_checkID)) if err != nil || sender == nil { log.Errorf("Error submitting event platform event to the Sender: %v", err) return } - sender.EventPlatformEvent(C.GoString(rawEvent), C.GoString(eventType)) + sender.EventPlatformEvent(C.GoBytes(unsafe.Pointer(rawEventPtr), rawEventSize), C.GoString(eventType)) } diff --git a/pkg/collector/python/init.go b/pkg/collector/python/init.go index 9adf25beec766..db0390206f7e3 100644 --- a/pkg/collector/python/init.go +++ b/pkg/collector/python/init.go @@ -118,7 +118,7 @@ void SubmitMetric(char *, metric_type_t, char *, double, char **, char *, bool); void SubmitServiceCheck(char *, char *, int, char **, char *, char *); void SubmitEvent(char *, event_t *); void SubmitHistogramBucket(char *, char *, long long, float, float, int, char *, char **, bool); -void SubmitEventPlatformEvent(char *, char *, char *); +void SubmitEventPlatformEvent(char *, char *, int, char *); void initAggregatorModule(rtloader_t *rtloader) { set_submit_metric_cb(rtloader, SubmitMetric); diff --git a/pkg/collector/python/test_aggregator.go b/pkg/collector/python/test_aggregator.go index de62475b2d312..b7c463ea00e31 100644 --- a/pkg/collector/python/test_aggregator.go +++ b/pkg/collector/python/test_aggregator.go @@ -227,8 +227,9 @@ func testSubmitEventPlatformEvent(t *testing.T) { SubmitEventPlatformEvent( C.CString("testID"), C.CString("raw-event"), + C.int(len("raw-event")), C.CString("dbm-sample"), ) - sender.AssertEventPlatformEvent(t, "raw-event", "dbm-sample") + sender.AssertEventPlatformEvent(t, []byte("raw-event"), "dbm-sample") } diff --git a/pkg/config/config.go b/pkg/config/config.go index 66789a3664ea2..09c54abe94335 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -1075,19 +1075,13 @@ func InitConfig(config Config) { config.BindEnvAndSetDefault("orchestrator_explorer.manifest_collection.buffer_flush_interval", 20*time.Second) // Container lifecycle configuration - config.BindEnvAndSetDefault("container_lifecycle.enabled", false) - config.BindEnv("container_lifecycle.dd_url") - config.BindEnv("container_lifecycle.additional_endpoints") + bindEnvAndSetLogsConfigKeys(config, "container_lifecycle.") // Container image configuration - config.BindEnvAndSetDefault("container_image.enabled", false) - config.BindEnv("container_image.dd_url") - config.BindEnv("container_image.additional_endpoints") + bindEnvAndSetLogsConfigKeys(config, "container_image.") // SBOM configuration - config.BindEnvAndSetDefault("sbom.enabled", false) - config.BindEnv("sbom.dd_url") - config.BindEnv("sbom.additional_endpoints") + bindEnvAndSetLogsConfigKeys(config, "sbom.") // Orchestrator Explorer - process agent // DEPRECATED in favor of `orchestrator_explorer.orchestrator_dd_url` setting. If both are set `orchestrator_explorer.orchestrator_dd_url` will take precedence. diff --git a/pkg/containerimage/forwarder.go b/pkg/containerimage/forwarder.go deleted file mode 100644 index 8d358f76416bb..0000000000000 --- a/pkg/containerimage/forwarder.go +++ /dev/null @@ -1,59 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2022-present Datadog, Inc. - -package containerimage - -import ( - "fmt" - "net/url" - - "github.com/DataDog/datadog-agent/pkg/config" - "github.com/DataDog/datadog-agent/pkg/config/resolver" - "github.com/DataDog/datadog-agent/pkg/forwarder" - "github.com/DataDog/datadog-agent/pkg/util/flavor" - "github.com/DataDog/datadog-agent/pkg/util/log" -) - -func buildKeysPerDomains(conf config.Config) (map[string][]string, error) { - mainURL := config.GetMainEndpointWithConfig(conf, "https://contimage-intake.", "container_image.dd_url") - if _, err := url.Parse(mainURL); err != nil { - return nil, fmt.Errorf("could not parse contimage main endpoint: %w", err) - } - - keysPerDomain := map[string][]string{ - mainURL: { - conf.GetString("api_key"), - }, - } - - if !conf.IsSet("container_image.additional_endpoints") { - return keysPerDomain, nil - } - - additionalEndpoints := conf.GetStringMapStringSlice("container_image.additional_endpoints") - - return config.MergeAdditionalEndpoints(keysPerDomain, additionalEndpoints) -} - -// NewForwarder returns a forwarder for container images events -func NewForwarder() *forwarder.DefaultForwarder { - if !config.Datadog.GetBool("container_image.enabled") { - return nil - } - - if flavor.GetFlavor() != flavor.DefaultAgent { - return nil - } - - keysPerDomain, err := buildKeysPerDomains(config.Datadog) - if err != nil { - log.Errorf("Cannot build keys per domains: %v", err) - return nil - } - - options := forwarder.NewOptionsWithResolvers(resolver.NewSingleDomainResolvers(keysPerDomain)) - - return forwarder.NewDefaultForwarder(options) -} diff --git a/pkg/containerlifecycle/forwarder.go b/pkg/containerlifecycle/forwarder.go deleted file mode 100644 index c70edb472dfd0..0000000000000 --- a/pkg/containerlifecycle/forwarder.go +++ /dev/null @@ -1,60 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2016-present Datadog, Inc. - -package containerlifecycle - -import ( - "fmt" - "net/url" - - "github.com/DataDog/datadog-agent/pkg/config" - "github.com/DataDog/datadog-agent/pkg/config/resolver" - "github.com/DataDog/datadog-agent/pkg/forwarder" - "github.com/DataDog/datadog-agent/pkg/util/flavor" - "github.com/DataDog/datadog-agent/pkg/util/log" -) - -func buildKeysPerDomains(conf config.Config) (map[string][]string, error) { - mainURL := config.GetMainEndpointWithConfig(conf, "https://contlcycle-intake.", "container_lifecycle.dd_url") - _, err := url.Parse(mainURL) - if err != nil { - return nil, fmt.Errorf("could not parse contlcycle main endpoint: %w", err) - } - - keysPerDomain := map[string][]string{ - mainURL: { - conf.GetString("api_key"), - }, - } - - if !conf.IsSet("container_lifecycle.additional_endpoints") { - return keysPerDomain, nil - } - - additionalEndpoints := conf.GetStringMapStringSlice("container_lifecycle.additional_endpoints") - - return config.MergeAdditionalEndpoints(keysPerDomain, additionalEndpoints) -} - -// NewForwarder returns a forwarder for container lifecycle events -func NewForwarder() *forwarder.DefaultForwarder { - if !config.Datadog.GetBool("container_lifecycle.enabled") { - return nil - } - - if flavor.GetFlavor() != flavor.DefaultAgent { - return nil - } - - keysPerDomain, err := buildKeysPerDomains(config.Datadog) - if err != nil { - log.Errorf("Cannot build keys per domains: %v", err) - return nil - } - - options := forwarder.NewOptionsWithResolvers(resolver.NewSingleDomainResolvers(keysPerDomain)) - - return forwarder.NewDefaultForwarder(options) -} diff --git a/pkg/epforwarder/epforwarder.go b/pkg/epforwarder/epforwarder.go index bbe947832815e..655aab8bdc799 100644 --- a/pkg/epforwarder/epforwarder.go +++ b/pkg/epforwarder/epforwarder.go @@ -36,11 +36,16 @@ const ( // EventTypeNetworkDevicesNetFlow is the event type for network devices NetFlow data EventTypeNetworkDevicesNetFlow = "network-devices-netflow" + + EventTypeContainerLifecycle = "container-lifecycle" + EventTypeContainerImages = "container-images" + EventTypeContainerSBOM = "container-sbom" ) var passthroughPipelineDescs = []passthroughPipelineDesc{ { eventType: eventTypeDBMSamples, + contentType: http.JSONContentType, endpointsConfigPrefix: "database_monitoring.samples.", hostnameEndpointPrefix: "dbm-metrics-intake.", intakeTrackType: "databasequery", @@ -52,6 +57,7 @@ var passthroughPipelineDescs = []passthroughPipelineDesc{ }, { eventType: eventTypeDBMMetrics, + contentType: http.JSONContentType, endpointsConfigPrefix: "database_monitoring.metrics.", hostnameEndpointPrefix: "dbm-metrics-intake.", intakeTrackType: "dbmmetrics", @@ -63,6 +69,7 @@ var passthroughPipelineDescs = []passthroughPipelineDesc{ }, { eventType: eventTypeDBMActivity, + contentType: http.JSONContentType, endpointsConfigPrefix: "database_monitoring.activity.", hostnameEndpointPrefix: "dbm-metrics-intake.", intakeTrackType: "dbmactivity", @@ -74,6 +81,7 @@ var passthroughPipelineDescs = []passthroughPipelineDesc{ }, { eventType: EventTypeNetworkDevicesMetadata, + contentType: http.JSONContentType, endpointsConfigPrefix: "network_devices.metadata.", hostnameEndpointPrefix: "ndm-intake.", intakeTrackType: "ndm", @@ -84,6 +92,7 @@ var passthroughPipelineDescs = []passthroughPipelineDesc{ }, { eventType: EventTypeSnmpTraps, + contentType: http.JSONContentType, endpointsConfigPrefix: "network_devices.snmp_traps.forwarder.", hostnameEndpointPrefix: "snmp-traps-intake.", intakeTrackType: "ndmtraps", @@ -94,6 +103,7 @@ var passthroughPipelineDescs = []passthroughPipelineDesc{ }, { eventType: EventTypeNetworkDevicesNetFlow, + contentType: http.JSONContentType, endpointsConfigPrefix: "network_devices.netflow.forwarder.", hostnameEndpointPrefix: "ndmflow-intake.", intakeTrackType: "ndmflow", @@ -113,6 +123,39 @@ var passthroughPipelineDescs = []passthroughPipelineDesc{ // by aggregator. defaultInputChanSize: 10000, }, + { + eventType: EventTypeContainerLifecycle, + contentType: http.ProtobufContentType, + endpointsConfigPrefix: "container_lifecycle.", + hostnameEndpointPrefix: "contlcycle-intake.", + intakeTrackType: "contlcycle", + defaultBatchMaxConcurrentSend: 10, + defaultBatchMaxContentSize: pkgconfig.DefaultBatchMaxContentSize, + defaultBatchMaxSize: pkgconfig.DefaultBatchMaxSize, + defaultInputChanSize: pkgconfig.DefaultInputChanSize, + }, + { + eventType: EventTypeContainerImages, + contentType: http.ProtobufContentType, + endpointsConfigPrefix: "container_image.", + hostnameEndpointPrefix: "contimage-intake.", + intakeTrackType: "contimage", + defaultBatchMaxConcurrentSend: 10, + defaultBatchMaxContentSize: pkgconfig.DefaultBatchMaxContentSize, + defaultBatchMaxSize: pkgconfig.DefaultBatchMaxSize, + defaultInputChanSize: pkgconfig.DefaultInputChanSize, + }, + { + eventType: EventTypeContainerSBOM, + contentType: http.ProtobufContentType, + endpointsConfigPrefix: "sbom.", + hostnameEndpointPrefix: "sbom-intake.", + intakeTrackType: "sbom", + defaultBatchMaxConcurrentSend: 10, + defaultBatchMaxContentSize: pkgconfig.DefaultBatchMaxContentSize, + defaultBatchMaxSize: pkgconfig.DefaultBatchMaxSize, + defaultInputChanSize: pkgconfig.DefaultInputChanSize, + }, } // An EventPlatformForwarder forwards Messages to a destination based on their event type @@ -198,7 +241,8 @@ type passthroughPipeline struct { } type passthroughPipelineDesc struct { - eventType string + eventType string + contentType string // intakeTrackType is the track type to use for the v2 intake api. When blank, v1 is used instead. intakeTrackType config.IntakeTrackType endpointsConfigPrefix string @@ -236,12 +280,12 @@ func newHTTPPassthroughPipeline(desc passthroughPipelineDesc, destinationsContex reliable := []client.Destination{} for i, endpoint := range endpoints.GetReliableEndpoints() { telemetryName := fmt.Sprintf("%s_%d_reliable_%d", desc.eventType, pipelineID, i) - reliable = append(reliable, http.NewDestination(endpoint, http.JSONContentType, destinationsContext, endpoints.BatchMaxConcurrentSend, true, telemetryName)) + reliable = append(reliable, http.NewDestination(endpoint, desc.contentType, destinationsContext, endpoints.BatchMaxConcurrentSend, true, telemetryName)) } additionals := []client.Destination{} for i, endpoint := range endpoints.GetUnReliableEndpoints() { telemetryName := fmt.Sprintf("%s_%d_unreliable_%d", desc.eventType, pipelineID, i) - additionals = append(additionals, http.NewDestination(endpoint, http.JSONContentType, destinationsContext, endpoints.BatchMaxConcurrentSend, false, telemetryName)) + additionals = append(additionals, http.NewDestination(endpoint, desc.contentType, destinationsContext, endpoints.BatchMaxConcurrentSend, false, telemetryName)) } destinations := client.NewDestinations(reliable, additionals) inputChan := make(chan *message.Message, endpoints.InputChanSize) diff --git a/pkg/forwarder/endpoints/endpoints.go b/pkg/forwarder/endpoints/endpoints.go index 34b0ae55a4e4c..d1f3ffe5e826f 100644 --- a/pkg/forwarder/endpoints/endpoints.go +++ b/pkg/forwarder/endpoints/endpoints.go @@ -54,10 +54,4 @@ var ( OrchestratorEndpoint = transaction.Endpoint{Route: "/api/v2/orch", Name: "orchestrator"} // OrchestratorManifestEndpoint is a v2 endpoint used to send orchestrator manifests OrchestratorManifestEndpoint = transaction.Endpoint{Route: "/api/v2/orchmanif", Name: "orchmanifest"} - // ContainerLifecycleEndpoint is an event platform endpoint used to send container lifecycle events - ContainerLifecycleEndpoint = transaction.Endpoint{Route: "/api/v2/contlcycle", Name: "contlcycle"} - // ContainerImageEndpoint is an event platform endpoint used to send container images - ContainerImageEndpoint = transaction.Endpoint{Route: "/api/v2/contimage", Name: "contimage"} - // SBOMEndpoint is an event platform endpoint used to send SBOM - SBOMEndpoint = transaction.Endpoint{Route: "/api/v2/sbom", Name: "sbom"} ) diff --git a/pkg/forwarder/forwarder.go b/pkg/forwarder/forwarder.go index 7f857006b6b31..65f356ef23d3b 100644 --- a/pkg/forwarder/forwarder.go +++ b/pkg/forwarder/forwarder.go @@ -74,9 +74,6 @@ type Forwarder interface { SubmitConnectionChecks(payload transaction.BytesPayloads, extra http.Header) (chan Response, error) SubmitOrchestratorChecks(payload transaction.BytesPayloads, extra http.Header, payloadType int) (chan Response, error) SubmitOrchestratorManifests(payload transaction.BytesPayloads, extra http.Header) (chan Response, error) - SubmitContainerLifecycleEvents(payload transaction.BytesPayloads, extra http.Header) error - SubmitContainerImages(payload transaction.BytesPayloads, extra http.Header) error - SubmitSBOM(payload transaction.BytesPayloads, extra http.Header) error } // Compile-time check to ensure that DefaultForwarder implements the Forwarder interface @@ -634,24 +631,6 @@ func (f *DefaultForwarder) SubmitOrchestratorManifests(payload transaction.Bytes return f.submitProcessLikePayload(endpoints.OrchestratorManifestEndpoint, payload, extra, true) } -// SubmitContainerLifecycleEvents sends container lifecycle events -func (f *DefaultForwarder) SubmitContainerLifecycleEvents(payload transaction.BytesPayloads, extra http.Header) error { - transactions := f.createHTTPTransactions(endpoints.ContainerLifecycleEndpoint, payload, extra) - return f.sendHTTPTransactions(transactions) -} - -// SubmitContainerImages sends container image -func (f *DefaultForwarder) SubmitContainerImages(payload transaction.BytesPayloads, extra http.Header) error { - transactions := f.createHTTPTransactions(endpoints.ContainerImageEndpoint, payload, extra) - return f.sendHTTPTransactions(transactions) -} - -// SubmitSBOM sends SBOM -func (f *DefaultForwarder) SubmitSBOM(payload transaction.BytesPayloads, extra http.Header) error { - transactions := f.createHTTPTransactions(endpoints.SBOMEndpoint, payload, extra) - return f.sendHTTPTransactions(transactions) -} - func (f *DefaultForwarder) submitProcessLikePayload(ep transaction.Endpoint, payload transaction.BytesPayloads, extra http.Header, retryable bool) (chan Response, error) { transactions := f.createHTTPTransactions(ep, payload, extra) results := make(chan Response, len(transactions)) diff --git a/pkg/forwarder/forwarder_test.go b/pkg/forwarder/forwarder_test.go index b385dda342863..05d9fb06d9b76 100644 --- a/pkg/forwarder/forwarder_test.go +++ b/pkg/forwarder/forwarder_test.go @@ -129,9 +129,6 @@ func TestSubmitIfStopped(t *testing.T) { assert.NotNil(t, forwarder.SubmitSeries(nil, make(http.Header))) assert.NotNil(t, forwarder.SubmitV1Intake(nil, make(http.Header))) assert.NotNil(t, forwarder.SubmitV1CheckRuns(nil, make(http.Header))) - assert.NotNil(t, forwarder.SubmitContainerLifecycleEvents(nil, make(http.Header))) - assert.NotNil(t, forwarder.SubmitContainerImages(nil, make(http.Header))) - assert.NotNil(t, forwarder.SubmitSBOM(nil, make(http.Header))) } func TestCreateHTTPTransactions(t *testing.T) { @@ -387,15 +384,6 @@ func TestForwarderEndtoEnd(t *testing.T) { assert.Nil(t, f.SubmitMetadata(payload, headers)) numReqs += 4 - assert.Nil(t, f.SubmitContainerLifecycleEvents(payload, headers)) - numReqs += 4 - - assert.Nil(t, f.SubmitContainerImages(payload, headers)) - numReqs += 4 - - assert.Nil(t, f.SubmitSBOM(payload, headers)) - numReqs += 4 - // let's wait a second for every channel communication to trigger <-time.After(1 * time.Second) diff --git a/pkg/forwarder/noop_forwarder.go b/pkg/forwarder/noop_forwarder.go index 1f1ca7a25a295..93afd725ac8a0 100644 --- a/pkg/forwarder/noop_forwarder.go +++ b/pkg/forwarder/noop_forwarder.go @@ -100,21 +100,6 @@ func (f NoopForwarder) SubmitOrchestratorChecks(payload transaction.BytesPayload return nil, nil } -// SubmitContainerLifecycleEvents does nothing. -func (f NoopForwarder) SubmitContainerLifecycleEvents(payload transaction.BytesPayloads, extra http.Header) error { - return nil -} - -// SubmitContainerImages does nothing. -func (f NoopForwarder) SubmitContainerImages(payload transaction.BytesPayloads, extra http.Header) error { - return nil -} - -// SubmitSBOM does nothing. -func (f NoopForwarder) SubmitSBOM(payload transaction.BytesPayloads, extra http.Header) error { - return nil -} - // SubmitOrchestratorManifests does nothing. func (f NoopForwarder) SubmitOrchestratorManifests(payload transaction.BytesPayloads, extra http.Header) (chan Response, error) { return nil, nil diff --git a/pkg/forwarder/sync_forwarder.go b/pkg/forwarder/sync_forwarder.go index f0d2f1b4c6214..265bfbe088f23 100644 --- a/pkg/forwarder/sync_forwarder.go +++ b/pkg/forwarder/sync_forwarder.go @@ -156,18 +156,3 @@ func (f *SyncForwarder) SubmitOrchestratorChecks(payload transaction.BytesPayloa func (f *SyncForwarder) SubmitOrchestratorManifests(payload transaction.BytesPayloads, extra http.Header) (chan Response, error) { return f.defaultForwarder.SubmitOrchestratorManifests(payload, extra) } - -// SubmitContainerLifecycleEvents sends container lifecycle events -func (f *SyncForwarder) SubmitContainerLifecycleEvents(payload transaction.BytesPayloads, extra http.Header) error { - return f.defaultForwarder.SubmitContainerLifecycleEvents(payload, extra) -} - -// SubmitContainerImages sends container image -func (f *SyncForwarder) SubmitContainerImages(payload transaction.BytesPayloads, extra http.Header) error { - return f.defaultForwarder.SubmitContainerImages(payload, extra) -} - -// SubmitSBOM sends SBOM -func (f *SyncForwarder) SubmitSBOM(payload transaction.BytesPayloads, extra http.Header) error { - return f.defaultForwarder.SubmitSBOM(payload, extra) -} diff --git a/pkg/forwarder/test_common.go b/pkg/forwarder/test_common.go index 765bf901cd155..b0be9024b7826 100644 --- a/pkg/forwarder/test_common.go +++ b/pkg/forwarder/test_common.go @@ -178,18 +178,3 @@ func (tf *MockedForwarder) SubmitOrchestratorChecks(payload transaction.BytesPay func (tf *MockedForwarder) SubmitOrchestratorManifests(payload transaction.BytesPayloads, extra http.Header) (chan Response, error) { return nil, tf.Called(payload, extra).Error(0) } - -// SubmitContainerLifecycleEvents mock -func (tf *MockedForwarder) SubmitContainerLifecycleEvents(payload transaction.BytesPayloads, extra http.Header) error { - return tf.Called(payload, extra).Error(0) -} - -// SubmitContainerImages mock -func (tf *MockedForwarder) SubmitContainerImages(payload transaction.BytesPayloads, extra http.Header) error { - return tf.Called(payload, extra).Error(0) -} - -// SubmitSBOM mock -func (tf *MockedForwarder) SubmitSBOM(payload transaction.BytesPayloads, extra http.Header) error { - return tf.Called(payload, extra).Error(0) -} diff --git a/pkg/logs/client/http/destination.go b/pkg/logs/client/http/destination.go index e2891d96b31c1..7d209b636f7b2 100644 --- a/pkg/logs/client/http/destination.go +++ b/pkg/logs/client/http/destination.go @@ -31,8 +31,9 @@ import ( // ContentType options, const ( - TextContentType = "text/plain" - JSONContentType = "application/json" + TextContentType = "text/plain" + JSONContentType = "application/json" + ProtobufContentType = "application/x-protobuf" ) // HTTP errors. diff --git a/pkg/netflow/flowaggregator/aggregator.go b/pkg/netflow/flowaggregator/aggregator.go index 8ed56c4f339da..33e145926616c 100644 --- a/pkg/netflow/flowaggregator/aggregator.go +++ b/pkg/netflow/flowaggregator/aggregator.go @@ -97,9 +97,8 @@ func (agg *FlowAggregator) sendFlows(flows []*common.Flow) { continue } - payloadStr := string(payloadBytes) - log.Tracef("flushed flow: %s", payloadStr) - agg.sender.EventPlatformEvent(payloadStr, epforwarder.EventTypeNetworkDevicesNetFlow) + log.Tracef("flushed flow: %s", string(payloadBytes)) + agg.sender.EventPlatformEvent(payloadBytes, epforwarder.EventTypeNetworkDevicesNetFlow) } } diff --git a/pkg/netflow/flowaggregator/aggregator_test.go b/pkg/netflow/flowaggregator/aggregator_test.go index e155459d3894c..1375aee902e7a 100644 --- a/pkg/netflow/flowaggregator/aggregator_test.go +++ b/pkg/netflow/flowaggregator/aggregator_test.go @@ -160,7 +160,7 @@ func TestAggregator(t *testing.T) { err = waitForFlowsToBeFlushed(aggregator, 10*time.Second, 1) assert.NoError(t, err) - sender.AssertEventPlatformEvent(t, compactEvent.String(), "network-devices-netflow") + sender.AssertEventPlatformEvent(t, compactEvent.Bytes(), "network-devices-netflow") sender.AssertMetric(t, "Count", "datadog.netflow.aggregator.flows_flushed", 1, "", nil) sender.AssertMetric(t, "MonotonicCount", "datadog.netflow.aggregator.flows_received", 1, "", nil) sender.AssertMetric(t, "Gauge", "datadog.netflow.aggregator.flows_contexts", 1, "", nil) @@ -294,7 +294,7 @@ func TestAggregator_withMockPayload(t *testing.T) { err = waitForFlowsToBeFlushed(aggregator, 3*time.Second, 1) assert.NoError(t, err) - sender.AssertEventPlatformEvent(t, compactEvent.String(), "network-devices-netflow") + sender.AssertEventPlatformEvent(t, compactEvent.Bytes(), "network-devices-netflow") sender.AssertMetric(t, "Count", "datadog.netflow.aggregator.flows_flushed", 6, "", nil) sender.AssertMetric(t, "MonotonicCount", "datadog.netflow.aggregator.flows_received", 6, "", nil) sender.AssertMetric(t, "Gauge", "datadog.netflow.aggregator.flows_contexts", 6, "", nil) diff --git a/pkg/otlp/internal/serializerexporter/consumer_test.go b/pkg/otlp/internal/serializerexporter/consumer_test.go index 92455ed1023a6..6f9fbd8679736 100644 --- a/pkg/otlp/internal/serializerexporter/consumer_test.go +++ b/pkg/otlp/internal/serializerexporter/consumer_test.go @@ -203,15 +203,3 @@ func (m *MockSerializer) SendOrchestratorMetadata(_ []serializer.ProcessMessageB func (m *MockSerializer) SendOrchestratorManifests(_ []serializer.ProcessMessageBody, _, _ string) error { return nil } - -func (m *MockSerializer) SendContainerLifecycleEvent(_ []serializer.ContainerLifecycleMessage, _ string) error { - return nil -} - -func (m *MockSerializer) SendContainerImage(_ []serializer.ContainerImageMessage, _ string) error { - return nil -} - -func (m *MockSerializer) SendSBOM(_ []serializer.SBOMMessage, _ string) error { - return nil -} diff --git a/pkg/sbom/forwarder.go b/pkg/sbom/forwarder.go deleted file mode 100644 index fd42e48946856..0000000000000 --- a/pkg/sbom/forwarder.go +++ /dev/null @@ -1,59 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2022-present Datadog, Inc. - -package sbom - -import ( - "fmt" - "net/url" - - "github.com/DataDog/datadog-agent/pkg/config" - "github.com/DataDog/datadog-agent/pkg/config/resolver" - "github.com/DataDog/datadog-agent/pkg/forwarder" - "github.com/DataDog/datadog-agent/pkg/util/flavor" - "github.com/DataDog/datadog-agent/pkg/util/log" -) - -func buildKeysPerDomains(conf config.Config) (map[string][]string, error) { - mainURL := config.GetMainEndpointWithConfig(conf, "https://sbom-intake.", "sbom.dd_url") - if _, err := url.Parse(mainURL); err != nil { - return nil, fmt.Errorf("could not parse sbom main endpoint: %w", err) - } - - keysPerDomain := map[string][]string{ - mainURL: { - conf.GetString("api_key"), - }, - } - - if !conf.IsSet("sbom.additional_endpoints") { - return keysPerDomain, nil - } - - additionalEndpoints := conf.GetStringMapStringSlice("sbom.additional_endpoints") - - return config.MergeAdditionalEndpoints(keysPerDomain, additionalEndpoints) -} - -// NewForwarder returns a forwarder for SBOM events -func NewForwarder() *forwarder.DefaultForwarder { - if !config.Datadog.GetBool("sbom.enabled") { - return nil - } - - if flavor.GetFlavor() != flavor.DefaultAgent { - return nil - } - - keysPerDomain, err := buildKeysPerDomains(config.Datadog) - if err != nil { - log.Errorf("Cannot build keys per domains: %v", err) - return nil - } - - options := forwarder.NewOptionsWithResolvers(resolver.NewSingleDomainResolvers(keysPerDomain)) - - return forwarder.NewDefaultForwarder(options) -} diff --git a/pkg/serializer/serializer.go b/pkg/serializer/serializer.go index 485c2da58023b..31ef29c5e08e4 100644 --- a/pkg/serializer/serializer.go +++ b/pkg/serializer/serializer.go @@ -28,8 +28,6 @@ import ( "github.com/DataDog/datadog-agent/pkg/version" "github.com/benbjohnson/clock" - "github.com/gogo/protobuf/proto" - googleproto "google.golang.org/protobuf/proto" ) const ( @@ -101,9 +99,6 @@ type MetricSerializer interface { SendAgentchecksMetadata(m marshaler.JSONMarshaler) error SendOrchestratorMetadata(msgs []ProcessMessageBody, hostName, clusterID string, payloadType int) error SendOrchestratorManifests(msgs []ProcessMessageBody, hostName, clusterID string) error - SendContainerLifecycleEvent(msgs []ContainerLifecycleMessage, hostName string) error - SendContainerImage(msgs []ContainerImageMessage, hostname string) error - SendSBOM(msgs []SBOMMessage, hostname string) error } // Serializer serializes metrics to the correct format and routes the payloads to the correct endpoint in the Forwarder @@ -111,9 +106,6 @@ type Serializer struct { clock clock.Clock Forwarder forwarder.Forwarder orchestratorForwarder forwarder.Forwarder - contlcycleForwarder forwarder.Forwarder - contimageForwarder forwarder.Forwarder - sbomForwarder forwarder.Forwarder seriesJSONPayloadBuilder *stream.JSONPayloadBuilder @@ -135,14 +127,11 @@ type Serializer struct { } // NewSerializer returns a new Serializer initialized -func NewSerializer(forwarder forwarder.Forwarder, orchestratorForwarder, contlcycleForwarder, contimageForwarder, sbomForwarder forwarder.Forwarder) *Serializer { +func NewSerializer(forwarder, orchestratorForwarder forwarder.Forwarder) *Serializer { s := &Serializer{ clock: clock.New(), Forwarder: forwarder, orchestratorForwarder: orchestratorForwarder, - contlcycleForwarder: contlcycleForwarder, - contimageForwarder: contimageForwarder, - sbomForwarder: sbomForwarder, seriesJSONPayloadBuilder: stream.NewJSONPayloadBuilder(config.Datadog.GetBool("enable_json_stream_shared_compressor_buffers")), enableEvents: config.Datadog.GetBool("enable_payloads.events"), enableSeries: config.Datadog.GetBool("enable_payloads.series"), @@ -469,114 +458,6 @@ func (s *Serializer) SendOrchestratorMetadata(msgs []ProcessMessageBody, hostNam return nil } -// SendContainerLifecycleEvent serializes & sends container lifecycle event payloads -func (s *Serializer) SendContainerLifecycleEvent(msgs []ContainerLifecycleMessage, hostname string) error { - if s.contlcycleForwarder == nil { - return errors.New("container lifecycle forwarder is not setup") - } - - payloads := make([]*[]byte, 0, len(msgs)) - - for _, msg := range msgs { - msg.Host = hostname - encoded, err := proto.Marshal(&msg) - if err != nil { - return log.Errorf("Unable to encode message: %v", err) - } - - payloads = append(payloads, &encoded) - } - - bytePayloads := transaction.NewBytesPayloadsWithoutMetaData(payloads) - - extraHeaders := make(http.Header) - extraHeaders.Set(headers.TimestampHeader, strconv.Itoa(int(s.clock.Now().Unix()))) - extraHeaders.Set(headers.EVPOriginHeader, "agent") - extraHeaders.Set(headers.EVPOriginVersionHeader, version.AgentVersion) - extraHeaders.Set(headers.ContentTypeHeader, headers.ProtobufContentType) - extraHeaders.Set(payloadVersionHTTPHeader, AgentPayloadVersion) - - if err := s.contlcycleForwarder.SubmitContainerLifecycleEvents(bytePayloads, extraHeaders); err != nil { - return log.Errorf("Unable to submit container lifecycle payloads: %v", err) - } - - log.Tracef("Sent container lifecycle events %+v", msgs) - - return nil -} - -// SendContainerImage serializes & sends container image payloads -func (s *Serializer) SendContainerImage(msgs []ContainerImageMessage, hostname string) error { - if s.contimageForwarder == nil { - return errors.New("container image forwarder is not setup") - } - - payloads := make([]*[]byte, 0, len(msgs)) - - for i := range msgs { - msgs[i].Host = hostname - encoded, err := googleproto.Marshal(&msgs[i]) - if err != nil { - return log.Errorf("Unable to encode message: %+v", err) - } - - payloads = append(payloads, &encoded) - } - - bytesPayloads := transaction.NewBytesPayloadsWithoutMetaData(payloads) - - extraHeaders := make(http.Header) - extraHeaders.Set(headers.TimestampHeader, strconv.Itoa(int(s.clock.Now().Unix()))) - extraHeaders.Set(headers.EVPOriginHeader, "agent") - extraHeaders.Set(headers.EVPOriginVersionHeader, version.AgentVersion) - extraHeaders.Set(headers.ContentTypeHeader, headers.ProtobufContentType) - extraHeaders.Set(payloadVersionHTTPHeader, AgentPayloadVersion) - - if err := s.contimageForwarder.SubmitContainerImages(bytesPayloads, extraHeaders); err != nil { - return log.Errorf("Unable to submit container image payload: %v", err) - } - - log.Tracef("Send container images %+v", msgs) - - return nil -} - -// SendSBOM serializes & sends sbom payloads -func (s *Serializer) SendSBOM(msgs []SBOMMessage, hostname string) error { - if s.sbomForwarder == nil { - return errors.New("SBOM forwarder is not setup") - } - - payloads := make([]*[]byte, 0, len(msgs)) - - for i := range msgs { - msgs[i].Host = hostname - encoded, err := googleproto.Marshal(&msgs[i]) - if err != nil { - return log.Errorf("Unable to encode message: %+v", err) - } - - payloads = append(payloads, &encoded) - } - - bytesPayloads := transaction.NewBytesPayloadsWithoutMetaData(payloads) - - extraHeaders := make(http.Header) - extraHeaders.Set(headers.TimestampHeader, strconv.Itoa(int(s.clock.Now().Unix()))) - extraHeaders.Set(headers.EVPOriginHeader, "agent") - extraHeaders.Set(headers.EVPOriginVersionHeader, version.AgentVersion) - extraHeaders.Set(headers.ContentTypeHeader, headers.ProtobufContentType) - extraHeaders.Set(payloadVersionHTTPHeader, AgentPayloadVersion) - - if err := s.sbomForwarder.SubmitSBOM(bytesPayloads, extraHeaders); err != nil { - return log.Errorf("Unable to submit SBOM payload: %v", err) - } - - log.Tracef("Send SBOM %+v", msgs) - - return nil -} - // SendOrchestratorManifests serializes & send orchestrator manifest payloads func (s *Serializer) SendOrchestratorManifests(msgs []ProcessMessageBody, hostName, clusterID string) error { if s.orchestratorForwarder == nil { diff --git a/pkg/serializer/serializer_test.go b/pkg/serializer/serializer_test.go index bc9e909bfe52f..c1d43e1940c43 100644 --- a/pkg/serializer/serializer_test.go +++ b/pkg/serializer/serializer_test.go @@ -12,29 +12,21 @@ import ( "fmt" "net/http" "reflect" - "strconv" "strings" "testing" - "github.com/benbjohnson/clock" jsoniter "github.com/json-iterator/go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "github.com/DataDog/agent-payload/v5/contimage" - "github.com/DataDog/agent-payload/v5/contlcycle" - "github.com/DataDog/agent-payload/v5/sbom" "github.com/DataDog/datadog-agent/pkg/config" "github.com/DataDog/datadog-agent/pkg/forwarder" "github.com/DataDog/datadog-agent/pkg/forwarder/transaction" "github.com/DataDog/datadog-agent/pkg/metrics" - "github.com/DataDog/datadog-agent/pkg/process/util/api/headers" metricsserializer "github.com/DataDog/datadog-agent/pkg/serializer/internal/metrics" "github.com/DataDog/datadog-agent/pkg/serializer/marshaler" "github.com/DataDog/datadog-agent/pkg/util/compression" - "github.com/DataDog/datadog-agent/pkg/util/pointer" - "github.com/DataDog/datadog-agent/pkg/version" ) var initialContentEncoding = compression.ContentEncoding @@ -232,7 +224,7 @@ func TestSendV1Events(t *testing.T) { matcher := createJSONPayloadMatcher(`{"apiKey":"","events":{},"internalHostname"`) f.On("SubmitV1Intake", matcher, jsonExtraHeadersWithCompression).Return(nil).Times(1) - s := NewSerializer(f, nil, nil, nil, nil) + s := NewSerializer(f, nil) err := s.SendEvents([]*metrics.Event{}) require.Nil(t, err) f.AssertExpectations(t) @@ -250,7 +242,7 @@ func TestSendV1EventsCreateMarshalersBySourceType(t *testing.T) { defer config.Datadog.Set("enable_events_stream_payload_serialization", nil) f := &forwarder.MockedForwarder{} - s := NewSerializer(f, nil, nil, nil, nil) + s := NewSerializer(f, nil) events := metrics.Events{&metrics.Event{SourceTypeName: "source1"}, &metrics.Event{SourceTypeName: "source2"}, &metrics.Event{SourceTypeName: "source3"}} payloadsCountMatcher := func(payloadCount int) interface{} { @@ -280,7 +272,7 @@ func TestSendV1ServiceChecks(t *testing.T) { config.Datadog.Set("enable_service_checks_stream_payload_serialization", false) defer config.Datadog.Set("enable_service_checks_stream_payload_serialization", nil) - s := NewSerializer(f, nil, nil, nil, nil) + s := NewSerializer(f, nil) err := s.SendServiceChecks(metrics.ServiceChecks{&metrics.ServiceCheck{}}) require.Nil(t, err) f.AssertExpectations(t) @@ -296,7 +288,7 @@ func TestSendV1Series(t *testing.T) { config.Datadog.Set("use_v2_api.series", false) defer config.Datadog.Set("use_v2_api.series", true) - s := NewSerializer(f, nil, nil, nil, nil) + s := NewSerializer(f, nil) err := s.SendIterableSeries(metricsserializer.CreateSerieSource(metrics.Series{})) require.Nil(t, err) @@ -309,7 +301,7 @@ func TestSendSeries(t *testing.T) { f.On("SubmitSeries", matcher, protobufExtraHeadersWithCompression).Return(nil).Times(1) config.Datadog.Set("use_v2_api.series", true) // default value, but just to be sure - s := NewSerializer(f, nil, nil, nil, nil) + s := NewSerializer(f, nil) err := s.SendIterableSeries(metricsserializer.CreateSerieSource(metrics.Series{&metrics.Serie{}})) require.Nil(t, err) @@ -322,7 +314,7 @@ func TestSendSketch(t *testing.T) { matcher := createProtoPayloadMatcher([]byte{18, 0}) f.On("SubmitSketchSeries", matcher, protobufExtraHeadersWithCompression).Return(nil).Times(1) - s := NewSerializer(f, nil, nil, nil, nil) + s := NewSerializer(f, nil) err := s.SendSketch(metrics.NewSketchesSourceTest()) require.Nil(t, err) f.AssertExpectations(t) @@ -332,7 +324,7 @@ func TestSendMetadata(t *testing.T) { f := &forwarder.MockedForwarder{} f.On("SubmitMetadata", jsonPayloads, jsonExtraHeadersWithCompression).Return(nil).Times(1) - s := NewSerializer(f, nil, nil, nil, nil) + s := NewSerializer(f, nil) payload := &testPayload{} err := s.SendMetadata(payload) @@ -355,7 +347,7 @@ func TestSendProcessesMetadata(t *testing.T) { payloads, _ := mkPayloads(payload, true) f.On("SubmitV1Intake", payloads, jsonExtraHeadersWithCompression).Return(nil).Times(1) - s := NewSerializer(f, nil, nil, nil, nil) + s := NewSerializer(f, nil) err := s.SendProcessesMetadata("test") require.Nil(t, err) @@ -371,101 +363,6 @@ func TestSendProcessesMetadata(t *testing.T) { require.NotNil(t, err) } -func TestSendContainerLifecycleEvents(t *testing.T) { - clock := clock.NewMock() - f := &forwarder.MockedForwarder{} - payload := []byte("\x0a\x02v1\x12\x08hostname\x18\x01") - payloads, _ := mkPayloads(payload, false) - extraHeaders := protobufExtraHeaders.Clone() - extraHeaders.Set(headers.EVPOriginHeader, "agent") - extraHeaders.Set(headers.EVPOriginVersionHeader, version.AgentVersion) - extraHeaders.Set(headers.TimestampHeader, strconv.Itoa(int(clock.Now().Unix()))) - f.On("SubmitContainerLifecycleEvents", payloads, extraHeaders).Return(nil).Times(1) - - s := NewSerializer(nil, nil, f, nil, nil) - s.clock = clock - - msg := []ContainerLifecycleMessage{ - { - Version: "v1", - Host: "hostname", - ObjectKind: contlcycle.EventsPayload_Pod, - Events: []*contlcycle.Event{}, - }, - } - err := s.SendContainerLifecycleEvent(msg, "hostname") - require.Nil(t, err) - f.AssertExpectations(t) - - f.On("SubmitContainerLifecycleEvents", payloads, extraHeaders).Return(fmt.Errorf("some error")).Times(1) - err = s.SendContainerLifecycleEvent(msg, "hostname") - require.NotNil(t, err) - f.AssertExpectations(t) -} - -func TestSendContainerImage(t *testing.T) { - clock := clock.NewMock() - f := &forwarder.MockedForwarder{} - payload := []byte("\x0a\x02v1\x12\x08hostname") - payloads, _ := mkPayloads(payload, false) - extraHeaders := protobufExtraHeaders.Clone() - extraHeaders.Set(headers.EVPOriginHeader, "agent") - extraHeaders.Set(headers.EVPOriginVersionHeader, version.AgentVersion) - extraHeaders.Set(headers.TimestampHeader, strconv.Itoa(int(clock.Now().Unix()))) - f.On("SubmitContainerImages", payloads, extraHeaders).Return(nil).Times(1) - - s := NewSerializer(nil, nil, nil, f, nil) - s.clock = clock - - msg := []ContainerImageMessage{ - { - Version: "v1", - Host: "hostname", - Images: []*contimage.ContainerImage{}, - }, - } - err := s.SendContainerImage(msg, "hostname") - require.Nil(t, err) - f.AssertExpectations(t) - - f.On("SubmitContainerImages", payloads, extraHeaders).Return(fmt.Errorf("some error")).Times(1) - err = s.SendContainerImage(msg, "hostname") - require.NotNil(t, err) - f.AssertExpectations(t) -} - -func TestSendSBOM(t *testing.T) { - clock := clock.NewMock() - f := &forwarder.MockedForwarder{} - payload := []byte("\x08\x01\x12\x08hostname\x1a\x05agent") - payloads, _ := mkPayloads(payload, false) - extraHeaders := protobufExtraHeaders.Clone() - extraHeaders.Set(headers.EVPOriginHeader, "agent") - extraHeaders.Set(headers.EVPOriginVersionHeader, version.AgentVersion) - extraHeaders.Set(headers.TimestampHeader, strconv.Itoa(int(clock.Now().Unix()))) - f.On("SubmitSBOM", payloads, extraHeaders).Return(nil).Times(1) - - s := NewSerializer(nil, nil, nil, nil, f) - s.clock = clock - - msg := []SBOMMessage{ - { - Version: 1, - Host: "hostname", - Source: pointer.Ptr("agent"), - Entities: []*sbom.SBOMEntity{}, - }, - } - err := s.SendSBOM(msg, "hostname") - require.Nil(t, err) - f.AssertExpectations(t) - - f.On("SubmitSBOM", payloads, extraHeaders).Return(fmt.Errorf("some error")).Times(1) - err = s.SendSBOM(msg, "hostname") - require.NotNil(t, err) - f.AssertExpectations(t) -} - func TestSendWithDisabledKind(t *testing.T) { mockConfig := config.Mock(t) @@ -485,7 +382,7 @@ func TestSendWithDisabledKind(t *testing.T) { }() f := &forwarder.MockedForwarder{} - s := NewSerializer(f, nil, nil, nil, nil) + s := NewSerializer(f, nil) payload := &testPayload{} diff --git a/pkg/serializer/test_common.go b/pkg/serializer/test_common.go index 3e4dd058d5e94..22f92811054f9 100644 --- a/pkg/serializer/test_common.go +++ b/pkg/serializer/test_common.go @@ -71,22 +71,7 @@ func (s *MockSerializer) SendOrchestratorMetadata(msgs []ProcessMessageBody, hos return s.Called(msgs, hostName, clusterID, payloadType).Error(0) } -// SendContainerLifecycleEvent serializes & sends container lifecycle event payloads -func (s *MockSerializer) SendContainerLifecycleEvent(msgs []ContainerLifecycleMessage, hostname string) error { - return s.Called(msgs, hostname).Error(0) -} - -// SendContainerImage serializes & sends container image payloads -func (s *MockSerializer) SendContainerImage(msgs []ContainerImageMessage, hostname string) error { - return s.Called(msgs, hostname).Error(0) -} - -// SendSBOM serializes & sends SBOM payloads -func (s *MockSerializer) SendSBOM(msgs []SBOMMessage, hostname string) error { - return s.Called(msgs, hostname).Error(0) -} - -// SendOrchestratorManifests serializes & sends orchestrator manifest payloads +// SendOrchestratorManifests serializes & send orchestrator manifest payloads func (s *MockSerializer) SendOrchestratorManifests(msgs []ProcessMessageBody, hostName, clusterID string) error { return s.Called(msgs, hostName, clusterID).Error(0) } diff --git a/pkg/serializer/types_contimage.go b/pkg/serializer/types_contimage.go deleted file mode 100644 index bd5ba52bfd844..0000000000000 --- a/pkg/serializer/types_contimage.go +++ /dev/null @@ -1,13 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2022-present Datadog, Inc. - -package serializer - -import ( - "github.com/DataDog/agent-payload/v5/contimage" -) - -// ContainerImageMessage is a type alias for contimage proto payload -type ContainerImageMessage = contimage.ContainerImagePayload diff --git a/pkg/serializer/types_contlcycle.go b/pkg/serializer/types_contlcycle.go deleted file mode 100644 index 89edde841427b..0000000000000 --- a/pkg/serializer/types_contlcycle.go +++ /dev/null @@ -1,13 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2016-present Datadog, Inc. - -package serializer - -import ( - "github.com/DataDog/agent-payload/v5/contlcycle" -) - -// ContainerLifecycleMessage is a type alias for contlcycle proto payload -type ContainerLifecycleMessage = contlcycle.EventsPayload diff --git a/pkg/serializer/types_sbom.go b/pkg/serializer/types_sbom.go deleted file mode 100644 index b3169717e4da9..0000000000000 --- a/pkg/serializer/types_sbom.go +++ /dev/null @@ -1,16 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2022-present Datadog, Inc. - -//go:build !serverless -// +build !serverless - -package serializer - -import ( - "github.com/DataDog/agent-payload/v5/sbom" -) - -// SBOMMessage is a type alias for SBOM proto payload -type SBOMMessage = sbom.SBOMPayload diff --git a/pkg/snmp/traps/forwarder.go b/pkg/snmp/traps/forwarder.go index e67141143db04..98988f4f60c37 100644 --- a/pkg/snmp/traps/forwarder.go +++ b/pkg/snmp/traps/forwarder.go @@ -62,5 +62,5 @@ func (tf *TrapForwarder) sendTrap(packet *SnmpPacket) { return } log.Tracef("send trap payload: %s", string(data)) - tf.sender.EventPlatformEvent(string(data), epforwarder.EventTypeSnmpTraps) + tf.sender.EventPlatformEvent(data, epforwarder.EventTypeSnmpTraps) } diff --git a/pkg/snmp/traps/forwarder_test.go b/pkg/snmp/traps/forwarder_test.go index 9c24ebd86c7e3..5452d2144599a 100644 --- a/pkg/snmp/traps/forwarder_test.go +++ b/pkg/snmp/traps/forwarder_test.go @@ -72,7 +72,7 @@ func TestV1GenericTrapAreForwarder(t *testing.T) { require.NoError(t, err) forwarder.trapsIn <- packet forwarder.Stop() - sender.AssertEventPlatformEvent(t, string(rawEvent), epforwarder.EventTypeSnmpTraps) + sender.AssertEventPlatformEvent(t, rawEvent, epforwarder.EventTypeSnmpTraps) } func TestV1SpecificTrapAreForwarder(t *testing.T) { @@ -85,7 +85,7 @@ func TestV1SpecificTrapAreForwarder(t *testing.T) { require.NoError(t, err) forwarder.trapsIn <- packet forwarder.Stop() - sender.AssertEventPlatformEvent(t, string(rawEvent), epforwarder.EventTypeSnmpTraps) + sender.AssertEventPlatformEvent(t, rawEvent, epforwarder.EventTypeSnmpTraps) } func TestV2TrapAreForwarder(t *testing.T) { forwarder, err := createForwarder(t) @@ -94,5 +94,5 @@ func TestV2TrapAreForwarder(t *testing.T) { require.True(t, ok) forwarder.trapsIn <- makeSnmpPacket(NetSNMPExampleHeartbeatNotification) forwarder.Stop() - sender.AssertEventPlatformEvent(t, "0dee7422f503d972db97b711e39a5003d1995c0d2f718542813acc4c46053ef0", epforwarder.EventTypeSnmpTraps) + sender.AssertEventPlatformEvent(t, []byte("0dee7422f503d972db97b711e39a5003d1995c0d2f718542813acc4c46053ef0"), epforwarder.EventTypeSnmpTraps) } diff --git a/rtloader/common/builtins/aggregator.c b/rtloader/common/builtins/aggregator.c index 2ecd7a690daf2..3ee1b6b56678e 100644 --- a/rtloader/common/builtins/aggregator.c +++ b/rtloader/common/builtins/aggregator.c @@ -425,15 +425,22 @@ static PyObject *submit_event_platform_event(PyObject *self, PyObject *args) PyObject *check = NULL; char *check_id = NULL; - char *raw_event = NULL; + char *raw_event_ptr = NULL; + Py_ssize_t raw_event_sz = 0; char *event_type = NULL; - if (!PyArg_ParseTuple(args, "Osss", &check, &check_id, &raw_event, &event_type)) { + if (!PyArg_ParseTuple(args, "Oss#s", &check, &check_id, &raw_event_ptr, &raw_event_sz, &event_type)) { PyGILState_Release(gstate); return NULL; } - cb_submit_event_platform_event(check_id, raw_event, event_type); + if (raw_event_sz > INT_MAX) { + PyErr_SetString(PyExc_ValueError, "event is too large"); + PyGILState_Release(gstate); + return NULL; + } + + cb_submit_event_platform_event(check_id, raw_event_ptr, raw_event_sz, event_type); PyGILState_Release(gstate); Py_RETURN_NONE; } diff --git a/rtloader/common/builtins/aggregator.h b/rtloader/common/builtins/aggregator.h index 29451bcc3b2ba..6097239ee676d 100644 --- a/rtloader/common/builtins/aggregator.h +++ b/rtloader/common/builtins/aggregator.h @@ -64,6 +64,7 @@ The callback is expected to be provided by the rtloader caller - in go-context: CGO. */ +#define PY_SSIZE_T_CLEAN #include #include diff --git a/rtloader/include/rtloader_types.h b/rtloader/include/rtloader_types.h index 681afbba54840..3f51959b596ba 100644 --- a/rtloader/include/rtloader_types.h +++ b/rtloader/include/rtloader_types.h @@ -105,7 +105,7 @@ typedef void (*cb_submit_event_t)(char *, event_t *); // (id, metric_name, value, lower_bound, upper_bound, monotonic, hostname, tags, flush_first_value) typedef void (*cb_submit_histogram_bucket_t)(char *, char *, long long, float, float, int, char *, char **, bool); // (id, event, event_type) -typedef void (*cb_submit_event_platform_event_t)(char *, char *, char *); +typedef void (*cb_submit_event_platform_event_t)(char *, char *, int, char *); // datadog_agent // diff --git a/rtloader/test/aggregator/aggregator.go b/rtloader/test/aggregator/aggregator.go index 58d98c3561533..9bd0c9dbf8ce1 100644 --- a/rtloader/test/aggregator/aggregator.go +++ b/rtloader/test/aggregator/aggregator.go @@ -25,7 +25,7 @@ extern void submitMetric(char *, metric_type_t, char *, double, char **, char *, extern void submitServiceCheck(char *, char *, int, char **, char *, char *); extern void submitEvent(char*, event_t*); extern void submitHistogramBucket(char *, char *, long long, float, float, int, char *, char **, bool); -extern void submitEventPlatformEvent(char *, char *, char *); +extern void submitEventPlatformEvent(char *, char *, int, char *); static void initAggregatorTests(rtloader_t *rtloader) { set_submit_metric_cb(rtloader, submitMetric); @@ -49,7 +49,7 @@ var ( scLevel int scName string scMessage string - rawEvent string + rawEvent []byte eventType string _event *event intValue int @@ -236,8 +236,8 @@ func submitHistogramBucket(id *C.char, cMetricName *C.char, cVal C.longlong, cLo } //export submitEventPlatformEvent -func submitEventPlatformEvent(id *C.char, _rawEvent *C.char, _eventType *C.char) { +func submitEventPlatformEvent(id *C.char, _rawEventPtr *C.char, _rawEventSize C.int, _eventType *C.char) { checkID = C.GoString(id) - rawEvent = C.GoString(_rawEvent) + rawEvent = C.GoBytes(unsafe.Pointer(_rawEventPtr), _rawEventSize) eventType = C.GoString(_eventType) }