From 6f4e092e9f012a65afc50d2871fa44a328d4bcfa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=A9na=C3=AFc=20Huard?= Date: Thu, 5 Jan 2023 14:38:07 +0100 Subject: [PATCH 1/2] Add the plumbing in the agent forwarder to submit container images and SBOM --- LICENSE-3rdparty.csv | 3 + go.mod | 2 +- go.sum | 4 +- pkg/aggregator/aggregator.go | 86 +++++++++++++ pkg/aggregator/demultiplexer_agent.go | 60 ++++++++- pkg/aggregator/demultiplexer_senders.go | 2 + pkg/aggregator/demultiplexer_serverless.go | 2 +- pkg/aggregator/mocksender/asserts.go | 12 ++ pkg/aggregator/mocksender/mocked_methods.go | 10 ++ pkg/aggregator/sender.go | 27 ++++ pkg/aggregator/sender_test.go | 10 +- pkg/config/config.go | 10 ++ pkg/containerimage/forwarder.go | 59 +++++++++ pkg/forwarder/endpoints/endpoints.go | 4 + pkg/forwarder/forwarder.go | 14 +++ pkg/forwarder/forwarder_test.go | 12 ++ pkg/forwarder/noop_forwarder.go | 10 ++ pkg/forwarder/sync_forwarder.go | 10 ++ pkg/forwarder/test_common.go | 10 ++ .../serializerexporter/consumer_test.go | 8 ++ pkg/sbom/forwarder.go | 59 +++++++++ pkg/serializer/serializer.go | 102 +++++++++++++-- pkg/serializer/serializer_test.go | 118 ++++++++++++++++-- pkg/serializer/test_common.go | 16 ++- pkg/serializer/types_contimage.go | 13 ++ pkg/serializer/types_sbom.go | 13 ++ 26 files changed, 649 insertions(+), 27 deletions(-) create mode 100644 pkg/containerimage/forwarder.go create mode 100644 pkg/sbom/forwarder.go create mode 100644 pkg/serializer/types_contimage.go create mode 100644 pkg/serializer/types_sbom.go diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index c85d873adda92..a795fb2781478 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -26,9 +26,12 @@ core,code.cloudfoundry.org/lager,Apache-2.0,"Copyright (c) 2016-Present CloudFou core,code.cloudfoundry.org/tlsconfig,Apache-2.0,"Copyright (c) 2016-Present CloudFoundry.org Foundation, Inc. All Rights Reserved." core,contrib.go.opencensus.io/exporter/prometheus,Apache-2.0,"Copyright 2020, OpenCensus Authors" core,github.com/AlekSi/pointer,MIT,Copyright (c) 2015 Alexey Palazhchenko +core,github.com/DataDog/agent-payload/v5/contimage,BSD-3-Clause,"Copyright (c) 2017, Datadog, Inc" core,github.com/DataDog/agent-payload/v5/contlcycle,BSD-3-Clause,"Copyright (c) 2017, Datadog, Inc" +core,github.com/DataDog/agent-payload/v5/cyclonedx_v1_4,BSD-3-Clause,"Copyright (c) 2017, Datadog, Inc" core,github.com/DataDog/agent-payload/v5/gogen,BSD-3-Clause,"Copyright (c) 2017, Datadog, Inc" core,github.com/DataDog/agent-payload/v5/process,BSD-3-Clause,"Copyright (c) 2017, Datadog, Inc" +core,github.com/DataDog/agent-payload/v5/sbom,BSD-3-Clause,"Copyright (c) 2017, Datadog, Inc" core,github.com/DataDog/aptly/aptly,MIT,"* Alexander Guy (https://github.com/alexanderguy) | * Andre Roth (https://github.com/neolynx) | * Andrea Bernardo Ciddio (https://github.com/bcandrea) | * Andrey Smirnov (https://github.com/smira) | * Artem Smirnov (https://github.com/urpylka) | * Benj Fassbind (https://github.com/randombenj) | * Benoit Foucher (https://github.com/bentoi) | * Charles Hsu (https://github.com/charz) | * Chris Read (https://github.com/cread) | * Chuan Liu (https://github.com/chuan) | * Clemens Rabe (https://github.com/seeraven) | * Dmitrii Kashin (https://github.com/freehck) | * Don Kuntz (https://github.com/dkuntz2) | * Geoffrey Thomas (https://github.com/geofft) | * Harald Sitter (https://github.com/apachelogger) | * Ivan Kurnosov (https://github.com/zerkms) | * Johannes Layher (https://github.com/jola5) | * Joshua Colson (https://github.com/freakinhippie) | * Lorenzo Bolla (https://github.com/lbolla) | * Ludovico Cavedon (https://github.com/cavedon) | * Markus Muellner (https://github.com/mmianl) | * Matt Martyn (https://github.com/MMartyn) | * Maximilian Stein (https://github.com/steinymity) | * Michael Koval (https://github.com/mkoval) | * Nabil Bendafi (https://github.com/nabilbendafi) | * Oliver Sauder (https://github.com/sliverc) | * Paul Krohn (https://github.com/paul-krohn) | * Petr Jediny (https://github.com/pjediny) | * Phil Frost (https://github.com/bitglue) | * Raphael Medaer (https://github.com/rmedaer) | * Raul Benencia (https://github.com/rul) | * Rohan Garg (https://github.com/shadeslayer) | * Russ Allbery (https://github.com/rra) | * Russell Greene (https://github.com/russelltg) | * Ryan Uber (https://github.com/ryanuber) | * Samuel Mutel (https://github.com/smutel) | * Sebastien Badia (https://github.com/sbadia) | * Sebastien Binet (https://github.com/sbinet) | * Shengjing Zhu (https://github.com/zhsj) | * Simon Aquino (https://github.com/queeno) | * Strajan Sebastian (https://github.com/strajansebastian) | * Sylvain Baubeau (https://github.com/lebauce) | * Szymon Sobik (https://github.com/sobczyk) | * TJ Merritt (https://github.com/tjmerritt) | * Vincent Batoufflet (https://github.com/vbatoufflet) | * Vincent Bernat (https://github.com/vincentbernat) | * William Manley (https://github.com/wmanley) | * x539 (https://github.com/x539) | Copyright 2013-2015 aptly authors. All rights reserved | List of contributors, in chronological order:" core,github.com/DataDog/aptly/database,MIT,"* Alexander Guy (https://github.com/alexanderguy) | * Andre Roth (https://github.com/neolynx) | * Andrea Bernardo Ciddio (https://github.com/bcandrea) | * Andrey Smirnov (https://github.com/smira) | * Artem Smirnov (https://github.com/urpylka) | * Benj Fassbind (https://github.com/randombenj) | * Benoit Foucher (https://github.com/bentoi) | * Charles Hsu (https://github.com/charz) | * Chris Read (https://github.com/cread) | * Chuan Liu (https://github.com/chuan) | * Clemens Rabe (https://github.com/seeraven) | * Dmitrii Kashin (https://github.com/freehck) | * Don Kuntz (https://github.com/dkuntz2) | * Geoffrey Thomas (https://github.com/geofft) | * Harald Sitter (https://github.com/apachelogger) | * Ivan Kurnosov (https://github.com/zerkms) | * Johannes Layher (https://github.com/jola5) | * Joshua Colson (https://github.com/freakinhippie) | * Lorenzo Bolla (https://github.com/lbolla) | * Ludovico Cavedon (https://github.com/cavedon) | * Markus Muellner (https://github.com/mmianl) | * Matt Martyn (https://github.com/MMartyn) | * Maximilian Stein (https://github.com/steinymity) | * Michael Koval (https://github.com/mkoval) | * Nabil Bendafi (https://github.com/nabilbendafi) | * Oliver Sauder (https://github.com/sliverc) | * Paul Krohn (https://github.com/paul-krohn) | * Petr Jediny (https://github.com/pjediny) | * Phil Frost (https://github.com/bitglue) | * Raphael Medaer (https://github.com/rmedaer) | * Raul Benencia (https://github.com/rul) | * Rohan Garg (https://github.com/shadeslayer) | * Russ Allbery (https://github.com/rra) | * Russell Greene (https://github.com/russelltg) | * Ryan Uber (https://github.com/ryanuber) | * Samuel Mutel (https://github.com/smutel) | * Sebastien Badia (https://github.com/sbadia) | * Sebastien Binet (https://github.com/sbinet) | * Shengjing Zhu (https://github.com/zhsj) | * Simon Aquino (https://github.com/queeno) | * Strajan Sebastian (https://github.com/strajansebastian) | * Sylvain Baubeau (https://github.com/lebauce) | * Szymon Sobik (https://github.com/sobczyk) | * TJ Merritt (https://github.com/tjmerritt) | * Vincent Batoufflet (https://github.com/vbatoufflet) | * Vincent Bernat (https://github.com/vincentbernat) | * William Manley (https://github.com/wmanley) | * x539 (https://github.com/x539) | Copyright 2013-2015 aptly authors. All rights reserved | List of contributors, in chronological order:" core,github.com/DataDog/aptly/database/goleveldb,MIT,"* Alexander Guy (https://github.com/alexanderguy) | * Andre Roth (https://github.com/neolynx) | * Andrea Bernardo Ciddio (https://github.com/bcandrea) | * Andrey Smirnov (https://github.com/smira) | * Artem Smirnov (https://github.com/urpylka) | * Benj Fassbind (https://github.com/randombenj) | * Benoit Foucher (https://github.com/bentoi) | * Charles Hsu (https://github.com/charz) | * Chris Read (https://github.com/cread) | * Chuan Liu (https://github.com/chuan) | * Clemens Rabe (https://github.com/seeraven) | * Dmitrii Kashin (https://github.com/freehck) | * Don Kuntz (https://github.com/dkuntz2) | * Geoffrey Thomas (https://github.com/geofft) | * Harald Sitter (https://github.com/apachelogger) | * Ivan Kurnosov (https://github.com/zerkms) | * Johannes Layher (https://github.com/jola5) | * Joshua Colson (https://github.com/freakinhippie) | * Lorenzo Bolla (https://github.com/lbolla) | * Ludovico Cavedon (https://github.com/cavedon) | * Markus Muellner (https://github.com/mmianl) | * Matt Martyn (https://github.com/MMartyn) | * Maximilian Stein (https://github.com/steinymity) | * Michael Koval (https://github.com/mkoval) | * Nabil Bendafi (https://github.com/nabilbendafi) | * Oliver Sauder (https://github.com/sliverc) | * Paul Krohn (https://github.com/paul-krohn) | * Petr Jediny (https://github.com/pjediny) | * Phil Frost (https://github.com/bitglue) | * Raphael Medaer (https://github.com/rmedaer) | * Raul Benencia (https://github.com/rul) | * Rohan Garg (https://github.com/shadeslayer) | * Russ Allbery (https://github.com/rra) | * Russell Greene (https://github.com/russelltg) | * Ryan Uber (https://github.com/ryanuber) | * Samuel Mutel (https://github.com/smutel) | * Sebastien Badia (https://github.com/sbadia) | * Sebastien Binet (https://github.com/sbinet) | * Shengjing Zhu (https://github.com/zhsj) | * Simon Aquino (https://github.com/queeno) | * Strajan Sebastian (https://github.com/strajansebastian) | * Sylvain Baubeau (https://github.com/lebauce) | * Szymon Sobik (https://github.com/sobczyk) | * TJ Merritt (https://github.com/tjmerritt) | * Vincent Batoufflet (https://github.com/vbatoufflet) | * Vincent Bernat (https://github.com/vincentbernat) | * William Manley (https://github.com/wmanley) | * x539 (https://github.com/x539) | Copyright 2013-2015 aptly authors. All rights reserved | List of contributors, in chronological order:" diff --git a/go.mod b/go.mod index 68a1496483af6..361bc78405eb7 100644 --- a/go.mod +++ b/go.mod @@ -41,7 +41,7 @@ require ( code.cloudfoundry.org/bbs v0.0.0-20200403215808-d7bc971db0db code.cloudfoundry.org/garden v0.0.0-20210208153517-580cadd489d2 code.cloudfoundry.org/lager v2.0.0+incompatible - github.com/DataDog/agent-payload/v5 v5.0.52 + github.com/DataDog/agent-payload/v5 v5.0.61 github.com/DataDog/datadog-agent/pkg/obfuscate v0.42.0-rc.3 github.com/DataDog/datadog-agent/pkg/otlp/model v0.42.0-rc.3 github.com/DataDog/datadog-agent/pkg/quantile v0.42.0-rc.3 diff --git a/go.sum b/go.sum index d0b87e9a4fcf3..eda175ad98f42 100644 --- a/go.sum +++ b/go.sum @@ -119,8 +119,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/toml v1.1.0 h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I= github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= -github.com/DataDog/agent-payload/v5 v5.0.52 h1:7v/FikpNGLjp/aSly39Mw6aKQDoxBnA7JyV4rCvrBns= -github.com/DataDog/agent-payload/v5 v5.0.52/go.mod h1:vhXPNG7eNDOLeW94Z7dLNUBv61kRBHK7vXnQ+RQfckk= +github.com/DataDog/agent-payload/v5 v5.0.61 h1:3HC4B1NpHgAedZHmM9/oCJvFo6pu/ugDAjrISK5AJsk= +github.com/DataDog/agent-payload/v5 v5.0.61/go.mod h1:oQZi1VZp1e3QvlSUX4iphZCpJaFepUxWq0hNXxihKBM= github.com/DataDog/aptly v1.5.0 h1:Oy6JVRC9iDgnmpeVYa4diXwP/exU7wJ/U1kuI4Zacxg= github.com/DataDog/aptly v1.5.0/go.mod h1:KVyvkYXGcFugxadGFZ+u5Fc3M6q/EfzFZyeTD9HVbkY= github.com/DataDog/cast v1.3.1-0.20190301154711-1ee8c8bd14a3 h1:SobA9WYm4K/MUtWlbKaomWTmnuYp1KhIm8Wlx3vmpsg= diff --git a/pkg/aggregator/aggregator.go b/pkg/aggregator/aggregator.go index 5283c3ef0dc61..ccbd54f1d2c7f 100644 --- a/pkg/aggregator/aggregator.go +++ b/pkg/aggregator/aggregator.go @@ -124,6 +124,10 @@ var ( aggregatorEventPlatformEventsErrors = expvar.Map{} aggregatorContainerLifecycleEvents = expvar.Int{} aggregatorContainerLifecycleEventsErrors = expvar.Int{} + aggregatorContainerImages = expvar.Int{} + aggregatorContainerImagesErrors = expvar.Int{} + aggregatorSBOM = expvar.Int{} + aggregatorSBOMErrors = expvar.Int{} tlmFlush = telemetry.NewCounter("aggregator", "flush", []string{"data_type", "state"}, "Number of metrics/service checks/events flushed") @@ -179,6 +183,10 @@ func init() { aggregatorExpvars.Set("EventPlatformEventsErrors", &aggregatorEventPlatformEventsErrors) aggregatorExpvars.Set("ContainerLifecycleEvents", &aggregatorContainerLifecycleEvents) aggregatorExpvars.Set("ContainerLifecycleEventsErrors", &aggregatorContainerLifecycleEventsErrors) + aggregatorExpvars.Set("ContainerImages", &aggregatorContainerImages) + aggregatorExpvars.Set("ContainerImagesErrors", &aggregatorContainerImagesErrors) + aggregatorExpvars.Set("SBOM", &aggregatorSBOM) + aggregatorExpvars.Set("SBOMErrors", &aggregatorSBOMErrors) contextsByMtypeMap := expvar.Map{} aggregatorDogstatsdContextsByMtype = make([]expvar.Int, int(metrics.NumMetricTypes)) @@ -212,6 +220,16 @@ type BufferedAggregator struct { contLcycleStopper chan struct{} contLcycleDequeueOnce sync.Once + contImageIn chan senderContainerImage + contImageBuffer chan senderContainerImage + contImageStopper chan struct{} + contImageDequeueOnce sync.Once + + sbomIn chan senderSBOM + sbomBuffer chan senderSBOM + sbomStopper chan struct{} + sbomDequeueOnce sync.Once + // metricSamplePool is a pool of slices of metric sample to avoid allocations. // Used by the Dogstatsd Batcher. MetricSamplePool *metrics.MetricSamplePool @@ -290,6 +308,14 @@ func NewBufferedAggregator(s serializer.MetricSerializer, eventPlatformForwarder contLcycleBuffer: make(chan senderContainerLifecycleEvent, bufferSize), contLcycleStopper: make(chan struct{}), + contImageIn: make(chan senderContainerImage, bufferSize), + contImageBuffer: make(chan senderContainerImage, bufferSize), + contImageStopper: make(chan struct{}), + + sbomIn: make(chan senderSBOM, bufferSize), + sbomBuffer: make(chan senderSBOM, bufferSize), + sbomStopper: make(chan struct{}), + tagsStore: tagsStore, checkSamplers: make(map[check.ID]*CheckSampler), flushInterval: flushInterval, @@ -785,6 +811,12 @@ func (agg *BufferedAggregator) run() { case event := <-agg.contLcycleIn: aggregatorContainerLifecycleEvents.Add(1) agg.handleContainerLifecycleEvent(event) + case event := <-agg.contImageIn: + aggregatorContainerImages.Add(1) + agg.handleContainerImage(event) + case event := <-agg.sbomIn: + aggregatorSBOM.Add(1) + agg.handleSBOM(event) } } } @@ -805,6 +837,38 @@ func (agg *BufferedAggregator) dequeueContainerLifecycleEvents() { } } +// dequeueContainerImages consumes buffered container image. +// It is blocking so it should be started in its own routine and only one instance should be started. +func (agg *BufferedAggregator) dequeueContainerImages() { + for { + select { + case event := <-agg.contImageBuffer: + if err := agg.serializer.SendContainerImage(event.msgs, agg.hostname); err != nil { + aggregatorContainerImagesErrors.Add(1) + log.Warnf("Error submitting container image data: %v", err) + } + case <-agg.contImageStopper: + return + } + } +} + +// dequeueSBOM consumes buffered SBOM. +// It is blocking so it should be started in its own routine and only one instance should be started. +func (agg *BufferedAggregator) dequeueSBOM() { + for { + select { + case event := <-agg.sbomBuffer: + if err := agg.serializer.SendSBOM(event.msgs, agg.hostname); err != nil { + aggregatorSBOMErrors.Add(1) + log.Warnf("Error submitting SBOM data: %v", err) + } + case <-agg.sbomStopper: + return + } + } +} + // handleContainerLifecycleEvent forwards container lifecycle events to the buffering channel. func (agg *BufferedAggregator) handleContainerLifecycleEvent(event senderContainerLifecycleEvent) { select { @@ -816,6 +880,28 @@ func (agg *BufferedAggregator) handleContainerLifecycleEvent(event senderContain } } +// handleContainerImage forwards container image to the buffering channel. +func (agg *BufferedAggregator) handleContainerImage(event senderContainerImage) { + select { + case agg.contImageBuffer <- event: + return + default: + aggregatorContainerImagesErrors.Add(1) + log.Warn("Container image channel is full") + } +} + +// handleSBOM forwards SBOM to the buffering channel. +func (agg *BufferedAggregator) handleSBOM(event senderSBOM) { + select { + case agg.sbomBuffer <- event: + return + default: + aggregatorSBOMErrors.Add(1) + log.Warn("SBOM channel is full") + } +} + // tags returns the list of tags that should be added to the agent telemetry metrics // Container agent tags may be missing in the first seconds after agent startup func (agg *BufferedAggregator) tags(withVersion bool) []string { diff --git a/pkg/aggregator/demultiplexer_agent.go b/pkg/aggregator/demultiplexer_agent.go index f0e43925318c8..0d035f84edc5d 100644 --- a/pkg/aggregator/demultiplexer_agent.go +++ b/pkg/aggregator/demultiplexer_agent.go @@ -12,10 +12,12 @@ import ( "github.com/DataDog/datadog-agent/pkg/aggregator/internal/tags" "github.com/DataDog/datadog-agent/pkg/config" + "github.com/DataDog/datadog-agent/pkg/containerimage" "github.com/DataDog/datadog-agent/pkg/containerlifecycle" "github.com/DataDog/datadog-agent/pkg/epforwarder" "github.com/DataDog/datadog-agent/pkg/forwarder" "github.com/DataDog/datadog-agent/pkg/metrics" + "github.com/DataDog/datadog-agent/pkg/sbom" "github.com/DataDog/datadog-agent/pkg/serializer" "github.com/DataDog/datadog-agent/pkg/util/log" ) @@ -61,6 +63,8 @@ type AgentDemultiplexerOptions struct { UseEventPlatformForwarder bool UseOrchestratorForwarder bool UseContainerLifecycleForwarder bool + UseContainerImageForwarder bool + UseSBOMForwarder bool FlushInterval time.Duration EnableNoAggregationPipeline bool @@ -83,6 +87,8 @@ func DefaultAgentDemultiplexerOptions(options *forwarder.Options) AgentDemultipl UseNoopEventPlatformForwarder: false, UseNoopOrchestratorForwarder: false, UseContainerLifecycleForwarder: false, + UseContainerImageForwarder: false, + UseSBOMForwarder: false, // the different agents/binaries enable it on a per-need basis EnableNoAggregationPipeline: false, } @@ -108,6 +114,8 @@ type forwarders struct { orchestrator forwarder.Forwarder eventPlatform epforwarder.EventPlatformForwarder containerLifecycle *forwarder.DefaultForwarder + containerImage *forwarder.DefaultForwarder + sbom *forwarder.DefaultForwarder } type dataOutputs struct { @@ -162,6 +170,18 @@ func initAgentDemultiplexer(options AgentDemultiplexerOptions, hostname string) containerLifecycleForwarder = containerlifecycle.NewForwarder() } + // setup the container image forwarder + var containerImageForwarder *forwarder.DefaultForwarder + if options.UseContainerImageForwarder { + containerImageForwarder = containerimage.NewForwarder() + } + + // setup the SBOM forwarder + var sbomForwarder *forwarder.DefaultForwarder + if options.UseSBOMForwarder { + sbomForwarder = sbom.NewForwarder() + } + var sharedForwarder forwarder.Forwarder if options.UseNoopForwarder { sharedForwarder = forwarder.NoopForwarder{} @@ -172,7 +192,7 @@ func initAgentDemultiplexer(options AgentDemultiplexerOptions, hostname string) // prepare the serializer // ---------------------- - sharedSerializer := serializer.NewSerializer(sharedForwarder, orchestratorForwarder, containerLifecycleForwarder) + sharedSerializer := serializer.NewSerializer(sharedForwarder, orchestratorForwarder, containerLifecycleForwarder, containerImageForwarder, sbomForwarder) // prepare the embedded aggregator // -- @@ -204,7 +224,7 @@ func initAgentDemultiplexer(options AgentDemultiplexerOptions, hostname string) var noAggWorker *noAggregationStreamWorker var noAggSerializer serializer.MetricSerializer if options.EnableNoAggregationPipeline { - noAggSerializer = serializer.NewSerializer(sharedForwarder, orchestratorForwarder, containerLifecycleForwarder) + noAggSerializer = serializer.NewSerializer(sharedForwarder, orchestratorForwarder, containerLifecycleForwarder, containerImageForwarder, sbomForwarder) noAggWorker = newNoAggregationStreamWorker( config.Datadog.GetInt("dogstatsd_no_aggregation_pipeline_batch_size"), noAggSerializer, @@ -230,6 +250,8 @@ func initAgentDemultiplexer(options AgentDemultiplexerOptions, hostname string) orchestrator: orchestratorForwarder, eventPlatform: eventPlatformForwarder, containerLifecycle: containerLifecycleForwarder, + containerImage: containerImageForwarder, + sbom: sbomForwarder, }, sharedSerializer: sharedSerializer, @@ -309,6 +331,24 @@ func (d *AgentDemultiplexer) Run() { log.Debug("not starting the container lifecycle forwarder") } + // container image forwarder + if d.forwarders.containerImage != nil { + if err := d.forwarders.containerImage.Start(); err != nil { + log.Errorf("error starting container image forwarder: %v", err) + } + } else { + log.Debug("no starting the container image forwarder") + } + + // sbom forwarder + if d.forwarders.sbom != nil { + if err := d.forwarders.sbom.Start(); err != nil { + log.Errorf("error starting SBOM forwarder: %v", err) + } + } else { + log.Debug("no starting the SBOM forwarder") + } + // shared forwarder if d.forwarders.shared != nil { d.forwarders.shared.Start() //nolint:errcheck @@ -322,6 +362,14 @@ func (d *AgentDemultiplexer) Run() { d.aggregator.contLcycleDequeueOnce.Do(func() { go d.aggregator.dequeueContainerLifecycleEvents() }) } + if d.options.UseContainerImageForwarder { + d.aggregator.contImageDequeueOnce.Do(func() { go d.aggregator.dequeueContainerImages() }) + } + + if d.options.UseSBOMForwarder { + d.aggregator.sbomDequeueOnce.Do(func() { go d.aggregator.dequeueSBOM() }) + } + for _, w := range d.statsd.workers { go w.run() } @@ -417,6 +465,14 @@ func (d *AgentDemultiplexer) Stop(flush bool) { d.dataOutputs.forwarders.containerLifecycle.Stop() d.dataOutputs.forwarders.containerLifecycle = nil } + if d.dataOutputs.forwarders.containerImage != nil { + d.dataOutputs.forwarders.containerImage.Stop() + d.dataOutputs.forwarders.containerImage = nil + } + if d.dataOutputs.forwarders.sbom != nil { + d.dataOutputs.forwarders.sbom.Stop() + d.dataOutputs.forwarders.sbom = nil + } if d.dataOutputs.forwarders.shared != nil { d.dataOutputs.forwarders.shared.Stop() d.dataOutputs.forwarders.shared = nil diff --git a/pkg/aggregator/demultiplexer_senders.go b/pkg/aggregator/demultiplexer_senders.go index 34ddcf68435a3..4fcbb3e3753dc 100644 --- a/pkg/aggregator/demultiplexer_senders.go +++ b/pkg/aggregator/demultiplexer_senders.go @@ -74,6 +74,8 @@ func (s *senders) GetDefaultSender() (Sender, error) { s.agg.orchestratorManifestIn, s.agg.eventPlatformIn, s.agg.contLcycleIn, + s.agg.contImageIn, + s.agg.sbomIn, ) }) return s.defaultSender, nil diff --git a/pkg/aggregator/demultiplexer_serverless.go b/pkg/aggregator/demultiplexer_serverless.go index 80ae33cfec516..4742b3e068cb8 100644 --- a/pkg/aggregator/demultiplexer_serverless.go +++ b/pkg/aggregator/demultiplexer_serverless.go @@ -39,7 +39,7 @@ type ServerlessDemultiplexer struct { func InitAndStartServerlessDemultiplexer(domainResolvers map[string]resolver.DomainResolver, forwarderTimeout time.Duration) *ServerlessDemultiplexer { bufferSize := config.Datadog.GetInt("aggregator_buffer_size") forwarder := forwarder.NewSyncForwarder(domainResolvers, forwarderTimeout) - serializer := serializer.NewSerializer(forwarder, nil, nil) + serializer := serializer.NewSerializer(forwarder, nil, nil, nil, nil) metricSamplePool := metrics.NewMetricSamplePool(MetricSamplePoolBatchSize) tagsStore := tags.NewStore(config.Datadog.GetBool("aggregator_use_tags_store"), "timesampler") diff --git a/pkg/aggregator/mocksender/asserts.go b/pkg/aggregator/mocksender/asserts.go index d450d0f697ff9..79273e836df88 100644 --- a/pkg/aggregator/mocksender/asserts.go +++ b/pkg/aggregator/mocksender/asserts.go @@ -12,6 +12,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/DataDog/agent-payload/v5/contimage" + "github.com/DataDog/agent-payload/v5/sbom" "github.com/DataDog/datadog-agent/pkg/metrics" ) @@ -70,6 +72,16 @@ func (m *MockSender) AssertEventPlatformEvent(t *testing.T, expectedRawEvent str return m.Mock.AssertCalled(t, "EventPlatformEvent", expectedRawEvent, expectedEventType) } +// AssertContainerImage assert the expected event was emitted with the following values +func (m *MockSender) AssertContainerImage(t *testing.T, expectedContainerImages []contimage.ContainerImagePayload) bool { + return m.Mock.AssertCalled(t, "ContainerImage", expectedContainerImages) +} + +// AssertContainerImage assert the expected event was emitted with the following values +func (m *MockSender) AssertSBOM(t *testing.T, expectedSBOM []sbom.SBOMPayload) bool { + return m.Mock.AssertCalled(t, "SBOM", expectedSBOM) +} + // AssertEventMissing assert the expectedEvent was never emitted with the following values: // AggregationKey, Priority, SourceTypeName, EventType, Host and a Ts range weighted with the parameter allowedDelta func (m *MockSender) AssertEventMissing(t *testing.T, expectedEvent metrics.Event, allowedDelta time.Duration) bool { diff --git a/pkg/aggregator/mocksender/mocked_methods.go b/pkg/aggregator/mocksender/mocked_methods.go index 85a463bf649b7..065616e7d60a7 100644 --- a/pkg/aggregator/mocksender/mocked_methods.go +++ b/pkg/aggregator/mocksender/mocked_methods.go @@ -117,6 +117,16 @@ func (m *MockSender) ContainerLifecycleEvent(msgs []serializer.ContainerLifecycl m.Called(msgs) } +// ContainerImage submit container image messages +func (m *MockSender) ContainerImage(msgs []serializer.ContainerImageMessage) { + m.Called(msgs) +} + +// SBOM submit container image messages +func (m *MockSender) SBOM(msgs []serializer.SBOMMessage) { + m.Called(msgs) +} + // OrchestratorManifest submit orchestrator manifest messages func (m *MockSender) OrchestratorManifest(msgs []serializer.ProcessMessageBody, clusterID string) { m.Called(msgs, clusterID) diff --git a/pkg/aggregator/sender.go b/pkg/aggregator/sender.go index 8a4acd1e31d91..97b33c50f20eb 100644 --- a/pkg/aggregator/sender.go +++ b/pkg/aggregator/sender.go @@ -41,6 +41,8 @@ type Sender interface { OrchestratorMetadata(msgs []serializer.ProcessMessageBody, clusterID string, nodeType int) OrchestratorManifest(msgs []serializer.ProcessMessageBody, clusterID string) ContainerLifecycleEvent(msgs []serializer.ContainerLifecycleMessage) + ContainerImage(msgs []serializer.ContainerImageMessage) + SBOM(msgs []serializer.SBOMMessage) } // RawSender interface to submit samples to aggregator directly @@ -64,6 +66,8 @@ type checkSender struct { orchestratorMetadataOut chan<- senderOrchestratorMetadata orchestratorManifestOut chan<- senderOrchestratorManifest contlcycleOut chan<- senderContainerLifecycleEvent + contimageOut chan<- senderContainerImage + sbomOut chan<- senderSBOM eventPlatformOut chan<- senderEventPlatformEvent checkTags []string service string @@ -109,6 +113,14 @@ type senderContainerLifecycleEvent struct { msgs []serializer.ContainerLifecycleMessage } +type senderContainerImage struct { + msgs []serializer.ContainerImageMessage +} + +type senderSBOM struct { + msgs []serializer.SBOMMessage +} + type senderOrchestratorManifest struct { msgs []serializer.ProcessMessageBody clusterID string @@ -130,6 +142,8 @@ func newCheckSender( orchestratorManifestOut chan<- senderOrchestratorManifest, eventPlatformOut chan<- senderEventPlatformEvent, contlcycleOut chan<- senderContainerLifecycleEvent, + contimageOut chan<- senderContainerImage, + sbomOut chan<- senderSBOM, ) *checkSender { return &checkSender{ id: id, @@ -143,6 +157,8 @@ func newCheckSender( orchestratorManifestOut: orchestratorManifestOut, eventPlatformOut: eventPlatformOut, contlcycleOut: contlcycleOut, + contimageOut: contimageOut, + sbomOut: sbomOut, } } @@ -434,10 +450,19 @@ func (s *checkSender) OrchestratorManifest(msgs []serializer.ProcessMessageBody, } s.orchestratorManifestOut <- om } + func (s *checkSender) ContainerLifecycleEvent(msgs []serializer.ContainerLifecycleMessage) { s.contlcycleOut <- senderContainerLifecycleEvent{msgs: msgs} } +func (s *checkSender) ContainerImage(msgs []serializer.ContainerImageMessage) { + s.contimageOut <- senderContainerImage{msgs: msgs} +} + +func (s *checkSender) SBOM(msgs []serializer.SBOMMessage) { + s.sbomOut <- senderSBOM{msgs: msgs} +} + func (sp *checkSenderPool) getSender(id check.ID) (Sender, error) { sp.m.Lock() defer sp.m.Unlock() @@ -463,6 +488,8 @@ func (sp *checkSenderPool) mkSender(id check.ID) (Sender, error) { sp.agg.orchestratorManifestIn, sp.agg.eventPlatformIn, sp.agg.contLcycleIn, + sp.agg.contImageIn, + sp.agg.sbomIn, ) sp.senders[id] = sender return sender, err diff --git a/pkg/aggregator/sender_test.go b/pkg/aggregator/sender_test.go index 462368ab56f5e..1fa8e15fcc070 100644 --- a/pkg/aggregator/sender_test.go +++ b/pkg/aggregator/sender_test.go @@ -28,6 +28,8 @@ type senderWithChans struct { orchestratorManifestChan chan senderOrchestratorManifest eventPlatformEventChan chan senderEventPlatformEvent contlcycleOut chan senderContainerLifecycleEvent + contimageOut chan senderContainerImage + sbomOut chan senderSBOM sender *checkSender } @@ -39,7 +41,9 @@ func initSender(id check.ID, defaultHostname string) (s senderWithChans) { s.orchestratorManifestChan = make(chan senderOrchestratorManifest, 10) s.eventPlatformEventChan = make(chan senderEventPlatformEvent, 10) s.contlcycleOut = make(chan senderContainerLifecycleEvent, 10) - s.sender = newCheckSender(id, defaultHostname, s.itemChan, s.serviceCheckChan, s.eventChan, s.orchestratorChan, s.orchestratorManifestChan, s.eventPlatformEventChan, s.contlcycleOut) + s.contimageOut = make(chan senderContainerImage, 10) + s.sbomOut = make(chan senderSBOM, 10) + s.sender = newCheckSender(id, defaultHostname, s.itemChan, s.serviceCheckChan, s.eventChan, s.orchestratorChan, s.orchestratorManifestChan, s.eventPlatformEventChan, s.contlcycleOut, s.contimageOut, s.sbomOut) return s } @@ -175,7 +179,9 @@ func TestGetAndSetSender(t *testing.T) { orchestratorManifestChan := make(chan senderOrchestratorManifest, 10) eventPlatformChan := make(chan senderEventPlatformEvent, 10) contlcycleChan := make(chan senderContainerLifecycleEvent, 10) - testCheckSender := newCheckSender(checkID1, "", itemChan, serviceCheckChan, eventChan, orchestratorChan, orchestratorManifestChan, eventPlatformChan, contlcycleChan) + contimageChan := make(chan senderContainerImage, 10) + sbomChan := make(chan senderSBOM, 10) + testCheckSender := newCheckSender(checkID1, "", itemChan, serviceCheckChan, eventChan, orchestratorChan, orchestratorManifestChan, eventPlatformChan, contlcycleChan, contimageChan, sbomChan) err := demux.SetSender(testCheckSender, checkID1) assert.Nil(t, err) diff --git a/pkg/config/config.go b/pkg/config/config.go index 391e0f74400dc..7434046bcae5c 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -1066,6 +1066,16 @@ func InitConfig(config Config) { config.BindEnv("container_lifecycle.dd_url") config.BindEnv("container_lifecycle.additional_endpoints") + // Container image configuration + config.BindEnvAndSetDefault("container_image.enabled", false) + config.BindEnv("container_image.dd_url") + config.BindEnv("container_image.additional_endpoints") + + // SBOM configuration + config.BindEnvAndSetDefault("sbom.enabled", false) + config.BindEnv("sbom.dd_url") + config.BindEnv("sbom.additional_endpoints") + // Orchestrator Explorer - process agent // DEPRECATED in favor of `orchestrator_explorer.orchestrator_dd_url` setting. If both are set `orchestrator_explorer.orchestrator_dd_url` will take precedence. config.BindEnv("process_config.orchestrator_dd_url", "DD_PROCESS_CONFIG_ORCHESTRATOR_DD_URL", "DD_PROCESS_AGENT_ORCHESTRATOR_DD_URL") diff --git a/pkg/containerimage/forwarder.go b/pkg/containerimage/forwarder.go new file mode 100644 index 0000000000000..0aabc6058f51b --- /dev/null +++ b/pkg/containerimage/forwarder.go @@ -0,0 +1,59 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2022-present Datadog, Inc. + +package containerimage + +import ( + "fmt" + "net/url" + + "github.com/DataDog/datadog-agent/pkg/config" + "github.com/DataDog/datadog-agent/pkg/config/resolver" + "github.com/DataDog/datadog-agent/pkg/forwarder" + "github.com/DataDog/datadog-agent/pkg/util/flavor" + "github.com/DataDog/datadog-agent/pkg/util/log" +) + +func buildKeysPerDomains(conf config.Config) (map[string][]string, error) { + mainURL := config.GetMainEndpointWithConfig(conf, "https://contimage-intake.", "container_image.dd_url") + if _, err := url.Parse(mainURL); err != nil { + return nil, fmt.Errorf("could not parse contimage main endpoint: %w", err) + } + + keysPerDomain := map[string][]string{ + mainURL: { + conf.GetString("api_key"), + }, + } + + if !conf.IsSet("container_image.additional_endpoints") { + return keysPerDomain, nil + } + + additionalEndpoints := conf.GetStringMapStringSlice("container_image.additional_endpoints") + + return config.MergeAdditionalEndpoints(keysPerDomain, additionalEndpoints) +} + +// NewForwarder returns a forwarder for container lifecycle events +func NewForwarder() *forwarder.DefaultForwarder { + if !config.Datadog.GetBool("container_image.enabled") { + return nil + } + + if flavor.GetFlavor() != flavor.DefaultAgent { + return nil + } + + keysPerDomain, err := buildKeysPerDomains(config.Datadog) + if err != nil { + log.Errorf("Cannot build keys per domains: %v", err) + return nil + } + + options := forwarder.NewOptionsWithResolvers(resolver.NewSingleDomainResolvers(keysPerDomain)) + + return forwarder.NewDefaultForwarder(options) +} diff --git a/pkg/forwarder/endpoints/endpoints.go b/pkg/forwarder/endpoints/endpoints.go index bef40361f22b1..34b0ae55a4e4c 100644 --- a/pkg/forwarder/endpoints/endpoints.go +++ b/pkg/forwarder/endpoints/endpoints.go @@ -56,4 +56,8 @@ var ( OrchestratorManifestEndpoint = transaction.Endpoint{Route: "/api/v2/orchmanif", Name: "orchmanifest"} // ContainerLifecycleEndpoint is an event platform endpoint used to send container lifecycle events ContainerLifecycleEndpoint = transaction.Endpoint{Route: "/api/v2/contlcycle", Name: "contlcycle"} + // ContainerImageEndpoint is an event platform endpoint used to send container images + ContainerImageEndpoint = transaction.Endpoint{Route: "/api/v2/contimage", Name: "contimage"} + // SBOMEndpoint is an event platform endpoint used to send SBOM + SBOMEndpoint = transaction.Endpoint{Route: "/api/v2/sbom", Name: "sbom"} ) diff --git a/pkg/forwarder/forwarder.go b/pkg/forwarder/forwarder.go index bbb1fff1f71dd..a2affdb15b3a6 100644 --- a/pkg/forwarder/forwarder.go +++ b/pkg/forwarder/forwarder.go @@ -75,6 +75,8 @@ type Forwarder interface { SubmitOrchestratorChecks(payload transaction.BytesPayloads, extra http.Header, payloadType int) (chan Response, error) SubmitOrchestratorManifests(payload transaction.BytesPayloads, extra http.Header) (chan Response, error) SubmitContainerLifecycleEvents(payload transaction.BytesPayloads, extra http.Header) error + SubmitContainerImages(payload transaction.BytesPayloads, extra http.Header) error + SubmitSBOM(payload transaction.BytesPayloads, extra http.Header) error } // Compile-time check to ensure that DefaultForwarder implements the Forwarder interface @@ -637,6 +639,18 @@ func (f *DefaultForwarder) SubmitContainerLifecycleEvents(payload transaction.By return f.sendHTTPTransactions(transactions) } +// SubmitContainerImages sends container image +func (f *DefaultForwarder) SubmitContainerImages(payload transaction.BytesPayloads, extra http.Header) error { + transactions := f.createHTTPTransactions(endpoints.ContainerImageEndpoint, payload, extra) + return f.sendHTTPTransactions(transactions) +} + +// SubmitSBOM sends SBOM +func (f *DefaultForwarder) SubmitSBOM(payload transaction.BytesPayloads, extra http.Header) error { + transactions := f.createHTTPTransactions(endpoints.SBOMEndpoint, payload, extra) + return f.sendHTTPTransactions(transactions) +} + func (f *DefaultForwarder) submitProcessLikePayload(ep transaction.Endpoint, payload transaction.BytesPayloads, extra http.Header, retryable bool) (chan Response, error) { transactions := f.createHTTPTransactions(ep, payload, extra) results := make(chan Response, len(transactions)) diff --git a/pkg/forwarder/forwarder_test.go b/pkg/forwarder/forwarder_test.go index 05d9fb06d9b76..b385dda342863 100644 --- a/pkg/forwarder/forwarder_test.go +++ b/pkg/forwarder/forwarder_test.go @@ -129,6 +129,9 @@ func TestSubmitIfStopped(t *testing.T) { assert.NotNil(t, forwarder.SubmitSeries(nil, make(http.Header))) assert.NotNil(t, forwarder.SubmitV1Intake(nil, make(http.Header))) assert.NotNil(t, forwarder.SubmitV1CheckRuns(nil, make(http.Header))) + assert.NotNil(t, forwarder.SubmitContainerLifecycleEvents(nil, make(http.Header))) + assert.NotNil(t, forwarder.SubmitContainerImages(nil, make(http.Header))) + assert.NotNil(t, forwarder.SubmitSBOM(nil, make(http.Header))) } func TestCreateHTTPTransactions(t *testing.T) { @@ -384,6 +387,15 @@ func TestForwarderEndtoEnd(t *testing.T) { assert.Nil(t, f.SubmitMetadata(payload, headers)) numReqs += 4 + assert.Nil(t, f.SubmitContainerLifecycleEvents(payload, headers)) + numReqs += 4 + + assert.Nil(t, f.SubmitContainerImages(payload, headers)) + numReqs += 4 + + assert.Nil(t, f.SubmitSBOM(payload, headers)) + numReqs += 4 + // let's wait a second for every channel communication to trigger <-time.After(1 * time.Second) diff --git a/pkg/forwarder/noop_forwarder.go b/pkg/forwarder/noop_forwarder.go index 49433d3369d4e..1f1ca7a25a295 100644 --- a/pkg/forwarder/noop_forwarder.go +++ b/pkg/forwarder/noop_forwarder.go @@ -105,6 +105,16 @@ func (f NoopForwarder) SubmitContainerLifecycleEvents(payload transaction.BytesP return nil } +// SubmitContainerImages does nothing. +func (f NoopForwarder) SubmitContainerImages(payload transaction.BytesPayloads, extra http.Header) error { + return nil +} + +// SubmitSBOM does nothing. +func (f NoopForwarder) SubmitSBOM(payload transaction.BytesPayloads, extra http.Header) error { + return nil +} + // SubmitOrchestratorManifests does nothing. func (f NoopForwarder) SubmitOrchestratorManifests(payload transaction.BytesPayloads, extra http.Header) (chan Response, error) { return nil, nil diff --git a/pkg/forwarder/sync_forwarder.go b/pkg/forwarder/sync_forwarder.go index d3881b8ba5c68..f0d2f1b4c6214 100644 --- a/pkg/forwarder/sync_forwarder.go +++ b/pkg/forwarder/sync_forwarder.go @@ -161,3 +161,13 @@ func (f *SyncForwarder) SubmitOrchestratorManifests(payload transaction.BytesPay func (f *SyncForwarder) SubmitContainerLifecycleEvents(payload transaction.BytesPayloads, extra http.Header) error { return f.defaultForwarder.SubmitContainerLifecycleEvents(payload, extra) } + +// SubmitContainerImages sends container image +func (f *SyncForwarder) SubmitContainerImages(payload transaction.BytesPayloads, extra http.Header) error { + return f.defaultForwarder.SubmitContainerImages(payload, extra) +} + +// SubmitSBOM sends SBOM +func (f *SyncForwarder) SubmitSBOM(payload transaction.BytesPayloads, extra http.Header) error { + return f.defaultForwarder.SubmitSBOM(payload, extra) +} diff --git a/pkg/forwarder/test_common.go b/pkg/forwarder/test_common.go index 2525e78050d3b..30ef6be6f779f 100644 --- a/pkg/forwarder/test_common.go +++ b/pkg/forwarder/test_common.go @@ -183,3 +183,13 @@ func (tf *MockedForwarder) SubmitOrchestratorManifests(payload transaction.Bytes func (tf *MockedForwarder) SubmitContainerLifecycleEvents(payload transaction.BytesPayloads, extra http.Header) error { return tf.Called(payload, extra).Error(0) } + +// SubmitContainerImages mock +func (tf *MockedForwarder) SubmitContainerImages(payload transaction.BytesPayloads, extra http.Header) error { + return tf.Called(payload, extra).Error(0) +} + +// SubmitSBOM +func (tf *MockedForwarder) SubmitSBOM(payload transaction.BytesPayloads, extra http.Header) error { + return tf.Called(payload, extra).Error(0) +} diff --git a/pkg/otlp/internal/serializerexporter/consumer_test.go b/pkg/otlp/internal/serializerexporter/consumer_test.go index 0495d97fefdc0..92455ed1023a6 100644 --- a/pkg/otlp/internal/serializerexporter/consumer_test.go +++ b/pkg/otlp/internal/serializerexporter/consumer_test.go @@ -207,3 +207,11 @@ func (m *MockSerializer) SendOrchestratorManifests(_ []serializer.ProcessMessage func (m *MockSerializer) SendContainerLifecycleEvent(_ []serializer.ContainerLifecycleMessage, _ string) error { return nil } + +func (m *MockSerializer) SendContainerImage(_ []serializer.ContainerImageMessage, _ string) error { + return nil +} + +func (m *MockSerializer) SendSBOM(_ []serializer.SBOMMessage, _ string) error { + return nil +} diff --git a/pkg/sbom/forwarder.go b/pkg/sbom/forwarder.go new file mode 100644 index 0000000000000..fd42e48946856 --- /dev/null +++ b/pkg/sbom/forwarder.go @@ -0,0 +1,59 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2022-present Datadog, Inc. + +package sbom + +import ( + "fmt" + "net/url" + + "github.com/DataDog/datadog-agent/pkg/config" + "github.com/DataDog/datadog-agent/pkg/config/resolver" + "github.com/DataDog/datadog-agent/pkg/forwarder" + "github.com/DataDog/datadog-agent/pkg/util/flavor" + "github.com/DataDog/datadog-agent/pkg/util/log" +) + +func buildKeysPerDomains(conf config.Config) (map[string][]string, error) { + mainURL := config.GetMainEndpointWithConfig(conf, "https://sbom-intake.", "sbom.dd_url") + if _, err := url.Parse(mainURL); err != nil { + return nil, fmt.Errorf("could not parse sbom main endpoint: %w", err) + } + + keysPerDomain := map[string][]string{ + mainURL: { + conf.GetString("api_key"), + }, + } + + if !conf.IsSet("sbom.additional_endpoints") { + return keysPerDomain, nil + } + + additionalEndpoints := conf.GetStringMapStringSlice("sbom.additional_endpoints") + + return config.MergeAdditionalEndpoints(keysPerDomain, additionalEndpoints) +} + +// NewForwarder returns a forwarder for SBOM events +func NewForwarder() *forwarder.DefaultForwarder { + if !config.Datadog.GetBool("sbom.enabled") { + return nil + } + + if flavor.GetFlavor() != flavor.DefaultAgent { + return nil + } + + keysPerDomain, err := buildKeysPerDomains(config.Datadog) + if err != nil { + log.Errorf("Cannot build keys per domains: %v", err) + return nil + } + + options := forwarder.NewOptionsWithResolvers(resolver.NewSingleDomainResolvers(keysPerDomain)) + + return forwarder.NewDefaultForwarder(options) +} diff --git a/pkg/serializer/serializer.go b/pkg/serializer/serializer.go index 0919ee345d499..9e39fcf0a16d9 100644 --- a/pkg/serializer/serializer.go +++ b/pkg/serializer/serializer.go @@ -100,6 +100,8 @@ type MetricSerializer interface { SendOrchestratorMetadata(msgs []ProcessMessageBody, hostName, clusterID string, payloadType int) error SendOrchestratorManifests(msgs []ProcessMessageBody, hostName, clusterID string) error SendContainerLifecycleEvent(msgs []ContainerLifecycleMessage, hostName string) error + SendContainerImage(msgs []ContainerImageMessage, hostname string) error + SendSBOM(msgs []SBOMMessage, hostname string) error } // Serializer serializes metrics to the correct format and routes the payloads to the correct endpoint in the Forwarder @@ -107,6 +109,8 @@ type Serializer struct { Forwarder forwarder.Forwarder orchestratorForwarder forwarder.Forwarder contlcycleForwarder forwarder.Forwarder + contimageForwarder forwarder.Forwarder + sbomForwarder forwarder.Forwarder seriesJSONPayloadBuilder *stream.JSONPayloadBuilder @@ -128,11 +132,13 @@ type Serializer struct { } // NewSerializer returns a new Serializer initialized -func NewSerializer(forwarder forwarder.Forwarder, orchestratorForwarder, contlcycleForwarder forwarder.Forwarder) *Serializer { +func NewSerializer(forwarder forwarder.Forwarder, orchestratorForwarder, contlcycleForwarder, contimageForwarder, sbomForwarder forwarder.Forwarder) *Serializer { s := &Serializer{ Forwarder: forwarder, orchestratorForwarder: orchestratorForwarder, contlcycleForwarder: contlcycleForwarder, + contimageForwarder: contimageForwarder, + sbomForwarder: sbomForwarder, seriesJSONPayloadBuilder: stream.NewJSONPayloadBuilder(config.Datadog.GetBool("enable_json_stream_shared_compressor_buffers")), enableEvents: config.Datadog.GetBool("enable_payloads.events"), enableSeries: config.Datadog.GetBool("enable_payloads.series"), @@ -460,23 +466,105 @@ func (s *Serializer) SendContainerLifecycleEvent(msgs []ContainerLifecycleMessag return errors.New("container lifecycle forwarder is not setup") } + payloads := make([]*[]byte, 0, len(msgs)) + for _, msg := range msgs { - extraHeaders := make(http.Header) - extraHeaders.Set("Content-Type", protobufContentType) msg.Host = hostname encoded, err := proto.Marshal(&msg) if err != nil { return log.Errorf("Unable to encode message: %v", err) } - payloads := transaction.NewBytesPayloadsWithoutMetaData([]*[]byte{&encoded}) - if err := s.contlcycleForwarder.SubmitContainerLifecycleEvents(payloads, extraHeaders); err != nil { - return log.Errorf("Unable to submit container lifecycle payload: %v", err) + payloads = append(payloads, &encoded) + } + + bytePayloads := transaction.NewBytesPayloadsWithoutMetaData(payloads) + + extraHeaders := make(http.Header) + extraHeaders.Set(headers.TimestampHeader, strconv.Itoa(int(time.Now().Unix()))) + extraHeaders.Set(headers.EVPOriginHeader, "agent") + extraHeaders.Set(headers.EVPOriginVersionHeader, version.AgentVersion) + extraHeaders.Set(headers.ContentTypeHeader, headers.ProtobufContentType) + extraHeaders.Set(payloadVersionHTTPHeader, AgentPayloadVersion) + + if err := s.contlcycleForwarder.SubmitContainerLifecycleEvents(bytePayloads, extraHeaders); err != nil { + return log.Errorf("Unable to submit container lifecycle payloads: %v", err) + } + + log.Tracef("Sent container lifecycle events %+v", msgs) + + return nil +} + +// SendContainerImage serializes & sends container image payloads +func (s *Serializer) SendContainerImage(msgs []ContainerImageMessage, hostname string) error { + if s.contimageForwarder == nil { + return errors.New("container image forwarder is not setup") + } + + payloads := make([]*[]byte, 0, len(msgs)) + + for i := range msgs { + msgs[i].Host = hostname + encoded, err := proto.Marshal(&msgs[i]) + if err != nil { + return log.Errorf("Unable to encode message: %+v", err) } - log.Tracef("Sent container lifecycle event %+v", msg) + payloads = append(payloads, &encoded) } + bytesPayloads := transaction.NewBytesPayloadsWithoutMetaData(payloads) + + extraHeaders := make(http.Header) + extraHeaders.Set(headers.TimestampHeader, strconv.Itoa(int(time.Now().Unix()))) + extraHeaders.Set(headers.EVPOriginHeader, "agent") + extraHeaders.Set(headers.EVPOriginVersionHeader, version.AgentVersion) + extraHeaders.Set(headers.ContentTypeHeader, headers.ProtobufContentType) + extraHeaders.Set(payloadVersionHTTPHeader, AgentPayloadVersion) + + if err := s.contimageForwarder.SubmitContainerImages(bytesPayloads, extraHeaders); err != nil { + return log.Errorf("Unable to submit container image payload: %v", err) + } + + log.Tracef("Send container images %+v", msgs) + + return nil +} + +// SendSBOM serializes & sends sbom payloads +func (s *Serializer) SendSBOM(msgs []SBOMMessage, hostname string) error { + if s.sbomForwarder == nil { + return errors.New("SBOM forwarder is not setup") + } + + payloads := make([]*[]byte, 0, len(msgs)) + + for i := range msgs { + msgs[i].Host = hostname + encoded, err := proto.Marshal(&msgs[i]) + if err != nil { + return log.Errorf("Unable to encode message: %+v", err) + } + + payloads = append(payloads, &encoded) + } + + bytesPayloads := transaction.NewBytesPayloadsWithoutMetaData(payloads) + + extraHeaders := make(http.Header) + extraHeaders.Set(headers.TimestampHeader, strconv.Itoa(int(time.Now().Unix()))) + extraHeaders.Set(headers.EVPOriginHeader, "agent") + extraHeaders.Set(headers.EVPOriginVersionHeader, version.AgentVersion) + extraHeaders.Set(headers.ContentTypeHeader, headers.ProtobufContentType) + extraHeaders.Set(payloadVersionHTTPHeader, AgentPayloadVersion) + + if err := s.sbomForwarder.SubmitSBOM(bytesPayloads, extraHeaders); err != nil { + return log.Errorf("Unable to submit SBOM payload: %v", err) + } + + log.Tracef("Send SBOM %+v", msgs) + return nil } diff --git a/pkg/serializer/serializer_test.go b/pkg/serializer/serializer_test.go index 3c97eb8afcf81..9571575deb066 100644 --- a/pkg/serializer/serializer_test.go +++ b/pkg/serializer/serializer_test.go @@ -12,21 +12,28 @@ import ( "fmt" "net/http" "reflect" + "strconv" "strings" "testing" + "time" jsoniter "github.com/json-iterator/go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/DataDog/agent-payload/v5/contimage" + "github.com/DataDog/agent-payload/v5/contlcycle" + "github.com/DataDog/agent-payload/v5/sbom" "github.com/DataDog/datadog-agent/pkg/config" "github.com/DataDog/datadog-agent/pkg/forwarder" "github.com/DataDog/datadog-agent/pkg/forwarder/transaction" "github.com/DataDog/datadog-agent/pkg/metrics" + "github.com/DataDog/datadog-agent/pkg/process/util/api/headers" metricsserializer "github.com/DataDog/datadog-agent/pkg/serializer/internal/metrics" "github.com/DataDog/datadog-agent/pkg/serializer/marshaler" "github.com/DataDog/datadog-agent/pkg/util/compression" + "github.com/DataDog/datadog-agent/pkg/version" ) var initialContentEncoding = compression.ContentEncoding @@ -224,7 +231,7 @@ func TestSendV1Events(t *testing.T) { matcher := createJSONPayloadMatcher(`{"apiKey":"","events":{},"internalHostname"`) f.On("SubmitV1Intake", matcher, jsonExtraHeadersWithCompression).Return(nil).Times(1) - s := NewSerializer(f, nil, nil) + s := NewSerializer(f, nil, nil, nil, nil) err := s.SendEvents([]*metrics.Event{}) require.Nil(t, err) f.AssertExpectations(t) @@ -242,7 +249,7 @@ func TestSendV1EventsCreateMarshalersBySourceType(t *testing.T) { defer config.Datadog.Set("enable_events_stream_payload_serialization", nil) f := &forwarder.MockedForwarder{} - s := NewSerializer(f, nil, nil) + s := NewSerializer(f, nil, nil, nil, nil) events := metrics.Events{&metrics.Event{SourceTypeName: "source1"}, &metrics.Event{SourceTypeName: "source2"}, &metrics.Event{SourceTypeName: "source3"}} payloadsCountMatcher := func(payloadCount int) interface{} { @@ -272,7 +279,7 @@ func TestSendV1ServiceChecks(t *testing.T) { config.Datadog.Set("enable_service_checks_stream_payload_serialization", false) defer config.Datadog.Set("enable_service_checks_stream_payload_serialization", nil) - s := NewSerializer(f, nil, nil) + s := NewSerializer(f, nil, nil, nil, nil) err := s.SendServiceChecks(metrics.ServiceChecks{&metrics.ServiceCheck{}}) require.Nil(t, err) f.AssertExpectations(t) @@ -288,7 +295,7 @@ func TestSendV1Series(t *testing.T) { config.Datadog.Set("use_v2_api.series", false) defer config.Datadog.Set("use_v2_api.series", true) - s := NewSerializer(f, nil, nil) + s := NewSerializer(f, nil, nil, nil, nil) err := s.SendIterableSeries(metricsserializer.CreateSerieSource(metrics.Series{})) require.Nil(t, err) @@ -301,7 +308,7 @@ func TestSendSeries(t *testing.T) { f.On("SubmitSeries", matcher, protobufExtraHeadersWithCompression).Return(nil).Times(1) config.Datadog.Set("use_v2_api.series", true) // default value, but just to be sure - s := NewSerializer(f, nil, nil) + s := NewSerializer(f, nil, nil, nil, nil) err := s.SendIterableSeries(metricsserializer.CreateSerieSource(metrics.Series{&metrics.Serie{}})) require.Nil(t, err) @@ -314,7 +321,7 @@ func TestSendSketch(t *testing.T) { matcher := createProtoPayloadMatcher([]byte{18, 0}) f.On("SubmitSketchSeries", matcher, protobufExtraHeadersWithCompression).Return(nil).Times(1) - s := NewSerializer(f, nil, nil) + s := NewSerializer(f, nil, nil, nil, nil) err := s.SendSketch(metrics.NewSketchesSourceTest()) require.Nil(t, err) f.AssertExpectations(t) @@ -324,7 +331,7 @@ func TestSendMetadata(t *testing.T) { f := &forwarder.MockedForwarder{} f.On("SubmitMetadata", jsonPayloads, jsonExtraHeadersWithCompression).Return(nil).Times(1) - s := NewSerializer(f, nil, nil) + s := NewSerializer(f, nil, nil, nil, nil) payload := &testPayload{} err := s.SendMetadata(payload) @@ -347,7 +354,7 @@ func TestSendProcessesMetadata(t *testing.T) { payloads, _ := mkPayloads(payload, true) f.On("SubmitV1Intake", payloads, jsonExtraHeadersWithCompression).Return(nil).Times(1) - s := NewSerializer(f, nil, nil) + s := NewSerializer(f, nil, nil, nil, nil) err := s.SendProcessesMetadata("test") require.Nil(t, err) @@ -363,6 +370,99 @@ func TestSendProcessesMetadata(t *testing.T) { require.NotNil(t, err) } +func TestSendContainerLifecycleEvents(t *testing.T) { + f := &forwarder.MockedForwarder{} + payload := []byte("\x0a\x02v1\x12\x08hostname\x18\x01") + payloads, _ := mkPayloads(payload, false) + extraHeaders := protobufExtraHeaders.Clone() + extraHeaders.Set(headers.EVPOriginHeader, "agent") + extraHeaders.Set(headers.EVPOriginVersionHeader, version.AgentVersion) + extraHeaders.Set(headers.TimestampHeader, strconv.Itoa(int(time.Now().Unix()))) + f.On("SubmitContainerLifecycleEvents", payloads, extraHeaders).Return(nil).Times(1) + + s := NewSerializer(nil, nil, f, nil, nil) + + msg := []ContainerLifecycleMessage{ + { + Version: "v1", + Host: "hostname", + ObjectKind: contlcycle.EventsPayload_Pod, + Events: []*contlcycle.Event{}, + }, + } + err := s.SendContainerLifecycleEvent(msg, "hostname") + require.Nil(t, err) + f.AssertExpectations(t) + + f.On("SubmitContainerLifecycleEvents", payloads, extraHeaders).Return(fmt.Errorf("some error")).Times(1) + err = s.SendContainerLifecycleEvent(msg, "hostname") + require.NotNil(t, err) + f.AssertExpectations(t) +} + +func TestSendContainerImage(t *testing.T) { + f := &forwarder.MockedForwarder{} + payload := []byte("\x0a\x02v1\x12\x08hostname") + payloads, _ := mkPayloads(payload, false) + extraHeaders := protobufExtraHeaders.Clone() + extraHeaders.Set(headers.EVPOriginHeader, "agent") + extraHeaders.Set(headers.EVPOriginVersionHeader, version.AgentVersion) + extraHeaders.Set(headers.TimestampHeader, strconv.Itoa(int(time.Now().Unix()))) + f.On("SubmitContainerImages", payloads, extraHeaders).Return(nil).Times(1) + + s := NewSerializer(nil, nil, nil, f, nil) + + msg := []ContainerImageMessage{ + { + Version: "v1", + Host: "hostname", + Images: []*contimage.ContainerImage{}, + }, + } + err := s.SendContainerImage(msg, "hostname") + require.Nil(t, err) + f.AssertExpectations(t) + + f.On("SubmitContainerImages", payloads, extraHeaders).Return(fmt.Errorf("some error")).Times(1) + err = s.SendContainerImage(msg, "hostname") + require.NotNil(t, err) + f.AssertExpectations(t) +} + +func makePtr(val string) *string { + return &val +} + +func TestSendSBOM(t *testing.T) { + f := &forwarder.MockedForwarder{} + payload := []byte("\x08\x01\x12\x08hostname\x1a\x05agent") + payloads, _ := mkPayloads(payload, false) + extraHeaders := protobufExtraHeaders.Clone() + extraHeaders.Set(headers.EVPOriginHeader, "agent") + extraHeaders.Set(headers.EVPOriginVersionHeader, version.AgentVersion) + extraHeaders.Set(headers.TimestampHeader, strconv.Itoa(int(time.Now().Unix()))) + f.On("SubmitSBOM", payloads, extraHeaders).Return(nil).Times(1) + + s := NewSerializer(nil, nil, nil, nil, f) + + msg := []SBOMMessage{ + { + Version: 1, + Host: "hostname", + Source: makePtr("agent"), + Entities: []*sbom.SBOMEntity{}, + }, + } + err := s.SendSBOM(msg, "hostname") + require.Nil(t, err) + f.AssertExpectations(t) + + f.On("SubmitSBOM", payloads, extraHeaders).Return(fmt.Errorf("some error")).Times(1) + err = s.SendSBOM(msg, "hostname") + require.NotNil(t, err) + f.AssertExpectations(t) +} + func TestSendWithDisabledKind(t *testing.T) { mockConfig := config.Mock(t) @@ -382,7 +482,7 @@ func TestSendWithDisabledKind(t *testing.T) { }() f := &forwarder.MockedForwarder{} - s := NewSerializer(f, nil, nil) + s := NewSerializer(f, nil, nil, nil, nil) payload := &testPayload{} diff --git a/pkg/serializer/test_common.go b/pkg/serializer/test_common.go index 587594f847ddc..3e4dd058d5e94 100644 --- a/pkg/serializer/test_common.go +++ b/pkg/serializer/test_common.go @@ -66,17 +66,27 @@ func (s *MockSerializer) SendProcessesMetadata(data interface{}) error { return s.Called(data).Error(0) } -// SendOrchestratorMetadata serializes & send orchestrator metadata payloads +// SendOrchestratorMetadata serializes & sends orchestrator metadata payloads func (s *MockSerializer) SendOrchestratorMetadata(msgs []ProcessMessageBody, hostName, clusterID string, payloadType int) error { return s.Called(msgs, hostName, clusterID, payloadType).Error(0) } -// SendContainerLifecycleEvent serializes & send container lifecycle event payloads +// SendContainerLifecycleEvent serializes & sends container lifecycle event payloads func (s *MockSerializer) SendContainerLifecycleEvent(msgs []ContainerLifecycleMessage, hostname string) error { return s.Called(msgs, hostname).Error(0) } -// SendOrchestratorManifests serializes & send orchestrator manifest payloads +// SendContainerImage serializes & sends container image payloads +func (s *MockSerializer) SendContainerImage(msgs []ContainerImageMessage, hostname string) error { + return s.Called(msgs, hostname).Error(0) +} + +// SendSBOM serializes & sends SBOM payloads +func (s *MockSerializer) SendSBOM(msgs []SBOMMessage, hostname string) error { + return s.Called(msgs, hostname).Error(0) +} + +// SendOrchestratorManifests serializes & sends orchestrator manifest payloads func (s *MockSerializer) SendOrchestratorManifests(msgs []ProcessMessageBody, hostName, clusterID string) error { return s.Called(msgs, hostName, clusterID).Error(0) } diff --git a/pkg/serializer/types_contimage.go b/pkg/serializer/types_contimage.go new file mode 100644 index 0000000000000..bd5ba52bfd844 --- /dev/null +++ b/pkg/serializer/types_contimage.go @@ -0,0 +1,13 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2022-present Datadog, Inc. + +package serializer + +import ( + "github.com/DataDog/agent-payload/v5/contimage" +) + +// ContainerImageMessage is a type alias for contimage proto payload +type ContainerImageMessage = contimage.ContainerImagePayload diff --git a/pkg/serializer/types_sbom.go b/pkg/serializer/types_sbom.go new file mode 100644 index 0000000000000..abda2692aed2b --- /dev/null +++ b/pkg/serializer/types_sbom.go @@ -0,0 +1,13 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2022-present Datadog, Inc. + +package serializer + +import ( + "github.com/DataDog/agent-payload/v5/sbom" +) + +// SBOMMessage is a type alias for SBOM proto payload +type SBOMMessage = sbom.SBOMPayload From 49503ebc9a54acd7313f95a75094448769a0d1f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=A9na=C3=AFc=20Huard?= Date: Thu, 12 Jan 2023 14:14:20 +0100 Subject: [PATCH 2/2] Apply suggestions from code review Co-authored-by: Nicolas Guerguadj <35628945+Kaderinho@users.noreply.github.com> --- pkg/aggregator/demultiplexer_agent.go | 4 ++-- pkg/aggregator/mocksender/asserts.go | 2 +- pkg/aggregator/mocksender/mocked_methods.go | 2 +- pkg/containerimage/forwarder.go | 2 +- pkg/forwarder/test_common.go | 2 +- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/aggregator/demultiplexer_agent.go b/pkg/aggregator/demultiplexer_agent.go index 0d035f84edc5d..7e477878a4e7a 100644 --- a/pkg/aggregator/demultiplexer_agent.go +++ b/pkg/aggregator/demultiplexer_agent.go @@ -337,7 +337,7 @@ func (d *AgentDemultiplexer) Run() { log.Errorf("error starting container image forwarder: %v", err) } } else { - log.Debug("no starting the container image forwarder") + log.Debug("not starting the container image forwarder") } // sbom forwarder @@ -346,7 +346,7 @@ func (d *AgentDemultiplexer) Run() { log.Errorf("error starting SBOM forwarder: %v", err) } } else { - log.Debug("no starting the SBOM forwarder") + log.Debug("not starting the SBOM forwarder") } // shared forwarder diff --git a/pkg/aggregator/mocksender/asserts.go b/pkg/aggregator/mocksender/asserts.go index 79273e836df88..0c3918127edf9 100644 --- a/pkg/aggregator/mocksender/asserts.go +++ b/pkg/aggregator/mocksender/asserts.go @@ -77,7 +77,7 @@ func (m *MockSender) AssertContainerImage(t *testing.T, expectedContainerImages return m.Mock.AssertCalled(t, "ContainerImage", expectedContainerImages) } -// AssertContainerImage assert the expected event was emitted with the following values +// AssertSBOM assert the expected event was emitted with the following values func (m *MockSender) AssertSBOM(t *testing.T, expectedSBOM []sbom.SBOMPayload) bool { return m.Mock.AssertCalled(t, "SBOM", expectedSBOM) } diff --git a/pkg/aggregator/mocksender/mocked_methods.go b/pkg/aggregator/mocksender/mocked_methods.go index 065616e7d60a7..3982818349776 100644 --- a/pkg/aggregator/mocksender/mocked_methods.go +++ b/pkg/aggregator/mocksender/mocked_methods.go @@ -122,7 +122,7 @@ func (m *MockSender) ContainerImage(msgs []serializer.ContainerImageMessage) { m.Called(msgs) } -// SBOM submit container image messages +// SBOM submit sbom data func (m *MockSender) SBOM(msgs []serializer.SBOMMessage) { m.Called(msgs) } diff --git a/pkg/containerimage/forwarder.go b/pkg/containerimage/forwarder.go index 0aabc6058f51b..8d358f76416bb 100644 --- a/pkg/containerimage/forwarder.go +++ b/pkg/containerimage/forwarder.go @@ -37,7 +37,7 @@ func buildKeysPerDomains(conf config.Config) (map[string][]string, error) { return config.MergeAdditionalEndpoints(keysPerDomain, additionalEndpoints) } -// NewForwarder returns a forwarder for container lifecycle events +// NewForwarder returns a forwarder for container images events func NewForwarder() *forwarder.DefaultForwarder { if !config.Datadog.GetBool("container_image.enabled") { return nil diff --git a/pkg/forwarder/test_common.go b/pkg/forwarder/test_common.go index 30ef6be6f779f..765bf901cd155 100644 --- a/pkg/forwarder/test_common.go +++ b/pkg/forwarder/test_common.go @@ -189,7 +189,7 @@ func (tf *MockedForwarder) SubmitContainerImages(payload transaction.BytesPayloa return tf.Called(payload, extra).Error(0) } -// SubmitSBOM +// SubmitSBOM mock func (tf *MockedForwarder) SubmitSBOM(payload transaction.BytesPayloads, extra http.Header) error { return tf.Called(payload, extra).Error(0) }