From 50585463e5b5949767df222ba9feed060d24c70e Mon Sep 17 00:00:00 2001 From: Tim Ross Date: Wed, 24 Nov 2021 17:01:57 -0500 Subject: [PATCH] Add jitter and backoff to prevent thundering herd on auth Move cache and resourceWatcher watchers from a 10s retry to a jittered backoff retry up to ~2min. Replace the reconnectToAuthService interval with a retry to add jitter and backoff there as well for when a node restarts due to changes introduced in #8102. Fixes #6889. --- lib/cache/cache.go | 11 +++++---- lib/restrictedsession/watcher.go | 9 ++++--- lib/reversetunnel/rc_manager.go | 8 ++++--- lib/service/connect.go | 40 +++++++++++++++++++++----------- lib/service/service.go | 10 ++++---- lib/services/watcher.go | 12 ++++++---- lib/srv/heartbeat.go | 2 +- 7 files changed, 58 insertions(+), 34 deletions(-) diff --git a/lib/cache/cache.go b/lib/cache/cache.go index e957547b49a6d..4e5bc9e835be1 100644 --- a/lib/cache/cache.go +++ b/lib/cache/cache.go @@ -553,7 +553,7 @@ func (c *Config) CheckAndSetDefaults() error { c.Clock = clockwork.NewRealClock() } if c.RetryPeriod == 0 { - c.RetryPeriod = defaults.HighResPollingPeriod + c.RetryPeriod = time.Minute } if c.WatcherInitTimeout == 0 { c.WatcherInitTimeout = time.Minute @@ -660,8 +660,11 @@ func New(config Config) (*Cache, error) { } retry, err := utils.NewLinear(utils.LinearConfig{ - Step: cs.Config.RetryPeriod / 10, - Max: cs.Config.RetryPeriod, + First: utils.HalfJitter(defaults.HighResPollingPeriod), + Step: cs.Config.RetryPeriod / 2, + Max: cs.Config.RetryPeriod * 2, + Jitter: utils.NewSeventhJitter(), + Clock: cs.Clock, }) if err != nil { cs.Close() @@ -725,7 +728,7 @@ func (c *Cache) update(ctx context.Context, retry utils.Retry) { c.Warningf("Re-init the cache on error: %v.", err) } // events cache should be closed as well - c.Debugf("Reloading %v.", retry) + c.Debugf("Reloading cache %v.", retry) select { case <-retry.After(): retry.Inc() diff --git a/lib/restrictedsession/watcher.go b/lib/restrictedsession/watcher.go index 2904acd55861c..a86f4f2e55c57 100644 --- a/lib/restrictedsession/watcher.go +++ b/lib/restrictedsession/watcher.go @@ -39,8 +39,11 @@ func NewRestrictionsWatcher(cfg RestrictionsWatcherConfig) (*RestrictionsWatcher return nil, trace.Wrap(err) } retry, err := utils.NewLinear(utils.LinearConfig{ - Step: cfg.RetryPeriod / 10, - Max: cfg.RetryPeriod, + First: utils.HalfJitter(defaults.HighResPollingPeriod), + Step: cs.Config.RetryPeriod / 2, + Max: cs.Config.RetryPeriod, + Jitter: utils.NewSeventhJitter(), + Clock: cs.Clock, }) if err != nil { return nil, trace.Wrap(err) @@ -98,7 +101,7 @@ func (cfg *RestrictionsWatcherConfig) CheckAndSetDefaults() error { return trace.BadParameter("missing parameter RestrictionsC") } if cfg.RetryPeriod == 0 { - cfg.RetryPeriod = defaults.HighResPollingPeriod + cfg.RetryPeriod = time.Minute } if cfg.ReloadPeriod == 0 { cfg.ReloadPeriod = defaults.LowResPollingPeriod diff --git a/lib/reversetunnel/rc_manager.go b/lib/reversetunnel/rc_manager.go index be3c4f8d41b57..291526fac9d50 100644 --- a/lib/reversetunnel/rc_manager.go +++ b/lib/reversetunnel/rc_manager.go @@ -76,6 +76,8 @@ type RemoteClusterTunnelManagerConfig struct { KubeDialAddr utils.NetAddr // FIPS indicates if Teleport was started in FIPS mode. FIPS bool + // Log is the logger + Log logrus.FieldLogger } func (c *RemoteClusterTunnelManagerConfig) CheckAndSetDefaults() error { @@ -138,7 +140,7 @@ func (w *RemoteClusterTunnelManager) Run(ctx context.Context) { w.mu.Unlock() if err := w.Sync(ctx); err != nil { - logrus.Warningf("Failed to sync reverse tunnels: %v.", err) + w.cfg.Log.Warningf("Failed to sync reverse tunnels: %v.", err) } ticker := time.NewTicker(defaults.ResyncInterval) @@ -147,11 +149,11 @@ func (w *RemoteClusterTunnelManager) Run(ctx context.Context) { for { select { case <-ctx.Done(): - logrus.Debugf("Closing.") + w.cfg.Log.Debugf("Closing.") return case <-ticker.C: if err := w.Sync(ctx); err != nil { - logrus.Warningf("Failed to sync reverse tunnels: %v.", err) + w.cfg.Log.Warningf("Failed to sync reverse tunnels: %v.", err) continue } } diff --git a/lib/service/connect.go b/lib/service/connect.go index 541c1e4b69a4e..3af3bac2ba8db 100644 --- a/lib/service/connect.go +++ b/lib/service/connect.go @@ -45,7 +45,17 @@ import ( // reconnectToAuthService continuously attempts to reconnect to the auth // service until succeeds or process gets shut down func (process *TeleportProcess) reconnectToAuthService(role types.SystemRole) (*Connector, error) { - retryTime := defaults.HighResPollingPeriod + retry, err := utils.NewLinear(utils.LinearConfig{ + First: utils.HalfJitter(defaults.HighResPollingPeriod), + Step: 20 * time.Second, + Max: 3 * time.Minute, + Clock: process.Clock, + Jitter: utils.NewSeventhJitter(), + }) + if err != nil { + return nil, trace.Wrap(err) + } + for { connector, err := process.connectToAuthService(role) if err == nil { @@ -67,8 +77,10 @@ func (process *TeleportProcess) reconnectToAuthService(role types.SystemRole) (* process.log.Errorf("%v failed to establish connection to cluster: %v.", role, err) // Wait in between attempts, but return if teleport is shutting down + process.log.Debugf("Retrying connection to auth server in %v", retry) select { - case <-time.After(retryTime): + case <-retry.After(): + retry.Inc() case <-process.ExitContext().Done(): process.log.Infof("%v stopping connection attempts, teleport is shutting down.", role) return nil, ErrTeleportExited @@ -120,12 +132,12 @@ func (process *TeleportProcess) connect(role types.SystemRole) (conn *Connector, }, nil } process.log.Infof("Connecting to the cluster %v with TLS client certificate.", identity.ClusterName) - client, err := process.newClient(process.Config.AuthServers, identity) + clt, err := process.newClient(process.Config.AuthServers, identity) if err != nil { return nil, trace.Wrap(err) } return &Connector{ - Client: client, + Client: clt, ClientIdentity: identity, ServerIdentity: identity, }, nil @@ -140,12 +152,12 @@ func (process *TeleportProcess) connect(role types.SystemRole) (conn *Connector, ServerIdentity: identity, }, nil } - client, err := process.newClient(process.Config.AuthServers, identity) + clt, err := process.newClient(process.Config.AuthServers, identity) if err != nil { return nil, trace.Wrap(err) } return &Connector{ - Client: client, + Client: clt, ClientIdentity: identity, ServerIdentity: identity, }, nil @@ -162,12 +174,12 @@ func (process *TeleportProcess) connect(role types.SystemRole) (conn *Connector, ServerIdentity: identity, }, nil } - client, err := process.newClient(process.Config.AuthServers, newIdentity) + clt, err := process.newClient(process.Config.AuthServers, newIdentity) if err != nil { return nil, trace.Wrap(err) } return &Connector{ - Client: client, + Client: clt, ClientIdentity: newIdentity, ServerIdentity: identity, }, nil @@ -184,12 +196,12 @@ func (process *TeleportProcess) connect(role types.SystemRole) (conn *Connector, ServerIdentity: newIdentity, }, nil } - client, err := process.newClient(process.Config.AuthServers, newIdentity) + clt, err := process.newClient(process.Config.AuthServers, newIdentity) if err != nil { return nil, trace.Wrap(err) } return &Connector{ - Client: client, + Client: clt, ClientIdentity: newIdentity, ServerIdentity: newIdentity, }, nil @@ -204,12 +216,12 @@ func (process *TeleportProcess) connect(role types.SystemRole) (conn *Connector, ServerIdentity: identity, }, nil } - client, err := process.newClient(process.Config.AuthServers, identity) + clt, err := process.newClient(process.Config.AuthServers, identity) if err != nil { return nil, trace.Wrap(err) } return &Connector{ - Client: client, + Client: clt, ClientIdentity: identity, ServerIdentity: identity, }, nil @@ -386,14 +398,14 @@ func (process *TeleportProcess) firstTimeConnect(role types.SystemRole) (*Connec ServerIdentity: identity, } } else { - client, err := process.newClient(process.Config.AuthServers, identity) + clt, err := process.newClient(process.Config.AuthServers, identity) if err != nil { return nil, trace.Wrap(err) } connector = &Connector{ ClientIdentity: identity, ServerIdentity: identity, - Client: client, + Client: clt, } } diff --git a/lib/service/service.go b/lib/service/service.go index f657d6a9aa53c..042672f3be48a 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -2986,6 +2986,10 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { return trace.Wrap(err) } + rcWatchLog := logrus.WithFields(logrus.Fields{ + trace.Component: teleport.Component(teleport.ComponentReverseTunnelAgent, process.id), + }) + // Create and register reverse tunnel AgentPool. rcWatcher, err := reversetunnel.NewRemoteClusterTunnelManager(reversetunnel.RemoteClusterTunnelManagerConfig{ HostUUID: conn.ServerIdentity.ID.HostUUID, @@ -2996,16 +3000,14 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { KubeDialAddr: utils.DialAddrFromListenAddr(kubeDialAddr(cfg.Proxy, clusterNetworkConfig.GetProxyListenerMode())), ReverseTunnelServer: tsrv, FIPS: process.Config.FIPS, + Log: rcWatchLog, }) if err != nil { return trace.Wrap(err) } process.RegisterCriticalFunc("proxy.reversetunnel.watcher", func() error { - log := logrus.WithFields(logrus.Fields{ - trace.Component: teleport.Component(teleport.ComponentReverseTunnelAgent, process.id), - }) - log.Infof("Starting reverse tunnel agent pool.") + rcWatchLog.Infof("Starting reverse tunnel agent pool.") done := make(chan struct{}) go func() { defer close(done) diff --git a/lib/services/watcher.go b/lib/services/watcher.go index b838dfa9c165a..6cd8871cb87a6 100644 --- a/lib/services/watcher.go +++ b/lib/services/watcher.go @@ -75,7 +75,7 @@ func (cfg *ResourceWatcherConfig) CheckAndSetDefaults() error { cfg.Log = logrus.StandardLogger() } if cfg.RetryPeriod == 0 { - cfg.RetryPeriod = defaults.HighResPollingPeriod + cfg.RetryPeriod = time.Minute } if cfg.RefetchPeriod == 0 { cfg.RefetchPeriod = defaults.LowResPollingPeriod @@ -94,9 +94,11 @@ func (cfg *ResourceWatcherConfig) CheckAndSetDefaults() error { // incl. cfg.CheckAndSetDefaults. func newResourceWatcher(ctx context.Context, collector resourceCollector, cfg ResourceWatcherConfig) (*resourceWatcher, error) { retry, err := utils.NewLinear(utils.LinearConfig{ - Step: cfg.RetryPeriod / 10, - Max: cfg.RetryPeriod, - Clock: cfg.Clock, + First: utils.HalfJitter(defaults.HighResPollingPeriod), + Step: cfg.RetryPeriod / 2, + Max: cfg.RetryPeriod * 2, + Jitter: utils.NewSeventhJitter(), + Clock: cfg.Clock, }) if err != nil { return nil, trace.Wrap(err) @@ -200,7 +202,7 @@ func (p *resourceWatcher) runWatchLoop() { return } if err != nil { - p.Log.Warningf("Restart watch on error: %v.", err) + p.Log.WithField("retry", p.retry).Warningf("Restart watch on error: %v.", err) } else { p.Log.Debug("Triggering scheduled refetch.") } diff --git a/lib/srv/heartbeat.go b/lib/srv/heartbeat.go index e32218cde777b..a63c444ea42c9 100644 --- a/lib/srv/heartbeat.go +++ b/lib/srv/heartbeat.go @@ -440,7 +440,7 @@ func (h *Heartbeat) announce() error { if !ok { return trace.BadParameter("expected services.Server, got %#v", h.current) } - err := h.Announcer.UpsertKubeService(context.TODO(), kube) + err := h.Announcer.UpsertKubeService(h.cancelCtx, kube) if err != nil { h.nextAnnounce = h.Clock.Now().UTC().Add(h.KeepAlivePeriod) h.setState(HeartbeatStateAnnounceWait)