Skip to content

Commit

Permalink
Add jitter and backoff to prevent thundering herd on auth
Browse files Browse the repository at this point in the history
Move cache and resourceWatcher watchers from a 10s retry to a jittered backoff retry up to ~2min. Replace the
reconnectToAuthService interval with a retry to add jitter and backoff there as well for when a node restarts due to
changes introduced in #8102.

Fixes #6889.
  • Loading branch information
rosstimothy committed Dec 8, 2021
1 parent 3fcbe17 commit 407fc90
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 34 deletions.
11 changes: 7 additions & 4 deletions lib/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ func (c *Config) CheckAndSetDefaults() error {
c.Clock = clockwork.NewRealClock()
}
if c.RetryPeriod == 0 {
c.RetryPeriod = defaults.HighResPollingPeriod
c.RetryPeriod = time.Minute
}
if c.WatcherInitTimeout == 0 {
c.WatcherInitTimeout = time.Minute
Expand Down Expand Up @@ -660,8 +660,11 @@ func New(config Config) (*Cache, error) {
}

retry, err := utils.NewLinear(utils.LinearConfig{
Step: cs.Config.RetryPeriod / 10,
Max: cs.Config.RetryPeriod,
First: utils.HalfJitter(defaults.HighResPollingPeriod),
Step: cs.Config.RetryPeriod / 2,
Max: cs.Config.RetryPeriod * 2,
Jitter: utils.NewSeventhJitter(),
Clock: cs.Clock,
})
if err != nil {
cs.Close()
Expand Down Expand Up @@ -725,7 +728,7 @@ func (c *Cache) update(ctx context.Context, retry utils.Retry) {
c.Warningf("Re-init the cache on error: %v.", err)
}
// events cache should be closed as well
c.Debugf("Reloading %v.", retry)
c.Debugf("Reloading cache %v.", retry)
select {
case <-retry.After():
retry.Inc()
Expand Down
9 changes: 6 additions & 3 deletions lib/restrictedsession/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ func NewRestrictionsWatcher(cfg RestrictionsWatcherConfig) (*RestrictionsWatcher
return nil, trace.Wrap(err)
}
retry, err := utils.NewLinear(utils.LinearConfig{
Step: cfg.RetryPeriod / 10,
Max: cfg.RetryPeriod,
First: utils.HalfJitter(defaults.HighResPollingPeriod),
Step: cs.Config.RetryPeriod / 2,
Max: cs.Config.RetryPeriod,
Jitter: utils.NewSeventhJitter(),
Clock: cs.Clock,
})
if err != nil {
return nil, trace.Wrap(err)
Expand Down Expand Up @@ -98,7 +101,7 @@ func (cfg *RestrictionsWatcherConfig) CheckAndSetDefaults() error {
return trace.BadParameter("missing parameter RestrictionsC")
}
if cfg.RetryPeriod == 0 {
cfg.RetryPeriod = defaults.HighResPollingPeriod
cfg.RetryPeriod = time.Minute
}
if cfg.ReloadPeriod == 0 {
cfg.ReloadPeriod = defaults.LowResPollingPeriod
Expand Down
8 changes: 5 additions & 3 deletions lib/reversetunnel/rc_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ type RemoteClusterTunnelManagerConfig struct {
KubeDialAddr utils.NetAddr
// FIPS indicates if Teleport was started in FIPS mode.
FIPS bool
// Log is the logger
Log logrus.FieldLogger
}

func (c *RemoteClusterTunnelManagerConfig) CheckAndSetDefaults() error {
Expand Down Expand Up @@ -138,7 +140,7 @@ func (w *RemoteClusterTunnelManager) Run(ctx context.Context) {
w.mu.Unlock()

if err := w.Sync(ctx); err != nil {
logrus.Warningf("Failed to sync reverse tunnels: %v.", err)
w.cfg.Log.Warningf("Failed to sync reverse tunnels: %v.", err)
}

ticker := time.NewTicker(defaults.ResyncInterval)
Expand All @@ -147,11 +149,11 @@ func (w *RemoteClusterTunnelManager) Run(ctx context.Context) {
for {
select {
case <-ctx.Done():
logrus.Debugf("Closing.")
w.cfg.Log.Debugf("Closing.")
return
case <-ticker.C:
if err := w.Sync(ctx); err != nil {
logrus.Warningf("Failed to sync reverse tunnels: %v.", err)
w.cfg.Log.Warningf("Failed to sync reverse tunnels: %v.", err)
continue
}
}
Expand Down
40 changes: 26 additions & 14 deletions lib/service/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,17 @@ import (
// reconnectToAuthService continuously attempts to reconnect to the auth
// service until succeeds or process gets shut down
func (process *TeleportProcess) reconnectToAuthService(role types.SystemRole) (*Connector, error) {
retryTime := defaults.HighResPollingPeriod
retry, err := utils.NewLinear(utils.LinearConfig{
First: utils.HalfJitter(defaults.HighResPollingPeriod),
Step: 20 * time.Second,
Max: 3 * time.Minute,
Clock: process.Clock,
Jitter: utils.NewSeventhJitter(),
})
if err != nil {
return nil, trace.Wrap(err)
}

for {
connector, err := process.connectToAuthService(role)
if err == nil {
Expand All @@ -67,8 +77,10 @@ func (process *TeleportProcess) reconnectToAuthService(role types.SystemRole) (*
process.log.Errorf("%v failed to establish connection to cluster: %v.", role, err)

// Wait in between attempts, but return if teleport is shutting down
process.log.Debugf("Retrying connection to auth server in %v", retry)
select {
case <-time.After(retryTime):
case <-retry.After():
retry.Inc()
case <-process.ExitContext().Done():
process.log.Infof("%v stopping connection attempts, teleport is shutting down.", role)
return nil, ErrTeleportExited
Expand Down Expand Up @@ -120,12 +132,12 @@ func (process *TeleportProcess) connect(role types.SystemRole) (conn *Connector,
}, nil
}
process.log.Infof("Connecting to the cluster %v with TLS client certificate.", identity.ClusterName)
client, err := process.newClient(process.Config.AuthServers, identity)
clt, err := process.newClient(process.Config.AuthServers, identity)
if err != nil {
return nil, trace.Wrap(err)
}
return &Connector{
Client: client,
Client: clt,
ClientIdentity: identity,
ServerIdentity: identity,
}, nil
Expand All @@ -140,12 +152,12 @@ func (process *TeleportProcess) connect(role types.SystemRole) (conn *Connector,
ServerIdentity: identity,
}, nil
}
client, err := process.newClient(process.Config.AuthServers, identity)
clt, err := process.newClient(process.Config.AuthServers, identity)
if err != nil {
return nil, trace.Wrap(err)
}
return &Connector{
Client: client,
Client: clt,
ClientIdentity: identity,
ServerIdentity: identity,
}, nil
Expand All @@ -162,12 +174,12 @@ func (process *TeleportProcess) connect(role types.SystemRole) (conn *Connector,
ServerIdentity: identity,
}, nil
}
client, err := process.newClient(process.Config.AuthServers, newIdentity)
clt, err := process.newClient(process.Config.AuthServers, newIdentity)
if err != nil {
return nil, trace.Wrap(err)
}
return &Connector{
Client: client,
Client: clt,
ClientIdentity: newIdentity,
ServerIdentity: identity,
}, nil
Expand All @@ -184,12 +196,12 @@ func (process *TeleportProcess) connect(role types.SystemRole) (conn *Connector,
ServerIdentity: newIdentity,
}, nil
}
client, err := process.newClient(process.Config.AuthServers, newIdentity)
clt, err := process.newClient(process.Config.AuthServers, newIdentity)
if err != nil {
return nil, trace.Wrap(err)
}
return &Connector{
Client: client,
Client: clt,
ClientIdentity: newIdentity,
ServerIdentity: newIdentity,
}, nil
Expand All @@ -204,12 +216,12 @@ func (process *TeleportProcess) connect(role types.SystemRole) (conn *Connector,
ServerIdentity: identity,
}, nil
}
client, err := process.newClient(process.Config.AuthServers, identity)
clt, err := process.newClient(process.Config.AuthServers, identity)
if err != nil {
return nil, trace.Wrap(err)
}
return &Connector{
Client: client,
Client: clt,
ClientIdentity: identity,
ServerIdentity: identity,
}, nil
Expand Down Expand Up @@ -386,14 +398,14 @@ func (process *TeleportProcess) firstTimeConnect(role types.SystemRole) (*Connec
ServerIdentity: identity,
}
} else {
client, err := process.newClient(process.Config.AuthServers, identity)
clt, err := process.newClient(process.Config.AuthServers, identity)
if err != nil {
return nil, trace.Wrap(err)
}
connector = &Connector{
ClientIdentity: identity,
ServerIdentity: identity,
Client: client,
Client: clt,
}
}

Expand Down
10 changes: 6 additions & 4 deletions lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2979,6 +2979,10 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error {
return trace.Wrap(err)
}

rcWatchLog := logrus.WithFields(logrus.Fields{
trace.Component: teleport.Component(teleport.ComponentReverseTunnelAgent, process.id),
})

// Create and register reverse tunnel AgentPool.
rcWatcher, err := reversetunnel.NewRemoteClusterTunnelManager(reversetunnel.RemoteClusterTunnelManagerConfig{
HostUUID: conn.ServerIdentity.ID.HostUUID,
Expand All @@ -2989,16 +2993,14 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error {
KubeDialAddr: utils.DialAddrFromListenAddr(kubeDialAddr(cfg.Proxy, clusterNetworkConfig.GetProxyListenerMode())),
ReverseTunnelServer: tsrv,
FIPS: process.Config.FIPS,
Log: rcWatchLog,
})
if err != nil {
return trace.Wrap(err)
}

process.RegisterCriticalFunc("proxy.reversetunnel.watcher", func() error {
log := logrus.WithFields(logrus.Fields{
trace.Component: teleport.Component(teleport.ComponentReverseTunnelAgent, process.id),
})
log.Infof("Starting reverse tunnel agent pool.")
rcWatchLog.Infof("Starting reverse tunnel agent pool.")
done := make(chan struct{})
go func() {
defer close(done)
Expand Down
12 changes: 7 additions & 5 deletions lib/services/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (cfg *ResourceWatcherConfig) CheckAndSetDefaults() error {
cfg.Log = logrus.StandardLogger()
}
if cfg.RetryPeriod == 0 {
cfg.RetryPeriod = defaults.HighResPollingPeriod
cfg.RetryPeriod = time.Minute
}
if cfg.RefetchPeriod == 0 {
cfg.RefetchPeriod = defaults.LowResPollingPeriod
Expand All @@ -94,9 +94,11 @@ func (cfg *ResourceWatcherConfig) CheckAndSetDefaults() error {
// incl. cfg.CheckAndSetDefaults.
func newResourceWatcher(ctx context.Context, collector resourceCollector, cfg ResourceWatcherConfig) (*resourceWatcher, error) {
retry, err := utils.NewLinear(utils.LinearConfig{
Step: cfg.RetryPeriod / 10,
Max: cfg.RetryPeriod,
Clock: cfg.Clock,
First: utils.HalfJitter(defaults.HighResPollingPeriod),
Step: cfg.RetryPeriod / 2,
Max: cfg.RetryPeriod * 2,
Jitter: utils.NewSeventhJitter(),
Clock: cfg.Clock,
})
if err != nil {
return nil, trace.Wrap(err)
Expand Down Expand Up @@ -200,7 +202,7 @@ func (p *resourceWatcher) runWatchLoop() {
return
}
if err != nil {
p.Log.Warningf("Restart watch on error: %v.", err)
p.Log.WithField("retry", p.retry).Warningf("Restart watch on error: %v.", err)
} else {
p.Log.Debug("Triggering scheduled refetch.")
}
Expand Down
2 changes: 1 addition & 1 deletion lib/srv/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ func (h *Heartbeat) announce() error {
if !ok {
return trace.BadParameter("expected services.Server, got %#v", h.current)
}
err := h.Announcer.UpsertKubeService(context.TODO(), kube)
err := h.Announcer.UpsertKubeService(h.cancelCtx, kube)
if err != nil {
h.nextAnnounce = h.Clock.Now().UTC().Add(h.KeepAlivePeriod)
h.setState(HeartbeatStateAnnounceWait)
Expand Down

0 comments on commit 407fc90

Please sign in to comment.