From 7a5ab997329663bc425a3ebd52988c6fa53d5458 Mon Sep 17 00:00:00 2001 From: Sylvain Baubeau Date: Tue, 7 Jan 2025 11:49:00 +0100 Subject: [PATCH] [Backport 7.62.x] [CWS] rework sbom resolver (#32699) Co-authored-by: Sylvain Afchain --- pkg/security/resolvers/sbom/resolver.go | 342 ++++++++++++++---------- 1 file changed, 200 insertions(+), 142 deletions(-) diff --git a/pkg/security/resolvers/sbom/resolver.go b/pkg/security/resolvers/sbom/resolver.go index 6fe1b6f344163..96713cd702649 100644 --- a/pkg/security/resolvers/sbom/resolver.go +++ b/pkg/security/resolvers/sbom/resolver.go @@ -13,6 +13,7 @@ import ( "errors" "fmt" "os" + "slices" "strings" "sync" "syscall" @@ -42,87 +43,105 @@ import ( "github.com/DataDog/datadog-agent/pkg/util/trivy" ) -// SBOMSource defines is the default log source for the SBOM events -const SBOMSource = "runtime-security-agent" +const ( + // SBOMSource defines is the default log source for the SBOM events + SBOMSource = "runtime-security-agent" -const maxSBOMGenerationRetries = 3 + // state of the sboms + pendingState int64 = iota + 1 + computedState + stoppedState + + maxSBOMGenerationRetries = 3 + maxSBOMEntries = 1024 + scanQueueSize = 100 +) var errNoProcessForContainerID = errors.New("found no running process matching the given container ID") +// Data use the keep the result of a scan of a same workload across multiple +// container +type Data struct { + files fileQuerier +} + // SBOM defines an SBOM type SBOM struct { sync.RWMutex - report *trivy.Report - files fileQuerier - Host string Source string Service string ContainerID containerutils.ContainerID - workloadKey string - deleted *atomic.Bool - scanSuccessful *atomic.Bool - cgroup *cgroupModel.CacheEntry + data *Data - refresh *debouncer.Debouncer + workloadKey workloadKey + + cgroup *cgroupModel.CacheEntry + state *atomic.Int64 + + refresher *debouncer.Debouncer } -func getWorkloadKey(selector *cgroupModel.WorkloadSelector) string { - return selector.Image + ":" + selector.Tag +type workloadKey string + +func getWorkloadKey(selector *cgroupModel.WorkloadSelector) workloadKey { + return workloadKey(selector.Image + ":" + selector.Tag) } // IsComputed returns true if SBOM was successfully generated func (s *SBOM) IsComputed() bool { - return s.scanSuccessful.Load() + return s.state.Load() == computedState } // SetReport sets the SBOM report -func (s *SBOM) SetReport(report *trivy.Report) { +func (s *SBOM) setReport(report *trivy.Report) { // build file cache - s.files = newFileQuerier(report) + s.data.files = newFileQuerier(report) } -// reset (thread unsafe) cleans up internal fields before a SBOM is inserted in cache, the goal is to save space and delete references -// to structs that will be GCed -func (s *SBOM) reset() { - s.Host = "" - s.Source = "" - s.Service = "" - s.ContainerID = "" - s.cgroup = nil - s.deleted.Store(true) - if s.refresh != nil { - s.refresh.Stop() - s.refresh = nil +func (s *SBOM) stop() { + if s.refresher != nil { + s.refresher.Stop() + + // don't forget to set the refresher to nil otherwise it generates a memleak + s.refresher = nil } + + // change the state so that already queued sbom won't be handled + s.state.Store(stoppedState) } // NewSBOM returns a new empty instance of SBOM -func NewSBOM(host string, source string, id containerutils.ContainerID, cgroup *cgroupModel.CacheEntry, workloadKey string) (*SBOM, error) { - sbom := &SBOM{ - files: fileQuerier{}, - Host: host, - Source: source, - ContainerID: id, - workloadKey: workloadKey, - deleted: atomic.NewBool(false), - scanSuccessful: atomic.NewBool(false), - cgroup: cgroup, - } - - return sbom, nil +func NewSBOM(host string, source string, id containerutils.ContainerID, cgroup *cgroupModel.CacheEntry, workloadKey workloadKey) *SBOM { + return &SBOM{ + Host: host, + Source: source, + ContainerID: id, + workloadKey: workloadKey, + state: atomic.NewInt64(pendingState), + cgroup: cgroup, + data: &Data{}, + } } // Resolver is the Software Bill-Of-material resolver type Resolver struct { - cfg *config.RuntimeSecurityConfig - sbomsLock sync.RWMutex - sboms map[containerutils.ContainerID]*SBOM - sbomsCacheLock sync.RWMutex - sbomsCache *simplelru.LRU[string, *SBOM] - scannerChan chan *SBOM + cfg *config.RuntimeSecurityConfig + + sbomsLock sync.RWMutex + sboms *simplelru.LRU[containerutils.ContainerID, *SBOM] + + // cache + dataCacheLock sync.RWMutex + dataCache *simplelru.LRU[workloadKey, *Data] // cache per workload key + + // queue + scanChan chan *SBOM + pendingScanLock sync.Mutex + pendingScan []containerutils.ContainerID + statsdClient statsd.ClientInterface sbomScanner *sbomscanner.Scanner hostRootDevice uint64 @@ -149,7 +168,7 @@ func NewSBOMResolver(c *config.RuntimeSecurityConfig, statsdClient statsd.Client return nil, errors.New("sbom is disabled") } - sbomsCache, err := simplelru.NewLRU[string, *SBOM](c.SBOMResolverWorkloadsCacheSize, nil) + dataCache, err := simplelru.NewLRU[workloadKey, *Data](c.SBOMResolverWorkloadsCacheSize, nil) if err != nil { return nil, fmt.Errorf("couldn't create new SBOMResolver: %w", err) } @@ -167,9 +186,8 @@ func NewSBOMResolver(c *config.RuntimeSecurityConfig, statsdClient statsd.Client resolver := &Resolver{ cfg: c, statsdClient: statsdClient, - sboms: make(map[containerutils.ContainerID]*SBOM), - sbomsCache: sbomsCache, - scannerChan: make(chan *SBOM, 100), + dataCache: dataCache, + scanChan: make(chan *SBOM, 100), sbomScanner: sbomScanner, hostRootDevice: stat.Dev, sbomGenerations: atomic.NewUint64(0), @@ -178,11 +196,22 @@ func NewSBOMResolver(c *config.RuntimeSecurityConfig, statsdClient statsd.Client failedSBOMGenerations: atomic.NewUint64(0), } + sboms, err := simplelru.NewLRU[containerutils.ContainerID, *SBOM](maxSBOMEntries, func(_ containerutils.ContainerID, sbom *SBOM) { + // should be trigger from a function already locking the sbom, see Add, Delete + sbom.stop() + resolver.removePendingScan(sbom.ContainerID) + }) + if err != nil { + return nil, fmt.Errorf("couldn't create new SBOM resolver: %w", err) + } + resolver.sboms = sboms + if !c.SBOMResolverEnabled { return resolver, nil } resolver.prepareContextTags() + return resolver, nil } @@ -221,17 +250,14 @@ func (r *Resolver) Start(ctx context.Context) error { hostRoot = "/" } - hostSBOM, err := NewSBOM(r.hostname, r.source, "", nil, "") - if err != nil { - return err - } - r.hostSBOM = hostSBOM + r.hostSBOM = NewSBOM(r.hostname, r.source, "", nil, "") report, err := r.generateSBOM(hostRoot) if err != nil { return err } - r.hostSBOM.SetReport(report) + r.hostSBOM.setReport(report) + r.hostSBOM.state.Store(computedState) } go func() { @@ -242,7 +268,7 @@ func (r *Resolver) Start(ctx context.Context) error { select { case <-ctx.Done(): return - case sbom := <-r.scannerChan: + case sbom := <-r.scanChan: if err := retry.Do(func() error { return r.analyzeWorkload(sbom) }, retry.Attempts(maxSBOMGenerationRetries), retry.Delay(200*time.Millisecond)); err != nil { @@ -263,7 +289,30 @@ func (r *Resolver) Start(ctx context.Context) error { func (r *Resolver) RefreshSBOM(containerID containerutils.ContainerID) error { if sbom := r.getSBOM(containerID); sbom != nil { seclog.Debugf("Refreshing SBOM for container %s", containerID) - sbom.refresh.Call() + + var refresher *debouncer.Debouncer + + // create a refresher debouncer on demand + sbom.Lock() + refresher = sbom.refresher + if refresher == nil { + refresher = debouncer.New( + 3*time.Second, func() { + // invalid cache data + r.removeSBOMData(sbom.workloadKey) + + sbom.Lock() + r.triggerScan(sbom) + sbom.Unlock() + }, + ) + refresher.Start() + sbom.refresher = refresher + } + sbom.Unlock() + + refresher.Call() + return nil } return fmt.Errorf("container %s not found", containerID) @@ -317,11 +366,9 @@ func (r *Resolver) doScan(sbom *SBOM) (*trivy.Report, error) { // the container ID reduces drastically the likelihood of this race) computedID, err := utils.GetProcContainerID(rootCandidatePID, rootCandidatePID) if err != nil { - sbom.cgroup.RemovePID(rootCandidatePID) continue } if computedID != sbom.ContainerID { - sbom.cgroup.RemovePID(rootCandidatePID) continue } @@ -341,7 +388,7 @@ func (r *Resolver) doScan(sbom *SBOM) (*trivy.Report, error) { } if report, lastErr = r.generateSBOM(containerProcRootPath); lastErr == nil { - sbom.SetReport(report) + sbom.setReport(report) scanned = true break } @@ -357,51 +404,85 @@ func (r *Resolver) doScan(sbom *SBOM) (*trivy.Report, error) { return report, nil } -func (r *Resolver) invalidateWorkflow(sbom *SBOM) { - r.sbomsCacheLock.Lock() - r.sbomsCache.Remove(sbom.workloadKey) - r.sbomsCacheLock.Unlock() +func (r *Resolver) removeSBOMData(key workloadKey) { + r.dataCacheLock.Lock() + r.dataCache.Remove(key) + r.dataCacheLock.Unlock() +} + +func (r *Resolver) addPendingScan(containerID containerutils.ContainerID) bool { + r.pendingScanLock.Lock() + defer r.pendingScanLock.Unlock() + + if len(r.pendingScan) >= scanQueueSize { + return false + } + + if slices.Contains(r.pendingScan, containerID) { + return false + } + r.pendingScan = append(r.pendingScan, containerID) + + return true +} + +func (r *Resolver) removePendingScan(containerID containerutils.ContainerID) { + r.pendingScanLock.Lock() + defer r.pendingScanLock.Unlock() + + r.pendingScan = slices.DeleteFunc(r.pendingScan, func(v containerutils.ContainerID) bool { + return v == containerID + }) } // analyzeWorkload generates the SBOM of the provided sbom and send it to the security agent func (r *Resolver) analyzeWorkload(sbom *SBOM) error { - seclog.Infof("analyzing sbom '%s'", sbom.ContainerID) sbom.Lock() defer sbom.Unlock() - if sbom.deleted.Load() { - // this sbom has been deleted, ignore + seclog.Infof("analyzing sbom '%s'", sbom.ContainerID) + + if sbom.state.Load() != pendingState { + r.removePendingScan(sbom.ContainerID) + + // should not append, ignore + seclog.Warnf("trying to analyze a sbom not in pending state for '%s': %d", sbom.ContainerID, sbom.state.Load()) return nil } // bail out if the workload has been analyzed while queued up - r.sbomsCacheLock.RLock() - if r.sbomsCache.Contains(sbom.workloadKey) { - r.sbomsCacheLock.RUnlock() + r.dataCacheLock.RLock() + if data, exists := r.dataCache.Get(sbom.workloadKey); exists { + r.dataCacheLock.RUnlock() + sbom.data = data + + r.removePendingScan(sbom.ContainerID) + return nil } - r.sbomsCacheLock.RUnlock() + r.dataCacheLock.RUnlock() report, err := r.doScan(sbom) if err != nil { return err } - // build file cache - sbom.files = newFileQuerier(report) - - // we can get rid of the report now that we've generate the file mapping - sbom.report = nil + data := &Data{ + files: newFileQuerier(report), + } + sbom.data = data - // mark the SBOM ass successful - sbom.scanSuccessful.Store(true) + // mark the SBOM as successful + sbom.state.Store(computedState) // add to cache - r.sbomsCacheLock.Lock() - r.sbomsCache.Add(sbom.workloadKey, sbom) - r.sbomsCacheLock.Unlock() + r.dataCacheLock.Lock() + r.dataCache.Add(sbom.workloadKey, data) + r.dataCacheLock.Unlock() - seclog.Infof("new sbom generated for '%s': %d files added", sbom.ContainerID, sbom.files.len()) + r.removePendingScan(sbom.ContainerID) + + seclog.Infof("new sbom generated for '%s': %d files added", sbom.ContainerID, data.files.len()) return nil } @@ -411,7 +492,7 @@ func (r *Resolver) getSBOM(containerID containerutils.ContainerID) *SBOM { sbom := r.hostSBOM if containerID != "" { - sbom = r.sboms[containerID] + sbom, _ = r.sboms.Get(containerID) } return sbom } @@ -427,49 +508,37 @@ func (r *Resolver) ResolvePackage(containerID containerutils.ContainerID, file * sbom.Lock() defer sbom.Unlock() - return sbom.files.queryFile(file.PathnameStr) + return sbom.data.files.queryFile(file.PathnameStr) } -// newWorkloadEntry (thread unsafe) creates a new SBOM entry for the sbom designated by the provided process cache +// newSBOM (thread unsafe) creates a new SBOM entry for the sbom designated by the provided process cache // entry -func (r *Resolver) newWorkloadEntry(id containerutils.ContainerID, cgroup *cgroupModel.CacheEntry, workloadKey string) (*SBOM, error) { - sbom, err := NewSBOM(r.hostname, r.source, id, cgroup, workloadKey) - if err != nil { - return nil, err - } - - sbom.refresh = debouncer.New( - 3*time.Second, func() { - r.invalidateWorkflow(sbom) - r.triggerScan(sbom) - }, - ) - r.sboms[id] = sbom - sbom.refresh.Start() - - return sbom, nil +func (r *Resolver) newSBOM(id containerutils.ContainerID, cgroup *cgroupModel.CacheEntry, workloadKey workloadKey) *SBOM { + sbom := NewSBOM(r.hostname, r.source, id, cgroup, workloadKey) + r.sboms.Add(id, sbom) + return sbom } -// queueWorkload inserts the provided sbom in a SBOM resolver chan, it will be inserted in the scannerChan or the +// queueWorkload inserts the provided sbom in a SBOM resolver chan, it will be inserted in the scanChan or the // delayerChan depending on the tags that have been resolved func (r *Resolver) queueWorkload(sbom *SBOM) { sbom.Lock() defer sbom.Unlock() - if sbom.deleted.Load() { + if sbom.state.Load() != pendingState { // this sbom was deleted before we could scan it, ignore it return } // check if this sbom has been scanned before - r.sbomsCacheLock.Lock() - defer r.sbomsCacheLock.Unlock() - - cachedSBOM, ok := r.sbomsCache.Get(sbom.workloadKey) - if ok { - // copy report and file cache (keeping a reference is fine, we won't be modifying the content) - sbom.files = cachedSBOM.files - sbom.report = cachedSBOM.report + r.dataCacheLock.Lock() + defer r.dataCacheLock.Unlock() + + if data, ok := r.dataCache.Get(sbom.workloadKey); ok { + sbom.data = data + + sbom.state.Store(computedState) + r.sbomsCacheHit.Inc() return } @@ -479,10 +548,17 @@ func (r *Resolver) queueWorkload(sbom *SBOM) { } func (r *Resolver) triggerScan(sbom *SBOM) { + if !r.addPendingScan(sbom.ContainerID) { + r.deleteSBOM(sbom) + return + } + // push sbom to the scanner chan select { - case r.scannerChan <- sbom: + case r.scanChan <- sbom: default: + r.removePendingScan(sbom.ContainerID) + r.deleteSBOM(sbom) } } @@ -501,13 +577,10 @@ func (r *Resolver) OnWorkloadSelectorResolvedEvent(workload *tags.Workload) { return } - _, ok := r.sboms[id] + _, ok := r.sboms.Get(id) if !ok { workloadKey := getWorkloadKey(workload.Selector.Copy()) - sbom, err := r.newWorkloadEntry(id, workload.CacheEntry, workloadKey) - if err != nil { - seclog.Errorf("couldn't create new SBOM entry for sbom '%s': %v", id, err) - } + sbom := r.newSBOM(id, workload.CacheEntry, workloadKey) r.queueWorkload(sbom) } } @@ -521,7 +594,8 @@ func (r *Resolver) GetWorkload(id containerutils.ContainerID) *SBOM { return r.hostSBOM } - return r.sboms[id] + sbom, _ := r.sboms.Get(id) + return sbom } // OnCGroupDeletedEvent is used to handle a CGroupDeleted event @@ -548,32 +622,16 @@ func (r *Resolver) deleteSBOM(sbom *SBOM) { defer r.sbomsLock.Unlock() seclog.Infof("deleting SBOM entry for '%s'", sbom.ContainerID) - // remove SBOM entry - delete(r.sboms, sbom.ContainerID) - - // check if the scan was successful - if !sbom.scanSuccessful.Load() { - // exit now, we don't want to cache a failed scan - return - } - - // save the sbom key before reset - sbomKey := sbom.workloadKey - - // cleanup and insert SBOM in cache - sbom.reset() - // push the sbom to the cache - r.sbomsCacheLock.Lock() - defer r.sbomsCacheLock.Unlock() - r.sbomsCache.Add(sbomKey, sbom) + // should be called under sbom.Lock + r.sboms.Remove(sbom.ContainerID) } // SendStats sends stats func (r *Resolver) SendStats() error { r.sbomsLock.RLock() defer r.sbomsLock.RUnlock() - if val := float64(len(r.sboms)); val > 0 { + if val := float64(r.sboms.Len()); val > 0 { if err := r.statsdClient.Gauge(metrics.MetricSBOMResolverActiveSBOMs, val, []string{}, 1.0); err != nil { return fmt.Errorf("couldn't send MetricSBOMResolverActiveSBOMs: %w", err) } @@ -585,9 +643,9 @@ func (r *Resolver) SendStats() error { } } - r.sbomsCacheLock.Lock() - defer r.sbomsCacheLock.Unlock() - if val := float64(r.sbomsCache.Len()); val > 0 { + r.dataCacheLock.Lock() + defer r.dataCacheLock.Unlock() + if val := float64(r.dataCache.Len()); val > 0 { if err := r.statsdClient.Gauge(metrics.MetricSBOMResolverSBOMCacheLen, val, []string{}, 1.0); err != nil { return fmt.Errorf("couldn't send MetricSBOMResolverSBOMCacheLen: %w", err) }