diff --git a/pkg/security/resolvers/cgroup/model/model.go b/pkg/security/resolvers/cgroup/model/model.go index 1990c8511e033..83cbbe2217f40 100644 --- a/pkg/security/resolvers/cgroup/model/model.go +++ b/pkg/security/resolvers/cgroup/model/model.go @@ -102,8 +102,3 @@ func (cgce *CacheEntry) GetWorkloadSelectorCopy() *WorkloadSelector { Tag: cgce.WorkloadSelector.Tag, } } - -// NeedsTagsResolution returns true if this workload is missing its tags -func (cgce *CacheEntry) NeedsTagsResolution() bool { - return len(cgce.ContainerID) != 0 && !cgce.WorkloadSelector.IsReady() -} diff --git a/pkg/security/resolvers/tags/resolver_linux.go b/pkg/security/resolvers/tags/resolver_linux.go index 40f368062eae4..e11b2001dba4a 100644 --- a/pkg/security/resolvers/tags/resolver_linux.go +++ b/pkg/security/resolvers/tags/resolver_linux.go @@ -13,14 +13,20 @@ import ( "github.com/DataDog/datadog-agent/pkg/security/resolvers/cgroup" cgroupModel "github.com/DataDog/datadog-agent/pkg/security/resolvers/cgroup/model" + "github.com/DataDog/datadog-agent/pkg/security/seclog" "github.com/DataDog/datadog-agent/pkg/security/utils" ) +type pendingWorkload struct { + *cgroupModel.CacheEntry + retries int +} + // LinuxResolver represents a default resolver based directly on the underlying tagger type LinuxResolver struct { *DefaultResolver *utils.Notifier[Event, *cgroupModel.CacheEntry] - workloadsWithoutTags chan *cgroupModel.CacheEntry + workloadsWithoutTags chan *pendingWorkload cgroupResolver *cgroup.Resolver } @@ -30,7 +36,10 @@ func (t *LinuxResolver) Start(ctx context.Context) error { return err } - if err := t.cgroupResolver.RegisterListener(cgroup.CGroupCreated, t.checkTags); err != nil { + if err := t.cgroupResolver.RegisterListener(cgroup.CGroupCreated, func(cgce *cgroupModel.CacheEntry) { + workload := &pendingWorkload{CacheEntry: cgce, retries: 3} + t.checkTags(workload) + }); err != nil { return err } @@ -46,12 +55,17 @@ func (t *LinuxResolver) Start(ctx context.Context) error { case <-ctx.Done(): return case <-delayerTick.C: - select { - case workload := <-t.workloadsWithoutTags: - t.checkTags(workload) - default: - } + WORKLOAD: + // we want to process approximately the number of workloads in the queue + for workloadCount := len(t.workloadsWithoutTags); workloadCount > 0; workloadCount-- { + select { + case workload := <-t.workloadsWithoutTags: + t.checkTags(workload) + default: + break WORKLOAD + } + } } } }() @@ -59,22 +73,32 @@ func (t *LinuxResolver) Start(ctx context.Context) error { return nil } +func needsTagsResolution(cgce *cgroupModel.CacheEntry) bool { + return len(cgce.ContainerID) != 0 && !cgce.WorkloadSelector.IsReady() +} + // checkTags checks if the tags of a workload were properly set -func (t *LinuxResolver) checkTags(workload *cgroupModel.CacheEntry) { - // check if the workload tags were found - if workload.NeedsTagsResolution() { - // this is a container, try to resolve its tags now - if err := t.fetchTags(workload); err != nil || workload.NeedsTagsResolution() { - // push to the workloadsWithoutTags chan so that its tags can be resolved later - select { - case t.workloadsWithoutTags <- workload: - default: +func (t *LinuxResolver) checkTags(pendingWorkload *pendingWorkload) { + workload := pendingWorkload.CacheEntry + // check if the workload tags were found or if it was deleted + if !workload.Deleted.Load() && needsTagsResolution(workload) { + // this is an alive cgroup, try to resolve its tags now + if err := t.fetchTags(workload); err != nil || needsTagsResolution(workload) { + if pendingWorkload.retries--; pendingWorkload.retries >= 0 { + // push to the workloadsWithoutTags chan so that its tags can be resolved later + select { + case t.workloadsWithoutTags <- pendingWorkload: + default: + seclog.Warnf("Failed to requeue workload %s for tags retrieval", workload.ContainerID) + } + } else { + seclog.Debugf("Failed to resolve tags for workload %s", workload.ContainerID) } return } - } - t.NotifyListeners(WorkloadSelectorResolved, workload) + t.NotifyListeners(WorkloadSelectorResolved, workload) + } } // fetchTags fetches tags for the provided workload @@ -92,7 +116,7 @@ func NewResolver(tagger Tagger, cgroupsResolver *cgroup.Resolver) *LinuxResolver resolver := &LinuxResolver{ Notifier: utils.NewNotifier[Event, *cgroupModel.CacheEntry](), DefaultResolver: NewDefaultResolver(tagger), - workloadsWithoutTags: make(chan *cgroupModel.CacheEntry, 100), + workloadsWithoutTags: make(chan *pendingWorkload, 100), cgroupResolver: cgroupsResolver, } return resolver