Skip to content

Commit

Permalink
backport of #9133 to branch/v6.2
Browse files Browse the repository at this point in the history
  • Loading branch information
rosstimothy committed Jan 11, 2022
1 parent 9d68409 commit 0ccd8f6
Show file tree
Hide file tree
Showing 16 changed files with 416 additions and 174 deletions.
1 change: 1 addition & 0 deletions integration/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,7 @@ func (i *TeleInstance) GenerateConfig(trustedSecrets []*InstanceSecrets, tconf *
}

tconf.Keygen = testauthority.New()
tconf.MaxRetryPeriod = defaults.HighResPollingPeriod
i.Config = tconf
return tconf, nil
}
Expand Down
32 changes: 24 additions & 8 deletions lib/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,8 @@ type Config struct {
WebToken types.WebTokenInterface
// Backend is a backend for local cache
Backend backend.Backend
// RetryPeriod is a period between cache retries on failures
RetryPeriod time.Duration
// MaxRetryPeriod is the maximum period between cache retries on failures
MaxRetryPeriod time.Duration
// WatcherInitTimeout is the maximum acceptable delay for an
// OpInit after a watcher has been started (default=1m).
WatcherInitTimeout time.Duration
Expand Down Expand Up @@ -474,8 +474,8 @@ func (c *Config) CheckAndSetDefaults() error {
if c.Clock == nil {
c.Clock = clockwork.NewRealClock()
}
if c.RetryPeriod == 0 {
c.RetryPeriod = defaults.HighResPollingPeriod
if c.MaxRetryPeriod == 0 {
c.MaxRetryPeriod = defaults.MaxWatcherBackoff
}
if c.WatcherInitTimeout == 0 {
c.WatcherInitTimeout = time.Minute
Expand Down Expand Up @@ -508,6 +508,9 @@ const (
// TombstoneWritten is emitted if cache is closed in a healthy
// state and successfully writes its tombstone.
TombstoneWritten = "tombstone_written"
// Reloading is emitted when an error occurred watching events
// and the cache is waiting to create a new watcher
Reloading = "reloading_cache"
)

// New creates a new instance of Cache
Expand Down Expand Up @@ -571,8 +574,11 @@ func New(config Config) (*Cache, error) {
}

retry, err := utils.NewLinear(utils.LinearConfig{
Step: cs.Config.RetryPeriod / 10,
Max: cs.Config.RetryPeriod,
First: utils.HalfJitter(cs.MaxRetryPeriod / 10),
Step: cs.MaxRetryPeriod / 5,
Max: cs.MaxRetryPeriod,
Jitter: utils.NewHalfJitter(),
Clock: cs.Clock,
})
if err != nil {
cs.Close()
Expand Down Expand Up @@ -635,10 +641,20 @@ func (c *Cache) update(ctx context.Context, retry utils.Retry) {
if err != nil {
c.Warningf("Re-init the cache on error: %v.", err)
}

// events cache should be closed as well
c.Debugf("Reloading %v.", retry)
c.Debugf("Reloading cache.")

c.notify(ctx, Event{Type: Reloading, Event: types.Event{
Resource: &types.ResourceHeader{
Kind: retry.Duration().String(),
},
}})

startedWaiting := c.Clock.Now()
select {
case <-retry.After():
case t := <-retry.After():
c.Debugf("Initiating new watch after waiting %v.", t.Sub(startedWaiting))
retry.Inc()
case <-c.ctx.Done():
return
Expand Down
Loading

0 comments on commit 0ccd8f6

Please sign in to comment.