Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Prevent relative expiry from emitting more events than can be process…
Browse files Browse the repository at this point in the history
…ed (#12002)

* Prevent relative expiry from emitting more events than can be processed

This alters relative expiry in four ways:

1) Relative expiry is no longer exclusively run by the auth cache
2) Relative expiry no longer emits delete events
3) There is now a limit to the number of nodes removed per interval
4) Relative expiry now runs more frequently

We can remove the need to emit any fake delete events during relative
expiry by not running it exclusively in the auth cache. All caches
will now run relative expiry themselves, even the components that
don't watch for nodes - this is effectively a noop for them. To
prevent the individual caches from getting too far out of sync, the
expiry interval is set to a much smaller value and we limit the
number of nodes being deleted per interval.

(cherry picked from commit b8394b3)
rosstimothy committed Apr 26, 2022
1 parent d89371c commit fe06101
Showing 4 changed files with 200 additions and 39 deletions.
74 changes: 43 additions & 31 deletions lib/cache/cache.go
Original file line number Diff line number Diff line change
@@ -61,11 +61,9 @@ var (
cacheCollectors = []prometheus.Collector{cacheEventsReceived, cacheStaleEventsReceived}
)

const cacheTargetAuth string = "auth"

// ForAuth sets up watch configuration for the auth server
func ForAuth(cfg Config) Config {
cfg.target = cacheTargetAuth
cfg.target = "auth"
cfg.Watches = []types.WatchKind{
{Kind: types.KindCertAuthority, LoadSecrets: true},
{Kind: types.KindClusterName},
@@ -556,6 +554,9 @@ type Config struct {
// "relative expiration" checks which are used to compensate for real backends
// that have suffer from overly lazy ttl'ing of resources.
RelativeExpiryCheckInterval time.Duration
// RelativeExpiryLimit determines the maximum number of nodes that may be
// removed during relative expiration.
RelativeExpiryLimit int
// EventsC is a channel for event notifications,
// used in tests
EventsC chan Event
@@ -598,11 +599,12 @@ func (c *Config) CheckAndSetDefaults() error {
c.CacheInitTimeout = time.Second * 20
}
if c.RelativeExpiryCheckInterval == 0 {
// TODO(fspmarshall): change this to 1/2 offline threshold once that becomes
// a configurable value. This will likely be a dynamic configuration, and
// therefore require lazy initialization after the cache has become healthy.
c.RelativeExpiryCheckInterval = apidefaults.ServerAnnounceTTL / 2
c.RelativeExpiryCheckInterval = apidefaults.ServerKeepAliveTTL() + 5*time.Second
}
if c.RelativeExpiryLimit == 0 {
c.RelativeExpiryLimit = 2000
}

if c.Component == "" {
c.Component = teleport.ComponentCache
}
@@ -902,11 +904,19 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry utils.Retry, timer *tim

retry.Reset()

relativeExpiryInterval := interval.New(interval.Config{
Duration: c.Config.RelativeExpiryCheckInterval,
FirstDuration: utils.HalfJitter(c.Config.RelativeExpiryCheckInterval),
Jitter: utils.NewSeventhJitter(),
})
// only enable relative node expiry if the cache is configured
// to watch for types.KindNode
relativeExpiryInterval := interval.NewNoop()
for _, watch := range c.Config.Watches {
if watch.Kind == types.KindNode {
relativeExpiryInterval = interval.New(interval.Config{
Duration: c.Config.RelativeExpiryCheckInterval,
FirstDuration: utils.HalfJitter(c.Config.RelativeExpiryCheckInterval),
Jitter: utils.NewSeventhJitter(),
})
break
}
}
defer relativeExpiryInterval.Stop()

c.notify(c.ctx, Event{Type: WatcherStarted})
@@ -958,7 +968,7 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry utils.Retry, timer *tim
}
}

err = c.processEvent(ctx, event)
err = c.processEvent(ctx, event, true)
if err != nil {
return trace.Wrap(err)
}
@@ -973,7 +983,7 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry utils.Retry, timer *tim
// staleness by only removing items which are stale from within the cache's own "frame of
// reference".
//
// to better understand why we use this expiry strategy, its important to understand the two
// to better understand why we use this expiry strategy, it's important to understand the two
// distinct scenarios that we're trying to accommodate:
//
// 1. Expiry events are being emitted very lazily by the real backend (*hours* after the time
@@ -993,15 +1003,8 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry utils.Retry, timer *tim
// relative to our current view of the world.
//
// *note*: this function is only sane to call when the cache and event stream are healthy, and
// cannot run concurrently with event processing. this function injects additional events into
// the outbound event stream.
// cannot run concurrently with event processing.
func (c *Cache) performRelativeNodeExpiry(ctx context.Context) error {
if c.target != cacheTargetAuth {
// if we are not the auth cache, we are a downstream cache and can rely upon the
// upstream auth cache to perform relative expiry and propagate the changes.
return nil
}

// TODO(fspmarshall): Start using dynamic value once it is implemented.
gracePeriod := apidefaults.ServerAnnounceTTL

@@ -1051,20 +1054,24 @@ func (c *Cache) performRelativeNodeExpiry(ctx context.Context) error {
continue
}

// event stream processing is paused while this function runs. we perform the
// actual expiry by constructing a fake delete event for the resource which both
// updates this cache, and all downstream caches.
err = c.processEvent(ctx, types.Event{
// remove the node locally without emitting an event, other caches will
// eventually remove the same node when they run their expiry logic.
if err := c.processEvent(ctx, types.Event{
Type: types.OpDelete,
Resource: &types.ResourceHeader{
Kind: types.KindNode,
Metadata: node.GetMetadata(),
},
})
if err != nil {
}, false); err != nil {
return trace.Wrap(err)
}
removed++

// high churn rates can cause purging a very large number of nodes
// per interval, limit to a sane number such that we don't overwhelm
// things or get too far out of sync with other caches.
if removed++; removed >= c.Config.RelativeExpiryLimit {
break
}
}

if removed > 0 {
@@ -1126,7 +1133,10 @@ func (c *Cache) fetch(ctx context.Context) (apply func(ctx context.Context) erro
}, nil
}

func (c *Cache) processEvent(ctx context.Context, event types.Event) error {
// processEvent hands the event off to the appropriate collection for processing. Any
// resources which were not registered are ignored. If processing completed successfully
// and emit is true the event will be emitted via the fanout.
func (c *Cache) processEvent(ctx context.Context, event types.Event, emit bool) error {
resourceKind := resourceKindFromResource(event.Resource)
collection, ok := c.collections[resourceKind]
if !ok {
@@ -1137,7 +1147,9 @@ func (c *Cache) processEvent(ctx context.Context, event types.Event) error {
if err := collection.processEvent(ctx, event); err != nil {
return trace.Wrap(err)
}
c.eventsFanout.Emit(event)
if emit {
c.eventsFanout.Emit(event)
}
return nil
}

122 changes: 114 additions & 8 deletions lib/cache/cache_test.go
Original file line number Diff line number Diff line change
@@ -24,6 +24,14 @@ import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/google/uuid"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"

"github.com/gravitational/teleport/api/client"
"github.com/gravitational/teleport/api/client/proto"
apidefaults "github.com/gravitational/teleport/api/defaults"
@@ -37,16 +45,10 @@ import (
"github.com/gravitational/teleport/lib/services/local"
"github.com/gravitational/teleport/lib/services/suite"
"github.com/gravitational/teleport/lib/utils"
"github.com/gravitational/trace"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/google/uuid"
"github.com/jonboulle/clockwork"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
)

const eventBufferSize = 1024

func TestMain(m *testing.M) {
utils.InitLoggerForTests()
os.Exit(m.Run())
@@ -507,6 +509,20 @@ func expectEvent(t *testing.T, eventsC <-chan Event, expectedEvent string) {
}
}

func unexpectedEvent(t *testing.T, eventsC <-chan Event, unexpectedEvent string) {
timeC := time.After(time.Second)
for {
select {
case event := <-eventsC:
if event.Type == unexpectedEvent {
t.Fatalf("Received unexpected event: %s", unexpectedEvent)
}
case <-timeC:
return
}
}
}

func expectNextEvent(t *testing.T, eventsC <-chan Event, expectedEvent string, skipEvents ...string) {
timeC := time.After(5 * time.Second)
for {
@@ -2314,6 +2330,96 @@ func TestRelativeExpiry(t *testing.T) {
require.True(t, len(nodes) > 0, "node_count=%d", len(nodes))
}

func TestRelativeExpiryLimit(t *testing.T) {
const (
checkInterval = time.Second
nodeCount = 100
expiryLimit = 10
)

// make sure the event buffer is much larger than node count
// so that we can batch create nodes without waiting on each event
require.True(t, int(nodeCount*3) < eventBufferSize)

ctx := context.Background()

clock := clockwork.NewFakeClockAt(time.Now().Add(time.Hour))
p := newTestPack(t, func(c Config) Config {
c.RelativeExpiryCheckInterval = checkInterval
c.RelativeExpiryLimit = expiryLimit
c.Clock = clock
return ForProxy(c)
})
t.Cleanup(p.Close)

// add servers that expire at a range of times
now := clock.Now()
for i := 0; i < nodeCount; i++ {
exp := now.Add(time.Minute * time.Duration(i))
server := suite.NewServer(types.KindNode, uuid.New().String(), "127.0.0.1:2022", apidefaults.Namespace)
server.SetExpiry(exp)
_, err := p.presenceS.UpsertNode(ctx, server)
require.NoError(t, err)
}

// wait for nodes to reach cache (we batch insert first for performance reasons)
for i := 0; i < nodeCount; i++ {
expectEvent(t, p.eventsC, EventProcessed)
}

nodes, err := p.cache.GetNodes(ctx, apidefaults.Namespace)
require.NoError(t, err)
require.Len(t, nodes, nodeCount)

clock.Advance(time.Hour * 24)
for expired := nodeCount - expiryLimit; expired > 0; expired -= expiryLimit {
// get rid of events that were emitted before clock advanced
drainEvents(p.eventsC)
// wait for next relative expiry check to run
expectEvent(t, p.eventsC, RelativeExpiry)

// verify that the limit is respected.
nodes, err = p.cache.GetNodes(ctx, apidefaults.Namespace)
require.NoError(t, err)
require.Len(t, nodes, expired)

// advance clock to trigger next relative expiry check
clock.Advance(time.Hour * 24)
}
}

func TestRelativeExpiryOnlyForNodeWatches(t *testing.T) {
clock := clockwork.NewFakeClockAt(time.Now().Add(time.Hour))
p := newTestPack(t, func(c Config) Config {
c.RelativeExpiryCheckInterval = time.Second
c.Clock = clock
c.Watches = []types.WatchKind{{Kind: types.KindNode}}
return c
})
t.Cleanup(p.Close)

p2 := newTestPack(t, func(c Config) Config {
c.RelativeExpiryCheckInterval = time.Second
c.Clock = clock
c.Watches = []types.WatchKind{
{Kind: types.KindNamespace},
{Kind: types.KindNamespace},
{Kind: types.KindCertAuthority},
}
return c
})
t.Cleanup(p2.Close)

for i := 0; i < 2; i++ {
clock.Advance(time.Hour * 24)
drainEvents(p.eventsC)
expectEvent(t, p.eventsC, RelativeExpiry)

drainEvents(p2.eventsC)
unexpectedEvent(t, p2.eventsC, RelativeExpiry)
}
}

func TestCache_Backoff(t *testing.T) {
clock := clockwork.NewFakeClock()
p := newTestPack(t, func(c Config) Config {
8 changes: 8 additions & 0 deletions lib/utils/interval/interval.go
Original file line number Diff line number Diff line change
@@ -56,6 +56,14 @@ type Config struct {
Jitter utils.Jitter
}

// NewNoop creates a new interval that will never fire.
func NewNoop() *Interval {
return &Interval{
ch: make(chan time.Time, 1),
done: make(chan struct{}),
}
}

// New creates a new interval instance. This function panics on non-positive
// interval durations (equivalent to time.NewTicker).
func New(cfg Config) *Interval {
35 changes: 35 additions & 0 deletions lib/utils/interval/interval_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2022 Gravitational, Inc
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package interval

import (
"testing"
)

func TestNewNoop(t *testing.T) {
i := NewNoop()
ch := i.Next()
select {
case <-ch:
t.Fatalf("noop should not emit anything")
default:
}
i.Stop()
select {
case <-ch:
t.Fatalf("noop should not emit anything")
default:
}
}

0 comments on commit fe06101

Please sign in to comment.