Skip to content

Commit

Permalink
Restore relative node expiry to original behavior
Browse files Browse the repository at this point in the history
When relative node expiry was originally implemented it only ran
on Auth and propagated delete events downstream. However, that
was quickly changed in #12002 because the event system could not
keep up in clusters with high churn and would result in buffer
overflow errors in downstream caches. The event system has been
overhauled to use FanoutV2, which no longer suffers from the
burst of events causing problems. The change to not propagate
delete events also breaks any node watchers on the local
cache from ever expiring the server since they never receive
a delete event.

This reverts some of the changes from #12002 such that relative
expiry now only runs on the auth cache and emits delete events.
Each relative expiry interval is however still limited to only
remove up to a fixed number of nodes.

Closes #37527
  • Loading branch information
rosstimothy committed Feb 15, 2024
1 parent 2eef6d7 commit 78caa9a
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 28 deletions.
47 changes: 23 additions & 24 deletions lib/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func makeAllKnownCAsFilter() types.CertAuthorityFilter {
// ForAuth sets up watch configuration for the auth server
func ForAuth(cfg Config) Config {
cfg.target = "auth"
cfg.EnableRelativeExpiry = true
cfg.Watches = []types.WatchKind{
{Kind: types.KindCertAuthority, LoadSecrets: true},
{Kind: types.KindClusterName},
Expand Down Expand Up @@ -741,6 +742,9 @@ type Config struct {
// healthy even if some of the requested resource kinds aren't
// supported by the event source.
DisablePartialHealth bool
// EnableRelativeExpiry turns on purging expired items from the cache even
// if delete events have not been received from the backend.
EnableRelativeExpiry bool
}

// CheckAndSetDefaults checks parameters and sets default values
Expand Down Expand Up @@ -1282,18 +1286,14 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry retryutils.Retry, timer

retry.Reset()

// only enable relative node expiry if the cache is configured
// to watch for types.KindNode
// Only enable relative node expiry for the auth cache.
relativeExpiryInterval := interval.NewNoop()
for _, watch := range c.Config.Watches {
if watch.Kind == types.KindNode {
relativeExpiryInterval = interval.New(interval.Config{
Duration: c.Config.RelativeExpiryCheckInterval,
FirstDuration: utils.HalfJitter(c.Config.RelativeExpiryCheckInterval),
Jitter: retryutils.NewSeventhJitter(),
})
break
}
if c.EnableRelativeExpiry {
relativeExpiryInterval = interval.New(interval.Config{
Duration: c.Config.RelativeExpiryCheckInterval,
FirstDuration: utils.HalfJitter(c.Config.RelativeExpiryCheckInterval),
Jitter: retryutils.NewSeventhJitter(),
})
}
defer relativeExpiryInterval.Stop()

Expand Down Expand Up @@ -1346,8 +1346,7 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry retryutils.Retry, timer
}
}

err = c.processEvent(ctx, event, true)
if err != nil {
if err := c.processEvent(ctx, event); err != nil {
return trace.Wrap(err)
}
c.notify(c.ctx, Event{Event: event, Type: EventProcessed})
Expand Down Expand Up @@ -1440,7 +1439,7 @@ func (c *Cache) performRelativeNodeExpiry(ctx context.Context) error {
Kind: types.KindNode,
Metadata: node.GetMetadata(),
},
}, false); err != nil {
}); err != nil {
return trace.Wrap(err)
}

Expand Down Expand Up @@ -1607,9 +1606,9 @@ func (c *Cache) fetch(ctx context.Context, confirmedKinds map[resourceKind]types
}

// processEvent hands the event off to the appropriate collection for processing. Any
// resources which were not registered are ignored. If processing completed successfully
// and emit is true the event will be emitted via the fanout.
func (c *Cache) processEvent(ctx context.Context, event types.Event, emit bool) error {
// resources which were not registered are ignored. If processing completed successfully,
// the event will be emitted via the fanout.
func (c *Cache) processEvent(ctx context.Context, event types.Event) error {
resourceKind := resourceKindFromResource(event.Resource)
collection, ok := c.collections.byKind[resourceKind]
if !ok {
Expand All @@ -1619,14 +1618,14 @@ func (c *Cache) processEvent(ctx context.Context, event types.Event, emit bool)
if err := collection.processEvent(ctx, event); err != nil {
return trace.Wrap(err)
}
if emit {
c.eventsFanout.Emit(event)
if !isHighVolumeResource(resourceKind.kind) {
c.lowVolumeEventsFanout.ForEach(func(f *services.FanoutV2) {
f.Emit(event)
})
}

c.eventsFanout.Emit(event)
if !isHighVolumeResource(resourceKind.kind) {
c.lowVolumeEventsFanout.ForEach(func(f *services.FanoutV2) {
f.Emit(event)
})
}

return nil
}

Expand Down
12 changes: 8 additions & 4 deletions lib/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2552,6 +2552,7 @@ func TestRelativeExpiry(t *testing.T) {
}

func TestRelativeExpiryLimit(t *testing.T) {
t.Parallel()
const (
checkInterval = time.Second
nodeCount = 100
Expand All @@ -2569,7 +2570,7 @@ func TestRelativeExpiryLimit(t *testing.T) {
c.RelativeExpiryCheckInterval = checkInterval
c.RelativeExpiryLimit = expiryLimit
c.Clock = clock
return ForProxy(c)
return ForAuth(c)
})
t.Cleanup(p.Close)

Expand Down Expand Up @@ -2609,7 +2610,8 @@ func TestRelativeExpiryLimit(t *testing.T) {
}
}

func TestRelativeExpiryOnlyForNodeWatches(t *testing.T) {
func TestRelativeExpiryOnlyForAuth(t *testing.T) {
t.Parallel()
clock := clockwork.NewFakeClockAt(time.Now().Add(time.Hour))
p := newTestPack(t, func(c Config) Config {
c.RelativeExpiryCheckInterval = time.Second
Expand All @@ -2622,9 +2624,10 @@ func TestRelativeExpiryOnlyForNodeWatches(t *testing.T) {
p2 := newTestPack(t, func(c Config) Config {
c.RelativeExpiryCheckInterval = time.Second
c.Clock = clock
c.target = "llama"
c.Watches = []types.WatchKind{
{Kind: types.KindNamespace},
{Kind: types.KindNamespace},
{Kind: types.KindNode},
{Kind: types.KindCertAuthority},
}
return c
Expand All @@ -2634,8 +2637,9 @@ func TestRelativeExpiryOnlyForNodeWatches(t *testing.T) {
for i := 0; i < 2; i++ {
clock.Advance(time.Hour * 24)
drainEvents(p.eventsC)
expectEvent(t, p.eventsC, RelativeExpiry)
unexpectedEvent(t, p.eventsC, RelativeExpiry)

clock.Advance(time.Hour * 24)
drainEvents(p2.eventsC)
unexpectedEvent(t, p2.eventsC, RelativeExpiry)
}
Expand Down

0 comments on commit 78caa9a

Please sign in to comment.