Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[workloadmeta/collectors/containerd] Collect image metadata #14592

Merged
merged 6 commits into from
Jan 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1085,6 +1085,9 @@ func InitConfig(config Config) {
config.BindEnvAndSetDefault("inventories_max_interval", DefaultInventoriesMaxInterval) // integer seconds
config.BindEnvAndSetDefault("inventories_min_interval", DefaultInventoriesMinInterval) // integer seconds

// workloadmeta
config.BindEnvAndSetDefault("workloadmeta.image_metadata_collection.enabled", false)

// Datadog security agent (common)
config.BindEnvAndSetDefault("security_agent.cmd_port", 5010)
config.BindEnvAndSetDefault("security_agent.expvar_port", 5011)
Expand Down
14 changes: 12 additions & 2 deletions pkg/util/containerd/containerd_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ type ContainerdItf interface {
Labels(namespace string, ctn containerd.Container) (map[string]string, error)
LabelsWithContext(ctx context.Context, namespace string, ctn containerd.Container) (map[string]string, error)
ListImages(namespace string) ([]containerd.Image, error)
Image(namespace string, ctn containerd.Container) (containerd.Image, error)
Image(namespace string, name string) (containerd.Image, error)
ImageOfContainer(namespace string, ctn containerd.Container) (containerd.Image, error)
ImageSize(namespace string, ctn containerd.Container) (int64, error)
Spec(namespace string, ctn containerd.Container) (*oci.Spec, error)
SpecWithContext(ctx context.Context, namespace string, ctn containerd.Container) (*oci.Spec, error)
Expand Down Expand Up @@ -231,7 +232,16 @@ func (c *ContainerdUtil) ListImages(namespace string) ([]containerd.Image, error
}

// Image interfaces with the containerd api to get an image
func (c *ContainerdUtil) Image(namespace string, ctn containerd.Container) (containerd.Image, error) {
func (c *ContainerdUtil) Image(namespace string, name string) (containerd.Image, error) {
ctx, cancel := context.WithTimeout(context.Background(), c.queryTimeout)
defer cancel()
ctxNamespace := namespaces.WithNamespace(ctx, namespace)

return c.cl.GetImage(ctxNamespace, name)
}

// ImageOfContainer interfaces with the containerd api to get an image
func (c *ContainerdUtil) ImageOfContainer(namespace string, ctn containerd.Container) (containerd.Image, error) {
ctx, cancel := context.WithTimeout(context.Background(), c.queryTimeout)
defer cancel()
ctxNamespace := namespaces.WithNamespace(ctx, namespace)
Expand Down
4 changes: 2 additions & 2 deletions pkg/util/containerd/containerd_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func TestInfo(t *testing.T) {
require.Equal(t, "foo", c.Image)
}

func TestImage(t *testing.T) {
func TestImageOfContainer(t *testing.T) {
mockUtil := ContainerdUtil{}

image := &mockImage{
Expand All @@ -160,7 +160,7 @@ func TestImage(t *testing.T) {
},
}

resultImage, err := mockUtil.Image(TestNamespace, container)
resultImage, err := mockUtil.ImageOfContainer(TestNamespace, container)
require.NoError(t, err)
require.Equal(t, resultImage, image)
}
Expand Down
12 changes: 9 additions & 3 deletions pkg/util/containerd/fake/containerd_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ type MockedContainerdClient struct {
MockEnvVars func(namespace string, ctn containerd.Container) (map[string]string, error)
MockMetadata func() (containerd.Version, error)
MockListImages func(namespace string) ([]containerd.Image, error)
MockImage func(namespace string, ctn containerd.Container) (containerd.Image, error)
MockImage func(namespace string, name string) (containerd.Image, error)
MockImageOfContainer func(namespace string, ctn containerd.Container) (containerd.Image, error)
MockImageSize func(namespace string, ctn containerd.Container) (int64, error)
MockTaskMetrics func(namespace string, ctn containerd.Container) (*types.Metric, error)
MockTaskPids func(namespace string, ctn containerd.Container) ([]containerd.ProcessInfo, error)
Expand Down Expand Up @@ -63,8 +64,13 @@ func (client *MockedContainerdClient) ListImages(namespace string) ([]containerd
}

// Image is a mock method
func (client *MockedContainerdClient) Image(namespace string, ctn containerd.Container) (containerd.Image, error) {
return client.MockImage(namespace, ctn)
func (client *MockedContainerdClient) Image(namespace string, name string) (containerd.Image, error) {
return client.MockImage(namespace, name)
}

// ImageOfContainer is a mock method
func (client *MockedContainerdClient) ImageOfContainer(namespace string, ctn containerd.Container) (containerd.Image, error) {
return client.MockImageOfContainer(namespace, ctn)
}

// ImageSize is a mock method
Expand Down
62 changes: 54 additions & 8 deletions pkg/workloadmeta/collectors/internal/containerd/containerd.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ const (
containerUpdateTopic = "/containers/update"
containerDeletionTopic = "/containers/delete"

imageCreationTopic = "/images/create"
imageUpdateTopic = "/images/update"
imageDeletionTopic = "/images/delete"

// These are not all the task-related topics, but enough to detect changes
// in the state of the container (only need to know if it's running or not).

Expand All @@ -56,6 +60,9 @@ var containerdTopics = []string{
containerCreationTopic,
containerUpdateTopic,
containerDeletionTopic,
imageCreationTopic,
imageUpdateTopic,
imageDeletionTopic,
TaskStartTopic,
TaskOOMTopic,
TaskExitTopic,
Expand All @@ -79,12 +86,19 @@ type collector struct {
// Container exit info (mainly exit code and exit timestamp) are attached to the corresponding task events.
// contToExitInfo caches the exit info of a task to enrich the container deletion event when it's received later.
contToExitInfo map[string]*exitInfo

knownImages *knownImages

// Map of image ID => array of repo tags
repoTags map[string][]string
}

func init() {
workloadmeta.RegisterCollector(collectorID, func() workloadmeta.Collector {
return &collector{
contToExitInfo: make(map[string]*exitInfo),
knownImages: newKnownImages(),
repoTags: make(map[string][]string),
}
})
}
Expand All @@ -110,7 +124,7 @@ func (c *collector) Start(ctx context.Context, store workloadmeta.Store) error {
eventsCtx, cancelEvents := context.WithCancel(ctx)
c.eventsChan, c.errorsChan = c.containerdClient.GetEvents().Subscribe(eventsCtx, subscribeFilters()...)

err = c.generateEventsFromContainerList(ctx)
err = c.notifyInitialEvents(ctx)
if err != nil {
cancelEvents()
return err
Expand Down Expand Up @@ -165,31 +179,36 @@ func (c *collector) stream(ctx context.Context) {
}
}

func (c *collector) generateEventsFromContainerList(ctx context.Context) error {
var events []workloadmeta.CollectorEvent
func (c *collector) notifyInitialEvents(ctx context.Context) error {
var containerEvents []workloadmeta.CollectorEvent

namespaces, err := cutil.NamespacesToWatch(ctx, c.containerdClient)
if err != nil {
return err
}

for _, namespace := range namespaces {
nsEvents, err := c.generateInitialEvents(namespace)
nsContainerEvents, err := c.generateInitialContainerEvents(namespace)
if err != nil {
return err
}
containerEvents = append(containerEvents, nsContainerEvents...)

events = append(events, nsEvents...)
if config.Datadog.GetBool("workloadmeta.image_metadata_collection.enabled") {
if err := c.notifyInitialImageEvents(ctx, namespace); err != nil {
return err
}
}
}

if len(events) > 0 {
c.store.Notify(events)
if len(containerEvents) > 0 {
c.store.Notify(containerEvents)
}

return nil
}

func (c *collector) generateInitialEvents(namespace string) ([]workloadmeta.CollectorEvent, error) {
func (c *collector) generateInitialContainerEvents(namespace string) ([]workloadmeta.CollectorEvent, error) {
var events []workloadmeta.CollectorEvent

existingContainers, err := c.containerdClient.Containers(namespace)
Expand Down Expand Up @@ -221,7 +240,30 @@ func (c *collector) generateInitialEvents(namespace string) ([]workloadmeta.Coll
return events, nil
}

func (c *collector) notifyInitialImageEvents(ctx context.Context, namespace string) error {
existingImages, err := c.containerdClient.ListImages(namespace)
if err != nil {
return err
}

for _, image := range existingImages {
if err := c.notifyEventForImage(ctx, namespace, image); err != nil {
return err
}
}

return nil
}

func (c *collector) handleEvent(ctx context.Context, containerdEvent *containerdevents.Envelope) error {
if isImageTopic(containerdEvent.Topic) {
return c.handleImageEvent(ctx, containerdEvent)
}

return c.handleContainerEvent(ctx, containerdEvent)
}

func (c *collector) handleContainerEvent(ctx context.Context, containerdEvent *containerdevents.Envelope) error {
containerID, container, err := c.extractContainerFromEvent(ctx, containerdEvent)
if err != nil {
return fmt.Errorf("cannot extract container from event: %w", err)
Expand Down Expand Up @@ -313,6 +355,10 @@ func subscribeFilters() []string {
var filters []string

for _, topic := range containerdTopics {
if isImageTopic(topic) && !config.Datadog.GetBool("workloadmeta.image_metadata_collection.enabled") {
continue
}

filters = append(filters, fmt.Sprintf(`topic==%q`, topic))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func containerdClient(container containerd.Container) fake.MockedContainerdClien
MockLabels: func(namespace string, ctn containerd.Container) (map[string]string, error) {
return labels, nil
},
MockImage: func(namespace string, ctn containerd.Container) (containerd.Image, error) {
MockImageOfContainer: func(namespace string, ctn containerd.Container) (containerd.Image, error) {
return &mockedImage{
mockName: func() string {
return imgName
Expand Down
Loading