diff --git a/lib/cache/cache.go b/lib/cache/cache.go index 9c2d04694c287..862cabeee7944 100644 --- a/lib/cache/cache.go +++ b/lib/cache/cache.go @@ -114,6 +114,7 @@ func makeAllKnownCAsFilter() types.CertAuthorityFilter { // ForAuth sets up watch configuration for the auth server func ForAuth(cfg Config) Config { cfg.target = "auth" + cfg.EnableRelativeExpiry = true cfg.Watches = []types.WatchKind{ {Kind: types.KindCertAuthority, LoadSecrets: true}, {Kind: types.KindClusterName}, @@ -741,6 +742,9 @@ type Config struct { // healthy even if some of the requested resource kinds aren't // supported by the event source. DisablePartialHealth bool + // EnableRelativeExpiry turns on purging expired items from the cache even + // if delete events have not been received from the backend. + EnableRelativeExpiry bool } // CheckAndSetDefaults checks parameters and sets default values @@ -1282,18 +1286,14 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry retryutils.Retry, timer retry.Reset() - // only enable relative node expiry if the cache is configured - // to watch for types.KindNode + // Only enable relative node expiry for the auth cache. relativeExpiryInterval := interval.NewNoop() - for _, watch := range c.Config.Watches { - if watch.Kind == types.KindNode { - relativeExpiryInterval = interval.New(interval.Config{ - Duration: c.Config.RelativeExpiryCheckInterval, - FirstDuration: utils.HalfJitter(c.Config.RelativeExpiryCheckInterval), - Jitter: retryutils.NewSeventhJitter(), - }) - break - } + if c.EnableRelativeExpiry { + relativeExpiryInterval = interval.New(interval.Config{ + Duration: c.Config.RelativeExpiryCheckInterval, + FirstDuration: utils.HalfJitter(c.Config.RelativeExpiryCheckInterval), + Jitter: retryutils.NewSeventhJitter(), + }) } defer relativeExpiryInterval.Stop() @@ -1346,8 +1346,7 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry retryutils.Retry, timer } } - err = c.processEvent(ctx, event, true) - if err != nil { + if err := c.processEvent(ctx, event); err != nil { return trace.Wrap(err) } c.notify(c.ctx, Event{Event: event, Type: EventProcessed}) @@ -1440,7 +1439,7 @@ func (c *Cache) performRelativeNodeExpiry(ctx context.Context) error { Kind: types.KindNode, Metadata: node.GetMetadata(), }, - }, false); err != nil { + }); err != nil { return trace.Wrap(err) } @@ -1607,9 +1606,9 @@ func (c *Cache) fetch(ctx context.Context, confirmedKinds map[resourceKind]types } // processEvent hands the event off to the appropriate collection for processing. Any -// resources which were not registered are ignored. If processing completed successfully -// and emit is true the event will be emitted via the fanout. -func (c *Cache) processEvent(ctx context.Context, event types.Event, emit bool) error { +// resources which were not registered are ignored. If processing completed successfully, +// the event will be emitted via the fanout. +func (c *Cache) processEvent(ctx context.Context, event types.Event) error { resourceKind := resourceKindFromResource(event.Resource) collection, ok := c.collections.byKind[resourceKind] if !ok { @@ -1619,14 +1618,14 @@ func (c *Cache) processEvent(ctx context.Context, event types.Event, emit bool) if err := collection.processEvent(ctx, event); err != nil { return trace.Wrap(err) } - if emit { - c.eventsFanout.Emit(event) - if !isHighVolumeResource(resourceKind.kind) { - c.lowVolumeEventsFanout.ForEach(func(f *services.FanoutV2) { - f.Emit(event) - }) - } + + c.eventsFanout.Emit(event) + if !isHighVolumeResource(resourceKind.kind) { + c.lowVolumeEventsFanout.ForEach(func(f *services.FanoutV2) { + f.Emit(event) + }) } + return nil } diff --git a/lib/cache/cache_test.go b/lib/cache/cache_test.go index 52491acf907b5..d52208cb14238 100644 --- a/lib/cache/cache_test.go +++ b/lib/cache/cache_test.go @@ -2552,6 +2552,7 @@ func TestRelativeExpiry(t *testing.T) { } func TestRelativeExpiryLimit(t *testing.T) { + t.Parallel() const ( checkInterval = time.Second nodeCount = 100 @@ -2569,7 +2570,7 @@ func TestRelativeExpiryLimit(t *testing.T) { c.RelativeExpiryCheckInterval = checkInterval c.RelativeExpiryLimit = expiryLimit c.Clock = clock - return ForProxy(c) + return ForAuth(c) }) t.Cleanup(p.Close) @@ -2609,7 +2610,8 @@ func TestRelativeExpiryLimit(t *testing.T) { } } -func TestRelativeExpiryOnlyForNodeWatches(t *testing.T) { +func TestRelativeExpiryOnlyForAuth(t *testing.T) { + t.Parallel() clock := clockwork.NewFakeClockAt(time.Now().Add(time.Hour)) p := newTestPack(t, func(c Config) Config { c.RelativeExpiryCheckInterval = time.Second @@ -2622,9 +2624,10 @@ func TestRelativeExpiryOnlyForNodeWatches(t *testing.T) { p2 := newTestPack(t, func(c Config) Config { c.RelativeExpiryCheckInterval = time.Second c.Clock = clock + c.target = "llama" c.Watches = []types.WatchKind{ {Kind: types.KindNamespace}, - {Kind: types.KindNamespace}, + {Kind: types.KindNode}, {Kind: types.KindCertAuthority}, } return c @@ -2634,8 +2637,9 @@ func TestRelativeExpiryOnlyForNodeWatches(t *testing.T) { for i := 0; i < 2; i++ { clock.Advance(time.Hour * 24) drainEvents(p.eventsC) - expectEvent(t, p.eventsC, RelativeExpiry) + unexpectedEvent(t, p.eventsC, RelativeExpiry) + clock.Advance(time.Hour * 24) drainEvents(p2.eventsC) unexpectedEvent(t, p2.eventsC, RelativeExpiry) }