Skip to content

Commit

Permalink
Add container_image core check (#14567)
Browse files Browse the repository at this point in the history
  • Loading branch information
L3n41c authored Jan 13, 2023
1 parent 9707350 commit 2dd3c42
Show file tree
Hide file tree
Showing 17 changed files with 614 additions and 24 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@
/pkg/collector/corechecks/cluster/ @DataDog/container-integrations
/pkg/collector/corechecks/cluster/orchestrator @DataDog/container-app
/pkg/collector/corechecks/containers/ @DataDog/container-integrations
/pkg/collector/corechecks/containerimage/ @DataDog/container-integrations
/pkg/collector/corechecks/containerlifecycle/ @DataDog/container-integrations
/pkg/collector/corechecks/ebpf/ @DataDog/container-integrations
/pkg/collector/corechecks/embed/ @Datadog/agent-platform
Expand Down
2 changes: 2 additions & 0 deletions cmd/agent/subcommands/run/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ import (
_ "github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/ksm"
_ "github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/kubernetesapiserver"
_ "github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator"
_ "github.com/DataDog/datadog-agent/pkg/collector/corechecks/containerimage"
_ "github.com/DataDog/datadog-agent/pkg/collector/corechecks/containerlifecycle"
_ "github.com/DataDog/datadog-agent/pkg/collector/corechecks/containers/containerd"
_ "github.com/DataDog/datadog-agent/pkg/collector/corechecks/containers/cri"
Expand Down Expand Up @@ -374,6 +375,7 @@ func startAgent(cliParams *cliParams, flare flare.Component) error {
opts := aggregator.DefaultAgentDemultiplexerOptions(forwarderOpts)
opts.EnableNoAggregationPipeline = pkgconfig.Datadog.GetBool("dogstatsd_no_aggregation_pipeline")
opts.UseContainerLifecycleForwarder = pkgconfig.Datadog.GetBool("container_lifecycle.enabled")
opts.UseContainerImageForwarder = pkgconfig.Datadog.GetBool("container_image.enabled")
demux = aggregator.InitAndStartAgentDemultiplexer(opts, hostnameDetected)

// Setup stats telemetry handler
Expand Down
2 changes: 1 addition & 1 deletion pkg/autodiscovery/listeners/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func computeContainerServiceIDs(entity string, image string, labels map[string]s
ids := []string{entity}

// Add Image names (long then short if different)
long, short, _, err := containers.SplitImageName(image)
long, _, short, _, err := containers.SplitImageName(image)
if err != nil {
log.Warnf("error while spliting image name: %s", err)
}
Expand Down
166 changes: 166 additions & 0 deletions pkg/collector/corechecks/containerimage/check.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2022-present Datadog, Inc.

package containerimage

import (
"errors"
"time"

yaml "gopkg.in/yaml.v2"

"github.com/DataDog/datadog-agent/pkg/autodiscovery/integration"
"github.com/DataDog/datadog-agent/pkg/collector/check"
core "github.com/DataDog/datadog-agent/pkg/collector/corechecks"
ddConfig "github.com/DataDog/datadog-agent/pkg/config"
"github.com/DataDog/datadog-agent/pkg/util/log"
"github.com/DataDog/datadog-agent/pkg/workloadmeta"
)

const (
checkName = "container_image"
)

func init() {
core.RegisterCheck(checkName, CheckFactory)
}

// Config holds the container_image check configuration
type Config struct {
chunkSize int `yaml:"chunk_size"`
newImagesMaxLatencySeconds int `yaml:"new_images_max_latency_seconds"`
periodicRefreshSeconds int `yaml:"periodic_refresh_seconds"`
}

type configValueRange struct {
min int
max int
default_ int
}

var /* const */ (
chunkSizeValueRange = &configValueRange{
min: 1,
max: 100,
default_: 10,
}

newImagesMaxLatencySecondsValueRange = &configValueRange{
min: 1, // 1 s
max: 300, // 5 min
default_: 30, // 30 s
}

periodicRefreshSecondsValueRange = &configValueRange{
min: 60, // 1 min
max: 86400, // 1 day
default_: 300, // 5 min
}
)

func validateValue(val *int, range_ *configValueRange) {
if *val == 0 {
*val = range_.default_
} else if *val < range_.min {
*val = range_.min
} else if *val > range_.max {
*val = range_.max
}
}

func (c *Config) Parse(data []byte) error {
if err := yaml.Unmarshal(data, c); err != nil {
return err
}

validateValue(&c.chunkSize, chunkSizeValueRange)
validateValue(&c.newImagesMaxLatencySeconds, newImagesMaxLatencySecondsValueRange)
validateValue(&c.periodicRefreshSeconds, periodicRefreshSecondsValueRange)

return nil
}

// Check reports container images
type Check struct {
core.CheckBase
workloadmetaStore workloadmeta.Store
instance *Config
processor *processor
stopCh chan struct{}
}

// CheckFactory registers the container_image check
func CheckFactory() check.Check {
return &Check{
CheckBase: core.NewCheckBase(checkName),
workloadmetaStore: workloadmeta.GetGlobalStore(),
instance: &Config{},
stopCh: make(chan struct{}),
}
}

// Configure parses the check configuration and initializes the container_image check
func (c *Check) Configure(integrationConfigDigest uint64, config, initConfig integration.Data, source string) error {
if !ddConfig.Datadog.GetBool("container_image.enabled") {
return errors.New("collection of container images is disabled")
}

if err := c.CommonConfigure(integrationConfigDigest, initConfig, config, source); err != nil {
return err
}

if err := c.instance.Parse(config); err != nil {
return err
}

sender, err := c.GetSender()
if err != nil {
return err
}

c.processor = newProcessor(sender, c.instance.chunkSize, time.Duration(c.instance.newImagesMaxLatencySeconds)*time.Second)

return nil
}

// Run starts the container_image check
func (c *Check) Run() error {
log.Infof("Starting long-running check %q", c.ID())
defer log.Infof("Shutting down long-running check %q", c.ID())

imgEventsCh := c.workloadmetaStore.Subscribe(
checkName,
workloadmeta.NormalPriority,
workloadmeta.NewFilter(
[]workloadmeta.Kind{workloadmeta.KindContainerImageMetadata},
workloadmeta.SourceAll,
workloadmeta.EventTypeSet, // We don’t care about images removal because we just have to wait for them to expire on BE side once we stopped refreshing them periodically.
),
)

imgRefreshTicker := time.NewTicker(time.Duration(c.instance.periodicRefreshSeconds) * time.Second)

for {
select {
case eventBundle := <-imgEventsCh:
c.processor.processEvents(eventBundle)
case <-imgRefreshTicker.C:
c.processor.processRefresh(c.workloadmetaStore.ListImages())
case <-c.stopCh:
c.processor.stop()
return nil
}
}
}

// Stop stops the container_image check
func (c *Check) Stop() {
close(c.stopCh)
}

// Interval returns 0. It makes container_image a long-running check
func (c *Check) Interval() time.Duration {
return 0
}
84 changes: 84 additions & 0 deletions pkg/collector/corechecks/containerimage/processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2022-present Datadog, Inc.

package containerimage

import (
"time"

"github.com/DataDog/datadog-agent/pkg/aggregator"
"github.com/DataDog/datadog-agent/pkg/util/log"
"github.com/DataDog/datadog-agent/pkg/workloadmeta"

"github.com/DataDog/agent-payload/v5/contimage"
model "github.com/DataDog/agent-payload/v5/contimage"
)

type processor struct {
queue chan *model.ContainerImage
}

func newProcessor(sender aggregator.Sender, maxNbItem int, maxRetentionTime time.Duration) *processor {
return &processor{
queue: newQueue(maxNbItem, maxRetentionTime, func(images []*model.ContainerImage) {
sender.ContainerImage([]contimage.ContainerImagePayload{
{
Version: "v1",
Images: images,
},
})
}),
}
}

func (p *processor) processEvents(evBundle workloadmeta.EventBundle) {
close(evBundle.Ch)

log.Tracef("Processing %d events", len(evBundle.Events))

for _, event := range evBundle.Events {
p.processImage(event.Entity.(*workloadmeta.ContainerImageMetadata))
}
}

func (p *processor) processRefresh(allImages []*workloadmeta.ContainerImageMetadata) {
// So far, the check is refreshing all the images every 5 minutes all together.
for _, img := range allImages {
p.processImage(img)
}
}

func (p *processor) processImage(img *workloadmeta.ContainerImageMetadata) {
layers := make([]*model.ContainerImage_ContainerImageLayer, 0, len(img.Layers))
for _, layer := range img.Layers {
layers = append(layers, &model.ContainerImage_ContainerImageLayer{
Urls: layer.URLs,
MediaType: layer.MediaType,
Digest: layer.Digest,
Size: layer.SizeBytes,
})
}

p.queue <- &model.ContainerImage{
Id: img.ID,
Name: img.Name,
Registry: img.Registry,
ShortName: img.ShortName,
Tags: img.RepoTags,
Digest: img.ID,
Size: img.SizeBytes,
RepoDigests: img.RepoDigests,
Os: &model.ContainerImage_OperatingSystem{
Name: img.OS,
Version: img.OSVersion,
Architecture: img.Architecture,
},
Layers: layers,
}
}

func (p *processor) stop() {
close(p.queue)
}
Loading

0 comments on commit 2dd3c42

Please sign in to comment.