Skip to content

Commit

Permalink
[CWS] Change tags retrieval retry logic (DataDog#31606)
Browse files Browse the repository at this point in the history
  • Loading branch information
lebauce authored Dec 3, 2024
1 parent 809bb8e commit 7618f3a
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 24 deletions.
5 changes: 0 additions & 5 deletions pkg/security/resolvers/cgroup/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,3 @@ func (cgce *CacheEntry) GetWorkloadSelectorCopy() *WorkloadSelector {
Tag: cgce.WorkloadSelector.Tag,
}
}

// NeedsTagsResolution returns true if this workload is missing its tags
func (cgce *CacheEntry) NeedsTagsResolution() bool {
return len(cgce.ContainerID) != 0 && !cgce.WorkloadSelector.IsReady()
}
62 changes: 43 additions & 19 deletions pkg/security/resolvers/tags/resolver_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,20 @@ import (

"github.com/DataDog/datadog-agent/pkg/security/resolvers/cgroup"
cgroupModel "github.com/DataDog/datadog-agent/pkg/security/resolvers/cgroup/model"
"github.com/DataDog/datadog-agent/pkg/security/seclog"
"github.com/DataDog/datadog-agent/pkg/security/utils"
)

type pendingWorkload struct {
*cgroupModel.CacheEntry
retries int
}

// LinuxResolver represents a default resolver based directly on the underlying tagger
type LinuxResolver struct {
*DefaultResolver
*utils.Notifier[Event, *cgroupModel.CacheEntry]
workloadsWithoutTags chan *cgroupModel.CacheEntry
workloadsWithoutTags chan *pendingWorkload
cgroupResolver *cgroup.Resolver
}

Expand All @@ -30,7 +36,10 @@ func (t *LinuxResolver) Start(ctx context.Context) error {
return err
}

if err := t.cgroupResolver.RegisterListener(cgroup.CGroupCreated, t.checkTags); err != nil {
if err := t.cgroupResolver.RegisterListener(cgroup.CGroupCreated, func(cgce *cgroupModel.CacheEntry) {
workload := &pendingWorkload{CacheEntry: cgce, retries: 3}
t.checkTags(workload)
}); err != nil {
return err
}

Expand All @@ -46,35 +55,50 @@ func (t *LinuxResolver) Start(ctx context.Context) error {
case <-ctx.Done():
return
case <-delayerTick.C:
select {
case workload := <-t.workloadsWithoutTags:
t.checkTags(workload)
default:
}

WORKLOAD:
// we want to process approximately the number of workloads in the queue
for workloadCount := len(t.workloadsWithoutTags); workloadCount > 0; workloadCount-- {
select {
case workload := <-t.workloadsWithoutTags:
t.checkTags(workload)
default:
break WORKLOAD
}
}
}
}
}()

return nil
}

func needsTagsResolution(cgce *cgroupModel.CacheEntry) bool {
return len(cgce.ContainerID) != 0 && !cgce.WorkloadSelector.IsReady()
}

// checkTags checks if the tags of a workload were properly set
func (t *LinuxResolver) checkTags(workload *cgroupModel.CacheEntry) {
// check if the workload tags were found
if workload.NeedsTagsResolution() {
// this is a container, try to resolve its tags now
if err := t.fetchTags(workload); err != nil || workload.NeedsTagsResolution() {
// push to the workloadsWithoutTags chan so that its tags can be resolved later
select {
case t.workloadsWithoutTags <- workload:
default:
func (t *LinuxResolver) checkTags(pendingWorkload *pendingWorkload) {
workload := pendingWorkload.CacheEntry
// check if the workload tags were found or if it was deleted
if !workload.Deleted.Load() && needsTagsResolution(workload) {
// this is an alive cgroup, try to resolve its tags now
if err := t.fetchTags(workload); err != nil || needsTagsResolution(workload) {
if pendingWorkload.retries--; pendingWorkload.retries >= 0 {
// push to the workloadsWithoutTags chan so that its tags can be resolved later
select {
case t.workloadsWithoutTags <- pendingWorkload:
default:
seclog.Warnf("Failed to requeue workload %s for tags retrieval", workload.ContainerID)
}
} else {
seclog.Debugf("Failed to resolve tags for workload %s", workload.ContainerID)
}
return
}
}

t.NotifyListeners(WorkloadSelectorResolved, workload)
t.NotifyListeners(WorkloadSelectorResolved, workload)
}
}

// fetchTags fetches tags for the provided workload
Expand All @@ -92,7 +116,7 @@ func NewResolver(tagger Tagger, cgroupsResolver *cgroup.Resolver) *LinuxResolver
resolver := &LinuxResolver{
Notifier: utils.NewNotifier[Event, *cgroupModel.CacheEntry](),
DefaultResolver: NewDefaultResolver(tagger),
workloadsWithoutTags: make(chan *cgroupModel.CacheEntry, 100),
workloadsWithoutTags: make(chan *pendingWorkload, 100),
cgroupResolver: cgroupsResolver,
}
return resolver
Expand Down

0 comments on commit 7618f3a

Please sign in to comment.