From 8359140bc5cd9e44eacc4ba95f0a3f14050924ca Mon Sep 17 00:00:00 2001 From: rosstimothy <39066650+rosstimothy@users.noreply.github.com> Date: Thu, 16 Dec 2021 11:41:08 -0500 Subject: [PATCH 1/5] Add jitter and backoff to prevent thundering herd on auth (#9133) Move cache and resourceWatcher watchers from a 10s retry to a jittered backoff retry up to ~1min. Replace the reconnectToAuthService interval with a retry to add jitter and backoff there as well for when a node restarts due to changes introduced in #8102. Fixes #6889. --- integration/helpers.go | 1 + lib/auth/helpers.go | 8 +- lib/cache/cache.go | 32 +- lib/cache/cache_test.go | 1145 +++++++++++++++-------------- lib/defaults/defaults.go | 8 +- lib/restrictedsession/watcher.go | 14 +- lib/reversetunnel/rc_manager.go | 11 +- lib/service/cfg.go | 4 + lib/service/connect.go | 48 +- lib/service/service.go | 16 +- lib/service/service_test.go | 48 ++ lib/services/watcher.go | 40 +- lib/services/watcher_test.go | 70 +- lib/srv/heartbeat.go | 2 +- lib/srv/monitor_test.go | 6 +- lib/srv/regular/sshserver_test.go | 1 + 16 files changed, 826 insertions(+), 628 deletions(-) diff --git a/integration/helpers.go b/integration/helpers.go index c283fcd6a9601..83f7a31bca19e 100644 --- a/integration/helpers.go +++ b/integration/helpers.go @@ -626,6 +626,7 @@ func (i *TeleInstance) GenerateConfig(t *testing.T, trustedSecrets []*InstanceSe tconf.Kube.CheckImpersonationPermissions = nullImpersonationCheck tconf.Keygen = testauthority.New() + tconf.MaxRetryPeriod = defaults.HighResPollingPeriod i.Config = tconf return tconf, nil } diff --git a/lib/auth/helpers.go b/lib/auth/helpers.go index 03115f4b6c30f..c8805aa810596 100644 --- a/lib/auth/helpers.go +++ b/lib/auth/helpers.go @@ -33,6 +33,7 @@ import ( authority "github.com/gravitational/teleport/lib/auth/testauthority" "github.com/gravitational/teleport/lib/backend" "github.com/gravitational/teleport/lib/backend/memory" + "github.com/gravitational/teleport/lib/defaults" "github.com/gravitational/teleport/lib/events" "github.com/gravitational/teleport/lib/limiter" "github.com/gravitational/teleport/lib/services" @@ -328,9 +329,10 @@ func NewTestAuthServer(cfg TestAuthServerConfig) (*TestAuthServer, error) { srv.LockWatcher, err = services.NewLockWatcher(ctx, services.LockWatcherConfig{ ResourceWatcherConfig: services.ResourceWatcherConfig{ - Component: teleport.ComponentAuth, - Client: srv.AuthServer, - Clock: cfg.Clock, + Component: teleport.ComponentAuth, + Client: srv.AuthServer, + Clock: cfg.Clock, + MaxRetryPeriod: defaults.HighResPollingPeriod, }, }) if err != nil { diff --git a/lib/cache/cache.go b/lib/cache/cache.go index 75a6bf82c8ab1..d433ad230edf9 100644 --- a/lib/cache/cache.go +++ b/lib/cache/cache.go @@ -460,8 +460,8 @@ type Config struct { WebToken types.WebTokenInterface // Backend is a backend for local cache Backend backend.Backend - // RetryPeriod is a period between cache retries on failures - RetryPeriod time.Duration + // MaxRetryPeriod is the maximum period between cache retries on failures + MaxRetryPeriod time.Duration // WatcherInitTimeout is the maximum acceptable delay for an // OpInit after a watcher has been started (default=1m). WatcherInitTimeout time.Duration @@ -500,8 +500,8 @@ func (c *Config) CheckAndSetDefaults() error { if c.Clock == nil { c.Clock = clockwork.NewRealClock() } - if c.RetryPeriod == 0 { - c.RetryPeriod = defaults.HighResPollingPeriod + if c.MaxRetryPeriod == 0 { + c.MaxRetryPeriod = defaults.MaxWatcherBackoff } if c.WatcherInitTimeout == 0 { c.WatcherInitTimeout = time.Minute @@ -534,6 +534,9 @@ const ( // TombstoneWritten is emitted if cache is closed in a healthy // state and successfully writes its tombstone. TombstoneWritten = "tombstone_written" + // Reloading is emitted when an error occurred watching events + // and the cache is waiting to create a new watcher + Reloading = "reloading_cache" ) // New creates a new instance of Cache @@ -605,8 +608,11 @@ func New(config Config) (*Cache, error) { } retry, err := utils.NewLinear(utils.LinearConfig{ - Step: cs.Config.RetryPeriod / 10, - Max: cs.Config.RetryPeriod, + First: utils.HalfJitter(cs.MaxRetryPeriod / 10), + Step: cs.MaxRetryPeriod / 5, + Max: cs.MaxRetryPeriod, + Jitter: utils.NewHalfJitter(), + Clock: cs.Clock, }) if err != nil { cs.Close() @@ -669,10 +675,20 @@ func (c *Cache) update(ctx context.Context, retry utils.Retry) { if err != nil { c.Warningf("Re-init the cache on error: %v.", err) } + // events cache should be closed as well - c.Debugf("Reloading %v.", retry) + c.Debugf("Reloading cache.") + + c.notify(ctx, Event{Type: Reloading, Event: types.Event{ + Resource: &types.ResourceHeader{ + Kind: retry.Duration().String(), + }, + }}) + + startedWaiting := c.Clock.Now() select { - case <-retry.After(): + case t := <-retry.After(): + c.Debugf("Initiating new watch after waiting %v.", t.Sub(startedWaiting)) retry.Inc() case <-c.ctx.Done(): return diff --git a/lib/cache/cache_test.go b/lib/cache/cache_test.go index 51eebf3ae3bdf..f5edc082728b2 100644 --- a/lib/cache/cache_test.go +++ b/lib/cache/cache_test.go @@ -32,20 +32,18 @@ import ( "github.com/gravitational/teleport/lib/backend/lite" "github.com/gravitational/teleport/lib/backend/memory" "github.com/gravitational/teleport/lib/defaults" - "github.com/gravitational/teleport/lib/fixtures" "github.com/gravitational/teleport/lib/services" "github.com/gravitational/teleport/lib/services/local" "github.com/gravitational/teleport/lib/services/suite" "github.com/gravitational/teleport/lib/utils" + "github.com/gravitational/trace" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + "github.com/jonboulle/clockwork" "github.com/pborman/uuid" log "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" - "gopkg.in/check.v1" - - "github.com/gravitational/trace" ) func TestMain(m *testing.M) { @@ -53,13 +51,6 @@ func TestMain(m *testing.M) { os.Exit(m.Run()) } -type CacheSuite struct{} - -var _ = check.Suite(&CacheSuite{}) - -// bootstrap check -func TestState(t *testing.T) { check.TestingT(t) } - // testPack contains pack of // services used for test run type testPack struct { @@ -97,36 +88,32 @@ func (t *testPack) Close() { } } -func (s *CacheSuite) newPackForAuth(c *check.C) *testPack { - return s.newPack(c, ForAuth) -} - -func (s *CacheSuite) newPackForProxy(c *check.C) *testPack { - return s.newPack(c, ForProxy) +func newPackForAuth(t *testing.T) *testPack { + return newTestPack(t, ForAuth) } -func (s *CacheSuite) newPackForOldRemoteProxy(c *check.C) *testPack { - return s.newPack(c, ForOldRemoteProxy) +func newPackForProxy(t *testing.T) *testPack { + return newTestPack(t, ForProxy) } -func (s *CacheSuite) newPackForNode(c *check.C) *testPack { - return s.newPack(c, ForNode) +func newPackForNode(t *testing.T) *testPack { + return newTestPack(t, ForNode) } -func (s *CacheSuite) newPack(c *check.C, setupConfig SetupConfigFn) *testPack { - pack, err := newPack(c.MkDir(), setupConfig) - c.Assert(err, check.IsNil) +func newTestPack(t *testing.T, setupConfig SetupConfigFn) *testPack { + pack, err := newPack(t.TempDir(), setupConfig) + require.NoError(t, err) return pack } -func (s *CacheSuite) newPackWithoutCache(c *check.C, setupConfig SetupConfigFn) *testPack { - pack, err := newPackWithoutCache(c.MkDir(), setupConfig) - c.Assert(err, check.IsNil) +func newTestPackWithoutCache(t *testing.T) *testPack { + pack, err := newPackWithoutCache(t.TempDir()) + require.NoError(t, err) return pack } // newPackWithoutCache returns a new test pack without creating cache -func newPackWithoutCache(dir string, ssetupConfig SetupConfigFn) (*testPack, error) { +func newPackWithoutCache(dir string, opts ...packOption) (*testPack, error) { ctx := context.Background() p := &testPack{ dataDir: dir, @@ -175,28 +162,28 @@ func newPackWithoutCache(dir string, ssetupConfig SetupConfigFn) (*testPack, err // newPack returns a new test pack or fails the test on error func newPack(dir string, setupConfig func(c Config) Config) (*testPack, error) { ctx := context.Background() - p, err := newPackWithoutCache(dir, setupConfig) + p, err := newPackWithoutCache(dir, opts...) if err != nil { return nil, trace.Wrap(err) } p.cache, err = New(setupConfig(Config{ - Context: ctx, - Backend: p.cacheBackend, - Events: p.eventsS, - ClusterConfig: p.clusterConfigS, - Provisioner: p.provisionerS, - Trust: p.trustS, - Users: p.usersS, - Access: p.accessS, - DynamicAccess: p.dynamicAccessS, - Presence: p.presenceS, - AppSession: p.appSessionS, - WebSession: p.webSessionS, - WebToken: p.webTokenS, - Restrictions: p.restrictions, - RetryPeriod: 200 * time.Millisecond, - EventsC: p.eventsC, + Context: ctx, + Backend: p.cacheBackend, + Events: p.eventsS, + ClusterConfig: p.clusterConfigS, + Provisioner: p.provisionerS, + Trust: p.trustS, + Users: p.usersS, + Access: p.accessS, + DynamicAccess: p.dynamicAccessS, + Presence: p.presenceS, + AppSession: p.appSessionS, + WebSession: p.webSessionS, + WebToken: p.webTokenS, + Restrictions: p.restrictions, + MaxRetryPeriod: 200 * time.Millisecond, + EventsC: p.eventsC, })) if err != nil { return nil, trace.Wrap(err) @@ -214,44 +201,44 @@ func newPack(dir string, setupConfig func(c Config) Config) (*testPack, error) { } // TestCA tests certificate authorities -func (s *CacheSuite) TestCA(c *check.C) { - p := s.newPackForAuth(c) - defer p.Close() +func TestCA(t *testing.T) { + p := newPackForAuth(t) + t.Cleanup(p.Close) ca := suite.NewTestCA(types.UserCA, "example.com") - c.Assert(p.trustS.UpsertCertAuthority(ca), check.IsNil) + require.NoError(t, p.trustS.UpsertCertAuthority(ca)) select { case <-p.eventsC: case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err := p.cache.GetCertAuthority(ca.GetID(), true) - c.Assert(err, check.IsNil) + require.NoError(t, err) ca.SetResourceID(out.GetResourceID()) - fixtures.DeepCompare(c, ca, out) + require.Empty(t, cmp.Diff(ca, out)) err = p.trustS.DeleteCertAuthority(ca.GetID()) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case <-p.eventsC: case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } _, err = p.cache.GetCertAuthority(ca.GetID(), false) - fixtures.ExpectNotFound(c, err) + require.True(t, trace.IsNotFound(err)) } // TestWatchers tests watchers connected to the cache, // verifies that all watchers of the cache will be closed // if the underlying watcher to the target backend is closed -func (s *CacheSuite) TestWatchers(c *check.C) { +func TestWatchers(t *testing.T) { ctx := context.Background() - p := s.newPackForAuth(c) - defer p.Close() + p := newPackForAuth(t) + t.Cleanup(p.Close) w, err := p.cache.NewWatcher(ctx, types.Watch{Kinds: []types.WatchKind{ { @@ -264,58 +251,60 @@ func (s *CacheSuite) TestWatchers(c *check.C) { }, }, }}) - c.Assert(err, check.IsNil) - defer w.Close() + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, w.Close()) + }) select { case e := <-w.Events(): - c.Assert(e.Type, check.Equals, types.OpInit) + require.Equal(t, types.OpInit, e.Type) case <-time.After(100 * time.Millisecond): - c.Fatalf("Timeout waiting for event.") + t.Fatalf("Timeout waiting for event.") } ca := suite.NewTestCA(types.UserCA, "example.com") - c.Assert(p.trustS.UpsertCertAuthority(ca), check.IsNil) + require.NoError(t, p.trustS.UpsertCertAuthority(ca)) select { case e := <-w.Events(): - c.Assert(e.Type, check.Equals, types.OpPut) - c.Assert(e.Resource.GetKind(), check.Equals, types.KindCertAuthority) + require.Equal(t, types.OpPut, e.Type) + require.Equal(t, types.KindCertAuthority, e.Resource.GetKind()) case <-time.After(time.Second): - c.Fatalf("Timeout waiting for event.") + t.Fatalf("Timeout waiting for event.") } // create an access request that matches the supplied filter req, err := services.NewAccessRequest("alice", "dictator") - c.Assert(err, check.IsNil) + require.NoError(t, err) - c.Assert(p.dynamicAccessS.CreateAccessRequest(ctx, req), check.IsNil) + require.NoError(t, p.dynamicAccessS.CreateAccessRequest(ctx, req)) select { case e := <-w.Events(): - c.Assert(e.Type, check.Equals, types.OpPut) - c.Assert(e.Resource.GetKind(), check.Equals, types.KindAccessRequest) + require.Equal(t, types.OpPut, e.Type) + require.Equal(t, types.KindAccessRequest, e.Resource.GetKind()) case <-time.After(time.Second): - c.Fatalf("Timeout waiting for event.") + t.Fatalf("Timeout waiting for event.") } - c.Assert(p.dynamicAccessS.DeleteAccessRequest(ctx, req.GetName()), check.IsNil) + require.NoError(t, p.dynamicAccessS.DeleteAccessRequest(ctx, req.GetName())) select { case e := <-w.Events(): - c.Assert(e.Type, check.Equals, types.OpDelete) - c.Assert(e.Resource.GetKind(), check.Equals, types.KindAccessRequest) + require.Equal(t, types.OpDelete, e.Type) + require.Equal(t, types.KindAccessRequest, e.Resource.GetKind()) case <-time.After(time.Second): - c.Fatalf("Timeout waiting for event.") + t.Fatalf("Timeout waiting for event.") } // create an access request that does not match the supplied filter req2, err := services.NewAccessRequest("bob", "dictator") - c.Assert(err, check.IsNil) + require.NoError(t, err) // create and then delete the non-matching request. - c.Assert(p.dynamicAccessS.CreateAccessRequest(ctx, req2), check.IsNil) - c.Assert(p.dynamicAccessS.DeleteAccessRequest(ctx, req2.GetName()), check.IsNil) + require.NoError(t, p.dynamicAccessS.CreateAccessRequest(ctx, req2)) + require.NoError(t, p.dynamicAccessS.DeleteAccessRequest(ctx, req2.GetName())) // because our filter did not match the request, the create event should never // have been created, meaning that the next event on the pipe is the delete @@ -323,10 +312,10 @@ func (s *CacheSuite) TestWatchers(c *check.C) { // a delete event). select { case e := <-w.Events(): - c.Assert(e.Type, check.Equals, types.OpDelete) - c.Assert(e.Resource.GetKind(), check.Equals, types.KindAccessRequest) + require.Equal(t, types.OpDelete, e.Type) + require.Equal(t, types.KindAccessRequest, e.Resource.GetKind()) case <-time.After(time.Second): - c.Fatalf("Timeout waiting for event.") + t.Fatalf("Timeout waiting for event.") } // event has arrived, now close the watchers @@ -336,15 +325,15 @@ func (s *CacheSuite) TestWatchers(c *check.C) { select { case <-w.Done(): case <-time.After(time.Second): - c.Fatalf("Timeout waiting for close event.") + t.Fatalf("Timeout waiting for close event.") } } -func waitForRestart(c *check.C, eventsC <-chan Event) { - waitForEvent(c, eventsC, WatcherStarted, WatcherFailed) +func waitForRestart(t *testing.T, eventsC <-chan Event) { + waitForEvent(t, eventsC, WatcherStarted, Reloading, WatcherFailed) } -func waitForEvent(c *check.C, eventsC <-chan Event, expectedEvent string, skipEvents ...string) { +func waitForEvent(t *testing.T, eventsC <-chan Event, expectedEvent string, skipEvents ...string) { timeC := time.After(5 * time.Second) for { // wait for watcher to restart @@ -353,27 +342,27 @@ func waitForEvent(c *check.C, eventsC <-chan Event, expectedEvent string, skipEv if apiutils.SliceContainsStr(skipEvents, event.Type) { continue } - c.Assert(event.Type, check.Equals, expectedEvent) + require.Equal(t, expectedEvent, event.Type) return case <-timeC: - c.Fatalf("Timeout waiting for expected event: %s", expectedEvent) + t.Fatalf("Timeout waiting for expected event: %s", expectedEvent) } } } // TestCompletenessInit verifies that flaky backends don't cause // the cache to return partial results during init. -func (s *CacheSuite) TestCompletenessInit(c *check.C) { +func TestCompletenessInit(t *testing.T) { ctx := context.Background() const caCount = 100 const inits = 20 - p := s.newPackWithoutCache(c, ForAuth) - defer p.Close() + p := newTestPackWithoutCache(t) + t.Cleanup(p.Close) // put lots of CAs in the backend for i := 0; i < caCount; i++ { ca := suite.NewTestCA(types.UserCA, fmt.Sprintf("%d.example.com", i)) - c.Assert(p.trustS.UpsertCertAuthority(ca), check.IsNil) + require.NoError(t, p.trustS.UpsertCertAuthority(ca)) } for i := 0; i < inits; i++ { @@ -384,31 +373,31 @@ func (s *CacheSuite) TestCompletenessInit(c *check.C) { Context: ctx, Mirror: true, }) - c.Assert(err, check.IsNil) + require.NoError(t, err) // simulate bad connection to auth server p.backend.SetReadError(trace.ConnectionProblem(nil, "backend is unavailable")) p.eventsS.closeWatchers() p.cache, err = New(ForAuth(Config{ - Context: ctx, - Backend: p.cacheBackend, - Events: p.eventsS, - ClusterConfig: p.clusterConfigS, - Provisioner: p.provisionerS, - Trust: p.trustS, - Users: p.usersS, - Access: p.accessS, - DynamicAccess: p.dynamicAccessS, - Presence: p.presenceS, - AppSession: p.appSessionS, - WebSession: p.webSessionS, - WebToken: p.webTokenS, - Restrictions: p.restrictions, - RetryPeriod: 200 * time.Millisecond, - EventsC: p.eventsC, + Context: ctx, + Backend: p.cacheBackend, + Events: p.eventsS, + ClusterConfig: p.clusterConfigS, + Provisioner: p.provisionerS, + Trust: p.trustS, + Users: p.usersS, + Access: p.accessS, + DynamicAccess: p.dynamicAccessS, + Presence: p.presenceS, + AppSession: p.appSessionS, + WebSession: p.webSessionS, + WebToken: p.webTokenS, + Restrictions: p.restrictions, + MaxRetryPeriod: 200 * time.Millisecond, + EventsC: p.eventsC, })) - c.Assert(err, check.IsNil) + require.NoError(t, err) p.backend.SetReadError(nil) @@ -417,58 +406,58 @@ func (s *CacheSuite) TestCompletenessInit(c *check.C) { // the CA list. for the purposes of this test, we just care that it // doesn't return the CA list *unless* it was successfully constructed. if err == nil { - c.Assert(len(cas), check.Equals, caCount) + require.Len(t, cas, caCount) } else { - fixtures.ExpectConnectionProblem(c, err) + require.True(t, trace.IsConnectionProblem(err)) } - c.Assert(p.cache.Close(), check.IsNil) + require.NoError(t, p.cache.Close()) p.cache = nil - c.Assert(p.cacheBackend.Close(), check.IsNil) + require.NoError(t, p.cacheBackend.Close()) p.cacheBackend = nil } } // TestCompletenessReset verifies that flaky backends don't cause // the cache to return partial results during reset. -func (s *CacheSuite) TestCompletenessReset(c *check.C) { +func TestCompletenessReset(t *testing.T) { ctx := context.Background() const caCount = 100 const resets = 20 - p := s.newPackWithoutCache(c, ForAuth) - defer p.Close() + p := newTestPackWithoutCache(t) + t.Cleanup(p.Close) // put lots of CAs in the backend for i := 0; i < caCount; i++ { ca := suite.NewTestCA(types.UserCA, fmt.Sprintf("%d.example.com", i)) - c.Assert(p.trustS.UpsertCertAuthority(ca), check.IsNil) + require.NoError(t, p.trustS.UpsertCertAuthority(ca)) } var err error p.cache, err = New(ForAuth(Config{ - Context: ctx, - Backend: p.cacheBackend, - Events: p.eventsS, - ClusterConfig: p.clusterConfigS, - Provisioner: p.provisionerS, - Trust: p.trustS, - Users: p.usersS, - Access: p.accessS, - DynamicAccess: p.dynamicAccessS, - Presence: p.presenceS, - AppSession: p.appSessionS, - WebSession: p.webSessionS, - WebToken: p.webTokenS, - Restrictions: p.restrictions, - RetryPeriod: 200 * time.Millisecond, - EventsC: p.eventsC, + Context: ctx, + Backend: p.cacheBackend, + Events: p.eventsS, + ClusterConfig: p.clusterConfigS, + Provisioner: p.provisionerS, + Trust: p.trustS, + Users: p.usersS, + Access: p.accessS, + DynamicAccess: p.dynamicAccessS, + Presence: p.presenceS, + AppSession: p.appSessionS, + WebSession: p.webSessionS, + WebToken: p.webTokenS, + Restrictions: p.restrictions, + MaxRetryPeriod: 200 * time.Millisecond, + EventsC: p.eventsC, })) - c.Assert(err, check.IsNil) + require.NoError(t, err) // verify that CAs are immediately available cas, err := p.cache.GetCertAuthorities(types.UserCA, false) - c.Assert(err, check.IsNil) - c.Assert(len(cas), check.Equals, caCount) + require.NoError(t, err) + require.Len(t, cas, caCount) for i := 0; i < resets; i++ { // simulate bad connection to auth server @@ -482,9 +471,9 @@ func (s *CacheSuite) TestCompletenessReset(c *check.C) { // the CA list. for the purposes of this test, we just care that it // doesn't return the CA list *unless* it was successfully constructed. if err == nil { - c.Assert(len(cas), check.Equals, caCount) + require.Len(t, cas, caCount) } else { - fixtures.ExpectConnectionProblem(c, err) + require.True(t, trace.IsConnectionProblem(err)) } } } @@ -492,83 +481,83 @@ func (s *CacheSuite) TestCompletenessReset(c *check.C) { // TestTombstones verifies that healthy caches leave tombstones // on closure, giving new caches the ability to start from a known // good state if the origin state is unavailable. -func (s *CacheSuite) TestTombstones(c *check.C) { +func TestTombstones(t *testing.T) { ctx := context.Background() const caCount = 10 - p := s.newPackWithoutCache(c, ForAuth) - defer p.Close() + p := newTestPackWithoutCache(t) + t.Cleanup(p.Close) // put lots of CAs in the backend for i := 0; i < caCount; i++ { ca := suite.NewTestCA(types.UserCA, fmt.Sprintf("%d.example.com", i)) - c.Assert(p.trustS.UpsertCertAuthority(ca), check.IsNil) + require.NoError(t, p.trustS.UpsertCertAuthority(ca)) } var err error p.cache, err = New(ForAuth(Config{ - Context: ctx, - Backend: p.cacheBackend, - Events: p.eventsS, - ClusterConfig: p.clusterConfigS, - Provisioner: p.provisionerS, - Trust: p.trustS, - Users: p.usersS, - Access: p.accessS, - DynamicAccess: p.dynamicAccessS, - Presence: p.presenceS, - AppSession: p.appSessionS, - WebSession: p.webSessionS, - WebToken: p.webTokenS, - Restrictions: p.restrictions, - RetryPeriod: 200 * time.Millisecond, - EventsC: p.eventsC, + Context: ctx, + Backend: p.cacheBackend, + Events: p.eventsS, + ClusterConfig: p.clusterConfigS, + Provisioner: p.provisionerS, + Trust: p.trustS, + Users: p.usersS, + Access: p.accessS, + DynamicAccess: p.dynamicAccessS, + Presence: p.presenceS, + AppSession: p.appSessionS, + WebSession: p.webSessionS, + WebToken: p.webTokenS, + Restrictions: p.restrictions, + MaxRetryPeriod: 200 * time.Millisecond, + EventsC: p.eventsC, })) - c.Assert(err, check.IsNil) + require.NoError(t, err) // verify that CAs are immediately available cas, err := p.cache.GetCertAuthorities(types.UserCA, false) - c.Assert(err, check.IsNil) - c.Assert(len(cas), check.Equals, caCount) + require.NoError(t, err) + require.Len(t, cas, caCount) - c.Assert(p.cache.Close(), check.IsNil) + require.NoError(t, p.cache.Close()) // wait for TombstoneWritten, ignoring all other event types - waitForEvent(c, p.eventsC, TombstoneWritten, WatcherStarted, EventProcessed, WatcherFailed) + waitForEvent(t, p.eventsC, TombstoneWritten, WatcherStarted, EventProcessed, WatcherFailed) // simulate bad connection to auth server p.backend.SetReadError(trace.ConnectionProblem(nil, "backend is unavailable")) p.eventsS.closeWatchers() p.cache, err = New(ForAuth(Config{ - Context: ctx, - Backend: p.cacheBackend, - Events: p.eventsS, - ClusterConfig: p.clusterConfigS, - Provisioner: p.provisionerS, - Trust: p.trustS, - Users: p.usersS, - Access: p.accessS, - DynamicAccess: p.dynamicAccessS, - Presence: p.presenceS, - AppSession: p.appSessionS, - WebSession: p.webSessionS, - WebToken: p.webTokenS, - Restrictions: p.restrictions, - RetryPeriod: 200 * time.Millisecond, - EventsC: p.eventsC, + Context: ctx, + Backend: p.cacheBackend, + Events: p.eventsS, + ClusterConfig: p.clusterConfigS, + Provisioner: p.provisionerS, + Trust: p.trustS, + Users: p.usersS, + Access: p.accessS, + DynamicAccess: p.dynamicAccessS, + Presence: p.presenceS, + AppSession: p.appSessionS, + WebSession: p.webSessionS, + WebToken: p.webTokenS, + Restrictions: p.restrictions, + MaxRetryPeriod: 200 * time.Millisecond, + EventsC: p.eventsC, })) - c.Assert(err, check.IsNil) + require.NoError(t, err) // verify that CAs are immediately available despite the fact // that the origin state was never available. cas, err = p.cache.GetCertAuthorities(types.UserCA, false) - c.Assert(err, check.IsNil) - c.Assert(len(cas), check.Equals, caCount) + require.NoError(t, err) + require.Len(t, cas, caCount) } // TestInitStrategy verifies that cache uses expected init strategy // of serving backend state when init is taking too long. -func (s *CacheSuite) TestInitStrategy(c *check.C) { +func TestInitStrategy(t *testing.T) { for i := 0; i < utils.GetIterations(); i++ { - s.initStrategy(c) + initStrategy(t) } } @@ -581,28 +570,28 @@ func TestListNodesTTLVariant(t *testing.T) { ctx := context.Background() - p, err := newPackWithoutCache(t.TempDir(), ForAuth) + p, err := newPackWithoutCache(t.TempDir()) require.NoError(t, err) - defer p.Close() + t.Cleanup(p.Close) p.cache, err = New(ForAuth(Config{ - Context: ctx, - Backend: p.cacheBackend, - Events: p.eventsS, - ClusterConfig: p.clusterConfigS, - Provisioner: p.provisionerS, - Trust: p.trustS, - Users: p.usersS, - Access: p.accessS, - DynamicAccess: p.dynamicAccessS, - Presence: p.presenceS, - AppSession: p.appSessionS, - WebSession: p.webSessionS, - WebToken: p.webTokenS, - Restrictions: p.restrictions, - RetryPeriod: 200 * time.Millisecond, - EventsC: p.eventsC, - neverOK: true, // ensure reads are never healthy + Context: ctx, + Backend: p.cacheBackend, + Events: p.eventsS, + ClusterConfig: p.clusterConfigS, + Provisioner: p.provisionerS, + Trust: p.trustS, + Users: p.usersS, + Access: p.accessS, + DynamicAccess: p.dynamicAccessS, + Presence: p.presenceS, + AppSession: p.appSessionS, + WebSession: p.webSessionS, + WebToken: p.webTokenS, + Restrictions: p.restrictions, + MaxRetryPeriod: 200 * time.Millisecond, + EventsC: p.eventsC, + neverOK: true, // ensure reads are never healthy })) require.NoError(t, err) @@ -644,45 +633,45 @@ func TestListNodesTTLVariant(t *testing.T) { require.Len(t, nodes, nodeCount) } -func (s *CacheSuite) initStrategy(c *check.C) { +func initStrategy(t *testing.T) { ctx := context.Background() - p := s.newPackWithoutCache(c, ForAuth) - defer p.Close() + p := newTestPackWithoutCache(t) + t.Cleanup(p.Close) p.backend.SetReadError(trace.ConnectionProblem(nil, "backend is out")) var err error p.cache, err = New(ForAuth(Config{ - Context: ctx, - Backend: p.cacheBackend, - Events: p.eventsS, - ClusterConfig: p.clusterConfigS, - Provisioner: p.provisionerS, - Trust: p.trustS, - Users: p.usersS, - Access: p.accessS, - DynamicAccess: p.dynamicAccessS, - Presence: p.presenceS, - AppSession: p.appSessionS, - WebSession: p.webSessionS, - WebToken: p.webTokenS, - Restrictions: p.restrictions, - RetryPeriod: 200 * time.Millisecond, - EventsC: p.eventsC, + Context: ctx, + Backend: p.cacheBackend, + Events: p.eventsS, + ClusterConfig: p.clusterConfigS, + Provisioner: p.provisionerS, + Trust: p.trustS, + Users: p.usersS, + Access: p.accessS, + DynamicAccess: p.dynamicAccessS, + Presence: p.presenceS, + AppSession: p.appSessionS, + WebSession: p.webSessionS, + WebToken: p.webTokenS, + Restrictions: p.restrictions, + MaxRetryPeriod: 200 * time.Millisecond, + EventsC: p.eventsC, })) - c.Assert(err, check.IsNil) + require.NoError(t, err) _, err = p.cache.GetCertAuthorities(types.UserCA, false) - fixtures.ExpectConnectionProblem(c, err) + require.True(t, trace.IsConnectionProblem(err)) ca := suite.NewTestCA(types.UserCA, "example.com") // NOTE 1: this could produce event processed // below, based on whether watcher restarts to get the event // or not, which is normal, but has to be accounted below - c.Assert(p.trustS.UpsertCertAuthority(ca), check.IsNil) + require.NoError(t, p.trustS.UpsertCertAuthority(ca)) p.backend.SetReadError(nil) // wait for watcher to restart - waitForRestart(c, p.eventsC) + waitForRestart(t, p.eventsC) normalizeCA := func(ca types.CertAuthority) types.CertAuthority { ca = ca.Clone() @@ -691,10 +680,11 @@ func (s *CacheSuite) initStrategy(c *check.C) { types.RemoveCASecrets(ca) return ca } + _ = normalizeCA out, err := p.cache.GetCertAuthority(ca.GetID(), false) - c.Assert(err, check.IsNil) - fixtures.DeepCompare(c, normalizeCA(ca), normalizeCA(out)) + require.NoError(t, err) + require.Empty(t, cmp.Diff(normalizeCA(ca), normalizeCA(out))) // fail again, make sure last recent data is still served // on errors @@ -703,17 +693,17 @@ func (s *CacheSuite) initStrategy(c *check.C) { // wait for the watcher to fail // there could be optional event processed event, // see NOTE 1 above - waitForEvent(c, p.eventsC, WatcherFailed, EventProcessed) + waitForEvent(t, p.eventsC, WatcherFailed, EventProcessed, Reloading) // backend is out, but old value is available out2, err := p.cache.GetCertAuthority(ca.GetID(), false) - c.Assert(err, check.IsNil) - c.Assert(out.GetResourceID(), check.Equals, out2.GetResourceID()) - fixtures.DeepCompare(c, normalizeCA(ca), normalizeCA(out)) + require.NoError(t, err) + require.Equal(t, out.GetResourceID(), out2.GetResourceID()) + require.Empty(t, cmp.Diff(normalizeCA(ca), normalizeCA(out))) // add modification and expect the resource to recover ca.SetRoleMap(types.RoleMap{types.RoleMapping{Remote: "test", Local: []string{"local-test"}}}) - c.Assert(p.trustS.UpsertCertAuthority(ca), check.IsNil) + require.NoError(t, p.trustS.UpsertCertAuthority(ca)) // now, recover the backend and make sure the // service is back and the new value has propagated @@ -721,66 +711,61 @@ func (s *CacheSuite) initStrategy(c *check.C) { // wait for watcher to restart successfully; ignoring any failed // attempts which occurred before backend became healthy again. - waitForEvent(c, p.eventsC, WatcherStarted, WatcherFailed) + waitForEvent(t, p.eventsC, WatcherStarted, WatcherFailed, Reloading) // new value is available now out, err = p.cache.GetCertAuthority(ca.GetID(), false) - c.Assert(err, check.IsNil) - fixtures.DeepCompare(c, normalizeCA(ca), normalizeCA(out)) + require.NoError(t, err) + require.Empty(t, cmp.Diff(normalizeCA(ca), normalizeCA(out))) } // TestRecovery tests error recovery scenario -func (s *CacheSuite) TestRecovery(c *check.C) { - p := s.newPackForAuth(c) - defer p.Close() +func TestRecovery(t *testing.T) { + p := newPackForAuth(t) + t.Cleanup(p.Close) ca := suite.NewTestCA(types.UserCA, "example.com") - c.Assert(p.trustS.UpsertCertAuthority(ca), check.IsNil) + require.NoError(t, p.trustS.UpsertCertAuthority(ca)) select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } // event has arrived, now close the watchers watchers := p.eventsS.getWatchers() - c.Assert(watchers, check.HasLen, 1) + require.Len(t, watchers, 1) p.eventsS.closeWatchers() // wait for watcher to restart - select { - case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, WatcherStarted) - case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") - } + waitForRestart(t, p.eventsC) // add modification and expect the resource to recover ca2 := suite.NewTestCA(types.UserCA, "example2.com") - c.Assert(p.trustS.UpsertCertAuthority(ca2), check.IsNil) + require.NoError(t, p.trustS.UpsertCertAuthority(ca2)) // wait for watcher to receive an event select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err := p.cache.GetCertAuthority(ca2.GetID(), false) - c.Assert(err, check.IsNil) + require.NoError(t, err) ca2.SetResourceID(out.GetResourceID()) types.RemoveCASecrets(ca2) - fixtures.DeepCompare(c, ca2, out) + require.Empty(t, cmp.Diff(ca2, out)) } // TestTokens tests static and dynamic tokens -func (s *CacheSuite) TestTokens(c *check.C) { +func TestTokens(t *testing.T) { ctx := context.Background() - p := s.newPackForAuth(c) - defer p.Close() + p := newPackForAuth(t) + t.Cleanup(p.Close) staticTokens, err := types.NewStaticTokens(types.StaticTokensSpecV2{ StaticTokens: []types.ProvisionTokenV1{ @@ -791,191 +776,191 @@ func (s *CacheSuite) TestTokens(c *check.C) { }, }, }) - c.Assert(err, check.IsNil) + require.NoError(t, err) err = p.clusterConfigS.SetStaticTokens(staticTokens) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err := p.cache.GetStaticTokens() - c.Assert(err, check.IsNil) + require.NoError(t, err) staticTokens.SetResourceID(out.GetResourceID()) - fixtures.DeepCompare(c, staticTokens, out) + require.Empty(t, cmp.Diff(staticTokens, out)) expires := time.Now().Add(10 * time.Hour).Truncate(time.Second).UTC() token, err := types.NewProvisionToken("token", types.SystemRoles{types.RoleAuth, types.RoleNode}, expires) - c.Assert(err, check.IsNil) + require.NoError(t, err) err = p.provisionerS.UpsertToken(ctx, token) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } tout, err := p.cache.GetToken(ctx, token.GetName()) - c.Assert(err, check.IsNil) + require.NoError(t, err) token.SetResourceID(tout.GetResourceID()) - fixtures.DeepCompare(c, token, tout) + require.Empty(t, cmp.Diff(token, tout)) err = p.provisionerS.DeleteToken(ctx, token.GetName()) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } _, err = p.cache.GetToken(ctx, token.GetName()) - fixtures.ExpectNotFound(c, err) + require.True(t, trace.IsNotFound(err)) } -func (s *CacheSuite) TestAuthPreference(c *check.C) { +func TestAuthPreference(t *testing.T) { ctx := context.Background() - p := s.newPackForAuth(c) - defer p.Close() + p := newPackForAuth(t) + t.Cleanup(p.Close) authPref, err := types.NewAuthPreferenceFromConfigFile(types.AuthPreferenceSpecV2{ AllowLocalAuth: types.NewBoolOption(true), MessageOfTheDay: "test MOTD", }) - c.Assert(err, check.IsNil) + require.NoError(t, err) err = p.clusterConfigS.SetAuthPreference(ctx, authPref) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) - c.Assert(event.Event.Resource.GetKind(), check.Equals, types.KindClusterAuthPreference) + require.Equal(t, EventProcessed, event.Type) + require.Equal(t, types.KindClusterAuthPreference, event.Event.Resource.GetKind()) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } outAuthPref, err := p.cache.GetAuthPreference(ctx) - c.Assert(err, check.IsNil) + require.NoError(t, err) authPref.SetResourceID(outAuthPref.GetResourceID()) - fixtures.DeepCompare(c, outAuthPref, authPref) + require.Empty(t, cmp.Diff(outAuthPref, authPref)) } -func (s *CacheSuite) TestClusterNetworkingConfig(c *check.C) { +func TestClusterNetworkingConfig(t *testing.T) { ctx := context.Background() - p := s.newPackForAuth(c) - defer p.Close() + p := newPackForAuth(t) + t.Cleanup(p.Close) netConfig, err := types.NewClusterNetworkingConfigFromConfigFile(types.ClusterNetworkingConfigSpecV2{ ClientIdleTimeout: types.Duration(time.Minute), ClientIdleTimeoutMessage: "test idle timeout message", }) - c.Assert(err, check.IsNil) + require.NoError(t, err) err = p.clusterConfigS.SetClusterNetworkingConfig(ctx, netConfig) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) - c.Assert(event.Event.Resource.GetKind(), check.Equals, types.KindClusterNetworkingConfig) + require.Equal(t, EventProcessed, event.Type) + require.Equal(t, types.KindClusterNetworkingConfig, event.Event.Resource.GetKind()) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } outNetConfig, err := p.cache.GetClusterNetworkingConfig(ctx) - c.Assert(err, check.IsNil) + require.NoError(t, err) netConfig.SetResourceID(outNetConfig.GetResourceID()) - fixtures.DeepCompare(c, outNetConfig, netConfig) + require.Empty(t, cmp.Diff(outNetConfig, netConfig)) } -func (s *CacheSuite) TestSessionRecordingConfig(c *check.C) { +func TestSessionRecordingConfig(t *testing.T) { ctx := context.Background() - p := s.newPackForAuth(c) - defer p.Close() + p := newPackForAuth(t) + t.Cleanup(p.Close) recConfig, err := types.NewSessionRecordingConfigFromConfigFile(types.SessionRecordingConfigSpecV2{ Mode: types.RecordAtProxySync, ProxyChecksHostKeys: types.NewBoolOption(true), }) - c.Assert(err, check.IsNil) + require.NoError(t, err) err = p.clusterConfigS.SetSessionRecordingConfig(ctx, recConfig) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) - c.Assert(event.Event.Resource.GetKind(), check.Equals, types.KindSessionRecordingConfig) + require.Equal(t, EventProcessed, event.Type) + require.Equal(t, types.KindSessionRecordingConfig, event.Event.Resource.GetKind()) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } outRecConfig, err := p.cache.GetSessionRecordingConfig(ctx) - c.Assert(err, check.IsNil) + require.NoError(t, err) recConfig.SetResourceID(outRecConfig.GetResourceID()) - fixtures.DeepCompare(c, outRecConfig, recConfig) + require.Empty(t, cmp.Diff(outRecConfig, recConfig)) } -func (s *CacheSuite) TestClusterAuditConfig(c *check.C) { +func TestClusterAuditConfig(t *testing.T) { ctx := context.Background() - p := s.newPackForAuth(c) - defer p.Close() + p := newPackForAuth(t) + t.Cleanup(p.Close) auditConfig, err := types.NewClusterAuditConfig(types.ClusterAuditConfigSpecV2{ AuditEventsURI: []string{"dynamodb://audit_table_name", "file:///home/log"}, }) - c.Assert(err, check.IsNil) + require.NoError(t, err) err = p.clusterConfigS.SetClusterAuditConfig(ctx, auditConfig) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) - c.Assert(event.Event.Resource.GetKind(), check.Equals, types.KindClusterAuditConfig) + require.Equal(t, EventProcessed, event.Type) + require.Equal(t, types.KindClusterAuditConfig, event.Event.Resource.GetKind()) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } outAuditConfig, err := p.cache.GetClusterAuditConfig(ctx) - c.Assert(err, check.IsNil) + require.NoError(t, err) auditConfig.SetResourceID(outAuditConfig.GetResourceID()) - fixtures.DeepCompare(c, outAuditConfig, auditConfig) + require.Empty(t, cmp.Diff(outAuditConfig, auditConfig)) } -func (s *CacheSuite) TestClusterName(c *check.C) { - p := s.newPackForAuth(c) - defer p.Close() +func TestClusterName(t *testing.T) { + p := newPackForAuth(t) + t.Cleanup(p.Close) clusterName, err := services.NewClusterNameWithRandomID(types.ClusterNameSpecV2{ ClusterName: "example.com", }) - c.Assert(err, check.IsNil) + require.NoError(t, err) err = p.clusterConfigS.SetClusterName(clusterName) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) - c.Assert(event.Event.Resource.GetKind(), check.Equals, types.KindClusterName) + require.Equal(t, EventProcessed, event.Type) + require.Equal(t, types.KindClusterName, event.Event.Resource.GetKind()) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } outName, err := p.cache.GetClusterName() - c.Assert(err, check.IsNil) + require.NoError(t, err) clusterName.SetResourceID(outName.GetResourceID()) - fixtures.DeepCompare(c, outName, clusterName) + require.Empty(t, cmp.Diff(outName, clusterName)) } // TestClusterConfig tests cluster configuration @@ -1051,130 +1036,130 @@ func (s *CacheSuite) TestClusterConfig(c *check.C) { } // TestNamespaces tests caching of namespaces -func (s *CacheSuite) TestNamespaces(c *check.C) { - p := s.newPackForProxy(c) - defer p.Close() +func TestNamespaces(t *testing.T) { + p := newPackForProxy(t) + t.Cleanup(p.Close) v, err := types.NewNamespace("universe") - c.Assert(err, check.IsNil) + require.NoError(t, err) ns := &v err = p.presenceS.UpsertNamespace(*ns) - c.Assert(err, check.IsNil) + require.NoError(t, err) ns, err = p.presenceS.GetNamespace(ns.GetName()) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err := p.cache.GetNamespace(ns.GetName()) - c.Assert(err, check.IsNil) + require.NoError(t, err) ns.SetResourceID(out.GetResourceID()) - fixtures.DeepCompare(c, ns, out) + require.Empty(t, cmp.Diff(ns, out)) // update namespace metadata ns.Metadata.Labels = map[string]string{"a": "b"} - c.Assert(err, check.IsNil) + require.NoError(t, err) err = p.presenceS.UpsertNamespace(*ns) - c.Assert(err, check.IsNil) + require.NoError(t, err) ns, err = p.presenceS.GetNamespace(ns.GetName()) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetNamespace(ns.GetName()) - c.Assert(err, check.IsNil) + require.NoError(t, err) ns.SetResourceID(out.GetResourceID()) - fixtures.DeepCompare(c, ns, out) + require.Empty(t, cmp.Diff(ns, out)) err = p.presenceS.DeleteNamespace(ns.GetName()) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case <-p.eventsC: case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } _, err = p.cache.GetNamespace(ns.GetName()) - fixtures.ExpectNotFound(c, err) + require.True(t, trace.IsNotFound(err)) } // TestUsers tests caching of users -func (s *CacheSuite) TestUsers(c *check.C) { +func TestUsers(t *testing.T) { ctx := context.Background() - p := s.newPackForProxy(c) - defer p.Close() + p := newPackForProxy(t) + t.Cleanup(p.Close) user, err := types.NewUser("bob") - c.Assert(err, check.IsNil) + require.NoError(t, err) err = p.usersS.UpsertUser(user) - c.Assert(err, check.IsNil) + require.NoError(t, err) user, err = p.usersS.GetUser(user.GetName(), false) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err := p.cache.GetUser(user.GetName(), false) - c.Assert(err, check.IsNil) + require.NoError(t, err) user.SetResourceID(out.GetResourceID()) - fixtures.DeepCompare(c, user, out) + require.Empty(t, cmp.Diff(user, out)) // update user's roles user.SetRoles([]string{"admin"}) - c.Assert(err, check.IsNil) + require.NoError(t, err) err = p.usersS.UpsertUser(user) - c.Assert(err, check.IsNil) + require.NoError(t, err) user, err = p.usersS.GetUser(user.GetName(), false) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetUser(user.GetName(), false) - c.Assert(err, check.IsNil) + require.NoError(t, err) user.SetResourceID(out.GetResourceID()) - fixtures.DeepCompare(c, user, out) + require.Empty(t, cmp.Diff(user, out)) err = p.usersS.DeleteUser(ctx, user.GetName()) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case <-p.eventsC: case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } _, err = p.cache.GetUser(user.GetName(), false) - fixtures.ExpectNotFound(c, err) + require.True(t, trace.IsNotFound(err)) } // TestRoles tests caching of roles -func (s *CacheSuite) TestRoles(c *check.C) { +func TestRoles(t *testing.T) { ctx := context.Background() - p := s.newPackForNode(c) - defer p.Close() + p := newPackForNode(t) + t.Cleanup(p.Close) role, err := types.NewRole("role1", types.RoleSpecV4{ Options: types.RoleOptions{ @@ -1186,128 +1171,128 @@ func (s *CacheSuite) TestRoles(c *check.C) { }, Deny: types.RoleConditions{}, }) - c.Assert(err, check.IsNil) + require.NoError(t, err) err = p.accessS.UpsertRole(ctx, role) - c.Assert(err, check.IsNil) + require.NoError(t, err) role, err = p.accessS.GetRole(ctx, role.GetName()) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err := p.cache.GetRole(ctx, role.GetName()) - c.Assert(err, check.IsNil) + require.NoError(t, err) role.SetResourceID(out.GetResourceID()) - fixtures.DeepCompare(c, role, out) + require.Empty(t, cmp.Diff(role, out)) // update role role.SetLogins(services.Allow, []string{"admin"}) - c.Assert(err, check.IsNil) + require.NoError(t, err) err = p.accessS.UpsertRole(ctx, role) - c.Assert(err, check.IsNil) + require.NoError(t, err) role, err = p.accessS.GetRole(ctx, role.GetName()) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetRole(ctx, role.GetName()) - c.Assert(err, check.IsNil) + require.NoError(t, err) role.SetResourceID(out.GetResourceID()) - fixtures.DeepCompare(c, role, out) + require.Empty(t, cmp.Diff(role, out)) err = p.accessS.DeleteRole(ctx, role.GetName()) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case <-p.eventsC: case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } _, err = p.cache.GetRole(ctx, role.GetName()) - fixtures.ExpectNotFound(c, err) + require.True(t, trace.IsNotFound(err)) } // TestReverseTunnels tests reverse tunnels caching -func (s *CacheSuite) TestReverseTunnels(c *check.C) { - p := s.newPackForProxy(c) - defer p.Close() +func TestReverseTunnels(t *testing.T) { + p := newPackForProxy(t) + t.Cleanup(p.Close) tunnel, err := types.NewReverseTunnel("example.com", []string{"example.com:2023"}) - c.Assert(err, check.IsNil) - c.Assert(p.presenceS.UpsertReverseTunnel(tunnel), check.IsNil) + require.NoError(t, err) + require.NoError(t, p.presenceS.UpsertReverseTunnel(tunnel)) tunnel, err = p.presenceS.GetReverseTunnel(tunnel.GetName()) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err := p.cache.GetReverseTunnels() - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) tunnel.SetResourceID(out[0].GetResourceID()) - fixtures.DeepCompare(c, tunnel, out[0]) + require.Empty(t, cmp.Diff(tunnel, out[0])) // update tunnel's parameters tunnel.SetClusterName("new.example.com") - c.Assert(err, check.IsNil) + require.NoError(t, err) err = p.presenceS.UpsertReverseTunnel(tunnel) - c.Assert(err, check.IsNil) + require.NoError(t, err) out, err = p.presenceS.GetReverseTunnels() - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) tunnel = out[0] select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetReverseTunnels() - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) tunnel.SetResourceID(out[0].GetResourceID()) - fixtures.DeepCompare(c, tunnel, out[0]) + require.Empty(t, cmp.Diff(tunnel, out[0])) err = p.presenceS.DeleteAllReverseTunnels() - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case <-p.eventsC: case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetReverseTunnels() - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 0) + require.NoError(t, err) + require.Empty(t, out) } // TestTunnelConnections tests tunnel connections caching -func (s *CacheSuite) TestTunnelConnections(c *check.C) { - p := s.newPackForProxy(c) - defer p.Close() +func TestTunnelConnections(t *testing.T) { + p := newPackForProxy(t) + t.Cleanup(p.Close) clusterName := "example.com" hb := time.Now().UTC() @@ -1316,322 +1301,322 @@ func (s *CacheSuite) TestTunnelConnections(c *check.C) { ProxyName: "p1", LastHeartbeat: hb, }) - c.Assert(err, check.IsNil) - c.Assert(p.presenceS.UpsertTunnelConnection(conn), check.IsNil) + require.NoError(t, err) + require.NoError(t, p.presenceS.UpsertTunnelConnection(conn)) out, err := p.presenceS.GetTunnelConnections(clusterName) - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) conn = out[0] select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetTunnelConnections(clusterName) - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) conn.SetResourceID(out[0].GetResourceID()) - fixtures.DeepCompare(c, conn, out[0]) + require.Empty(t, cmp.Diff(conn, out[0])) // update conn's parameters hb = hb.Add(time.Second) conn.SetLastHeartbeat(hb) err = p.presenceS.UpsertTunnelConnection(conn) - c.Assert(err, check.IsNil) + require.NoError(t, err) out, err = p.presenceS.GetTunnelConnections(clusterName) - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) conn = out[0] select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetTunnelConnections(clusterName) - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) conn.SetResourceID(out[0].GetResourceID()) - fixtures.DeepCompare(c, conn, out[0]) + require.Empty(t, cmp.Diff(conn, out[0])) err = p.presenceS.DeleteTunnelConnections(clusterName) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case <-p.eventsC: case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetTunnelConnections(clusterName) - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 0) + require.NoError(t, err) + require.Empty(t, out) } // TestNodes tests nodes cache -func (s *CacheSuite) TestNodes(c *check.C) { +func TestNodes(t *testing.T) { ctx := context.Background() - p := s.newPackForProxy(c) - defer p.Close() + p := newPackForProxy(t) + t.Cleanup(p.Close) server := suite.NewServer(types.KindNode, "srv1", "127.0.0.1:2022", apidefaults.Namespace) _, err := p.presenceS.UpsertNode(ctx, server) - c.Assert(err, check.IsNil) + require.NoError(t, err) out, err := p.presenceS.GetNodes(ctx, apidefaults.Namespace) - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) srv := out[0] select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetNodes(ctx, apidefaults.Namespace) - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) srv.SetResourceID(out[0].GetResourceID()) - fixtures.DeepCompare(c, srv, out[0]) + require.Empty(t, cmp.Diff(srv, out[0])) // update srv parameters srv.SetExpiry(time.Now().Add(30 * time.Minute).UTC()) srv.SetAddr("127.0.0.2:2033") lease, err := p.presenceS.UpsertNode(ctx, srv) - c.Assert(err, check.IsNil) + require.NoError(t, err) out, err = p.presenceS.GetNodes(ctx, apidefaults.Namespace) - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) srv = out[0] select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetNodes(ctx, apidefaults.Namespace) - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) srv.SetResourceID(out[0].GetResourceID()) - fixtures.DeepCompare(c, srv, out[0]) + require.Empty(t, cmp.Diff(srv, out[0])) // update keep alive on the node and make sure // it propagates lease.Expires = time.Now().UTC().Add(time.Hour) err = p.presenceS.KeepAliveNode(ctx, *lease) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetNodes(ctx, apidefaults.Namespace) - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) srv.SetResourceID(out[0].GetResourceID()) srv.SetExpiry(lease.Expires) - fixtures.DeepCompare(c, srv, out[0]) + require.Empty(t, cmp.Diff(srv, out[0])) err = p.presenceS.DeleteAllNodes(ctx, apidefaults.Namespace) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case <-p.eventsC: case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetNodes(ctx, apidefaults.Namespace) - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 0) + require.NoError(t, err) + require.Empty(t, out) } // TestProxies tests proxies cache -func (s *CacheSuite) TestProxies(c *check.C) { - p := s.newPackForProxy(c) - defer p.Close() +func TestProxies(t *testing.T) { + p := newPackForProxy(t) + t.Cleanup(p.Close) server := suite.NewServer(types.KindProxy, "srv1", "127.0.0.1:2022", apidefaults.Namespace) err := p.presenceS.UpsertProxy(server) - c.Assert(err, check.IsNil) + require.NoError(t, err) out, err := p.presenceS.GetProxies() - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) srv := out[0] select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetProxies() - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) srv.SetResourceID(out[0].GetResourceID()) - fixtures.DeepCompare(c, srv, out[0]) + require.Empty(t, cmp.Diff(srv, out[0])) // update srv parameters srv.SetAddr("127.0.0.2:2033") err = p.presenceS.UpsertProxy(srv) - c.Assert(err, check.IsNil) + require.NoError(t, err) out, err = p.presenceS.GetProxies() - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) srv = out[0] select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetProxies() - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) srv.SetResourceID(out[0].GetResourceID()) - fixtures.DeepCompare(c, srv, out[0]) + require.Empty(t, cmp.Diff(srv, out[0])) err = p.presenceS.DeleteAllProxies() - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case <-p.eventsC: case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetProxies() - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 0) + require.NoError(t, err) + require.Empty(t, out) } // TestAuthServers tests auth servers cache -func (s *CacheSuite) TestAuthServers(c *check.C) { - p := s.newPackForProxy(c) - defer p.Close() +func TestAuthServers(t *testing.T) { + p := newPackForProxy(t) + t.Cleanup(p.Close) server := suite.NewServer(types.KindAuthServer, "srv1", "127.0.0.1:2022", apidefaults.Namespace) err := p.presenceS.UpsertAuthServer(server) - c.Assert(err, check.IsNil) + require.NoError(t, err) out, err := p.presenceS.GetAuthServers() - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) srv := out[0] select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetAuthServers() - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) srv.SetResourceID(out[0].GetResourceID()) - fixtures.DeepCompare(c, srv, out[0]) + require.Empty(t, cmp.Diff(srv, out[0])) // update srv parameters srv.SetAddr("127.0.0.2:2033") err = p.presenceS.UpsertAuthServer(srv) - c.Assert(err, check.IsNil) + require.NoError(t, err) out, err = p.presenceS.GetAuthServers() - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) srv = out[0] select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetAuthServers() - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) srv.SetResourceID(out[0].GetResourceID()) - fixtures.DeepCompare(c, srv, out[0]) + require.Empty(t, cmp.Diff(srv, out[0])) err = p.presenceS.DeleteAllAuthServers() - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case <-p.eventsC: case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetAuthServers() - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 0) + require.NoError(t, err) + require.Empty(t, out) } // TestRemoteClusters tests remote clusters caching -func (s *CacheSuite) TestRemoteClusters(c *check.C) { +func TestRemoteClusters(t *testing.T) { ctx := context.Background() - p := s.newPackForProxy(c) - defer p.Close() + p := newPackForProxy(t) + t.Cleanup(p.Close) clusterName := "example.com" rc, err := types.NewRemoteCluster(clusterName) - c.Assert(err, check.IsNil) - c.Assert(p.presenceS.CreateRemoteCluster(rc), check.IsNil) + require.NoError(t, err) + require.NoError(t, p.presenceS.CreateRemoteCluster(rc)) out, err := p.presenceS.GetRemoteClusters() - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) rc = out[0] select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetRemoteClusters() - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) rc.SetResourceID(out[0].GetResourceID()) - fixtures.DeepCompare(c, rc, out[0]) + require.Empty(t, cmp.Diff(rc, out[0])) // update conn's parameters meta := rc.GetMetadata() @@ -1639,125 +1624,125 @@ func (s *CacheSuite) TestRemoteClusters(c *check.C) { rc.SetMetadata(meta) err = p.presenceS.UpdateRemoteCluster(ctx, rc) - c.Assert(err, check.IsNil) + require.NoError(t, err) out, err = p.presenceS.GetRemoteClusters() - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) - fixtures.DeepCompare(c, meta.Labels, out[0].GetMetadata().Labels) + require.NoError(t, err) + require.Len(t, out, 1) + require.Empty(t, cmp.Diff(meta.Labels, out[0].GetMetadata().Labels)) rc = out[0] select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetRemoteClusters() - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) rc.SetResourceID(out[0].GetResourceID()) - fixtures.DeepCompare(c, rc, out[0]) + require.Empty(t, cmp.Diff(rc, out[0])) err = p.presenceS.DeleteAllRemoteClusters() - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case <-p.eventsC: case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetRemoteClusters() - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 0) + require.NoError(t, err) + require.Empty(t, out) } // TestAppServers tests that CRUD operations are replicated from the backend to // the cache. -func (s *CacheSuite) TestAppServers(c *check.C) { - p := s.newPackForProxy(c) - defer p.Close() +func TestAppServers(t *testing.T) { + p := newPackForProxy(t) + t.Cleanup(p.Close) // Upsert application into backend. server := suite.NewAppServer("foo", "http://127.0.0.1:8080", "foo.example.com") _, err := p.presenceS.UpsertAppServer(context.Background(), server) - c.Assert(err, check.IsNil) + require.NoError(t, err) // Check that the application is now in the backend. out, err := p.presenceS.GetAppServers(context.Background(), apidefaults.Namespace) - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) srv := out[0] // Wait until the information has been replicated to the cache. select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } // Make sure the cache has a single application in it. out, err = p.cache.GetAppServers(context.Background(), apidefaults.Namespace) - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) // Check that the value in the cache, value in the backend, and original // services.App all exactly match. srv.SetResourceID(out[0].GetResourceID()) server.SetResourceID(out[0].GetResourceID()) - fixtures.DeepCompare(c, srv, out[0]) - fixtures.DeepCompare(c, server, out[0]) + require.Empty(t, cmp.Diff(srv, out[0])) + require.Empty(t, cmp.Diff(server, out[0])) // Update the application and upsert it into the backend again. srv.SetExpiry(time.Now().Add(30 * time.Minute).UTC()) _, err = p.presenceS.UpsertAppServer(context.Background(), srv) - c.Assert(err, check.IsNil) + require.NoError(t, err) // Check that the application is in the backend and only one exists (so an // update occurred). out, err = p.presenceS.GetAppServers(context.Background(), apidefaults.Namespace) - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) srv = out[0] // Check that information has been replicated to the cache. select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } // Make sure the cache has a single application in it. out, err = p.cache.GetAppServers(context.Background(), apidefaults.Namespace) - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) // Check that the value in the cache, value in the backend, and original // services.App all exactly match. srv.SetResourceID(out[0].GetResourceID()) - fixtures.DeepCompare(c, srv, out[0]) + require.Empty(t, cmp.Diff(srv, out[0])) // Remove all applications from the backend. err = p.presenceS.DeleteAllAppServers(context.Background(), apidefaults.Namespace) - c.Assert(err, check.IsNil) + require.NoError(t, err) // Check that information has been replicated to the cache. select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } // Check that the cache is now empty. out, err = p.cache.GetAppServers(context.Background(), apidefaults.Namespace) - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 0) + require.NoError(t, err) + require.Empty(t, out) } // TestDatabaseServers tests that CRUD operations on database servers are @@ -1765,7 +1750,7 @@ func (s *CacheSuite) TestAppServers(c *check.C) { func TestDatabaseServers(t *testing.T) { p, err := newPack(t.TempDir(), ForProxy) require.NoError(t, err) - defer p.Close() + t.Cleanup(p.Close) ctx := context.Background() @@ -1846,6 +1831,52 @@ func TestDatabaseServers(t *testing.T) { require.Equal(t, 0, len(out)) } +func TestCache_Backoff(t *testing.T) { + clock := clockwork.NewFakeClock() + p := newTestPack(t, func(c Config) Config { + c.MaxRetryPeriod = defaults.MaxWatcherBackoff + c.Clock = clock + return ForNode(c) + }) + t.Cleanup(p.Close) + + // close watchers to trigger a reload event + watchers := p.eventsS.getWatchers() + require.Len(t, watchers, 1) + p.eventsS.closeWatchers() + p.backend.SetReadError(trace.ConnectionProblem(nil, "backend is unavailable")) + + step := p.cache.Config.MaxRetryPeriod / 5.0 + for i := 0; i < 5; i++ { + // wait for cache to reload + select { + case event := <-p.eventsC: + require.Equal(t, Reloading, event.Type) + duration, err := time.ParseDuration(event.Event.Resource.GetKind()) + require.NoError(t, err) + + stepMin := step * time.Duration(i) / 2 + stepMax := step * time.Duration(i+1) + + require.GreaterOrEqual(t, duration, stepMin) + require.LessOrEqual(t, duration, stepMax) + + // add some extra to the duration to ensure the retry occurs + clock.Advance(duration * 3) + case <-time.After(time.Minute): + t.Fatalf("timeout waiting for event") + } + + // wait for cache to fail again - backend will still produce a ConnectionProblem error + select { + case event := <-p.eventsC: + require.Equal(t, WatcherFailed, event.Type) + case <-time.After(30 * time.Second): + t.Fatalf("timeout waiting for event") + } + } +} + type proxyEvents struct { sync.Mutex watchers []types.Watcher diff --git a/lib/defaults/defaults.go b/lib/defaults/defaults.go index 32e05b73763d0..dda00f58a8762 100644 --- a/lib/defaults/defaults.go +++ b/lib/defaults/defaults.go @@ -393,15 +393,19 @@ var ( // restart if there has been more than `MaxConnectionErrorsBeforeRestart` // errors in the preceding `ConnectionErrorMeasurementPeriod` MaxConnectionErrorsBeforeRestart = 5 + + // MaxWatcherBackoff is the maximum retry time a watcher should use in + // the event of connection issues + MaxWatcherBackoff = time.Minute ) // Default connection limits, they can be applied separately on any of the Teleport // services (SSH, auth, proxy) const ( - // Number of max. simultaneous connections to a service + // LimiterMaxConnections Number of max. simultaneous connections to a service LimiterMaxConnections = 15000 - // Number of max. simultaneous connected users/logins + // LimiterMaxConcurrentUsers Number of max. simultaneous connected users/logins LimiterMaxConcurrentUsers = 250 // LimiterMaxConcurrentSignatures limits maximum number of concurrently diff --git a/lib/restrictedsession/watcher.go b/lib/restrictedsession/watcher.go index 2259939ad7489..5073f0c718442 100644 --- a/lib/restrictedsession/watcher.go +++ b/lib/restrictedsession/watcher.go @@ -38,8 +38,10 @@ func NewRestrictionsWatcher(cfg RestrictionsWatcherConfig) (*RestrictionsWatcher return nil, trace.Wrap(err) } retry, err := utils.NewLinear(utils.LinearConfig{ - Step: cfg.RetryPeriod / 10, - Max: cfg.RetryPeriod, + First: utils.HalfJitter(cfg.MaxRetryPeriod / 10), + Step: cfg.MaxRetryPeriod / 5, + Max: cfg.MaxRetryPeriod, + Jitter: utils.NewHalfJitter(), }) if err != nil { return nil, trace.Wrap(err) @@ -74,8 +76,8 @@ type RestrictionsWatcher struct { // RestrictionsWatcherConfig configures restrictions watcher type RestrictionsWatcherConfig struct { - // RetryPeriod is a retry period on failed watchers - RetryPeriod time.Duration + // MaxRetryPeriod is the maximum retry period on failed watchers + MaxRetryPeriod time.Duration // ReloadPeriod is a failed period on failed watches ReloadPeriod time.Duration // Client is used by changeset to monitor restrictions updates @@ -96,8 +98,8 @@ func (cfg *RestrictionsWatcherConfig) CheckAndSetDefaults() error { if cfg.RestrictionsC == nil { return trace.BadParameter("missing parameter RestrictionsC") } - if cfg.RetryPeriod == 0 { - cfg.RetryPeriod = defaults.HighResPollingPeriod + if cfg.MaxRetryPeriod == 0 { + cfg.MaxRetryPeriod = defaults.MaxWatcherBackoff } if cfg.ReloadPeriod == 0 { cfg.ReloadPeriod = defaults.LowResPollingPeriod diff --git a/lib/reversetunnel/rc_manager.go b/lib/reversetunnel/rc_manager.go index dab996a592468..1559a0f6efa06 100644 --- a/lib/reversetunnel/rc_manager.go +++ b/lib/reversetunnel/rc_manager.go @@ -75,6 +75,8 @@ type RemoteClusterTunnelManagerConfig struct { KubeDialAddr utils.NetAddr // FIPS indicates if Teleport was started in FIPS mode. FIPS bool + // Log is the logger + Log logrus.FieldLogger } func (c *RemoteClusterTunnelManagerConfig) CheckAndSetDefaults() error { @@ -96,6 +98,9 @@ func (c *RemoteClusterTunnelManagerConfig) CheckAndSetDefaults() error { if c.Clock == nil { c.Clock = clockwork.NewRealClock() } + if c.Log == nil { + c.Log = logrus.New() + } return nil } @@ -137,7 +142,7 @@ func (w *RemoteClusterTunnelManager) Run(ctx context.Context) { w.mu.Unlock() if err := w.Sync(ctx); err != nil { - logrus.Warningf("Failed to sync reverse tunnels: %v.", err) + w.cfg.Log.Warningf("Failed to sync reverse tunnels: %v.", err) } ticker := time.NewTicker(defaults.ResyncInterval) @@ -146,11 +151,11 @@ func (w *RemoteClusterTunnelManager) Run(ctx context.Context) { for { select { case <-ctx.Done(): - logrus.Debugf("Closing.") + w.cfg.Log.Debugf("Closing.") return case <-ticker.C: if err := w.Sync(ctx); err != nil { - logrus.Warningf("Failed to sync reverse tunnels: %v.", err) + w.cfg.Log.Warningf("Failed to sync reverse tunnels: %v.", err) continue } } diff --git a/lib/service/cfg.go b/lib/service/cfg.go index 0b5818fec29c1..d743ee0ec5bb7 100644 --- a/lib/service/cfg.go +++ b/lib/service/cfg.go @@ -237,6 +237,9 @@ type Config struct { // unit time that the node can sustain before restarting itself, as // measured by the rotation state service. RestartThreshold Rate + + // MaxRetryPeriod is the maximum period between reconnection attempts to auth + MaxRetryPeriod time.Duration } // ApplyToken assigns a given token to all internal services but only if token @@ -905,6 +908,7 @@ func ApplyDefaults(cfg *Config) { Amount: defaults.MaxConnectionErrorsBeforeRestart, Time: defaults.ConnectionErrorMeasurementPeriod, } + cfg.MaxRetryPeriod = defaults.MaxWatcherBackoff } // ApplyFIPSDefaults updates default configuration to be FedRAMP/FIPS 140-2 diff --git a/lib/service/connect.go b/lib/service/connect.go index df704b2c51ffa..b999083cb4b30 100644 --- a/lib/service/connect.go +++ b/lib/service/connect.go @@ -19,7 +19,6 @@ package service import ( "crypto/tls" "path/filepath" - "time" "golang.org/x/crypto/ssh" @@ -45,7 +44,17 @@ import ( // reconnectToAuthService continuously attempts to reconnect to the auth // service until succeeds or process gets shut down func (process *TeleportProcess) reconnectToAuthService(role types.SystemRole) (*Connector, error) { - retryTime := defaults.HighResPollingPeriod + retry, err := utils.NewLinear(utils.LinearConfig{ + First: utils.HalfJitter(process.Config.MaxRetryPeriod / 10), + Step: process.Config.MaxRetryPeriod / 5, + Max: process.Config.MaxRetryPeriod, + Clock: process.Clock, + Jitter: utils.NewHalfJitter(), + }) + if err != nil { + return nil, trace.Wrap(err) + } + for { connector, err := process.connectToAuthService(role) if err == nil { @@ -66,9 +75,18 @@ func (process *TeleportProcess) reconnectToAuthService(role types.SystemRole) (* } process.log.Errorf("%v failed to establish connection to cluster: %v.", role, err) + // Used for testing that auth service will attempt to reconnect in the provided duration. + select { + case process.connectFailureC <- retry.Duration(): + default: + } + + startedWait := process.Clock.Now() // Wait in between attempts, but return if teleport is shutting down select { - case <-time.After(retryTime): + case t := <-retry.After(): + process.log.Debugf("Retrying connection to auth server after waiting %v.", t.Sub(startedWait)) + retry.Inc() case <-process.ExitContext().Done(): process.log.Infof("%v stopping connection attempts, teleport is shutting down.", role) return nil, ErrTeleportExited @@ -130,12 +148,12 @@ func (process *TeleportProcess) connect(role types.SystemRole) (conn *Connector, }, nil } process.log.Infof("Connecting to the cluster %v with TLS client certificate.", identity.ClusterName) - client, err := process.newClient(process.Config.AuthServers, identity) + clt, err := process.newClient(process.Config.AuthServers, identity) if err != nil { return nil, trace.Wrap(err) } return &Connector{ - Client: client, + Client: clt, ClientIdentity: identity, ServerIdentity: identity, }, nil @@ -150,12 +168,12 @@ func (process *TeleportProcess) connect(role types.SystemRole) (conn *Connector, ServerIdentity: identity, }, nil } - client, err := process.newClient(process.Config.AuthServers, identity) + clt, err := process.newClient(process.Config.AuthServers, identity) if err != nil { return nil, trace.Wrap(err) } return &Connector{ - Client: client, + Client: clt, ClientIdentity: identity, ServerIdentity: identity, }, nil @@ -172,12 +190,12 @@ func (process *TeleportProcess) connect(role types.SystemRole) (conn *Connector, ServerIdentity: identity, }, nil } - client, err := process.newClient(process.Config.AuthServers, newIdentity) + clt, err := process.newClient(process.Config.AuthServers, newIdentity) if err != nil { return nil, trace.Wrap(err) } return &Connector{ - Client: client, + Client: clt, ClientIdentity: newIdentity, ServerIdentity: identity, }, nil @@ -194,12 +212,12 @@ func (process *TeleportProcess) connect(role types.SystemRole) (conn *Connector, ServerIdentity: newIdentity, }, nil } - client, err := process.newClient(process.Config.AuthServers, newIdentity) + clt, err := process.newClient(process.Config.AuthServers, newIdentity) if err != nil { return nil, trace.Wrap(err) } return &Connector{ - Client: client, + Client: clt, ClientIdentity: newIdentity, ServerIdentity: newIdentity, }, nil @@ -214,12 +232,12 @@ func (process *TeleportProcess) connect(role types.SystemRole) (conn *Connector, ServerIdentity: identity, }, nil } - client, err := process.newClient(process.Config.AuthServers, identity) + clt, err := process.newClient(process.Config.AuthServers, identity) if err != nil { return nil, trace.Wrap(err) } return &Connector{ - Client: client, + Client: clt, ClientIdentity: identity, ServerIdentity: identity, }, nil @@ -397,14 +415,14 @@ func (process *TeleportProcess) firstTimeConnect(role types.SystemRole) (*Connec ServerIdentity: identity, } } else { - client, err := process.newClient(process.Config.AuthServers, identity) + clt, err := process.newClient(process.Config.AuthServers, identity) if err != nil { return nil, trace.Wrap(err) } connector = &Connector{ ClientIdentity: identity, ServerIdentity: identity, - Client: client, + Client: clt, } } diff --git a/lib/service/service.go b/lib/service/service.go index 2b5dea17c7e35..11cd1d3145c21 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -302,6 +302,9 @@ type TeleportProcess struct { // clusterFeatures contain flags for supported and unsupported features. clusterFeatures proto.Features + + // connectFailureC is a channel to notify of failures to connect to auth (used in tests). + connectFailureC chan time.Duration } type keyPairKey struct { @@ -546,7 +549,7 @@ func waitAndReload(ctx context.Context, cfg Config, srv Process, newTeleport New defer cancel() srv.Shutdown(timeoutCtx) if timeoutCtx.Err() == context.DeadlineExceeded { - // The new serivce can start initiating connections to the old service + // The new service can start initiating connections to the old service // keeping it from shutting down gracefully, or some external // connections can keep hanging the old auth service and prevent // the services from shutting down, so abort the graceful way @@ -682,6 +685,7 @@ func NewTeleport(cfg *Config) (*TeleportProcess, error) { id: processID, keyPairs: make(map[keyPairKey]KeyPair), appDependCh: make(chan Event, 1024), + connectFailureC: make(chan time.Duration), } process.registerAppDepend() @@ -2813,6 +2817,10 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { return nil }) + rcWatchLog := logrus.WithFields(logrus.Fields{ + trace.Component: teleport.Component(teleport.ComponentReverseTunnelAgent, process.id), + }) + // Create and register reverse tunnel AgentPool. rcWatcher, err := reversetunnel.NewRemoteClusterTunnelManager(reversetunnel.RemoteClusterTunnelManagerConfig{ HostUUID: conn.ServerIdentity.ID.HostUUID, @@ -2823,16 +2831,14 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { KubeDialAddr: utils.DialAddrFromListenAddr(cfg.Proxy.Kube.ListenAddr), ReverseTunnelServer: tsrv, FIPS: process.Config.FIPS, + Log: rcWatchLog, }) if err != nil { return trace.Wrap(err) } process.RegisterCriticalFunc("proxy.reversetunnel.watcher", func() error { - log := logrus.WithFields(logrus.Fields{ - trace.Component: teleport.Component(teleport.ComponentReverseTunnelAgent, process.id), - }) - log.Infof("Starting reverse tunnel agent pool.") + rcWatchLog.Infof("Starting reverse tunnel agent pool.") done := make(chan struct{}) go func() { defer close(done) diff --git a/lib/service/service_test.go b/lib/service/service_test.go index b9715f78bc8f7..c68698e54cb1f 100644 --- a/lib/service/service_test.go +++ b/lib/service/service_test.go @@ -21,6 +21,8 @@ import ( "io/ioutil" "net/http" "os" + "strings" + "sync" "testing" "time" @@ -450,3 +452,49 @@ func waitForStatus(diagAddr string, statusCodes ...int) error { } } } + +func TestTeleportProcess_reconnectToAuth(t *testing.T) { + t.Parallel() + clock := clockwork.NewFakeClock() + // Create and configure a default Teleport configuration. + cfg := MakeDefaultConfig() + cfg.AuthServers = []utils.NetAddr{{AddrNetwork: "tcp", Addr: "127.0.0.1:0"}} + cfg.Clock = clock + cfg.DataDir = t.TempDir() + cfg.Auth.Enabled = false + cfg.Proxy.Enabled = false + cfg.SSH.Enabled = true + process, err := NewTeleport(cfg) + require.NoError(t, err) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + c, err := process.reconnectToAuthService(types.RoleAdmin) + require.Equal(t, ErrTeleportExited, err) + require.Nil(t, c) + }() + + step := cfg.MaxRetryPeriod / 5.0 + for i := 0; i < 5; i++ { + // wait for connection to fail + select { + case duration := <-process.connectFailureC: + stepMin := step * time.Duration(i) / 2 + stepMax := step * time.Duration(i+1) + + require.GreaterOrEqual(t, duration, stepMin) + require.LessOrEqual(t, duration, stepMax) + // add some extra to the duration to ensure the retry occurs + clock.Advance(duration * 3) + case <-time.After(time.Minute): + t.Fatalf("timeout waiting for failure") + } + } + + supervisor, ok := process.Supervisor.(*LocalSupervisor) + require.True(t, ok) + supervisor.signalExit() + wg.Wait() +} diff --git a/lib/services/watcher.go b/lib/services/watcher.go index bd247ef0baf51..735db9741a5a7 100644 --- a/lib/services/watcher.go +++ b/lib/services/watcher.go @@ -52,8 +52,8 @@ type ResourceWatcherConfig struct { Component string // Log is a logger. Log logrus.FieldLogger - // RetryPeriod is a retry period on failed watchers. - RetryPeriod time.Duration + // MaxRetryPeriod is the maximum retry period on failed watchers. + MaxRetryPeriod time.Duration // RefetchPeriod is a period after which to explicitly refetch the resources. // It is to protect against unexpected cache syncing issues. RefetchPeriod time.Duration @@ -74,8 +74,8 @@ func (cfg *ResourceWatcherConfig) CheckAndSetDefaults() error { if cfg.Log == nil { cfg.Log = logrus.StandardLogger() } - if cfg.RetryPeriod == 0 { - cfg.RetryPeriod = defaults.HighResPollingPeriod + if cfg.MaxRetryPeriod == 0 { + cfg.MaxRetryPeriod = defaults.MaxWatcherBackoff } if cfg.RefetchPeriod == 0 { cfg.RefetchPeriod = defaults.LowResPollingPeriod @@ -94,9 +94,11 @@ func (cfg *ResourceWatcherConfig) CheckAndSetDefaults() error { // incl. cfg.CheckAndSetDefaults. func newResourceWatcher(ctx context.Context, collector resourceCollector, cfg ResourceWatcherConfig) (*resourceWatcher, error) { retry, err := utils.NewLinear(utils.LinearConfig{ - Step: cfg.RetryPeriod / 10, - Max: cfg.RetryPeriod, - Clock: cfg.Clock, + First: utils.HalfJitter(cfg.MaxRetryPeriod / 10), + Step: cfg.MaxRetryPeriod / 5, + Max: cfg.MaxRetryPeriod, + Jitter: utils.NewHalfJitter(), + Clock: cfg.Clock, }) if err != nil { return nil, trace.Wrap(err) @@ -110,7 +112,7 @@ func newResourceWatcher(ctx context.Context, collector resourceCollector, cfg Re cancel: cancel, retry: retry, LoopC: make(chan struct{}), - ResetC: make(chan struct{}), + ResetC: make(chan time.Duration, 1), StaleC: make(chan struct{}, 1), } go p.runWatchLoop() @@ -139,7 +141,7 @@ type resourceWatcher struct { // (used in tests). LoopC chan struct{} // ResetC is a channel to notify of internal watcher reset (used in tests). - ResetC chan struct{} + ResetC chan time.Duration // StaleC is a channel that can trigger the condition of resource staleness // (used in tests). StaleC chan struct{} @@ -174,7 +176,7 @@ func (p *resourceWatcher) hasStaleView() bool { // runWatchLoop runs a watch loop. func (p *resourceWatcher) runWatchLoop() { for { - p.Log.WithField("retry", p.retry).Debug("Starting watch.") + p.Log.Debug("Starting watch.") err := p.watch() select { @@ -183,8 +185,17 @@ func (p *resourceWatcher) runWatchLoop() { default: } + // Used for testing that the watch routine has exited and is about + // to be restarted. + select { + case p.ResetC <- p.retry.Duration(): + default: + } + + startedWaiting := p.Clock.Now() select { - case <-p.retry.After(): + case t := <-p.retry.After(): + p.Log.Debugf("Attempting to restart watch after waiting %v.", t.Sub(startedWaiting)) p.retry.Inc() case <-p.ctx.Done(): p.Log.Debug("Closed, returning from watch loop.") @@ -202,12 +213,7 @@ func (p *resourceWatcher) runWatchLoop() { p.Log.Warningf("Maximum staleness of %v exceeded, failure started at %v.", p.MaxStaleness, p.failureStartedAt) p.collector.notifyStale() } - // Used for testing that the watch routine has exited and is about - // to be restarted. - select { - case p.ResetC <- struct{}{}: - default: - } + } } diff --git a/lib/services/watcher_test.go b/lib/services/watcher_test.go index 682c7bef049c5..b0cc4813ab4ac 100644 --- a/lib/services/watcher_test.go +++ b/lib/services/watcher_test.go @@ -18,6 +18,7 @@ package services_test import ( "context" + "errors" "sync" "testing" "time" @@ -36,6 +37,59 @@ import ( "github.com/gravitational/teleport/lib/services/local" ) +var _ types.Events = (*errorWatcher)(nil) + +type errorWatcher struct { +} + +func (e errorWatcher) NewWatcher(context.Context, types.Watch) (types.Watcher, error) { + return nil, errors.New("watcher error") +} + +var _ services.ProxyGetter = (*nopProxyGetter)(nil) + +type nopProxyGetter struct { +} + +func (n nopProxyGetter) GetProxies() ([]types.Server, error) { + return nil, nil +} + +func TestResourceWatcher_Backoff(t *testing.T) { + t.Parallel() + ctx := context.Background() + clock := clockwork.NewFakeClock() + + w, err := services.NewProxyWatcher(ctx, services.ProxyWatcherConfig{ + ResourceWatcherConfig: services.ResourceWatcherConfig{ + Component: "test", + Clock: clock, + MaxRetryPeriod: defaults.MaxWatcherBackoff, + Client: &errorWatcher{}, + }, + ProxyGetter: &nopProxyGetter{}, + }) + require.NoError(t, err) + t.Cleanup(w.Close) + + step := w.MaxRetryPeriod / 5.0 + for i := 0; i < 5; i++ { + // wait for watcher to reload + select { + case duration := <-w.ResetC: + stepMin := step * time.Duration(i) / 2 + stepMax := step * time.Duration(i+1) + + require.GreaterOrEqual(t, duration, stepMin) + require.LessOrEqual(t, duration, stepMax) + // add some extra to the duration to ensure the retry occurs + clock.Advance(duration * 3) + case <-time.After(time.Minute): + t.Fatalf("timeout waiting for reset") + } + } +} + func TestProxyWatcher(t *testing.T) { t.Parallel() ctx := context.Background() @@ -54,8 +108,8 @@ func TestProxyWatcher(t *testing.T) { presence := local.NewPresenceService(bk) w, err := services.NewProxyWatcher(ctx, services.ProxyWatcherConfig{ ResourceWatcherConfig: services.ResourceWatcherConfig{ - Component: "test", - RetryPeriod: 200 * time.Millisecond, + Component: "test", + MaxRetryPeriod: 200 * time.Millisecond, Client: &client{ Presence: presence, Events: local.NewEventsService(bk, nil), @@ -147,8 +201,8 @@ func TestLockWatcher(t *testing.T) { access := local.NewAccessService(bk) w, err := services.NewLockWatcher(ctx, services.LockWatcherConfig{ ResourceWatcherConfig: services.ResourceWatcherConfig{ - Component: "test", - RetryPeriod: 200 * time.Millisecond, + Component: "test", + MaxRetryPeriod: 200 * time.Millisecond, Client: &client{ Access: access, Events: local.NewEventsService(bk, nil), @@ -252,8 +306,8 @@ func TestLockWatcherSubscribeWithEmptyTarget(t *testing.T) { access := local.NewAccessService(bk) w, err := services.NewLockWatcher(ctx, services.LockWatcherConfig{ ResourceWatcherConfig: services.ResourceWatcherConfig{ - Component: "test", - RetryPeriod: 200 * time.Millisecond, + Component: "test", + MaxRetryPeriod: 200 * time.Millisecond, Client: &client{ Access: access, Events: local.NewEventsService(bk, nil), @@ -330,8 +384,8 @@ func TestLockWatcherStale(t *testing.T) { events := &withUnreliability{Events: local.NewEventsService(bk, nil)} w, err := services.NewLockWatcher(ctx, services.LockWatcherConfig{ ResourceWatcherConfig: services.ResourceWatcherConfig{ - Component: "test", - RetryPeriod: 200 * time.Millisecond, + Component: "test", + MaxRetryPeriod: 200 * time.Millisecond, Client: &client{ Access: access, Events: events, diff --git a/lib/srv/heartbeat.go b/lib/srv/heartbeat.go index dbba9134fac94..3b1e9fb040332 100644 --- a/lib/srv/heartbeat.go +++ b/lib/srv/heartbeat.go @@ -431,7 +431,7 @@ func (h *Heartbeat) announce() error { if !ok { return trace.BadParameter("expected services.Server, got %#v", h.current) } - err := h.Announcer.UpsertKubeService(context.TODO(), kube) + err := h.Announcer.UpsertKubeService(h.cancelCtx, kube) if err != nil { h.nextAnnounce = h.Clock.Now().UTC().Add(h.KeepAlivePeriod) h.setState(HeartbeatStateAnnounceWait) diff --git a/lib/srv/monitor_test.go b/lib/srv/monitor_test.go index cd168e04cfe18..fd1946c7b2eee 100644 --- a/lib/srv/monitor_test.go +++ b/lib/srv/monitor_test.go @@ -112,7 +112,7 @@ func TestMonitorStaleLocks(t *testing.T) { select { case <-asrv.LockWatcher.LoopC: - case <-time.After(2 * time.Second): + case <-time.After(15 * time.Second): t.Fatal("Timeout waiting for LockWatcher loop check.") } select { @@ -123,12 +123,12 @@ func TestMonitorStaleLocks(t *testing.T) { go asrv.Backend.CloseWatchers() select { case <-asrv.LockWatcher.ResetC: - case <-time.After(2 * time.Second): + case <-time.After(15 * time.Second): t.Fatal("Timeout waiting for LockWatcher reset.") } select { case <-conn.closedC: - case <-time.After(2 * time.Second): + case <-time.After(15 * time.Second): t.Fatal("Timeout waiting for connection close.") } require.Equal(t, services.StrictLockingModeAccessDenied.Error(), emitter.LastEvent().(*apievents.ClientDisconnect).Reason) diff --git a/lib/srv/regular/sshserver_test.go b/lib/srv/regular/sshserver_test.go index 6376bdb64f0f0..e0bc6d3b5a6a6 100644 --- a/lib/srv/regular/sshserver_test.go +++ b/lib/srv/regular/sshserver_test.go @@ -1004,6 +1004,7 @@ func TestProxyReverseTunnel(t *testing.T) { AccessPoint: proxyClient, ReverseTunnelServer: reverseTunnelServer, LocalCluster: f.testSrv.ClusterName(), + Log: logger, }) require.NoError(t, err) From 2f1acc159f07a788f2366460ed96775de10f3bb3 Mon Sep 17 00:00:00 2001 From: Tim Ross Date: Thu, 20 Jan 2022 16:03:38 -0500 Subject: [PATCH 2/5] cache_test --- lib/cache/cache_test.go | 226 ++++++++++++++++++++-------------------- 1 file changed, 113 insertions(+), 113 deletions(-) diff --git a/lib/cache/cache_test.go b/lib/cache/cache_test.go index f5edc082728b2..b9e783c9df81d 100644 --- a/lib/cache/cache_test.go +++ b/lib/cache/cache_test.go @@ -168,22 +168,22 @@ func newPack(dir string, setupConfig func(c Config) Config) (*testPack, error) { } p.cache, err = New(setupConfig(Config{ - Context: ctx, - Backend: p.cacheBackend, - Events: p.eventsS, - ClusterConfig: p.clusterConfigS, - Provisioner: p.provisionerS, - Trust: p.trustS, - Users: p.usersS, - Access: p.accessS, - DynamicAccess: p.dynamicAccessS, - Presence: p.presenceS, - AppSession: p.appSessionS, - WebSession: p.webSessionS, - WebToken: p.webTokenS, - Restrictions: p.restrictions, - MaxRetryPeriod: 200 * time.Millisecond, - EventsC: p.eventsC, + Context: ctx, + Backend: p.cacheBackend, + Events: p.eventsS, + ClusterConfig: p.clusterConfigS, + Provisioner: p.provisionerS, + Trust: p.trustS, + Users: p.usersS, + Access: p.accessS, + DynamicAccess: p.dynamicAccessS, + Presence: p.presenceS, + AppSession: p.appSessionS, + WebSession: p.webSessionS, + WebToken: p.webTokenS, + Restrictions: p.restrictions, + MaxRetryPeriod: 200 * time.Millisecond, + EventsC: p.eventsC, })) if err != nil { return nil, trace.Wrap(err) @@ -380,22 +380,22 @@ func TestCompletenessInit(t *testing.T) { p.eventsS.closeWatchers() p.cache, err = New(ForAuth(Config{ - Context: ctx, - Backend: p.cacheBackend, - Events: p.eventsS, - ClusterConfig: p.clusterConfigS, - Provisioner: p.provisionerS, - Trust: p.trustS, - Users: p.usersS, - Access: p.accessS, - DynamicAccess: p.dynamicAccessS, - Presence: p.presenceS, - AppSession: p.appSessionS, - WebSession: p.webSessionS, - WebToken: p.webTokenS, - Restrictions: p.restrictions, - MaxRetryPeriod: 200 * time.Millisecond, - EventsC: p.eventsC, + Context: ctx, + Backend: p.cacheBackend, + Events: p.eventsS, + ClusterConfig: p.clusterConfigS, + Provisioner: p.provisionerS, + Trust: p.trustS, + Users: p.usersS, + Access: p.accessS, + DynamicAccess: p.dynamicAccessS, + Presence: p.presenceS, + AppSession: p.appSessionS, + WebSession: p.webSessionS, + WebToken: p.webTokenS, + Restrictions: p.restrictions, + MaxRetryPeriod: 200 * time.Millisecond, + EventsC: p.eventsC, })) require.NoError(t, err) @@ -435,22 +435,22 @@ func TestCompletenessReset(t *testing.T) { var err error p.cache, err = New(ForAuth(Config{ - Context: ctx, - Backend: p.cacheBackend, - Events: p.eventsS, - ClusterConfig: p.clusterConfigS, - Provisioner: p.provisionerS, - Trust: p.trustS, - Users: p.usersS, - Access: p.accessS, - DynamicAccess: p.dynamicAccessS, - Presence: p.presenceS, - AppSession: p.appSessionS, - WebSession: p.webSessionS, - WebToken: p.webTokenS, - Restrictions: p.restrictions, - MaxRetryPeriod: 200 * time.Millisecond, - EventsC: p.eventsC, + Context: ctx, + Backend: p.cacheBackend, + Events: p.eventsS, + ClusterConfig: p.clusterConfigS, + Provisioner: p.provisionerS, + Trust: p.trustS, + Users: p.usersS, + Access: p.accessS, + DynamicAccess: p.dynamicAccessS, + Presence: p.presenceS, + AppSession: p.appSessionS, + WebSession: p.webSessionS, + WebToken: p.webTokenS, + Restrictions: p.restrictions, + MaxRetryPeriod: 200 * time.Millisecond, + EventsC: p.eventsC, })) require.NoError(t, err) @@ -495,22 +495,22 @@ func TestTombstones(t *testing.T) { var err error p.cache, err = New(ForAuth(Config{ - Context: ctx, - Backend: p.cacheBackend, - Events: p.eventsS, - ClusterConfig: p.clusterConfigS, - Provisioner: p.provisionerS, - Trust: p.trustS, - Users: p.usersS, - Access: p.accessS, - DynamicAccess: p.dynamicAccessS, - Presence: p.presenceS, - AppSession: p.appSessionS, - WebSession: p.webSessionS, - WebToken: p.webTokenS, - Restrictions: p.restrictions, - MaxRetryPeriod: 200 * time.Millisecond, - EventsC: p.eventsC, + Context: ctx, + Backend: p.cacheBackend, + Events: p.eventsS, + ClusterConfig: p.clusterConfigS, + Provisioner: p.provisionerS, + Trust: p.trustS, + Users: p.usersS, + Access: p.accessS, + DynamicAccess: p.dynamicAccessS, + Presence: p.presenceS, + AppSession: p.appSessionS, + WebSession: p.webSessionS, + WebToken: p.webTokenS, + Restrictions: p.restrictions, + MaxRetryPeriod: 200 * time.Millisecond, + EventsC: p.eventsC, })) require.NoError(t, err) @@ -527,22 +527,22 @@ func TestTombstones(t *testing.T) { p.eventsS.closeWatchers() p.cache, err = New(ForAuth(Config{ - Context: ctx, - Backend: p.cacheBackend, - Events: p.eventsS, - ClusterConfig: p.clusterConfigS, - Provisioner: p.provisionerS, - Trust: p.trustS, - Users: p.usersS, - Access: p.accessS, - DynamicAccess: p.dynamicAccessS, - Presence: p.presenceS, - AppSession: p.appSessionS, - WebSession: p.webSessionS, - WebToken: p.webTokenS, - Restrictions: p.restrictions, - MaxRetryPeriod: 200 * time.Millisecond, - EventsC: p.eventsC, + Context: ctx, + Backend: p.cacheBackend, + Events: p.eventsS, + ClusterConfig: p.clusterConfigS, + Provisioner: p.provisionerS, + Trust: p.trustS, + Users: p.usersS, + Access: p.accessS, + DynamicAccess: p.dynamicAccessS, + Presence: p.presenceS, + AppSession: p.appSessionS, + WebSession: p.webSessionS, + WebToken: p.webTokenS, + Restrictions: p.restrictions, + MaxRetryPeriod: 200 * time.Millisecond, + EventsC: p.eventsC, })) require.NoError(t, err) @@ -575,23 +575,23 @@ func TestListNodesTTLVariant(t *testing.T) { t.Cleanup(p.Close) p.cache, err = New(ForAuth(Config{ - Context: ctx, - Backend: p.cacheBackend, - Events: p.eventsS, - ClusterConfig: p.clusterConfigS, - Provisioner: p.provisionerS, - Trust: p.trustS, - Users: p.usersS, - Access: p.accessS, - DynamicAccess: p.dynamicAccessS, - Presence: p.presenceS, - AppSession: p.appSessionS, - WebSession: p.webSessionS, - WebToken: p.webTokenS, - Restrictions: p.restrictions, - MaxRetryPeriod: 200 * time.Millisecond, - EventsC: p.eventsC, - neverOK: true, // ensure reads are never healthy + Context: ctx, + Backend: p.cacheBackend, + Events: p.eventsS, + ClusterConfig: p.clusterConfigS, + Provisioner: p.provisionerS, + Trust: p.trustS, + Users: p.usersS, + Access: p.accessS, + DynamicAccess: p.dynamicAccessS, + Presence: p.presenceS, + AppSession: p.appSessionS, + WebSession: p.webSessionS, + WebToken: p.webTokenS, + Restrictions: p.restrictions, + MaxRetryPeriod: 200 * time.Millisecond, + EventsC: p.eventsC, + neverOK: true, // ensure reads are never healthy })) require.NoError(t, err) @@ -641,22 +641,22 @@ func initStrategy(t *testing.T) { p.backend.SetReadError(trace.ConnectionProblem(nil, "backend is out")) var err error p.cache, err = New(ForAuth(Config{ - Context: ctx, - Backend: p.cacheBackend, - Events: p.eventsS, - ClusterConfig: p.clusterConfigS, - Provisioner: p.provisionerS, - Trust: p.trustS, - Users: p.usersS, - Access: p.accessS, - DynamicAccess: p.dynamicAccessS, - Presence: p.presenceS, - AppSession: p.appSessionS, - WebSession: p.webSessionS, - WebToken: p.webTokenS, - Restrictions: p.restrictions, - MaxRetryPeriod: 200 * time.Millisecond, - EventsC: p.eventsC, + Context: ctx, + Backend: p.cacheBackend, + Events: p.eventsS, + ClusterConfig: p.clusterConfigS, + Provisioner: p.provisionerS, + Trust: p.trustS, + Users: p.usersS, + Access: p.accessS, + DynamicAccess: p.dynamicAccessS, + Presence: p.presenceS, + AppSession: p.appSessionS, + WebSession: p.webSessionS, + WebToken: p.webTokenS, + Restrictions: p.restrictions, + MaxRetryPeriod: 200 * time.Millisecond, + EventsC: p.eventsC, })) require.NoError(t, err) From 62330ace96187a0df9b309a74f59a161374b13ef Mon Sep 17 00:00:00 2001 From: rosstimothy <39066650+rosstimothy@users.noreply.github.com> Date: Wed, 5 Jan 2022 12:33:58 -0500 Subject: [PATCH 3/5] Fix Flaky Retry Tests (#9516) Fix flaky unit tests Addresses issues causing failures in TestCache_Backoff, TestTeleportProcess_reconnectToAuth and TestResourceWatcher_Backoff. By utilizing FakeClock.BlockUntil tests ensure that the clock will not be advanced until retry.After has been called. Move retry duration channels to config in order to allow them to be buffered by tests. --- lib/cache/cache_test.go | 5 ++++- lib/service/cfg.go | 4 ++++ lib/service/connect.go | 2 +- lib/service/service.go | 4 ---- lib/service/service_test.go | 10 ++++++++-- lib/services/watcher.go | 9 ++++++--- lib/services/watcher_test.go | 7 ++++++- 7 files changed, 29 insertions(+), 12 deletions(-) diff --git a/lib/cache/cache_test.go b/lib/cache/cache_test.go index b9e783c9df81d..71c3a267fa3ab 100644 --- a/lib/cache/cache_test.go +++ b/lib/cache/cache_test.go @@ -1861,8 +1861,11 @@ func TestCache_Backoff(t *testing.T) { require.GreaterOrEqual(t, duration, stepMin) require.LessOrEqual(t, duration, stepMax) + // wait for cache to get to retry.After + clock.BlockUntil(1) + // add some extra to the duration to ensure the retry occurs - clock.Advance(duration * 3) + clock.Advance(p.cache.MaxRetryPeriod) case <-time.After(time.Minute): t.Fatalf("timeout waiting for event") } diff --git a/lib/service/cfg.go b/lib/service/cfg.go index d743ee0ec5bb7..4813816c129a6 100644 --- a/lib/service/cfg.go +++ b/lib/service/cfg.go @@ -240,6 +240,9 @@ type Config struct { // MaxRetryPeriod is the maximum period between reconnection attempts to auth MaxRetryPeriod time.Duration + + // ConnectFailureC is a channel to notify of failures to connect to auth (used in tests). + ConnectFailureC chan time.Duration } // ApplyToken assigns a given token to all internal services but only if token @@ -909,6 +912,7 @@ func ApplyDefaults(cfg *Config) { Time: defaults.ConnectionErrorMeasurementPeriod, } cfg.MaxRetryPeriod = defaults.MaxWatcherBackoff + cfg.ConnectFailureC = make(chan time.Duration, 1) } // ApplyFIPSDefaults updates default configuration to be FedRAMP/FIPS 140-2 diff --git a/lib/service/connect.go b/lib/service/connect.go index b999083cb4b30..d2fa9483272e7 100644 --- a/lib/service/connect.go +++ b/lib/service/connect.go @@ -77,7 +77,7 @@ func (process *TeleportProcess) reconnectToAuthService(role types.SystemRole) (* // Used for testing that auth service will attempt to reconnect in the provided duration. select { - case process.connectFailureC <- retry.Duration(): + case process.Config.ConnectFailureC <- retry.Duration(): default: } diff --git a/lib/service/service.go b/lib/service/service.go index 11cd1d3145c21..d6bfb37355af3 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -302,9 +302,6 @@ type TeleportProcess struct { // clusterFeatures contain flags for supported and unsupported features. clusterFeatures proto.Features - - // connectFailureC is a channel to notify of failures to connect to auth (used in tests). - connectFailureC chan time.Duration } type keyPairKey struct { @@ -685,7 +682,6 @@ func NewTeleport(cfg *Config) (*TeleportProcess, error) { id: processID, keyPairs: make(map[keyPairKey]KeyPair), appDependCh: make(chan Event, 1024), - connectFailureC: make(chan time.Duration), } process.registerAppDepend() diff --git a/lib/service/service_test.go b/lib/service/service_test.go index c68698e54cb1f..9c26a8439f870 100644 --- a/lib/service/service_test.go +++ b/lib/service/service_test.go @@ -464,6 +464,8 @@ func TestTeleportProcess_reconnectToAuth(t *testing.T) { cfg.Auth.Enabled = false cfg.Proxy.Enabled = false cfg.SSH.Enabled = true + cfg.MaxRetryPeriod = defaults.MaxWatcherBackoff + cfg.ConnectFailureC = make(chan time.Duration, 5) process, err := NewTeleport(cfg) require.NoError(t, err) @@ -480,14 +482,18 @@ func TestTeleportProcess_reconnectToAuth(t *testing.T) { for i := 0; i < 5; i++ { // wait for connection to fail select { - case duration := <-process.connectFailureC: + case duration := <-process.Config.ConnectFailureC: stepMin := step * time.Duration(i) / 2 stepMax := step * time.Duration(i+1) require.GreaterOrEqual(t, duration, stepMin) require.LessOrEqual(t, duration, stepMax) + + // wait for connection to get to retry.After + clock.BlockUntil(1) + // add some extra to the duration to ensure the retry occurs - clock.Advance(duration * 3) + clock.Advance(cfg.MaxRetryPeriod) case <-time.After(time.Minute): t.Fatalf("timeout waiting for failure") } diff --git a/lib/services/watcher.go b/lib/services/watcher.go index 735db9741a5a7..de24bb81b8bac 100644 --- a/lib/services/watcher.go +++ b/lib/services/watcher.go @@ -64,6 +64,8 @@ type ResourceWatcherConfig struct { // MaxStaleness is a maximum acceptable staleness for the locally maintained // resources, zero implies no staleness detection. MaxStaleness time.Duration + // ResetC is a channel to notify of internal watcher reset (used in tests). + ResetC chan time.Duration } // CheckAndSetDefaults checks parameters and sets default values. @@ -86,6 +88,9 @@ func (cfg *ResourceWatcherConfig) CheckAndSetDefaults() error { if cfg.Client == nil { return trace.BadParameter("missing parameter Client") } + if cfg.ResetC == nil { + cfg.ResetC = make(chan time.Duration, 1) + } return nil } @@ -112,7 +117,6 @@ func newResourceWatcher(ctx context.Context, collector resourceCollector, cfg Re cancel: cancel, retry: retry, LoopC: make(chan struct{}), - ResetC: make(chan time.Duration, 1), StaleC: make(chan struct{}, 1), } go p.runWatchLoop() @@ -140,8 +144,7 @@ type resourceWatcher struct { // LoopC is a channel to check whether the watch loop is running // (used in tests). LoopC chan struct{} - // ResetC is a channel to notify of internal watcher reset (used in tests). - ResetC chan time.Duration + // StaleC is a channel that can trigger the condition of resource staleness // (used in tests). StaleC chan struct{} diff --git a/lib/services/watcher_test.go b/lib/services/watcher_test.go index b0cc4813ab4ac..314d5f2a73bf5 100644 --- a/lib/services/watcher_test.go +++ b/lib/services/watcher_test.go @@ -66,6 +66,7 @@ func TestResourceWatcher_Backoff(t *testing.T) { Clock: clock, MaxRetryPeriod: defaults.MaxWatcherBackoff, Client: &errorWatcher{}, + ResetC: make(chan time.Duration, 5), }, ProxyGetter: &nopProxyGetter{}, }) @@ -82,8 +83,12 @@ func TestResourceWatcher_Backoff(t *testing.T) { require.GreaterOrEqual(t, duration, stepMin) require.LessOrEqual(t, duration, stepMax) + + // wait for watcher to get to retry.After + clock.BlockUntil(1) + // add some extra to the duration to ensure the retry occurs - clock.Advance(duration * 3) + clock.Advance(w.MaxRetryPeriod) case <-time.After(time.Minute): t.Fatalf("timeout waiting for reset") } From dc4f1233e59da0c69a8722afa393dd1f72e52651 Mon Sep 17 00:00:00 2001 From: Tim Ross Date: Thu, 20 Jan 2022 16:39:57 -0500 Subject: [PATCH 4/5] fix build --- lib/cache/cache_test.go | 47 ++++++++++++++++++++----------------- lib/service/service_test.go | 1 - 2 files changed, 26 insertions(+), 22 deletions(-) diff --git a/lib/cache/cache_test.go b/lib/cache/cache_test.go index 71c3a267fa3ab..0284fa69f821a 100644 --- a/lib/cache/cache_test.go +++ b/lib/cache/cache_test.go @@ -96,6 +96,10 @@ func newPackForProxy(t *testing.T) *testPack { return newTestPack(t, ForProxy) } +func newPackForOldRemoteProxy(t *testing.T) *testPack { + return newTestPack(t, ForOldRemoteProxy) +} + func newPackForNode(t *testing.T) *testPack { return newTestPack(t, ForNode) } @@ -113,7 +117,7 @@ func newTestPackWithoutCache(t *testing.T) *testPack { } // newPackWithoutCache returns a new test pack without creating cache -func newPackWithoutCache(dir string, opts ...packOption) (*testPack, error) { +func newPackWithoutCache(dir string) (*testPack, error) { ctx := context.Background() p := &testPack{ dataDir: dir, @@ -162,7 +166,7 @@ func newPackWithoutCache(dir string, opts ...packOption) (*testPack, error) { // newPack returns a new test pack or fails the test on error func newPack(dir string, setupConfig func(c Config) Config) (*testPack, error) { ctx := context.Background() - p, err := newPackWithoutCache(dir, opts...) + p, err := newPackWithoutCache(dir) if err != nil { return nil, trace.Wrap(err) } @@ -965,9 +969,9 @@ func TestClusterName(t *testing.T) { // TestClusterConfig tests cluster configuration // DELETE IN 8.0.0 -func (s *CacheSuite) TestClusterConfig(c *check.C) { +func TestClusterConfig(t *testing.T) { ctx := context.Background() - p := s.newPackForOldRemoteProxy(c) + p := newPackForOldRemoteProxy(t) defer p.Close() // Since changes to configuration-related resources trigger ClusterConfig events @@ -978,61 +982,62 @@ func (s *CacheSuite) TestClusterConfig(c *check.C) { for { select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) if event.Event.Resource.GetKind() == types.KindClusterConfig { continue } - c.Assert(event.Event.Resource.GetKind(), check.Equals, resourceKind) + require.Equal(t, resourceKind, event.Event.Resource.GetKind()) return case <-timeC: - c.Fatalf("Timeout waiting for update to resource %v", resourceKind) + t.Fatalf("Timeout waiting for update to resource %v", resourceKind) } } } err := p.clusterConfigS.SetClusterNetworkingConfig(ctx, types.DefaultClusterNetworkingConfig()) - c.Assert(err, check.IsNil) + require.NoError(t, err) err = p.clusterConfigS.SetAuthPreference(ctx, types.DefaultAuthPreference()) - c.Assert(err, check.IsNil) + require.NoError(t, err) waitForEventIgnoreClusterConfig(types.KindClusterAuthPreference) err = p.clusterConfigS.SetSessionRecordingConfig(ctx, types.DefaultSessionRecordingConfig()) - c.Assert(err, check.IsNil) + require.NoError(t, err) auditConfig, err := types.NewClusterAuditConfig(types.ClusterAuditConfigSpecV2{ AuditEventsURI: []string{"dynamodb://audit_table_name", "file:///home/log"}, }) - c.Assert(err, check.IsNil) + require.NoError(t, err) err = p.clusterConfigS.SetClusterAuditConfig(ctx, auditConfig) - c.Assert(err, check.IsNil) + require.NoError(t, err) clusterName, err := services.NewClusterNameWithRandomID(types.ClusterNameSpecV2{ ClusterName: "example.com", }) - c.Assert(err, check.IsNil) + require.NoError(t, err) err = p.clusterConfigS.SetClusterName(clusterName) - c.Assert(err, check.IsNil) + require.NoError(t, err) waitForEventIgnoreClusterConfig(types.KindClusterName) err = p.clusterConfigS.SetClusterConfig(types.DefaultClusterConfig()) - c.Assert(err, check.IsNil) + require.NoError(t, err) clusterConfig, err := p.clusterConfigS.GetClusterConfig() - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) - c.Assert(event.Event.Resource.GetKind(), check.Equals, types.KindClusterConfig) + require.Equal(t, EventProcessed, event.Type) + require.Equal(t, types.KindClusterConfig, event.Event.Resource.GetKind()) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err := p.cache.GetClusterConfig() - c.Assert(err, check.IsNil) + require.NoError(t, err) clusterConfig.SetResourceID(out.GetResourceID()) - fixtures.DeepCompare(c, clusterConfig, out) + + require.Empty(t, cmp.Diff(clusterConfig, out)) } // TestNamespaces tests caching of namespaces diff --git a/lib/service/service_test.go b/lib/service/service_test.go index 9c26a8439f870..895d512852af0 100644 --- a/lib/service/service_test.go +++ b/lib/service/service_test.go @@ -21,7 +21,6 @@ import ( "io/ioutil" "net/http" "os" - "strings" "sync" "testing" "time" From a7f612f91e7421a035edd9b59675a03419a8e2ca Mon Sep 17 00:00:00 2001 From: Tim Ross Date: Thu, 20 Jan 2022 17:13:02 -0500 Subject: [PATCH 5/5] fix stale watcher test --- lib/services/watcher.go | 21 ++++++++++++--------- lib/services/watcher_test.go | 11 +++++++++-- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/lib/services/watcher.go b/lib/services/watcher.go index de24bb81b8bac..82283a94772cd 100644 --- a/lib/services/watcher.go +++ b/lib/services/watcher.go @@ -188,6 +188,16 @@ func (p *resourceWatcher) runWatchLoop() { default: } + if err != nil && p.failureStartedAt.IsZero() { + // Note that failureStartedAt is zeroed in the watch routine immediately + // after the local resource set has been successfully updated. + p.failureStartedAt = p.Clock.Now() + } + if p.hasStaleView() { + p.Log.Warningf("Maximum staleness of %v exceeded, failure started at %v.", p.MaxStaleness, p.failureStartedAt) + p.collector.notifyStale() + } + // Used for testing that the watch routine has exited and is about // to be restarted. select { @@ -206,15 +216,8 @@ func (p *resourceWatcher) runWatchLoop() { } if err != nil { p.Log.Warningf("Restart watch on error: %v.", err) - if p.failureStartedAt.IsZero() { - p.failureStartedAt = p.Clock.Now() - } - // failureStartedAt is zeroed in the watch routine immediately after - // the local resource set has been successfully updated. - } - if p.hasStaleView() { - p.Log.Warningf("Maximum staleness of %v exceeded, failure started at %v.", p.MaxStaleness, p.failureStartedAt) - p.collector.notifyStale() + } else { + p.Log.Debug("Triggering scheduled refetch.") } } diff --git a/lib/services/watcher_test.go b/lib/services/watcher_test.go index 314d5f2a73bf5..cade7548f828e 100644 --- a/lib/services/watcher_test.go +++ b/lib/services/watcher_test.go @@ -400,10 +400,16 @@ func TestLockWatcherStale(t *testing.T) { }) require.NoError(t, err) t.Cleanup(w.Close) + select { + case <-w.LoopC: + case <-time.After(15 * time.Second): + t.Fatal("Timeout waiting for LockWatcher loop.") + } // Subscribe to lock watcher updates. target := types.LockTarget{Node: "node"} require.NoError(t, w.CheckLockInForce(constants.LockingModeBestEffort, target)) + require.NoError(t, w.CheckLockInForce(constants.LockingModeStrict, target)) sub, err := w.Subscribe(ctx, target) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, sub.Close()) }) @@ -420,6 +426,7 @@ func TestLockWatcherStale(t *testing.T) { case <-time.After(2 * time.Second): } require.NoError(t, w.CheckLockInForce(constants.LockingModeBestEffort, target)) + require.NoError(t, w.CheckLockInForce(constants.LockingModeStrict, target)) // Advance the clock to exceed LockMaxStaleness. clock.Advance(defaults.LockMaxStaleness + time.Second) @@ -428,7 +435,7 @@ func TestLockWatcherStale(t *testing.T) { require.Equal(t, types.OpUnreliable, event.Type) case <-sub.Done(): t.Fatal("Lock watcher subscription has unexpectedly exited.") - case <-time.After(2 * time.Second): + case <-time.After(15 * time.Second): t.Fatal("Timeout waiting for OpUnreliable.") } require.NoError(t, w.CheckLockInForce(constants.LockingModeBestEffort, target)) @@ -460,7 +467,7 @@ ExpectPut: break ExpectPut case <-sub.Done(): t.Fatal("Lock watcher subscription has unexpectedly exited.") - case <-time.After(2 * time.Second): + case <-time.After(15 * time.Second): t.Fatal("Timeout waiting for OpPut.") } }