Skip to content

Commit

Permalink
Prevent relative expiry from emitting more events than can be process…
Browse files Browse the repository at this point in the history
…ed (#12002)

* Prevent relative expiry from emitting more events than can be processed

This alters relative expiry in four ways:

1) Relative expiry is no longer exclusively run by the auth cache
2) Relative expiry no longer emits delete events
3) There is now a limit to the number of nodes removed per interval
4) Relative expiry now runs more frequently

We can remove the need to emit any fake delete events during relative
expiry by not running it exclusively in the auth cache. All caches
will now run relative expiry themselves, even the components that
don't watch for nodes - this is effectively a noop for them. To
prevent the individual caches from getting too far out of sync, the
expiry interval is set to a much smaller value and we limit the
number of nodes being deleted per interval.

(cherry picked from commit b8394b3)

# Conflicts:
#	lib/cache/cache.go
#	lib/cache/cache_test.go
  • Loading branch information
rosstimothy committed Apr 26, 2022
1 parent 5a25a1c commit 8271714
Show file tree
Hide file tree
Showing 4 changed files with 414 additions and 21 deletions.
155 changes: 151 additions & 4 deletions lib/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/services/local"
"github.com/gravitational/teleport/lib/utils"
"github.com/gravitational/teleport/lib/utils/interval"

"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
Expand Down Expand Up @@ -539,6 +540,13 @@ type Config struct {
// CacheInitTimeout is the maximum amount of time that cache.New
// will block, waiting for initialization (default=20s).
CacheInitTimeout time.Duration
// RelativeExpiryCheckInterval determines how often the cache performs special
// "relative expiration" checks which are used to compensate for real backends
// that have suffer from overly lazy ttl'ing of resources.
RelativeExpiryCheckInterval time.Duration
// RelativeExpiryLimit determines the maximum number of nodes that may be
// removed during relative expiration.
RelativeExpiryLimit int
// EventsC is a channel for event notifications,
// used in tests
EventsC chan Event
Expand Down Expand Up @@ -580,6 +588,13 @@ func (c *Config) CheckAndSetDefaults() error {
if c.CacheInitTimeout == 0 {
c.CacheInitTimeout = time.Second * 20
}
if c.RelativeExpiryCheckInterval == 0 {
c.RelativeExpiryCheckInterval = apidefaults.ServerKeepAliveTTL() + 5*time.Second
}
if c.RelativeExpiryLimit == 0 {
c.RelativeExpiryLimit = 2000
}

if c.Component == "" {
c.Component = teleport.ComponentCache
}
Expand All @@ -605,6 +620,9 @@ const (
// Reloading is emitted when an error occurred watching events
// and the cache is waiting to create a new watcher
Reloading = "reloading_cache"
// RelativeExpiry notifies that relative expiry operations have
// been run.
RelativeExpiry = "relative_expiry"
)

// New creates a new instance of Cache
Expand Down Expand Up @@ -772,7 +790,7 @@ func (c *Cache) notify(ctx context.Context, event Event) {
// 1. Every client is connected to the database fan-out
// system. This system creates a buffered channel for every
// client and tracks the channel overflow. Thanks to channels every client gets its
// own unique iterator over the event stream. If client looses connection
// own unique iterator over the event stream. If client loses connection
// or fails to keep up with the stream, the server will terminate
// the channel and client will have to re-initialize.
//
Expand Down Expand Up @@ -876,6 +894,21 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry utils.Retry, timer *tim

retry.Reset()

// only enable relative node expiry if the cache is configured
// to watch for types.KindNode
relativeExpiryInterval := interval.NewNoop()
for _, watch := range c.Config.Watches {
if watch.Kind == types.KindNode {
relativeExpiryInterval = interval.New(interval.Config{
Duration: c.Config.RelativeExpiryCheckInterval,
FirstDuration: utils.HalfJitter(c.Config.RelativeExpiryCheckInterval),
Jitter: utils.NewSeventhJitter(),
})
break
}
}
defer relativeExpiryInterval.Stop()

c.notify(c.ctx, Event{Type: WatcherStarted})

var lastStalenessWarning time.Time
Expand All @@ -886,6 +919,11 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry utils.Retry, timer *tim
return trace.ConnectionProblem(watcher.Error(), "watcher is closed: %v", watcher.Error())
case <-c.ctx.Done():
return trace.ConnectionProblem(c.ctx.Err(), "context is closing")
case <-relativeExpiryInterval.Next():
if err := c.performRelativeNodeExpiry(ctx); err != nil {
return trace.Wrap(err)
}
c.notify(ctx, Event{Type: RelativeExpiry})
case event := <-watcher.Events():
// check for expired resources in OpPut events and log them periodically. stale OpPut events
// may be an indicator of poor performance, and can lead to confusing and inconsistent state
Expand Down Expand Up @@ -920,7 +958,7 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry utils.Retry, timer *tim
}
}

err = c.processEvent(ctx, event)
err = c.processEvent(ctx, event, true)
if err != nil {
return trace.Wrap(err)
}
Expand All @@ -929,6 +967,110 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry utils.Retry, timer *tim
}
}

