Skip to content

Commit

Permalink
[workloadmeta/collectors/containerd] Collect image metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
davidor committed Dec 9, 2022
1 parent a95c45d commit 771808c
Show file tree
Hide file tree
Showing 3 changed files with 488 additions and 8 deletions.
62 changes: 54 additions & 8 deletions pkg/workloadmeta/collectors/internal/containerd/containerd.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ const (
containerUpdateTopic = "/containers/update"
containerDeletionTopic = "/containers/delete"

imageCreationTopic = "/images/create"
imageUpdateTopic = "/images/update"
imageDeletionTopic = "/images/delete"

// These are not all the task-related topics, but enough to detect changes
// in the state of the container (only need to know if it's running or not).

Expand All @@ -56,6 +60,9 @@ var containerdTopics = []string{
containerCreationTopic,
containerUpdateTopic,
containerDeletionTopic,
imageCreationTopic,
imageUpdateTopic,
imageDeletionTopic,
TaskStartTopic,
TaskOOMTopic,
TaskExitTopic,
Expand All @@ -79,12 +86,19 @@ type collector struct {
// Container exit info (mainly exit code and exit timestamp) are attached to the corresponding task events.
// contToExitInfo caches the exit info of a task to enrich the container deletion event when it's received later.
contToExitInfo map[string]*exitInfo

// Stores a map of image name => image ID.
// Events from containerd contain an image name, but not IDs. When a delete
// event arrives it'll contain a name, but we won't be able to access the ID
// because the image is already gone, that's why we need this map.
knownImages map[string]string
}

func init() {
workloadmeta.RegisterCollector(collectorID, func() workloadmeta.Collector {
return &collector{
contToExitInfo: make(map[string]*exitInfo),
knownImages: make(map[string]string),
}
})
}
Expand All @@ -110,7 +124,7 @@ func (c *collector) Start(ctx context.Context, store workloadmeta.Store) error {
eventsCtx, cancelEvents := context.WithCancel(ctx)
c.eventsChan, c.errorsChan = c.containerdClient.GetEvents().Subscribe(eventsCtx, subscribeFilters()...)

err = c.generateEventsFromContainerList(ctx)
err = c.notifyInitialEvents(ctx)
if err != nil {
cancelEvents()
return err
Expand Down Expand Up @@ -165,31 +179,36 @@ func (c *collector) stream(ctx context.Context) {
}
}

func (c *collector) generateEventsFromContainerList(ctx context.Context) error {
var events []workloadmeta.CollectorEvent
func (c *collector) notifyInitialEvents(ctx context.Context) error {
var containerEvents []workloadmeta.CollectorEvent

namespaces, err := cutil.NamespacesToWatch(ctx, c.containerdClient)
if err != nil {
return err
}

for _, namespace := range namespaces {
nsEvents, err := c.generateInitialEvents(namespace)
nsContainerEvents, err := c.generateInitialContainerEvents(namespace)
if err != nil {
return err
}
containerEvents = append(containerEvents, nsContainerEvents...)

events = append(events, nsEvents...)
if config.Datadog.GetBool("workloadmeta.image_metadata_collection.enabled") {
if err := c.notifyInitialImageEvents(ctx, namespace); err != nil {
return err
}
}
}

if len(events) > 0 {
c.store.Notify(events)
if len(containerEvents) > 0 {
c.store.Notify(containerEvents)
}

return nil
}

func (c *collector) generateInitialEvents(namespace string) ([]workloadmeta.CollectorEvent, error) {
func (c *collector) generateInitialContainerEvents(namespace string) ([]workloadmeta.CollectorEvent, error) {
var events []workloadmeta.CollectorEvent

existingContainers, err := c.containerdClient.Containers(namespace)
Expand Down Expand Up @@ -221,7 +240,30 @@ func (c *collector) generateInitialEvents(namespace string) ([]workloadmeta.Coll
return events, nil
}

func (c *collector) notifyInitialImageEvents(ctx context.Context, namespace string) error {
existingImages, err := c.containerdClient.ListImages(namespace)
if err != nil {
return err
}

for _, image := range existingImages {
if err := c.notifyEventForImage(ctx, namespace, image); err != nil {
return err
}
}

return nil
}

func (c *collector) handleEvent(ctx context.Context, containerdEvent *containerdevents.Envelope) error {
if isImageTopic(containerdEvent.Topic) {
return c.handleImageEvent(ctx, containerdEvent)
}

return c.handleContainerEvent(ctx, containerdEvent)
}

func (c *collector) handleContainerEvent(ctx context.Context, containerdEvent *containerdevents.Envelope) error {
containerID, container, err := c.extractContainerFromEvent(ctx, containerdEvent)
if err != nil {
return fmt.Errorf("cannot extract container from event: %w", err)
Expand Down Expand Up @@ -313,6 +355,10 @@ func subscribeFilters() []string {
var filters []string

for _, topic := range containerdTopics {
if isImageTopic(topic) && !config.Datadog.GetBool("workloadmeta.image_metadata_collection.enabled") {
continue
}

filters = append(filters, fmt.Sprintf(`topic==%q`, topic))
}

Expand Down
Loading

0 comments on commit 771808c

Please sign in to comment.