From 787bd8c4f11712cd31c1340d49c0de5aeb094c60 Mon Sep 17 00:00:00 2001 From: rosstimothy <39066650+rosstimothy@users.noreply.github.com> Date: Thu, 16 Dec 2021 11:41:08 -0500 Subject: [PATCH] Add jitter and backoff to prevent thundering herd on auth (#9133) Move cache and resourceWatcher watchers from a 10s retry to a jittered backoff retry up to ~1min. Replace the reconnectToAuthService interval with a retry to add jitter and backoff there as well for when a node restarts due to changes introduced in #8102. Fixes #6889. --- integration/helpers.go | 1 + lib/auth/helpers.go | 8 +- lib/cache/cache.go | 32 +- lib/cache/cache_test.go | 935 ++++++++++++++++-------------- lib/defaults/defaults.go | 8 +- lib/restrictedsession/watcher.go | 14 +- lib/reversetunnel/rc_manager.go | 11 +- lib/service/cfg.go | 4 + lib/service/connect.go | 48 +- lib/service/service.go | 16 +- lib/service/service_test.go | 47 ++ lib/services/watcher.go | 41 +- lib/services/watcher_test.go | 78 ++- lib/srv/heartbeat.go | 2 +- lib/srv/monitor_test.go | 6 +- lib/srv/regular/sshserver_test.go | 1 + 16 files changed, 727 insertions(+), 525 deletions(-) diff --git a/integration/helpers.go b/integration/helpers.go index 21bf1c38286f6..dd2c77b4d356a 100644 --- a/integration/helpers.go +++ b/integration/helpers.go @@ -614,6 +614,7 @@ func (i *TeleInstance) GenerateConfig(t *testing.T, trustedSecrets []*InstanceSe tconf.Kube.CheckImpersonationPermissions = nullImpersonationCheck tconf.Keygen = testauthority.New() + tconf.MaxRetryPeriod = defaults.HighResPollingPeriod i.Config = tconf return tconf, nil } diff --git a/lib/auth/helpers.go b/lib/auth/helpers.go index cda8b422e6598..1b60455f2281c 100644 --- a/lib/auth/helpers.go +++ b/lib/auth/helpers.go @@ -35,6 +35,7 @@ import ( authority "github.com/gravitational/teleport/lib/auth/testauthority" "github.com/gravitational/teleport/lib/backend" "github.com/gravitational/teleport/lib/backend/memory" + "github.com/gravitational/teleport/lib/defaults" "github.com/gravitational/teleport/lib/events" "github.com/gravitational/teleport/lib/limiter" "github.com/gravitational/teleport/lib/services" @@ -326,9 +327,10 @@ func NewTestAuthServer(cfg TestAuthServerConfig) (*TestAuthServer, error) { srv.LockWatcher, err = services.NewLockWatcher(ctx, services.LockWatcherConfig{ ResourceWatcherConfig: services.ResourceWatcherConfig{ - Component: teleport.ComponentAuth, - Client: srv.AuthServer, - Clock: cfg.Clock, + Component: teleport.ComponentAuth, + Client: srv.AuthServer, + Clock: cfg.Clock, + MaxRetryPeriod: defaults.HighResPollingPeriod, }, }) if err != nil { diff --git a/lib/cache/cache.go b/lib/cache/cache.go index ac6f4c1ef4550..239375b03e7fe 100644 --- a/lib/cache/cache.go +++ b/lib/cache/cache.go @@ -512,8 +512,8 @@ type Config struct { WindowsDesktops services.WindowsDesktops // Backend is a backend for local cache Backend backend.Backend - // RetryPeriod is a period between cache retries on failures - RetryPeriod time.Duration + // MaxRetryPeriod is the maximum period between cache retries on failures + MaxRetryPeriod time.Duration // WatcherInitTimeout is the maximum acceptable delay for an // OpInit after a watcher has been started (default=1m). WatcherInitTimeout time.Duration @@ -552,8 +552,8 @@ func (c *Config) CheckAndSetDefaults() error { if c.Clock == nil { c.Clock = clockwork.NewRealClock() } - if c.RetryPeriod == 0 { - c.RetryPeriod = defaults.HighResPollingPeriod + if c.MaxRetryPeriod == 0 { + c.MaxRetryPeriod = defaults.MaxWatcherBackoff } if c.WatcherInitTimeout == 0 { c.WatcherInitTimeout = time.Minute @@ -586,6 +586,9 @@ const ( // TombstoneWritten is emitted if cache is closed in a healthy // state and successfully writes its tombstone. TombstoneWritten = "tombstone_written" + // Reloading is emitted when an error occurred watching events + // and the cache is waiting to create a new watcher + Reloading = "reloading_cache" ) // New creates a new instance of Cache @@ -660,8 +663,11 @@ func New(config Config) (*Cache, error) { } retry, err := utils.NewLinear(utils.LinearConfig{ - Step: cs.Config.RetryPeriod / 10, - Max: cs.Config.RetryPeriod, + First: utils.HalfJitter(cs.MaxRetryPeriod / 10), + Step: cs.MaxRetryPeriod / 5, + Max: cs.MaxRetryPeriod, + Jitter: utils.NewHalfJitter(), + Clock: cs.Clock, }) if err != nil { cs.Close() @@ -724,10 +730,20 @@ func (c *Cache) update(ctx context.Context, retry utils.Retry) { if err != nil { c.Warningf("Re-init the cache on error: %v.", err) } + // events cache should be closed as well - c.Debugf("Reloading %v.", retry) + c.Debugf("Reloading cache.") + + c.notify(ctx, Event{Type: Reloading, Event: types.Event{ + Resource: &types.ResourceHeader{ + Kind: retry.Duration().String(), + }, + }}) + + startedWaiting := c.Clock.Now() select { - case <-retry.After(): + case t := <-retry.After(): + c.Debugf("Initiating new watch after waiting %v.", t.Sub(startedWaiting)) retry.Inc() case <-c.ctx.Done(): return diff --git a/lib/cache/cache_test.go b/lib/cache/cache_test.go index c00bf4cd6ba7b..b9ab7c01ee783 100644 --- a/lib/cache/cache_test.go +++ b/lib/cache/cache_test.go @@ -32,20 +32,18 @@ import ( "github.com/gravitational/teleport/lib/backend/lite" "github.com/gravitational/teleport/lib/backend/memory" "github.com/gravitational/teleport/lib/defaults" - "github.com/gravitational/teleport/lib/fixtures" "github.com/gravitational/teleport/lib/services" "github.com/gravitational/teleport/lib/services/local" "github.com/gravitational/teleport/lib/services/suite" "github.com/gravitational/teleport/lib/utils" + "github.com/gravitational/trace" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + "github.com/jonboulle/clockwork" "github.com/pborman/uuid" log "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" - "gopkg.in/check.v1" - - "github.com/gravitational/trace" ) func TestMain(m *testing.M) { @@ -53,13 +51,6 @@ func TestMain(m *testing.M) { os.Exit(m.Run()) } -type CacheSuite struct{} - -var _ = check.Suite(&CacheSuite{}) - -// bootstrap check -func TestState(t *testing.T) { check.TestingT(t) } - // testPack contains pack of // services used for test run type testPack struct { @@ -100,32 +91,32 @@ func (t *testPack) Close() { } } -func (s *CacheSuite) newPackForAuth(c *check.C) *testPack { - return s.newPack(c, ForAuth) +func newPackForAuth(t *testing.T) *testPack { + return newTestPack(t, ForAuth) } -func (s *CacheSuite) newPackForProxy(c *check.C) *testPack { - return s.newPack(c, ForProxy) +func newPackForProxy(t *testing.T) *testPack { + return newTestPack(t, ForProxy) } -func (s *CacheSuite) newPackForNode(c *check.C) *testPack { - return s.newPack(c, ForNode) +func newPackForNode(t *testing.T) *testPack { + return newTestPack(t, ForNode) } -func (s *CacheSuite) newPack(c *check.C, setupConfig SetupConfigFn) *testPack { - pack, err := newPack(c.MkDir(), setupConfig) - c.Assert(err, check.IsNil) +func newTestPack(t *testing.T, setupConfig SetupConfigFn) *testPack { + pack, err := newPack(t.TempDir(), setupConfig) + require.NoError(t, err) return pack } -func (s *CacheSuite) newPackWithoutCache(c *check.C, setupConfig SetupConfigFn) *testPack { - pack, err := newPackWithoutCache(c.MkDir(), setupConfig) - c.Assert(err, check.IsNil) +func newTestPackWithoutCache(t *testing.T) *testPack { + pack, err := newPackWithoutCache(t.TempDir()) + require.NoError(t, err) return pack } // newPackWithoutCache returns a new test pack without creating cache -func newPackWithoutCache(dir string, ssetupConfig SetupConfigFn) (*testPack, error) { +func newPackWithoutCache(dir string, opts ...packOption) (*testPack, error) { ctx := context.Background() p := &testPack{ dataDir: dir, @@ -177,7 +168,7 @@ func newPackWithoutCache(dir string, ssetupConfig SetupConfigFn) (*testPack, err // newPack returns a new test pack or fails the test on error func newPack(dir string, setupConfig func(c Config) Config) (*testPack, error) { ctx := context.Background() - p, err := newPackWithoutCache(dir, setupConfig) + p, err := newPackWithoutCache(dir, opts...) if err != nil { return nil, trace.Wrap(err) } @@ -200,7 +191,7 @@ func newPack(dir string, setupConfig func(c Config) Config) (*testPack, error) { Apps: p.apps, Databases: p.databases, WindowsDesktops: p.windowsDesktops, - RetryPeriod: 200 * time.Millisecond, + MaxRetryPeriod: 200 * time.Millisecond, EventsC: p.eventsC, })) if err != nil { @@ -219,44 +210,44 @@ func newPack(dir string, setupConfig func(c Config) Config) (*testPack, error) { } // TestCA tests certificate authorities -func (s *CacheSuite) TestCA(c *check.C) { - p := s.newPackForAuth(c) - defer p.Close() +func TestCA(t *testing.T) { + p := newPackForAuth(t) + t.Cleanup(p.Close) ca := suite.NewTestCA(types.UserCA, "example.com") - c.Assert(p.trustS.UpsertCertAuthority(ca), check.IsNil) + require.NoError(t, p.trustS.UpsertCertAuthority(ca)) select { case <-p.eventsC: case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err := p.cache.GetCertAuthority(ca.GetID(), true) - c.Assert(err, check.IsNil) + require.NoError(t, err) ca.SetResourceID(out.GetResourceID()) - fixtures.DeepCompare(c, ca, out) + require.Empty(t, cmp.Diff(ca, out)) err = p.trustS.DeleteCertAuthority(ca.GetID()) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case <-p.eventsC: case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } _, err = p.cache.GetCertAuthority(ca.GetID(), false) - fixtures.ExpectNotFound(c, err) + require.True(t, trace.IsNotFound(err)) } // TestWatchers tests watchers connected to the cache, // verifies that all watchers of the cache will be closed // if the underlying watcher to the target backend is closed -func (s *CacheSuite) TestWatchers(c *check.C) { +func TestWatchers(t *testing.T) { ctx := context.Background() - p := s.newPackForAuth(c) - defer p.Close() + p := newPackForAuth(t) + t.Cleanup(p.Close) w, err := p.cache.NewWatcher(ctx, types.Watch{Kinds: []types.WatchKind{ { @@ -269,58 +260,60 @@ func (s *CacheSuite) TestWatchers(c *check.C) { }, }, }}) - c.Assert(err, check.IsNil) - defer w.Close() + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, w.Close()) + }) select { case e := <-w.Events(): - c.Assert(e.Type, check.Equals, types.OpInit) + require.Equal(t, types.OpInit, e.Type) case <-time.After(100 * time.Millisecond): - c.Fatalf("Timeout waiting for event.") + t.Fatalf("Timeout waiting for event.") } ca := suite.NewTestCA(types.UserCA, "example.com") - c.Assert(p.trustS.UpsertCertAuthority(ca), check.IsNil) + require.NoError(t, p.trustS.UpsertCertAuthority(ca)) select { case e := <-w.Events(): - c.Assert(e.Type, check.Equals, types.OpPut) - c.Assert(e.Resource.GetKind(), check.Equals, types.KindCertAuthority) + require.Equal(t, types.OpPut, e.Type) + require.Equal(t, types.KindCertAuthority, e.Resource.GetKind()) case <-time.After(time.Second): - c.Fatalf("Timeout waiting for event.") + t.Fatalf("Timeout waiting for event.") } // create an access request that matches the supplied filter req, err := services.NewAccessRequest("alice", "dictator") - c.Assert(err, check.IsNil) + require.NoError(t, err) - c.Assert(p.dynamicAccessS.CreateAccessRequest(ctx, req), check.IsNil) + require.NoError(t, p.dynamicAccessS.CreateAccessRequest(ctx, req)) select { case e := <-w.Events(): - c.Assert(e.Type, check.Equals, types.OpPut) - c.Assert(e.Resource.GetKind(), check.Equals, types.KindAccessRequest) + require.Equal(t, types.OpPut, e.Type) + require.Equal(t, types.KindAccessRequest, e.Resource.GetKind()) case <-time.After(time.Second): - c.Fatalf("Timeout waiting for event.") + t.Fatalf("Timeout waiting for event.") } - c.Assert(p.dynamicAccessS.DeleteAccessRequest(ctx, req.GetName()), check.IsNil) + require.NoError(t, p.dynamicAccessS.DeleteAccessRequest(ctx, req.GetName())) select { case e := <-w.Events(): - c.Assert(e.Type, check.Equals, types.OpDelete) - c.Assert(e.Resource.GetKind(), check.Equals, types.KindAccessRequest) + require.Equal(t, types.OpDelete, e.Type) + require.Equal(t, types.KindAccessRequest, e.Resource.GetKind()) case <-time.After(time.Second): - c.Fatalf("Timeout waiting for event.") + t.Fatalf("Timeout waiting for event.") } // create an access request that does not match the supplied filter req2, err := services.NewAccessRequest("bob", "dictator") - c.Assert(err, check.IsNil) + require.NoError(t, err) // create and then delete the non-matching request. - c.Assert(p.dynamicAccessS.CreateAccessRequest(ctx, req2), check.IsNil) - c.Assert(p.dynamicAccessS.DeleteAccessRequest(ctx, req2.GetName()), check.IsNil) + require.NoError(t, p.dynamicAccessS.CreateAccessRequest(ctx, req2)) + require.NoError(t, p.dynamicAccessS.DeleteAccessRequest(ctx, req2.GetName())) // because our filter did not match the request, the create event should never // have been created, meaning that the next event on the pipe is the delete @@ -328,10 +321,10 @@ func (s *CacheSuite) TestWatchers(c *check.C) { // a delete event). select { case e := <-w.Events(): - c.Assert(e.Type, check.Equals, types.OpDelete) - c.Assert(e.Resource.GetKind(), check.Equals, types.KindAccessRequest) + require.Equal(t, types.OpDelete, e.Type) + require.Equal(t, types.KindAccessRequest, e.Resource.GetKind()) case <-time.After(time.Second): - c.Fatalf("Timeout waiting for event.") + t.Fatalf("Timeout waiting for event.") } // event has arrived, now close the watchers @@ -341,15 +334,15 @@ func (s *CacheSuite) TestWatchers(c *check.C) { select { case <-w.Done(): case <-time.After(time.Second): - c.Fatalf("Timeout waiting for close event.") + t.Fatalf("Timeout waiting for close event.") } } -func waitForRestart(c *check.C, eventsC <-chan Event) { - waitForEvent(c, eventsC, WatcherStarted, WatcherFailed) +func waitForRestart(t *testing.T, eventsC <-chan Event) { + waitForEvent(t, eventsC, WatcherStarted, Reloading, WatcherFailed) } -func waitForEvent(c *check.C, eventsC <-chan Event, expectedEvent string, skipEvents ...string) { +func waitForEvent(t *testing.T, eventsC <-chan Event, expectedEvent string, skipEvents ...string) { timeC := time.After(5 * time.Second) for { // wait for watcher to restart @@ -358,27 +351,27 @@ func waitForEvent(c *check.C, eventsC <-chan Event, expectedEvent string, skipEv if apiutils.SliceContainsStr(skipEvents, event.Type) { continue } - c.Assert(event.Type, check.Equals, expectedEvent) + require.Equal(t, expectedEvent, event.Type) return case <-timeC: - c.Fatalf("Timeout waiting for expected event: %s", expectedEvent) + t.Fatalf("Timeout waiting for expected event: %s", expectedEvent) } } } // TestCompletenessInit verifies that flaky backends don't cause // the cache to return partial results during init. -func (s *CacheSuite) TestCompletenessInit(c *check.C) { +func TestCompletenessInit(t *testing.T) { ctx := context.Background() const caCount = 100 const inits = 20 - p := s.newPackWithoutCache(c, ForAuth) - defer p.Close() + p := newTestPackWithoutCache(t) + t.Cleanup(p.Close) // put lots of CAs in the backend for i := 0; i < caCount; i++ { ca := suite.NewTestCA(types.UserCA, fmt.Sprintf("%d.example.com", i)) - c.Assert(p.trustS.UpsertCertAuthority(ca), check.IsNil) + require.NoError(t, p.trustS.UpsertCertAuthority(ca)) } for i := 0; i < inits; i++ { @@ -389,7 +382,7 @@ func (s *CacheSuite) TestCompletenessInit(c *check.C) { Context: ctx, Mirror: true, }) - c.Assert(err, check.IsNil) + require.NoError(t, err) // simulate bad connection to auth server p.backend.SetReadError(trace.ConnectionProblem(nil, "backend is unavailable")) @@ -413,10 +406,10 @@ func (s *CacheSuite) TestCompletenessInit(c *check.C) { Apps: p.apps, Databases: p.databases, WindowsDesktops: p.windowsDesktops, - RetryPeriod: 200 * time.Millisecond, + MaxRetryPeriod: 200 * time.Millisecond, EventsC: p.eventsC, })) - c.Assert(err, check.IsNil) + require.NoError(t, err) p.backend.SetReadError(nil) @@ -425,31 +418,31 @@ func (s *CacheSuite) TestCompletenessInit(c *check.C) { // the CA list. for the purposes of this test, we just care that it // doesn't return the CA list *unless* it was successfully constructed. if err == nil { - c.Assert(len(cas), check.Equals, caCount) + require.Len(t, cas, caCount) } else { - fixtures.ExpectConnectionProblem(c, err) + require.True(t, trace.IsConnectionProblem(err)) } - c.Assert(p.cache.Close(), check.IsNil) + require.NoError(t, p.cache.Close()) p.cache = nil - c.Assert(p.cacheBackend.Close(), check.IsNil) + require.NoError(t, p.cacheBackend.Close()) p.cacheBackend = nil } } // TestCompletenessReset verifies that flaky backends don't cause // the cache to return partial results during reset. -func (s *CacheSuite) TestCompletenessReset(c *check.C) { +func TestCompletenessReset(t *testing.T) { ctx := context.Background() const caCount = 100 const resets = 20 - p := s.newPackWithoutCache(c, ForAuth) - defer p.Close() + p := newTestPackWithoutCache(t) + t.Cleanup(p.Close) // put lots of CAs in the backend for i := 0; i < caCount; i++ { ca := suite.NewTestCA(types.UserCA, fmt.Sprintf("%d.example.com", i)) - c.Assert(p.trustS.UpsertCertAuthority(ca), check.IsNil) + require.NoError(t, p.trustS.UpsertCertAuthority(ca)) } var err error @@ -471,15 +464,15 @@ func (s *CacheSuite) TestCompletenessReset(c *check.C) { Apps: p.apps, Databases: p.databases, WindowsDesktops: p.windowsDesktops, - RetryPeriod: 200 * time.Millisecond, + MaxRetryPeriod: 200 * time.Millisecond, EventsC: p.eventsC, })) - c.Assert(err, check.IsNil) + require.NoError(t, err) // verify that CAs are immediately available cas, err := p.cache.GetCertAuthorities(types.UserCA, false) - c.Assert(err, check.IsNil) - c.Assert(len(cas), check.Equals, caCount) + require.NoError(t, err) + require.Len(t, cas, caCount) for i := 0; i < resets; i++ { // simulate bad connection to auth server @@ -493,9 +486,9 @@ func (s *CacheSuite) TestCompletenessReset(c *check.C) { // the CA list. for the purposes of this test, we just care that it // doesn't return the CA list *unless* it was successfully constructed. if err == nil { - c.Assert(len(cas), check.Equals, caCount) + require.Len(t, cas, caCount) } else { - fixtures.ExpectConnectionProblem(c, err) + require.True(t, trace.IsConnectionProblem(err)) } } } @@ -503,16 +496,16 @@ func (s *CacheSuite) TestCompletenessReset(c *check.C) { // TestTombstones verifies that healthy caches leave tombstones // on closure, giving new caches the ability to start from a known // good state if the origin state is unavailable. -func (s *CacheSuite) TestTombstones(c *check.C) { +func TestTombstones(t *testing.T) { ctx := context.Background() const caCount = 10 - p := s.newPackWithoutCache(c, ForAuth) - defer p.Close() + p := newTestPackWithoutCache(t) + t.Cleanup(p.Close) // put lots of CAs in the backend for i := 0; i < caCount; i++ { ca := suite.NewTestCA(types.UserCA, fmt.Sprintf("%d.example.com", i)) - c.Assert(p.trustS.UpsertCertAuthority(ca), check.IsNil) + require.NoError(t, p.trustS.UpsertCertAuthority(ca)) } var err error @@ -534,19 +527,19 @@ func (s *CacheSuite) TestTombstones(c *check.C) { Apps: p.apps, Databases: p.databases, WindowsDesktops: p.windowsDesktops, - RetryPeriod: 200 * time.Millisecond, + MaxRetryPeriod: 200 * time.Millisecond, EventsC: p.eventsC, })) - c.Assert(err, check.IsNil) + require.NoError(t, err) // verify that CAs are immediately available cas, err := p.cache.GetCertAuthorities(types.UserCA, false) - c.Assert(err, check.IsNil) - c.Assert(len(cas), check.Equals, caCount) + require.NoError(t, err) + require.Len(t, cas, caCount) - c.Assert(p.cache.Close(), check.IsNil) + require.NoError(t, p.cache.Close()) // wait for TombstoneWritten, ignoring all other event types - waitForEvent(c, p.eventsC, TombstoneWritten, WatcherStarted, EventProcessed, WatcherFailed) + waitForEvent(t, p.eventsC, TombstoneWritten, WatcherStarted, EventProcessed, WatcherFailed) // simulate bad connection to auth server p.backend.SetReadError(trace.ConnectionProblem(nil, "backend is unavailable")) p.eventsS.closeWatchers() @@ -569,23 +562,23 @@ func (s *CacheSuite) TestTombstones(c *check.C) { Apps: p.apps, Databases: p.databases, WindowsDesktops: p.windowsDesktops, - RetryPeriod: 200 * time.Millisecond, + MaxRetryPeriod: 200 * time.Millisecond, EventsC: p.eventsC, })) - c.Assert(err, check.IsNil) + require.NoError(t, err) // verify that CAs are immediately available despite the fact // that the origin state was never available. cas, err = p.cache.GetCertAuthorities(types.UserCA, false) - c.Assert(err, check.IsNil) - c.Assert(len(cas), check.Equals, caCount) + require.NoError(t, err) + require.Len(t, cas, caCount) } // TestInitStrategy verifies that cache uses expected init strategy // of serving backend state when init is taking too long. -func (s *CacheSuite) TestInitStrategy(c *check.C) { +func TestInitStrategy(t *testing.T) { for i := 0; i < utils.GetIterations(); i++ { - s.initStrategy(c) + initStrategy(t) } } @@ -598,9 +591,9 @@ func TestListNodesTTLVariant(t *testing.T) { ctx := context.Background() - p, err := newPackWithoutCache(t.TempDir(), ForAuth) + p, err := newPackWithoutCache(t.TempDir()) require.NoError(t, err) - defer p.Close() + t.Cleanup(p.Close) p.cache, err = New(ForAuth(Config{ Context: ctx, @@ -620,7 +613,7 @@ func TestListNodesTTLVariant(t *testing.T) { Apps: p.apps, Databases: p.databases, WindowsDesktops: p.windowsDesktops, - RetryPeriod: 200 * time.Millisecond, + MaxRetryPeriod: 200 * time.Millisecond, EventsC: p.eventsC, neverOK: true, // ensure reads are never healthy })) @@ -664,10 +657,10 @@ func TestListNodesTTLVariant(t *testing.T) { require.Len(t, nodes, nodeCount) } -func (s *CacheSuite) initStrategy(c *check.C) { +func initStrategy(t *testing.T) { ctx := context.Background() - p := s.newPackWithoutCache(c, ForAuth) - defer p.Close() + p := newTestPackWithoutCache(t) + t.Cleanup(p.Close) p.backend.SetReadError(trace.ConnectionProblem(nil, "backend is out")) var err error @@ -689,23 +682,23 @@ func (s *CacheSuite) initStrategy(c *check.C) { Apps: p.apps, Databases: p.databases, WindowsDesktops: p.windowsDesktops, - RetryPeriod: 200 * time.Millisecond, + MaxRetryPeriod: 200 * time.Millisecond, EventsC: p.eventsC, })) - c.Assert(err, check.IsNil) + require.NoError(t, err) _, err = p.cache.GetCertAuthorities(types.UserCA, false) - fixtures.ExpectConnectionProblem(c, err) + require.True(t, trace.IsConnectionProblem(err)) ca := suite.NewTestCA(types.UserCA, "example.com") // NOTE 1: this could produce event processed // below, based on whether watcher restarts to get the event // or not, which is normal, but has to be accounted below - c.Assert(p.trustS.UpsertCertAuthority(ca), check.IsNil) + require.NoError(t, p.trustS.UpsertCertAuthority(ca)) p.backend.SetReadError(nil) // wait for watcher to restart - waitForRestart(c, p.eventsC) + waitForRestart(t, p.eventsC) normalizeCA := func(ca types.CertAuthority) types.CertAuthority { ca = ca.Clone() @@ -714,10 +707,11 @@ func (s *CacheSuite) initStrategy(c *check.C) { types.RemoveCASecrets(ca) return ca } + _ = normalizeCA out, err := p.cache.GetCertAuthority(ca.GetID(), false) - c.Assert(err, check.IsNil) - fixtures.DeepCompare(c, normalizeCA(ca), normalizeCA(out)) + require.NoError(t, err) + require.Empty(t, cmp.Diff(normalizeCA(ca), normalizeCA(out))) // fail again, make sure last recent data is still served // on errors @@ -726,17 +720,17 @@ func (s *CacheSuite) initStrategy(c *check.C) { // wait for the watcher to fail // there could be optional event processed event, // see NOTE 1 above - waitForEvent(c, p.eventsC, WatcherFailed, EventProcessed) + waitForEvent(t, p.eventsC, WatcherFailed, EventProcessed, Reloading) // backend is out, but old value is available out2, err := p.cache.GetCertAuthority(ca.GetID(), false) - c.Assert(err, check.IsNil) - c.Assert(out.GetResourceID(), check.Equals, out2.GetResourceID()) - fixtures.DeepCompare(c, normalizeCA(ca), normalizeCA(out)) + require.NoError(t, err) + require.Equal(t, out.GetResourceID(), out2.GetResourceID()) + require.Empty(t, cmp.Diff(normalizeCA(ca), normalizeCA(out))) // add modification and expect the resource to recover ca.SetRoleMap(types.RoleMap{types.RoleMapping{Remote: "test", Local: []string{"local-test"}}}) - c.Assert(p.trustS.UpsertCertAuthority(ca), check.IsNil) + require.NoError(t, p.trustS.UpsertCertAuthority(ca)) // now, recover the backend and make sure the // service is back and the new value has propagated @@ -744,66 +738,61 @@ func (s *CacheSuite) initStrategy(c *check.C) { // wait for watcher to restart successfully; ignoring any failed // attempts which occurred before backend became healthy again. - waitForEvent(c, p.eventsC, WatcherStarted, WatcherFailed) + waitForEvent(t, p.eventsC, WatcherStarted, WatcherFailed, Reloading) // new value is available now out, err = p.cache.GetCertAuthority(ca.GetID(), false) - c.Assert(err, check.IsNil) - fixtures.DeepCompare(c, normalizeCA(ca), normalizeCA(out)) + require.NoError(t, err) + require.Empty(t, cmp.Diff(normalizeCA(ca), normalizeCA(out))) } // TestRecovery tests error recovery scenario -func (s *CacheSuite) TestRecovery(c *check.C) { - p := s.newPackForAuth(c) - defer p.Close() +func TestRecovery(t *testing.T) { + p := newPackForAuth(t) + t.Cleanup(p.Close) ca := suite.NewTestCA(types.UserCA, "example.com") - c.Assert(p.trustS.UpsertCertAuthority(ca), check.IsNil) + require.NoError(t, p.trustS.UpsertCertAuthority(ca)) select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } // event has arrived, now close the watchers watchers := p.eventsS.getWatchers() - c.Assert(watchers, check.HasLen, 1) + require.Len(t, watchers, 1) p.eventsS.closeWatchers() // wait for watcher to restart - select { - case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, WatcherStarted) - case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") - } + waitForRestart(t, p.eventsC) // add modification and expect the resource to recover ca2 := suite.NewTestCA(types.UserCA, "example2.com") - c.Assert(p.trustS.UpsertCertAuthority(ca2), check.IsNil) + require.NoError(t, p.trustS.UpsertCertAuthority(ca2)) // wait for watcher to receive an event select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err := p.cache.GetCertAuthority(ca2.GetID(), false) - c.Assert(err, check.IsNil) + require.NoError(t, err) ca2.SetResourceID(out.GetResourceID()) types.RemoveCASecrets(ca2) - fixtures.DeepCompare(c, ca2, out) + require.Empty(t, cmp.Diff(ca2, out)) } // TestTokens tests static and dynamic tokens -func (s *CacheSuite) TestTokens(c *check.C) { +func TestTokens(t *testing.T) { ctx := context.Background() - p := s.newPackForAuth(c) - defer p.Close() + p := newPackForAuth(t) + t.Cleanup(p.Close) staticTokens, err := types.NewStaticTokens(types.StaticTokensSpecV2{ StaticTokens: []types.ProvisionTokenV1{ @@ -814,318 +803,318 @@ func (s *CacheSuite) TestTokens(c *check.C) { }, }, }) - c.Assert(err, check.IsNil) + require.NoError(t, err) err = p.clusterConfigS.SetStaticTokens(staticTokens) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err := p.cache.GetStaticTokens() - c.Assert(err, check.IsNil) + require.NoError(t, err) staticTokens.SetResourceID(out.GetResourceID()) - fixtures.DeepCompare(c, staticTokens, out) + require.Empty(t, cmp.Diff(staticTokens, out)) expires := time.Now().Add(10 * time.Hour).Truncate(time.Second).UTC() token, err := types.NewProvisionToken("token", types.SystemRoles{types.RoleAuth, types.RoleNode}, expires) - c.Assert(err, check.IsNil) + require.NoError(t, err) err = p.provisionerS.UpsertToken(ctx, token) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } tout, err := p.cache.GetToken(ctx, token.GetName()) - c.Assert(err, check.IsNil) + require.NoError(t, err) token.SetResourceID(tout.GetResourceID()) - fixtures.DeepCompare(c, token, tout) + require.Empty(t, cmp.Diff(token, tout)) err = p.provisionerS.DeleteToken(ctx, token.GetName()) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } _, err = p.cache.GetToken(ctx, token.GetName()) - fixtures.ExpectNotFound(c, err) + require.True(t, trace.IsNotFound(err)) } -func (s *CacheSuite) TestAuthPreference(c *check.C) { +func TestAuthPreference(t *testing.T) { ctx := context.Background() - p := s.newPackForAuth(c) - defer p.Close() + p := newPackForAuth(t) + t.Cleanup(p.Close) authPref, err := types.NewAuthPreferenceFromConfigFile(types.AuthPreferenceSpecV2{ AllowLocalAuth: types.NewBoolOption(true), MessageOfTheDay: "test MOTD", }) - c.Assert(err, check.IsNil) + require.NoError(t, err) err = p.clusterConfigS.SetAuthPreference(ctx, authPref) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) - c.Assert(event.Event.Resource.GetKind(), check.Equals, types.KindClusterAuthPreference) + require.Equal(t, EventProcessed, event.Type) + require.Equal(t, types.KindClusterAuthPreference, event.Event.Resource.GetKind()) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } outAuthPref, err := p.cache.GetAuthPreference(ctx) - c.Assert(err, check.IsNil) + require.NoError(t, err) authPref.SetResourceID(outAuthPref.GetResourceID()) - fixtures.DeepCompare(c, outAuthPref, authPref) + require.Empty(t, cmp.Diff(outAuthPref, authPref)) } -func (s *CacheSuite) TestClusterNetworkingConfig(c *check.C) { +func TestClusterNetworkingConfig(t *testing.T) { ctx := context.Background() - p := s.newPackForAuth(c) - defer p.Close() + p := newPackForAuth(t) + t.Cleanup(p.Close) netConfig, err := types.NewClusterNetworkingConfigFromConfigFile(types.ClusterNetworkingConfigSpecV2{ ClientIdleTimeout: types.Duration(time.Minute), ClientIdleTimeoutMessage: "test idle timeout message", }) - c.Assert(err, check.IsNil) + require.NoError(t, err) err = p.clusterConfigS.SetClusterNetworkingConfig(ctx, netConfig) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) - c.Assert(event.Event.Resource.GetKind(), check.Equals, types.KindClusterNetworkingConfig) + require.Equal(t, EventProcessed, event.Type) + require.Equal(t, types.KindClusterNetworkingConfig, event.Event.Resource.GetKind()) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } outNetConfig, err := p.cache.GetClusterNetworkingConfig(ctx) - c.Assert(err, check.IsNil) + require.NoError(t, err) netConfig.SetResourceID(outNetConfig.GetResourceID()) - fixtures.DeepCompare(c, outNetConfig, netConfig) + require.Empty(t, cmp.Diff(outNetConfig, netConfig)) } -func (s *CacheSuite) TestSessionRecordingConfig(c *check.C) { +func TestSessionRecordingConfig(t *testing.T) { ctx := context.Background() - p := s.newPackForAuth(c) - defer p.Close() + p := newPackForAuth(t) + t.Cleanup(p.Close) recConfig, err := types.NewSessionRecordingConfigFromConfigFile(types.SessionRecordingConfigSpecV2{ Mode: types.RecordAtProxySync, ProxyChecksHostKeys: types.NewBoolOption(true), }) - c.Assert(err, check.IsNil) + require.NoError(t, err) err = p.clusterConfigS.SetSessionRecordingConfig(ctx, recConfig) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) - c.Assert(event.Event.Resource.GetKind(), check.Equals, types.KindSessionRecordingConfig) + require.Equal(t, EventProcessed, event.Type) + require.Equal(t, types.KindSessionRecordingConfig, event.Event.Resource.GetKind()) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } outRecConfig, err := p.cache.GetSessionRecordingConfig(ctx) - c.Assert(err, check.IsNil) + require.NoError(t, err) recConfig.SetResourceID(outRecConfig.GetResourceID()) - fixtures.DeepCompare(c, outRecConfig, recConfig) + require.Empty(t, cmp.Diff(outRecConfig, recConfig)) } -func (s *CacheSuite) TestClusterAuditConfig(c *check.C) { +func TestClusterAuditConfig(t *testing.T) { ctx := context.Background() - p := s.newPackForAuth(c) - defer p.Close() + p := newPackForAuth(t) + t.Cleanup(p.Close) auditConfig, err := types.NewClusterAuditConfig(types.ClusterAuditConfigSpecV2{ AuditEventsURI: []string{"dynamodb://audit_table_name", "file:///home/log"}, }) - c.Assert(err, check.IsNil) + require.NoError(t, err) err = p.clusterConfigS.SetClusterAuditConfig(ctx, auditConfig) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) - c.Assert(event.Event.Resource.GetKind(), check.Equals, types.KindClusterAuditConfig) + require.Equal(t, EventProcessed, event.Type) + require.Equal(t, types.KindClusterAuditConfig, event.Event.Resource.GetKind()) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } outAuditConfig, err := p.cache.GetClusterAuditConfig(ctx) - c.Assert(err, check.IsNil) + require.NoError(t, err) auditConfig.SetResourceID(outAuditConfig.GetResourceID()) - fixtures.DeepCompare(c, outAuditConfig, auditConfig) + require.Empty(t, cmp.Diff(outAuditConfig, auditConfig)) } -func (s *CacheSuite) TestClusterName(c *check.C) { - p := s.newPackForAuth(c) - defer p.Close() +func TestClusterName(t *testing.T) { + p := newPackForAuth(t) + t.Cleanup(p.Close) clusterName, err := services.NewClusterNameWithRandomID(types.ClusterNameSpecV2{ ClusterName: "example.com", }) - c.Assert(err, check.IsNil) + require.NoError(t, err) err = p.clusterConfigS.SetClusterName(clusterName) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) - c.Assert(event.Event.Resource.GetKind(), check.Equals, types.KindClusterName) + require.Equal(t, EventProcessed, event.Type) + require.Equal(t, types.KindClusterName, event.Event.Resource.GetKind()) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } outName, err := p.cache.GetClusterName() - c.Assert(err, check.IsNil) + require.NoError(t, err) clusterName.SetResourceID(outName.GetResourceID()) - fixtures.DeepCompare(c, outName, clusterName) + require.Empty(t, cmp.Diff(outName, clusterName)) } // TestNamespaces tests caching of namespaces -func (s *CacheSuite) TestNamespaces(c *check.C) { - p := s.newPackForProxy(c) - defer p.Close() +func TestNamespaces(t *testing.T) { + p := newPackForProxy(t) + t.Cleanup(p.Close) v, err := types.NewNamespace("universe") - c.Assert(err, check.IsNil) + require.NoError(t, err) ns := &v err = p.presenceS.UpsertNamespace(*ns) - c.Assert(err, check.IsNil) + require.NoError(t, err) ns, err = p.presenceS.GetNamespace(ns.GetName()) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err := p.cache.GetNamespace(ns.GetName()) - c.Assert(err, check.IsNil) + require.NoError(t, err) ns.SetResourceID(out.GetResourceID()) - fixtures.DeepCompare(c, ns, out) + require.Empty(t, cmp.Diff(ns, out)) // update namespace metadata ns.Metadata.Labels = map[string]string{"a": "b"} - c.Assert(err, check.IsNil) + require.NoError(t, err) err = p.presenceS.UpsertNamespace(*ns) - c.Assert(err, check.IsNil) + require.NoError(t, err) ns, err = p.presenceS.GetNamespace(ns.GetName()) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetNamespace(ns.GetName()) - c.Assert(err, check.IsNil) + require.NoError(t, err) ns.SetResourceID(out.GetResourceID()) - fixtures.DeepCompare(c, ns, out) + require.Empty(t, cmp.Diff(ns, out)) err = p.presenceS.DeleteNamespace(ns.GetName()) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case <-p.eventsC: case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } _, err = p.cache.GetNamespace(ns.GetName()) - fixtures.ExpectNotFound(c, err) + require.True(t, trace.IsNotFound(err)) } // TestUsers tests caching of users -func (s *CacheSuite) TestUsers(c *check.C) { +func TestUsers(t *testing.T) { ctx := context.Background() - p := s.newPackForProxy(c) - defer p.Close() + p := newPackForProxy(t) + t.Cleanup(p.Close) user, err := types.NewUser("bob") - c.Assert(err, check.IsNil) + require.NoError(t, err) err = p.usersS.UpsertUser(user) - c.Assert(err, check.IsNil) + require.NoError(t, err) user, err = p.usersS.GetUser(user.GetName(), false) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err := p.cache.GetUser(user.GetName(), false) - c.Assert(err, check.IsNil) + require.NoError(t, err) user.SetResourceID(out.GetResourceID()) - fixtures.DeepCompare(c, user, out) + require.Empty(t, cmp.Diff(user, out)) // update user's roles user.SetRoles([]string{"access"}) - c.Assert(err, check.IsNil) + require.NoError(t, err) err = p.usersS.UpsertUser(user) - c.Assert(err, check.IsNil) + require.NoError(t, err) user, err = p.usersS.GetUser(user.GetName(), false) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetUser(user.GetName(), false) - c.Assert(err, check.IsNil) + require.NoError(t, err) user.SetResourceID(out.GetResourceID()) - fixtures.DeepCompare(c, user, out) + require.Empty(t, cmp.Diff(user, out)) err = p.usersS.DeleteUser(ctx, user.GetName()) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case <-p.eventsC: case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } _, err = p.cache.GetUser(user.GetName(), false) - fixtures.ExpectNotFound(c, err) + require.True(t, trace.IsNotFound(err)) } // TestRoles tests caching of roles -func (s *CacheSuite) TestRoles(c *check.C) { +func TestRoles(t *testing.T) { ctx := context.Background() - p := s.newPackForNode(c) - defer p.Close() + p := newPackForNode(t) + t.Cleanup(p.Close) role, err := types.NewRole("role1", types.RoleSpecV4{ Options: types.RoleOptions{ @@ -1137,128 +1126,128 @@ func (s *CacheSuite) TestRoles(c *check.C) { }, Deny: types.RoleConditions{}, }) - c.Assert(err, check.IsNil) + require.NoError(t, err) err = p.accessS.UpsertRole(ctx, role) - c.Assert(err, check.IsNil) + require.NoError(t, err) role, err = p.accessS.GetRole(ctx, role.GetName()) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err := p.cache.GetRole(ctx, role.GetName()) - c.Assert(err, check.IsNil) + require.NoError(t, err) role.SetResourceID(out.GetResourceID()) - fixtures.DeepCompare(c, role, out) + require.Empty(t, cmp.Diff(role, out)) // update role role.SetLogins(types.Allow, []string{"admin"}) - c.Assert(err, check.IsNil) + require.NoError(t, err) err = p.accessS.UpsertRole(ctx, role) - c.Assert(err, check.IsNil) + require.NoError(t, err) role, err = p.accessS.GetRole(ctx, role.GetName()) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetRole(ctx, role.GetName()) - c.Assert(err, check.IsNil) + require.NoError(t, err) role.SetResourceID(out.GetResourceID()) - fixtures.DeepCompare(c, role, out) + require.Empty(t, cmp.Diff(role, out)) err = p.accessS.DeleteRole(ctx, role.GetName()) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case <-p.eventsC: case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } _, err = p.cache.GetRole(ctx, role.GetName()) - fixtures.ExpectNotFound(c, err) + require.True(t, trace.IsNotFound(err)) } // TestReverseTunnels tests reverse tunnels caching -func (s *CacheSuite) TestReverseTunnels(c *check.C) { - p := s.newPackForProxy(c) - defer p.Close() +func TestReverseTunnels(t *testing.T) { + p := newPackForProxy(t) + t.Cleanup(p.Close) tunnel, err := types.NewReverseTunnel("example.com", []string{"example.com:2023"}) - c.Assert(err, check.IsNil) - c.Assert(p.presenceS.UpsertReverseTunnel(tunnel), check.IsNil) + require.NoError(t, err) + require.NoError(t, p.presenceS.UpsertReverseTunnel(tunnel)) tunnel, err = p.presenceS.GetReverseTunnel(tunnel.GetName()) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err := p.cache.GetReverseTunnels() - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) tunnel.SetResourceID(out[0].GetResourceID()) - fixtures.DeepCompare(c, tunnel, out[0]) + require.Empty(t, cmp.Diff(tunnel, out[0])) // update tunnel's parameters tunnel.SetClusterName("new.example.com") - c.Assert(err, check.IsNil) + require.NoError(t, err) err = p.presenceS.UpsertReverseTunnel(tunnel) - c.Assert(err, check.IsNil) + require.NoError(t, err) out, err = p.presenceS.GetReverseTunnels() - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) tunnel = out[0] select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetReverseTunnels() - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) tunnel.SetResourceID(out[0].GetResourceID()) - fixtures.DeepCompare(c, tunnel, out[0]) + require.Empty(t, cmp.Diff(tunnel, out[0])) err = p.presenceS.DeleteAllReverseTunnels() - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case <-p.eventsC: case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetReverseTunnels() - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 0) + require.NoError(t, err) + require.Empty(t, out) } // TestTunnelConnections tests tunnel connections caching -func (s *CacheSuite) TestTunnelConnections(c *check.C) { - p := s.newPackForProxy(c) - defer p.Close() +func TestTunnelConnections(t *testing.T) { + p := newPackForProxy(t) + t.Cleanup(p.Close) clusterName := "example.com" hb := time.Now().UTC() @@ -1267,322 +1256,322 @@ func (s *CacheSuite) TestTunnelConnections(c *check.C) { ProxyName: "p1", LastHeartbeat: hb, }) - c.Assert(err, check.IsNil) - c.Assert(p.presenceS.UpsertTunnelConnection(conn), check.IsNil) + require.NoError(t, err) + require.NoError(t, p.presenceS.UpsertTunnelConnection(conn)) out, err := p.presenceS.GetTunnelConnections(clusterName) - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) conn = out[0] select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetTunnelConnections(clusterName) - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) conn.SetResourceID(out[0].GetResourceID()) - fixtures.DeepCompare(c, conn, out[0]) + require.Empty(t, cmp.Diff(conn, out[0])) // update conn's parameters hb = hb.Add(time.Second) conn.SetLastHeartbeat(hb) err = p.presenceS.UpsertTunnelConnection(conn) - c.Assert(err, check.IsNil) + require.NoError(t, err) out, err = p.presenceS.GetTunnelConnections(clusterName) - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) conn = out[0] select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetTunnelConnections(clusterName) - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) conn.SetResourceID(out[0].GetResourceID()) - fixtures.DeepCompare(c, conn, out[0]) + require.Empty(t, cmp.Diff(conn, out[0])) err = p.presenceS.DeleteTunnelConnections(clusterName) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case <-p.eventsC: case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetTunnelConnections(clusterName) - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 0) + require.NoError(t, err) + require.Empty(t, out) } // TestNodes tests nodes cache -func (s *CacheSuite) TestNodes(c *check.C) { +func TestNodes(t *testing.T) { ctx := context.Background() - p := s.newPackForProxy(c) - defer p.Close() + p := newPackForProxy(t) + t.Cleanup(p.Close) server := suite.NewServer(types.KindNode, "srv1", "127.0.0.1:2022", apidefaults.Namespace) _, err := p.presenceS.UpsertNode(ctx, server) - c.Assert(err, check.IsNil) + require.NoError(t, err) out, err := p.presenceS.GetNodes(ctx, apidefaults.Namespace) - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) srv := out[0] select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetNodes(ctx, apidefaults.Namespace) - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) srv.SetResourceID(out[0].GetResourceID()) - fixtures.DeepCompare(c, srv, out[0]) + require.Empty(t, cmp.Diff(srv, out[0])) // update srv parameters srv.SetExpiry(time.Now().Add(30 * time.Minute).UTC()) srv.SetAddr("127.0.0.2:2033") lease, err := p.presenceS.UpsertNode(ctx, srv) - c.Assert(err, check.IsNil) + require.NoError(t, err) out, err = p.presenceS.GetNodes(ctx, apidefaults.Namespace) - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) srv = out[0] select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetNodes(ctx, apidefaults.Namespace) - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) srv.SetResourceID(out[0].GetResourceID()) - fixtures.DeepCompare(c, srv, out[0]) + require.Empty(t, cmp.Diff(srv, out[0])) // update keep alive on the node and make sure // it propagates lease.Expires = time.Now().UTC().Add(time.Hour) err = p.presenceS.KeepAliveNode(ctx, *lease) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetNodes(ctx, apidefaults.Namespace) - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) srv.SetResourceID(out[0].GetResourceID()) srv.SetExpiry(lease.Expires) - fixtures.DeepCompare(c, srv, out[0]) + require.Empty(t, cmp.Diff(srv, out[0])) err = p.presenceS.DeleteAllNodes(ctx, apidefaults.Namespace) - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case <-p.eventsC: case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetNodes(ctx, apidefaults.Namespace) - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 0) + require.NoError(t, err) + require.Empty(t, out) } // TestProxies tests proxies cache -func (s *CacheSuite) TestProxies(c *check.C) { - p := s.newPackForProxy(c) - defer p.Close() +func TestProxies(t *testing.T) { + p := newPackForProxy(t) + t.Cleanup(p.Close) server := suite.NewServer(types.KindProxy, "srv1", "127.0.0.1:2022", apidefaults.Namespace) err := p.presenceS.UpsertProxy(server) - c.Assert(err, check.IsNil) + require.NoError(t, err) out, err := p.presenceS.GetProxies() - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) srv := out[0] select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetProxies() - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) srv.SetResourceID(out[0].GetResourceID()) - fixtures.DeepCompare(c, srv, out[0]) + require.Empty(t, cmp.Diff(srv, out[0])) // update srv parameters srv.SetAddr("127.0.0.2:2033") err = p.presenceS.UpsertProxy(srv) - c.Assert(err, check.IsNil) + require.NoError(t, err) out, err = p.presenceS.GetProxies() - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) srv = out[0] select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetProxies() - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) srv.SetResourceID(out[0].GetResourceID()) - fixtures.DeepCompare(c, srv, out[0]) + require.Empty(t, cmp.Diff(srv, out[0])) err = p.presenceS.DeleteAllProxies() - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case <-p.eventsC: case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetProxies() - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 0) + require.NoError(t, err) + require.Empty(t, out) } // TestAuthServers tests auth servers cache -func (s *CacheSuite) TestAuthServers(c *check.C) { - p := s.newPackForProxy(c) - defer p.Close() +func TestAuthServers(t *testing.T) { + p := newPackForProxy(t) + t.Cleanup(p.Close) server := suite.NewServer(types.KindAuthServer, "srv1", "127.0.0.1:2022", apidefaults.Namespace) err := p.presenceS.UpsertAuthServer(server) - c.Assert(err, check.IsNil) + require.NoError(t, err) out, err := p.presenceS.GetAuthServers() - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) srv := out[0] select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetAuthServers() - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) srv.SetResourceID(out[0].GetResourceID()) - fixtures.DeepCompare(c, srv, out[0]) + require.Empty(t, cmp.Diff(srv, out[0])) // update srv parameters srv.SetAddr("127.0.0.2:2033") err = p.presenceS.UpsertAuthServer(srv) - c.Assert(err, check.IsNil) + require.NoError(t, err) out, err = p.presenceS.GetAuthServers() - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) srv = out[0] select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetAuthServers() - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) srv.SetResourceID(out[0].GetResourceID()) - fixtures.DeepCompare(c, srv, out[0]) + require.Empty(t, cmp.Diff(srv, out[0])) err = p.presenceS.DeleteAllAuthServers() - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case <-p.eventsC: case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetAuthServers() - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 0) + require.NoError(t, err) + require.Empty(t, out) } // TestRemoteClusters tests remote clusters caching -func (s *CacheSuite) TestRemoteClusters(c *check.C) { +func TestRemoteClusters(t *testing.T) { ctx := context.Background() - p := s.newPackForProxy(c) - defer p.Close() + p := newPackForProxy(t) + t.Cleanup(p.Close) clusterName := "example.com" rc, err := types.NewRemoteCluster(clusterName) - c.Assert(err, check.IsNil) - c.Assert(p.presenceS.CreateRemoteCluster(rc), check.IsNil) + require.NoError(t, err) + require.NoError(t, p.presenceS.CreateRemoteCluster(rc)) out, err := p.presenceS.GetRemoteClusters() - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) rc = out[0] select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetRemoteClusters() - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) rc.SetResourceID(out[0].GetResourceID()) - fixtures.DeepCompare(c, rc, out[0]) + require.Empty(t, cmp.Diff(rc, out[0])) // update conn's parameters meta := rc.GetMetadata() @@ -1590,125 +1579,125 @@ func (s *CacheSuite) TestRemoteClusters(c *check.C) { rc.SetMetadata(meta) err = p.presenceS.UpdateRemoteCluster(ctx, rc) - c.Assert(err, check.IsNil) + require.NoError(t, err) out, err = p.presenceS.GetRemoteClusters() - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) - fixtures.DeepCompare(c, meta.Labels, out[0].GetMetadata().Labels) + require.NoError(t, err) + require.Len(t, out, 1) + require.Empty(t, cmp.Diff(meta.Labels, out[0].GetMetadata().Labels)) rc = out[0] select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetRemoteClusters() - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) rc.SetResourceID(out[0].GetResourceID()) - fixtures.DeepCompare(c, rc, out[0]) + require.Empty(t, cmp.Diff(rc, out[0])) err = p.presenceS.DeleteAllRemoteClusters() - c.Assert(err, check.IsNil) + require.NoError(t, err) select { case <-p.eventsC: case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } out, err = p.cache.GetRemoteClusters() - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 0) + require.NoError(t, err) + require.Empty(t, out) } // TestAppServers tests that CRUD operations are replicated from the backend to // the cache. -func (s *CacheSuite) TestAppServers(c *check.C) { - p := s.newPackForProxy(c) - defer p.Close() +func TestAppServers(t *testing.T) { + p := newPackForProxy(t) + t.Cleanup(p.Close) // Upsert application into backend. server := suite.NewAppServer("foo", "http://127.0.0.1:8080", "foo.example.com") _, err := p.presenceS.UpsertAppServer(context.Background(), server) - c.Assert(err, check.IsNil) + require.NoError(t, err) // Check that the application is now in the backend. out, err := p.presenceS.GetAppServers(context.Background(), apidefaults.Namespace) - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) srv := out[0] // Wait until the information has been replicated to the cache. select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } // Make sure the cache has a single application in it. out, err = p.cache.GetAppServers(context.Background(), apidefaults.Namespace) - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) // Check that the value in the cache, value in the backend, and original // services.App all exactly match. srv.SetResourceID(out[0].GetResourceID()) server.SetResourceID(out[0].GetResourceID()) - fixtures.DeepCompare(c, srv, out[0]) - fixtures.DeepCompare(c, server, out[0]) + require.Empty(t, cmp.Diff(srv, out[0])) + require.Empty(t, cmp.Diff(server, out[0])) // Update the application and upsert it into the backend again. srv.SetExpiry(time.Now().Add(30 * time.Minute).UTC()) _, err = p.presenceS.UpsertAppServer(context.Background(), srv) - c.Assert(err, check.IsNil) + require.NoError(t, err) // Check that the application is in the backend and only one exists (so an // update occurred). out, err = p.presenceS.GetAppServers(context.Background(), apidefaults.Namespace) - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) srv = out[0] // Check that information has been replicated to the cache. select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } // Make sure the cache has a single application in it. out, err = p.cache.GetAppServers(context.Background(), apidefaults.Namespace) - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 1) + require.NoError(t, err) + require.Len(t, out, 1) // Check that the value in the cache, value in the backend, and original // services.App all exactly match. srv.SetResourceID(out[0].GetResourceID()) - fixtures.DeepCompare(c, srv, out[0]) + require.Empty(t, cmp.Diff(srv, out[0])) // Remove all applications from the backend. err = p.presenceS.DeleteAllAppServers(context.Background(), apidefaults.Namespace) - c.Assert(err, check.IsNil) + require.NoError(t, err) // Check that information has been replicated to the cache. select { case event := <-p.eventsC: - c.Assert(event.Type, check.Equals, EventProcessed) + require.Equal(t, EventProcessed, event.Type) case <-time.After(time.Second): - c.Fatalf("timeout waiting for event") + t.Fatalf("timeout waiting for event") } // Check that the cache is now empty. out, err = p.cache.GetAppServers(context.Background(), apidefaults.Namespace) - c.Assert(err, check.IsNil) - c.Assert(out, check.HasLen, 0) + require.NoError(t, err) + require.Empty(t, out) } // TestApplicationServers tests that CRUD operations on app servers are @@ -1716,7 +1705,7 @@ func (s *CacheSuite) TestAppServers(c *check.C) { func TestApplicationServers(t *testing.T) { p, err := newPack(t.TempDir(), ForProxy) require.NoError(t, err) - defer p.Close() + t.Cleanup(p.Close) ctx := context.Background() @@ -1798,7 +1787,7 @@ func TestApplicationServers(t *testing.T) { func TestApps(t *testing.T) { p, err := newPack(t.TempDir(), ForProxy) require.NoError(t, err) - defer p.Close() + t.Cleanup(p.Close) ctx := context.Background() @@ -1882,7 +1871,7 @@ func TestApps(t *testing.T) { func TestDatabaseServers(t *testing.T) { p, err := newPack(t.TempDir(), ForProxy) require.NoError(t, err) - defer p.Close() + t.Cleanup(p.Close) ctx := context.Background() @@ -1969,7 +1958,7 @@ func TestDatabaseServers(t *testing.T) { func TestDatabases(t *testing.T) { p, err := newPack(t.TempDir(), ForProxy) require.NoError(t, err) - defer p.Close() + t.Cleanup(p.Close) ctx := context.Background() @@ -2049,6 +2038,52 @@ func TestDatabases(t *testing.T) { require.Equal(t, 0, len(out)) } +func TestCache_Backoff(t *testing.T) { + clock := clockwork.NewFakeClock() + p := newTestPack(t, func(c Config) Config { + c.MaxRetryPeriod = defaults.MaxWatcherBackoff + c.Clock = clock + return ForNode(c) + }) + t.Cleanup(p.Close) + + // close watchers to trigger a reload event + watchers := p.eventsS.getWatchers() + require.Len(t, watchers, 1) + p.eventsS.closeWatchers() + p.backend.SetReadError(trace.ConnectionProblem(nil, "backend is unavailable")) + + step := p.cache.Config.MaxRetryPeriod / 5.0 + for i := 0; i < 5; i++ { + // wait for cache to reload + select { + case event := <-p.eventsC: + require.Equal(t, Reloading, event.Type) + duration, err := time.ParseDuration(event.Event.Resource.GetKind()) + require.NoError(t, err) + + stepMin := step * time.Duration(i) / 2 + stepMax := step * time.Duration(i+1) + + require.GreaterOrEqual(t, duration, stepMin) + require.LessOrEqual(t, duration, stepMax) + + // add some extra to the duration to ensure the retry occurs + clock.Advance(duration * 3) + case <-time.After(time.Minute): + t.Fatalf("timeout waiting for event") + } + + // wait for cache to fail again - backend will still produce a ConnectionProblem error + select { + case event := <-p.eventsC: + require.Equal(t, WatcherFailed, event.Type) + case <-time.After(30 * time.Second): + t.Fatalf("timeout waiting for event") + } + } +} + type proxyEvents struct { sync.Mutex watchers []types.Watcher diff --git a/lib/defaults/defaults.go b/lib/defaults/defaults.go index 1877f5ce45dbc..6943c14977d16 100644 --- a/lib/defaults/defaults.go +++ b/lib/defaults/defaults.go @@ -428,15 +428,19 @@ var ( // restart if there has been more than `MaxConnectionErrorsBeforeRestart` // errors in the preceding `ConnectionErrorMeasurementPeriod` MaxConnectionErrorsBeforeRestart = 5 + + // MaxWatcherBackoff is the maximum retry time a watcher should use in + // the event of connection issues + MaxWatcherBackoff = time.Minute ) // Default connection limits, they can be applied separately on any of the Teleport // services (SSH, auth, proxy) const ( - // Number of max. simultaneous connections to a service + // LimiterMaxConnections Number of max. simultaneous connections to a service LimiterMaxConnections = 15000 - // Number of max. simultaneous connected users/logins + // LimiterMaxConcurrentUsers Number of max. simultaneous connected users/logins LimiterMaxConcurrentUsers = 250 // LimiterMaxConcurrentSignatures limits maximum number of concurrently diff --git a/lib/restrictedsession/watcher.go b/lib/restrictedsession/watcher.go index 2904acd55861c..caa41e7934b00 100644 --- a/lib/restrictedsession/watcher.go +++ b/lib/restrictedsession/watcher.go @@ -39,8 +39,10 @@ func NewRestrictionsWatcher(cfg RestrictionsWatcherConfig) (*RestrictionsWatcher return nil, trace.Wrap(err) } retry, err := utils.NewLinear(utils.LinearConfig{ - Step: cfg.RetryPeriod / 10, - Max: cfg.RetryPeriod, + First: utils.HalfJitter(cfg.MaxRetryPeriod / 10), + Step: cfg.MaxRetryPeriod / 5, + Max: cfg.MaxRetryPeriod, + Jitter: utils.NewHalfJitter(), }) if err != nil { return nil, trace.Wrap(err) @@ -75,8 +77,8 @@ type RestrictionsWatcher struct { // RestrictionsWatcherConfig configures restrictions watcher type RestrictionsWatcherConfig struct { - // RetryPeriod is a retry period on failed watchers - RetryPeriod time.Duration + // MaxRetryPeriod is the maximum retry period on failed watchers + MaxRetryPeriod time.Duration // ReloadPeriod is a failed period on failed watches ReloadPeriod time.Duration // Client is used by changeset to monitor restrictions updates @@ -97,8 +99,8 @@ func (cfg *RestrictionsWatcherConfig) CheckAndSetDefaults() error { if cfg.RestrictionsC == nil { return trace.BadParameter("missing parameter RestrictionsC") } - if cfg.RetryPeriod == 0 { - cfg.RetryPeriod = defaults.HighResPollingPeriod + if cfg.MaxRetryPeriod == 0 { + cfg.MaxRetryPeriod = defaults.MaxWatcherBackoff } if cfg.ReloadPeriod == 0 { cfg.ReloadPeriod = defaults.LowResPollingPeriod diff --git a/lib/reversetunnel/rc_manager.go b/lib/reversetunnel/rc_manager.go index be3c4f8d41b57..ce3ceae8d1cec 100644 --- a/lib/reversetunnel/rc_manager.go +++ b/lib/reversetunnel/rc_manager.go @@ -76,6 +76,8 @@ type RemoteClusterTunnelManagerConfig struct { KubeDialAddr utils.NetAddr // FIPS indicates if Teleport was started in FIPS mode. FIPS bool + // Log is the logger + Log logrus.FieldLogger } func (c *RemoteClusterTunnelManagerConfig) CheckAndSetDefaults() error { @@ -97,6 +99,9 @@ func (c *RemoteClusterTunnelManagerConfig) CheckAndSetDefaults() error { if c.Clock == nil { c.Clock = clockwork.NewRealClock() } + if c.Log == nil { + c.Log = logrus.New() + } return nil } @@ -138,7 +143,7 @@ func (w *RemoteClusterTunnelManager) Run(ctx context.Context) { w.mu.Unlock() if err := w.Sync(ctx); err != nil { - logrus.Warningf("Failed to sync reverse tunnels: %v.", err) + w.cfg.Log.Warningf("Failed to sync reverse tunnels: %v.", err) } ticker := time.NewTicker(defaults.ResyncInterval) @@ -147,11 +152,11 @@ func (w *RemoteClusterTunnelManager) Run(ctx context.Context) { for { select { case <-ctx.Done(): - logrus.Debugf("Closing.") + w.cfg.Log.Debugf("Closing.") return case <-ticker.C: if err := w.Sync(ctx); err != nil { - logrus.Warningf("Failed to sync reverse tunnels: %v.", err) + w.cfg.Log.Warningf("Failed to sync reverse tunnels: %v.", err) continue } } diff --git a/lib/service/cfg.go b/lib/service/cfg.go index 93134729302ce..b1334f1c266bb 100644 --- a/lib/service/cfg.go +++ b/lib/service/cfg.go @@ -247,6 +247,9 @@ type Config struct { // unit time that the node can sustain before restarting itself, as // measured by the rotation state service. RestartThreshold Rate + + // MaxRetryPeriod is the maximum period between reconnection attempts to auth + MaxRetryPeriod time.Duration } // ApplyToken assigns a given token to all internal services but only if token @@ -1110,6 +1113,7 @@ func ApplyDefaults(cfg *Config) { Amount: defaults.MaxConnectionErrorsBeforeRestart, Time: defaults.ConnectionErrorMeasurementPeriod, } + cfg.MaxRetryPeriod = defaults.MaxWatcherBackoff } // ApplyFIPSDefaults updates default configuration to be FedRAMP/FIPS 140-2 diff --git a/lib/service/connect.go b/lib/service/connect.go index 35e3ac9cddab8..4ecfce4c602f0 100644 --- a/lib/service/connect.go +++ b/lib/service/connect.go @@ -19,7 +19,6 @@ package service import ( "crypto/tls" "path/filepath" - "time" "golang.org/x/crypto/ssh" @@ -46,7 +45,17 @@ import ( // reconnectToAuthService continuously attempts to reconnect to the auth // service until succeeds or process gets shut down func (process *TeleportProcess) reconnectToAuthService(role types.SystemRole) (*Connector, error) { - retryTime := defaults.HighResPollingPeriod + retry, err := utils.NewLinear(utils.LinearConfig{ + First: utils.HalfJitter(process.Config.MaxRetryPeriod / 10), + Step: process.Config.MaxRetryPeriod / 5, + Max: process.Config.MaxRetryPeriod, + Clock: process.Clock, + Jitter: utils.NewHalfJitter(), + }) + if err != nil { + return nil, trace.Wrap(err) + } + for { connector, err := process.connectToAuthService(role) if err == nil { @@ -67,9 +76,18 @@ func (process *TeleportProcess) reconnectToAuthService(role types.SystemRole) (* } process.log.Errorf("%v failed to establish connection to cluster: %v.", role, err) + // Used for testing that auth service will attempt to reconnect in the provided duration. + select { + case process.connectFailureC <- retry.Duration(): + default: + } + + startedWait := process.Clock.Now() // Wait in between attempts, but return if teleport is shutting down select { - case <-time.After(retryTime): + case t := <-retry.After(): + process.log.Debugf("Retrying connection to auth server after waiting %v.", t.Sub(startedWait)) + retry.Inc() case <-process.ExitContext().Done(): process.log.Infof("%v stopping connection attempts, teleport is shutting down.", role) return nil, ErrTeleportExited @@ -130,12 +148,12 @@ func (process *TeleportProcess) connect(role types.SystemRole) (conn *Connector, }, nil } process.log.Infof("Connecting to the cluster %v with TLS client certificate.", identity.ClusterName) - client, err := process.newClient(process.Config.AuthServers, identity) + clt, err := process.newClient(process.Config.AuthServers, identity) if err != nil { return nil, trace.Wrap(err) } return &Connector{ - Client: client, + Client: clt, ClientIdentity: identity, ServerIdentity: identity, }, nil @@ -150,12 +168,12 @@ func (process *TeleportProcess) connect(role types.SystemRole) (conn *Connector, ServerIdentity: identity, }, nil } - client, err := process.newClient(process.Config.AuthServers, identity) + clt, err := process.newClient(process.Config.AuthServers, identity) if err != nil { return nil, trace.Wrap(err) } return &Connector{ - Client: client, + Client: clt, ClientIdentity: identity, ServerIdentity: identity, }, nil @@ -172,12 +190,12 @@ func (process *TeleportProcess) connect(role types.SystemRole) (conn *Connector, ServerIdentity: identity, }, nil } - client, err := process.newClient(process.Config.AuthServers, newIdentity) + clt, err := process.newClient(process.Config.AuthServers, newIdentity) if err != nil { return nil, trace.Wrap(err) } return &Connector{ - Client: client, + Client: clt, ClientIdentity: newIdentity, ServerIdentity: identity, }, nil @@ -194,12 +212,12 @@ func (process *TeleportProcess) connect(role types.SystemRole) (conn *Connector, ServerIdentity: newIdentity, }, nil } - client, err := process.newClient(process.Config.AuthServers, newIdentity) + clt, err := process.newClient(process.Config.AuthServers, newIdentity) if err != nil { return nil, trace.Wrap(err) } return &Connector{ - Client: client, + Client: clt, ClientIdentity: newIdentity, ServerIdentity: newIdentity, }, nil @@ -214,12 +232,12 @@ func (process *TeleportProcess) connect(role types.SystemRole) (conn *Connector, ServerIdentity: identity, }, nil } - client, err := process.newClient(process.Config.AuthServers, identity) + clt, err := process.newClient(process.Config.AuthServers, identity) if err != nil { return nil, trace.Wrap(err) } return &Connector{ - Client: client, + Client: clt, ClientIdentity: identity, ServerIdentity: identity, }, nil @@ -396,14 +414,14 @@ func (process *TeleportProcess) firstTimeConnect(role types.SystemRole) (*Connec ServerIdentity: identity, } } else { - client, err := process.newClient(process.Config.AuthServers, identity) + clt, err := process.newClient(process.Config.AuthServers, identity) if err != nil { return nil, trace.Wrap(err) } connector = &Connector{ ClientIdentity: identity, ServerIdentity: identity, - Client: client, + Client: clt, } } diff --git a/lib/service/service.go b/lib/service/service.go index 68c6b6b987729..a99b74b8e041f 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -319,6 +319,9 @@ type TeleportProcess struct { // clusterFeatures contain flags for supported and unsupported features. clusterFeatures proto.Features + + // connectFailureC is a channel to notify of failures to connect to auth (used in tests). + connectFailureC chan time.Duration } type keyPairKey struct { @@ -574,7 +577,7 @@ func waitAndReload(ctx context.Context, cfg Config, srv Process, newTeleport New defer cancel() srv.Shutdown(timeoutCtx) if timeoutCtx.Err() == context.DeadlineExceeded { - // The new serivce can start initiating connections to the old service + // The new service can start initiating connections to the old service // keeping it from shutting down gracefully, or some external // connections can keep hanging the old auth service and prevent // the services from shutting down, so abort the graceful way @@ -710,6 +713,7 @@ func NewTeleport(cfg *Config) (*TeleportProcess, error) { id: processID, keyPairs: make(map[keyPairKey]KeyPair), appDependCh: make(chan Event, 1024), + connectFailureC: make(chan time.Duration), } process.registerAppDepend() @@ -3024,6 +3028,10 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { return trace.Wrap(err) } + rcWatchLog := logrus.WithFields(logrus.Fields{ + trace.Component: teleport.Component(teleport.ComponentReverseTunnelAgent, process.id), + }) + // Create and register reverse tunnel AgentPool. rcWatcher, err := reversetunnel.NewRemoteClusterTunnelManager(reversetunnel.RemoteClusterTunnelManagerConfig{ HostUUID: conn.ServerIdentity.ID.HostUUID, @@ -3034,16 +3042,14 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { KubeDialAddr: utils.DialAddrFromListenAddr(kubeDialAddr(cfg.Proxy, clusterNetworkConfig.GetProxyListenerMode())), ReverseTunnelServer: tsrv, FIPS: process.Config.FIPS, + Log: rcWatchLog, }) if err != nil { return trace.Wrap(err) } process.RegisterCriticalFunc("proxy.reversetunnel.watcher", func() error { - log := logrus.WithFields(logrus.Fields{ - trace.Component: teleport.Component(teleport.ComponentReverseTunnelAgent, process.id), - }) - log.Infof("Starting reverse tunnel agent pool.") + rcWatchLog.Infof("Starting reverse tunnel agent pool.") done := make(chan struct{}) go func() { defer close(done) diff --git a/lib/service/service_test.go b/lib/service/service_test.go index de4fa8ef9a5db..a732b9a7b6218 100644 --- a/lib/service/service_test.go +++ b/lib/service/service_test.go @@ -22,6 +22,7 @@ import ( "net/http" "os" "strings" + "sync" "testing" "time" @@ -532,3 +533,49 @@ func TestSetupProxyTLSConfig(t *testing.T) { }) } } + +func TestTeleportProcess_reconnectToAuth(t *testing.T) { + t.Parallel() + clock := clockwork.NewFakeClock() + // Create and configure a default Teleport configuration. + cfg := MakeDefaultConfig() + cfg.AuthServers = []utils.NetAddr{{AddrNetwork: "tcp", Addr: "127.0.0.1:0"}} + cfg.Clock = clock + cfg.DataDir = t.TempDir() + cfg.Auth.Enabled = false + cfg.Proxy.Enabled = false + cfg.SSH.Enabled = true + process, err := NewTeleport(cfg) + require.NoError(t, err) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + c, err := process.reconnectToAuthService(types.RoleAdmin) + require.Equal(t, ErrTeleportExited, err) + require.Nil(t, c) + }() + + step := cfg.MaxRetryPeriod / 5.0 + for i := 0; i < 5; i++ { + // wait for connection to fail + select { + case duration := <-process.connectFailureC: + stepMin := step * time.Duration(i) / 2 + stepMax := step * time.Duration(i+1) + + require.GreaterOrEqual(t, duration, stepMin) + require.LessOrEqual(t, duration, stepMax) + // add some extra to the duration to ensure the retry occurs + clock.Advance(duration * 3) + case <-time.After(time.Minute): + t.Fatalf("timeout waiting for failure") + } + } + + supervisor, ok := process.Supervisor.(*LocalSupervisor) + require.True(t, ok) + supervisor.signalExit() + wg.Wait() +} diff --git a/lib/services/watcher.go b/lib/services/watcher.go index b838dfa9c165a..082c0793d13ab 100644 --- a/lib/services/watcher.go +++ b/lib/services/watcher.go @@ -52,8 +52,8 @@ type ResourceWatcherConfig struct { Component string // Log is a logger. Log logrus.FieldLogger - // RetryPeriod is a retry period on failed watchers. - RetryPeriod time.Duration + // MaxRetryPeriod is the maximum retry period on failed watchers. + MaxRetryPeriod time.Duration // RefetchPeriod is a period after which to explicitly refetch the resources. // It is to protect against unexpected cache syncing issues. RefetchPeriod time.Duration @@ -74,8 +74,8 @@ func (cfg *ResourceWatcherConfig) CheckAndSetDefaults() error { if cfg.Log == nil { cfg.Log = logrus.StandardLogger() } - if cfg.RetryPeriod == 0 { - cfg.RetryPeriod = defaults.HighResPollingPeriod + if cfg.MaxRetryPeriod == 0 { + cfg.MaxRetryPeriod = defaults.MaxWatcherBackoff } if cfg.RefetchPeriod == 0 { cfg.RefetchPeriod = defaults.LowResPollingPeriod @@ -94,9 +94,11 @@ func (cfg *ResourceWatcherConfig) CheckAndSetDefaults() error { // incl. cfg.CheckAndSetDefaults. func newResourceWatcher(ctx context.Context, collector resourceCollector, cfg ResourceWatcherConfig) (*resourceWatcher, error) { retry, err := utils.NewLinear(utils.LinearConfig{ - Step: cfg.RetryPeriod / 10, - Max: cfg.RetryPeriod, - Clock: cfg.Clock, + First: utils.HalfJitter(cfg.MaxRetryPeriod / 10), + Step: cfg.MaxRetryPeriod / 5, + Max: cfg.MaxRetryPeriod, + Jitter: utils.NewHalfJitter(), + Clock: cfg.Clock, }) if err != nil { return nil, trace.Wrap(err) @@ -110,7 +112,7 @@ func newResourceWatcher(ctx context.Context, collector resourceCollector, cfg Re cancel: cancel, retry: retry, LoopC: make(chan struct{}), - ResetC: make(chan struct{}), + ResetC: make(chan time.Duration, 1), StaleC: make(chan struct{}, 1), } go p.runWatchLoop() @@ -139,7 +141,7 @@ type resourceWatcher struct { // (used in tests). LoopC chan struct{} // ResetC is a channel to notify of internal watcher reset (used in tests). - ResetC chan struct{} + ResetC chan time.Duration // StaleC is a channel that can trigger the condition of resource staleness // (used in tests). StaleC chan struct{} @@ -174,7 +176,7 @@ func (p *resourceWatcher) hasStaleView() bool { // runWatchLoop runs a watch loop. func (p *resourceWatcher) runWatchLoop() { for { - p.Log.WithField("retry", p.retry).Debug("Starting watch.") + p.Log.Debug("Starting watch.") err := p.watch() select { @@ -192,8 +194,18 @@ func (p *resourceWatcher) runWatchLoop() { p.Log.Warningf("Maximum staleness of %v exceeded, failure started at %v.", p.MaxStaleness, p.failureStartedAt) p.collector.notifyStale() } + + // Used for testing that the watch routine has exited and is about + // to be restarted. + select { + case p.ResetC <- p.retry.Duration(): + default: + } + + startedWaiting := p.Clock.Now() select { - case <-p.retry.After(): + case t := <-p.retry.After(): + p.Log.Debugf("Attempting to restart watch after waiting %v.", t.Sub(startedWaiting)) p.retry.Inc() case <-p.ctx.Done(): p.Log.Debug("Closed, returning from watch loop.") @@ -204,12 +216,7 @@ func (p *resourceWatcher) runWatchLoop() { } else { p.Log.Debug("Triggering scheduled refetch.") } - // Used for testing that the watch routine has exited and is about - // to be restarted. - select { - case p.ResetC <- struct{}{}: - default: - } + } } diff --git a/lib/services/watcher_test.go b/lib/services/watcher_test.go index dd6528963e46a..c84459aedb80b 100644 --- a/lib/services/watcher_test.go +++ b/lib/services/watcher_test.go @@ -18,6 +18,7 @@ package services_test import ( "context" + "errors" "sync" "testing" "time" @@ -36,6 +37,59 @@ import ( "github.com/gravitational/teleport/lib/services/local" ) +var _ types.Events = (*errorWatcher)(nil) + +type errorWatcher struct { +} + +func (e errorWatcher) NewWatcher(context.Context, types.Watch) (types.Watcher, error) { + return nil, errors.New("watcher error") +} + +var _ services.ProxyGetter = (*nopProxyGetter)(nil) + +type nopProxyGetter struct { +} + +func (n nopProxyGetter) GetProxies() ([]types.Server, error) { + return nil, nil +} + +func TestResourceWatcher_Backoff(t *testing.T) { + t.Parallel() + ctx := context.Background() + clock := clockwork.NewFakeClock() + + w, err := services.NewProxyWatcher(ctx, services.ProxyWatcherConfig{ + ResourceWatcherConfig: services.ResourceWatcherConfig{ + Component: "test", + Clock: clock, + MaxRetryPeriod: defaults.MaxWatcherBackoff, + Client: &errorWatcher{}, + }, + ProxyGetter: &nopProxyGetter{}, + }) + require.NoError(t, err) + t.Cleanup(w.Close) + + step := w.MaxRetryPeriod / 5.0 + for i := 0; i < 5; i++ { + // wait for watcher to reload + select { + case duration := <-w.ResetC: + stepMin := step * time.Duration(i) / 2 + stepMax := step * time.Duration(i+1) + + require.GreaterOrEqual(t, duration, stepMin) + require.LessOrEqual(t, duration, stepMax) + // add some extra to the duration to ensure the retry occurs + clock.Advance(duration * 3) + case <-time.After(time.Minute): + t.Fatalf("timeout waiting for reset") + } + } +} + func TestProxyWatcher(t *testing.T) { t.Parallel() ctx := context.Background() @@ -54,8 +108,8 @@ func TestProxyWatcher(t *testing.T) { presence := local.NewPresenceService(bk) w, err := services.NewProxyWatcher(ctx, services.ProxyWatcherConfig{ ResourceWatcherConfig: services.ResourceWatcherConfig{ - Component: "test", - RetryPeriod: 200 * time.Millisecond, + Component: "test", + MaxRetryPeriod: 200 * time.Millisecond, Client: &client{ Presence: presence, Events: local.NewEventsService(bk), @@ -147,8 +201,8 @@ func TestLockWatcher(t *testing.T) { access := local.NewAccessService(bk) w, err := services.NewLockWatcher(ctx, services.LockWatcherConfig{ ResourceWatcherConfig: services.ResourceWatcherConfig{ - Component: "test", - RetryPeriod: 200 * time.Millisecond, + Component: "test", + MaxRetryPeriod: 200 * time.Millisecond, Client: &client{ Access: access, Events: local.NewEventsService(bk), @@ -252,8 +306,8 @@ func TestLockWatcherSubscribeWithEmptyTarget(t *testing.T) { access := local.NewAccessService(bk) w, err := services.NewLockWatcher(ctx, services.LockWatcherConfig{ ResourceWatcherConfig: services.ResourceWatcherConfig{ - Component: "test", - RetryPeriod: 200 * time.Millisecond, + Component: "test", + MaxRetryPeriod: 200 * time.Millisecond, Client: &client{ Access: access, Events: local.NewEventsService(bk), @@ -330,8 +384,8 @@ func TestLockWatcherStale(t *testing.T) { events := &withUnreliability{Events: local.NewEventsService(bk)} w, err := services.NewLockWatcher(ctx, services.LockWatcherConfig{ ResourceWatcherConfig: services.ResourceWatcherConfig{ - Component: "test", - RetryPeriod: 200 * time.Millisecond, + Component: "test", + MaxRetryPeriod: 200 * time.Millisecond, Client: &client{ Access: access, Events: events, @@ -473,8 +527,8 @@ func TestDatabaseWatcher(t *testing.T) { databasesService := local.NewDatabasesService(bk) w, err := services.NewDatabaseWatcher(ctx, services.DatabaseWatcherConfig{ ResourceWatcherConfig: services.ResourceWatcherConfig{ - Component: "test", - RetryPeriod: 200 * time.Millisecond, + Component: "test", + MaxRetryPeriod: 200 * time.Millisecond, Client: &client{ Databases: databasesService, Events: local.NewEventsService(bk), @@ -570,8 +624,8 @@ func TestAppWatcher(t *testing.T) { appService := local.NewAppService(bk) w, err := services.NewAppWatcher(ctx, services.AppWatcherConfig{ ResourceWatcherConfig: services.ResourceWatcherConfig{ - Component: "test", - RetryPeriod: 200 * time.Millisecond, + Component: "test", + MaxRetryPeriod: 200 * time.Millisecond, Client: &client{ Apps: appService, Events: local.NewEventsService(bk), diff --git a/lib/srv/heartbeat.go b/lib/srv/heartbeat.go index e32218cde777b..a63c444ea42c9 100644 --- a/lib/srv/heartbeat.go +++ b/lib/srv/heartbeat.go @@ -440,7 +440,7 @@ func (h *Heartbeat) announce() error { if !ok { return trace.BadParameter("expected services.Server, got %#v", h.current) } - err := h.Announcer.UpsertKubeService(context.TODO(), kube) + err := h.Announcer.UpsertKubeService(h.cancelCtx, kube) if err != nil { h.nextAnnounce = h.Clock.Now().UTC().Add(h.KeepAlivePeriod) h.setState(HeartbeatStateAnnounceWait) diff --git a/lib/srv/monitor_test.go b/lib/srv/monitor_test.go index cd168e04cfe18..fd1946c7b2eee 100644 --- a/lib/srv/monitor_test.go +++ b/lib/srv/monitor_test.go @@ -112,7 +112,7 @@ func TestMonitorStaleLocks(t *testing.T) { select { case <-asrv.LockWatcher.LoopC: - case <-time.After(2 * time.Second): + case <-time.After(15 * time.Second): t.Fatal("Timeout waiting for LockWatcher loop check.") } select { @@ -123,12 +123,12 @@ func TestMonitorStaleLocks(t *testing.T) { go asrv.Backend.CloseWatchers() select { case <-asrv.LockWatcher.ResetC: - case <-time.After(2 * time.Second): + case <-time.After(15 * time.Second): t.Fatal("Timeout waiting for LockWatcher reset.") } select { case <-conn.closedC: - case <-time.After(2 * time.Second): + case <-time.After(15 * time.Second): t.Fatal("Timeout waiting for connection close.") } require.Equal(t, services.StrictLockingModeAccessDenied.Error(), emitter.LastEvent().(*apievents.ClientDisconnect).Reason) diff --git a/lib/srv/regular/sshserver_test.go b/lib/srv/regular/sshserver_test.go index e433a99913f34..7d45fab6671c9 100644 --- a/lib/srv/regular/sshserver_test.go +++ b/lib/srv/regular/sshserver_test.go @@ -1029,6 +1029,7 @@ func TestProxyReverseTunnel(t *testing.T) { AccessPoint: proxyClient, ReverseTunnelServer: reverseTunnelServer, LocalCluster: f.testSrv.ClusterName(), + Log: logger, }) require.NoError(t, err)