// performRelativeNodeExpiry performs a special kind of active expiration where we remove nodes
// which are clearly stale relative to their more recently heartbeated counterparts as well as
// the current time. This strategy lets us side-step issues of clock drift or general cache
// staleness by only removing items which are stale from within the cache's own "frame of
// reference".
//
// to better understand why we use this expiry strategy, it's important to understand the two
// distinct scenarios that we're trying to accommodate:
//
// 1. Expiry events are being emitted very lazily by the real backend (*hours* after the time
// at which the resource was supposed to expire).
//
// 2. The event stream itself is stale (i.e. all events show up late, not just expiry events).
//
// In the first scenario, removing items from the cache after they have passed some designated
// threshold of staleness seems reasonable. In the second scenario, your best option is to
// faithfully serve the delayed, but internally consistent, view created by the event stream and
// not expire any items.
//
// Relative expiry is the compromise between the two above scenarios. We calculate a staleness
// threshold after which items would be removed, but we calculate it relative to the most recent
// expiry *or* the current time, depending on which is earlier. The result is that nodes are
// removed only if they are both stale from the perspective of the current clock, *and* stale
// relative to our current view of the world.
//
// *note*: this function is only sane to call when the cache and event stream are healthy, and
// cannot run concurrently with event processing.
func (c *Cache) performRelativeNodeExpiry(ctx context.Context) error {
// TODO(fspmarshall): Start using dynamic value once it is implemented.
gracePeriod := apidefaults.ServerAnnounceTTL

// latestExp will be the value that we choose to consider the most recent "expired"
// timestamp. This will either end up being the most recently seen node expiry, or
// the current time (whichever is earlier).
var latestExp time.Time

nodes, err := c.GetNodes(ctx, apidefaults.Namespace)
if err != nil {
return trace.Wrap(err)
}

// iterate nodes and determine the most recent expiration value.
for _, node := range nodes {
if node.Expiry().IsZero() {
continue
}

if node.Expiry().After(latestExp) || latestExp.IsZero() {
// this node's expiry is more recent than the previously
// recorded value.
latestExp = node.Expiry()
}
}

if latestExp.IsZero() {
return nil
}

// if the most recent expiration value is still in the future, we use the current time
// as the most recent expiration value instead. Unless the event stream is unhealthy, or
// all nodes were recently removed, this should always be true.
if now := c.Clock.Now(); latestExp.After(now) {
latestExp = now
}

// we subtract gracePeriod from our most recent expiry value to get the retention
// threshold. nodes which expired earlier than the retention threshold will be
// removed, as we expect well-behaved backends to have emitted an expiry event
// within the grace period.
retentionThreshold := latestExp.Add(-gracePeriod)

var removed int
for _, node := range nodes {
if node.Expiry().IsZero() || node.Expiry().After(retentionThreshold) {
continue
}

// remove the node locally without emitting an event, other caches will
// eventually remove the same node when they run their expiry logic.
if err := c.processEvent(ctx, types.Event{
Type: types.OpDelete,
Resource: &types.ResourceHeader{
Kind: types.KindNode,
Metadata: node.GetMetadata(),
},
}, false); err != nil {
return trace.Wrap(err)
}

// high churn rates can cause purging a very large number of nodes
// per interval, limit to a sane number such that we don't overwhelm
// things or get too far out of sync with other caches.
if removed++; removed >= c.Config.RelativeExpiryLimit {
break
}
}

if removed > 0 {
c.Debugf("Removed %d nodes via relative expiry (retentionThreshold=%s).", removed, retentionThreshold)
}

return nil
}

func (c *Cache) watchKinds() []types.WatchKind {
out := make([]types.WatchKind, 0, len(c.collections))
for _, collection := range c.collections {
Expand Down Expand Up @@ -981,7 +1123,10 @@ func (c *Cache) fetch(ctx context.Context) (apply func(ctx context.Context) erro
}, nil
}

func (c *Cache) processEvent(ctx context.Context, event types.Event) error {
// processEvent hands the event off to the appropriate collection for processing. Any
// resources which were not registered are ignored. If processing completed successfully
// and emit is true the event will be emitted via the fanout.
func (c *Cache) processEvent(ctx context.Context, event types.Event, emit bool) error {
resourceKind := resourceKindFromResource(event.Resource)
collection, ok := c.collections[resourceKind]
if !ok {
Expand All @@ -992,7 +1137,9 @@ func (c *Cache) processEvent(ctx context.Context, event types.Event) error {
if err := collection.processEvent(ctx, event); err != nil {
return trace.Wrap(err)
}
c.eventsFanout.Emit(event)
if emit {
c.eventsFanout.Emit(event)
}
return nil
}

Expand Down
Loading

0 comments on commit 8271714

Please sign in to comment.