diff --git a/notifier/processor.go b/notifier/processor.go index 8c30633a1a..5672b40560 100644 --- a/notifier/processor.go +++ b/notifier/processor.go @@ -10,6 +10,7 @@ import ( "github.com/quay/claircore/libvuln/driver" "github.com/quay/claircore/pkg/distlock" "github.com/rs/zerolog" + "golang.org/x/sync/errgroup" clairerror "github.com/quay/clair/v4/clair-error" "github.com/quay/clair/v4/indexer" @@ -71,7 +72,7 @@ func (p *Processor) process(ctx context.Context, c <-chan Event) { for { select { case <-ctx.Done(): - log.Info().Msg("ctx canceld. ending event processing") + log.Info().Msg("context canceled: ending event processing") case e := <-c: uoid := e.uo.Ref.String() log := zerolog.Ctx(ctx).With(). @@ -124,14 +125,22 @@ func (p *Processor) create(ctx context.Context, e Event, prev uuid.UUID) error { return fmt.Errorf("failed to get update diff: %v", err) } log.Debug().Int("removed", len(diff.Removed)).Int("added", len(diff.Added)).Msg("diff results") - added, err := p.indexer.AffectedManifests(ctx, diff.Added) - if err != nil { - return fmt.Errorf("failed to get added affected manifests: %v", err) + + added := &claircore.AffectedManifests{ + Vulnerabilities: make(map[string]*claircore.Vulnerability), + VulnerableManifests: make(map[string][]string), } - removed, err := p.indexer.AffectedManifests(ctx, diff.Removed) - if err != nil { - return fmt.Errorf("failed to get removed affected manifests: %v", err) + removed := &claircore.AffectedManifests{ + Vulnerabilities: make(map[string]*claircore.Vulnerability), + VulnerableManifests: make(map[string][]string), } + eg, wctx := errgroup.WithContext(ctx) + eg.Go(getAffected(wctx, p.indexer, diff.Added, added)) + eg.Go(getAffected(wctx, p.indexer, diff.Removed, removed)) + if err := eg.Wait(); err != nil { + return fmt.Errorf("failed to get affected manifests: %v", err) + } + log.Debug().Int("added", len(added.VulnerableManifests)).Int("removed", len(removed.VulnerableManifests)).Msg("affected manifest counts") if len(added.VulnerableManifests) == 0 && len(removed.VulnerableManifests) == 0 { @@ -197,6 +206,44 @@ func (p *Processor) create(ctx context.Context, e Event, prev uuid.UUID) error { return nil } +func min(a, b int) int { + if a < b { + return a + } + return b +} + +// GetAffected issues AffectedManifest calls in chunks and merges the result. +// +// Its signature is weird to make use in an errgroup a little bit nicer. +func getAffected(ctx context.Context, ic indexer.Service, vs []claircore.Vulnerability, out *claircore.AffectedManifests) func() error { + const chunk = 1000 + return func() error { + var s []claircore.Vulnerability + for len(vs) > 0 { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + s = vs[:min(chunk, len(vs))] + vs = vs[len(s):] + a, err := ic.AffectedManifests(ctx, s) + if err != nil { + return err + } + for k, v := range a.Vulnerabilities { + out.Vulnerabilities[k] = v + } + for k, v := range a.VulnerableManifests { + out.VulnerableManifests[k] = append(out.VulnerableManifests[k], v...) + } + } + out.Sort() + return nil + } +} + // safe guards against situations where creating notifications is // incorrect. // diff --git a/notifier/processor_create_test.go b/notifier/processor_create_test.go index c8b1c8ecc8..9f92227f73 100644 --- a/notifier/processor_create_test.go +++ b/notifier/processor_create_test.go @@ -8,10 +8,11 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/google/uuid" - "github.com/quay/clair/v4/indexer" - "github.com/quay/clair/v4/matcher" "github.com/quay/claircore" "github.com/quay/claircore/libvuln/driver" + + "github.com/quay/clair/v4/indexer" + "github.com/quay/clair/v4/matcher" ) var ( @@ -84,8 +85,8 @@ func testProcessorStoreErr(t *testing.T) { mm := &matcher.Mock{ UpdateDiff_: func(context.Context, uuid.UUID, uuid.UUID) (*driver.UpdateDiff, error) { return &driver.UpdateDiff{ - Added: []claircore.Vulnerability{}, - Removed: []claircore.Vulnerability{}, + Added: []claircore.Vulnerability{*vulnAdd}, + Removed: []claircore.Vulnerability{*vulnRemoved}, }, nil }, } @@ -129,8 +130,8 @@ func testProcessorIndexerErr(t *testing.T) { mm := &matcher.Mock{ UpdateDiff_: func(context.Context, uuid.UUID, uuid.UUID) (*driver.UpdateDiff, error) { return &driver.UpdateDiff{ - Added: []claircore.Vulnerability{}, - Removed: []claircore.Vulnerability{}, + Added: []claircore.Vulnerability{*vulnAdd}, + Removed: []claircore.Vulnerability{*vulnRemoved}, }, nil }, } @@ -209,23 +210,25 @@ func testProcessorCreate(t *testing.T) { mm := &matcher.Mock{ UpdateDiff_: func(context.Context, uuid.UUID, uuid.UUID) (*driver.UpdateDiff, error) { return &driver.UpdateDiff{ - Added: []claircore.Vulnerability{}, - Removed: []claircore.Vulnerability{}, + Added: []claircore.Vulnerability{*vulnAdd}, + Removed: []claircore.Vulnerability{*vulnRemoved}, }, nil }, } count := 0 im := &indexer.Mock{ AffectedManifests_: func(ctx context.Context, vulns []claircore.Vulnerability) (*claircore.AffectedManifests, error) { - switch count { - case 0: - count++ + if count > 1 { + return nil, fmt.Errorf("unexpected number of calls") + } + count++ + switch vulns[0].ID { + case "0": return affectedManifestsAdd, nil - case 1: + case "1": return affectedManifestsRemoved, nil - default: - return nil, fmt.Errorf("unexpected number of calls") } + return nil, fmt.Errorf("unexpected call") }, } // perform bulk of checks in this mock method.