From 252c7902b86f792a165b5dd2abd93b19391a4d9e Mon Sep 17 00:00:00 2001 From: andyzhangx Date: Mon, 24 May 2021 09:23:42 +0000 Subject: [PATCH] fix windows node register failure issue --- .../node_register.go | 21 +- go.mod | 1 + .../apimachinery/pkg/util/clock/clock.go | 445 ++++++++++++ .../apimachinery/pkg/util/runtime/runtime.go | 173 +++++ .../k8s.io/apimachinery/pkg/util/wait/doc.go | 19 + .../k8s.io/apimachinery/pkg/util/wait/wait.go | 635 ++++++++++++++++++ vendor/modules.txt | 4 + 7 files changed, 1297 insertions(+), 1 deletion(-) create mode 100644 vendor/k8s.io/apimachinery/pkg/util/clock/clock.go create mode 100644 vendor/k8s.io/apimachinery/pkg/util/runtime/runtime.go create mode 100644 vendor/k8s.io/apimachinery/pkg/util/wait/doc.go create mode 100644 vendor/k8s.io/apimachinery/pkg/util/wait/wait.go diff --git a/cmd/csi-node-driver-registrar/node_register.go b/cmd/csi-node-driver-registrar/node_register.go index 51f2e001d..898834d05 100644 --- a/cmd/csi-node-driver-registrar/node_register.go +++ b/cmd/csi-node-driver-registrar/node_register.go @@ -24,10 +24,12 @@ import ( "os/signal" "runtime" "syscall" + "time" "google.golang.org/grpc" "github.com/kubernetes-csi/node-driver-registrar/pkg/util" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1" ) @@ -60,8 +62,25 @@ func nodeRegister(csiDriverName, httpEndpoint string) { } klog.Infof("Registration Server started at: %s\n", socketPath) grpcServer := grpc.NewServer() + // Registers kubelet plugin watcher api. - registerapi.RegisterRegistrationServer(grpcServer, registrar) + if runtime.GOOS == "windows" { + for true { + // try register on Windows node in a loop, detailed issue: + // https://github.com/kubernetes-csi/node-driver-registrar/issues/143 + err := wait.PollImmediate(1*time.Second, 30*time.Second, func() (bool, error) { + registerapi.RegisterRegistrationServer(grpcServer, registrar) + return true, nil + }) + if err == nil { + break + } else if err == wait.ErrWaitTimeout { + klog.Errorf("RegisterRegistrationServer timeout, try again") + } + } + } else { + registerapi.RegisterRegistrationServer(grpcServer, registrar) + } go healthzServer(socketPath, httpEndpoint) go removeRegSocket(csiDriverName) diff --git a/go.mod b/go.mod index cbbca3944..5d8b706e1 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( golang.org/x/text v0.3.5 // indirect google.golang.org/genproto v0.0.0-20210317182105-75c7a8546eb9 // indirect google.golang.org/grpc v1.36.0 + k8s.io/apimachinery v0.21.0 k8s.io/client-go v0.21.0 k8s.io/klog/v2 v2.8.0 k8s.io/kubelet v0.21.0 diff --git a/vendor/k8s.io/apimachinery/pkg/util/clock/clock.go b/vendor/k8s.io/apimachinery/pkg/util/clock/clock.go new file mode 100644 index 000000000..1a544d3b2 --- /dev/null +++ b/vendor/k8s.io/apimachinery/pkg/util/clock/clock.go @@ -0,0 +1,445 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package clock + +import ( + "sync" + "time" +) + +// PassiveClock allows for injecting fake or real clocks into code +// that needs to read the current time but does not support scheduling +// activity in the future. +type PassiveClock interface { + Now() time.Time + Since(time.Time) time.Duration +} + +// Clock allows for injecting fake or real clocks into code that +// needs to do arbitrary things based on time. +type Clock interface { + PassiveClock + After(time.Duration) <-chan time.Time + AfterFunc(time.Duration, func()) Timer + NewTimer(time.Duration) Timer + Sleep(time.Duration) + NewTicker(time.Duration) Ticker +} + +// RealClock really calls time.Now() +type RealClock struct{} + +// Now returns the current time. +func (RealClock) Now() time.Time { + return time.Now() +} + +// Since returns time since the specified timestamp. +func (RealClock) Since(ts time.Time) time.Duration { + return time.Since(ts) +} + +// After is the same as time.After(d). +func (RealClock) After(d time.Duration) <-chan time.Time { + return time.After(d) +} + +// AfterFunc is the same as time.AfterFunc(d, f). +func (RealClock) AfterFunc(d time.Duration, f func()) Timer { + return &realTimer{ + timer: time.AfterFunc(d, f), + } +} + +// NewTimer returns a new Timer. +func (RealClock) NewTimer(d time.Duration) Timer { + return &realTimer{ + timer: time.NewTimer(d), + } +} + +// NewTicker returns a new Ticker. +func (RealClock) NewTicker(d time.Duration) Ticker { + return &realTicker{ + ticker: time.NewTicker(d), + } +} + +// Sleep pauses the RealClock for duration d. +func (RealClock) Sleep(d time.Duration) { + time.Sleep(d) +} + +// FakePassiveClock implements PassiveClock, but returns an arbitrary time. +type FakePassiveClock struct { + lock sync.RWMutex + time time.Time +} + +// FakeClock implements Clock, but returns an arbitrary time. +type FakeClock struct { + FakePassiveClock + + // waiters are waiting for the fake time to pass their specified time + waiters []fakeClockWaiter +} + +type fakeClockWaiter struct { + targetTime time.Time + stepInterval time.Duration + skipIfBlocked bool + destChan chan time.Time + afterFunc func() +} + +// NewFakePassiveClock returns a new FakePassiveClock. +func NewFakePassiveClock(t time.Time) *FakePassiveClock { + return &FakePassiveClock{ + time: t, + } +} + +// NewFakeClock returns a new FakeClock +func NewFakeClock(t time.Time) *FakeClock { + return &FakeClock{ + FakePassiveClock: *NewFakePassiveClock(t), + } +} + +// Now returns f's time. +func (f *FakePassiveClock) Now() time.Time { + f.lock.RLock() + defer f.lock.RUnlock() + return f.time +} + +// Since returns time since the time in f. +func (f *FakePassiveClock) Since(ts time.Time) time.Duration { + f.lock.RLock() + defer f.lock.RUnlock() + return f.time.Sub(ts) +} + +// SetTime sets the time on the FakePassiveClock. +func (f *FakePassiveClock) SetTime(t time.Time) { + f.lock.Lock() + defer f.lock.Unlock() + f.time = t +} + +// After is the Fake version of time.After(d). +func (f *FakeClock) After(d time.Duration) <-chan time.Time { + f.lock.Lock() + defer f.lock.Unlock() + stopTime := f.time.Add(d) + ch := make(chan time.Time, 1) // Don't block! + f.waiters = append(f.waiters, fakeClockWaiter{ + targetTime: stopTime, + destChan: ch, + }) + return ch +} + +// AfterFunc is the Fake version of time.AfterFunc(d, callback). +func (f *FakeClock) AfterFunc(d time.Duration, cb func()) Timer { + f.lock.Lock() + defer f.lock.Unlock() + stopTime := f.time.Add(d) + ch := make(chan time.Time, 1) // Don't block! + + timer := &fakeTimer{ + fakeClock: f, + waiter: fakeClockWaiter{ + targetTime: stopTime, + destChan: ch, + afterFunc: cb, + }, + } + f.waiters = append(f.waiters, timer.waiter) + return timer +} + +// NewTimer is the Fake version of time.NewTimer(d). +func (f *FakeClock) NewTimer(d time.Duration) Timer { + f.lock.Lock() + defer f.lock.Unlock() + stopTime := f.time.Add(d) + ch := make(chan time.Time, 1) // Don't block! + timer := &fakeTimer{ + fakeClock: f, + waiter: fakeClockWaiter{ + targetTime: stopTime, + destChan: ch, + }, + } + f.waiters = append(f.waiters, timer.waiter) + return timer +} + +// NewTicker returns a new Ticker. +func (f *FakeClock) NewTicker(d time.Duration) Ticker { + f.lock.Lock() + defer f.lock.Unlock() + tickTime := f.time.Add(d) + ch := make(chan time.Time, 1) // hold one tick + f.waiters = append(f.waiters, fakeClockWaiter{ + targetTime: tickTime, + stepInterval: d, + skipIfBlocked: true, + destChan: ch, + }) + + return &fakeTicker{ + c: ch, + } +} + +// Step moves clock by Duration, notifies anyone that's called After, Tick, or NewTimer +func (f *FakeClock) Step(d time.Duration) { + f.lock.Lock() + defer f.lock.Unlock() + f.setTimeLocked(f.time.Add(d)) +} + +// SetTime sets the time on a FakeClock. +func (f *FakeClock) SetTime(t time.Time) { + f.lock.Lock() + defer f.lock.Unlock() + f.setTimeLocked(t) +} + +// Actually changes the time and checks any waiters. f must be write-locked. +func (f *FakeClock) setTimeLocked(t time.Time) { + f.time = t + newWaiters := make([]fakeClockWaiter, 0, len(f.waiters)) + for i := range f.waiters { + w := &f.waiters[i] + if !w.targetTime.After(t) { + + if w.skipIfBlocked { + select { + case w.destChan <- t: + default: + } + } else { + w.destChan <- t + } + + if w.afterFunc != nil { + w.afterFunc() + } + + if w.stepInterval > 0 { + for !w.targetTime.After(t) { + w.targetTime = w.targetTime.Add(w.stepInterval) + } + newWaiters = append(newWaiters, *w) + } + + } else { + newWaiters = append(newWaiters, f.waiters[i]) + } + } + f.waiters = newWaiters +} + +// HasWaiters returns true if After or AfterFunc has been called on f but not yet satisfied +// (so you can write race-free tests). +func (f *FakeClock) HasWaiters() bool { + f.lock.RLock() + defer f.lock.RUnlock() + return len(f.waiters) > 0 +} + +// Sleep pauses the FakeClock for duration d. +func (f *FakeClock) Sleep(d time.Duration) { + f.Step(d) +} + +// IntervalClock implements Clock, but each invocation of Now steps the clock forward the specified duration +type IntervalClock struct { + Time time.Time + Duration time.Duration +} + +// Now returns i's time. +func (i *IntervalClock) Now() time.Time { + i.Time = i.Time.Add(i.Duration) + return i.Time +} + +// Since returns time since the time in i. +func (i *IntervalClock) Since(ts time.Time) time.Duration { + return i.Time.Sub(ts) +} + +// After is currently unimplemented, will panic. +// TODO: make interval clock use FakeClock so this can be implemented. +func (*IntervalClock) After(d time.Duration) <-chan time.Time { + panic("IntervalClock doesn't implement After") +} + +// AfterFunc is currently unimplemented, will panic. +// TODO: make interval clock use FakeClock so this can be implemented. +func (*IntervalClock) AfterFunc(d time.Duration, cb func()) Timer { + panic("IntervalClock doesn't implement AfterFunc") +} + +// NewTimer is currently unimplemented, will panic. +// TODO: make interval clock use FakeClock so this can be implemented. +func (*IntervalClock) NewTimer(d time.Duration) Timer { + panic("IntervalClock doesn't implement NewTimer") +} + +// NewTicker is currently unimplemented, will panic. +// TODO: make interval clock use FakeClock so this can be implemented. +func (*IntervalClock) NewTicker(d time.Duration) Ticker { + panic("IntervalClock doesn't implement NewTicker") +} + +// Sleep is currently unimplemented; will panic. +func (*IntervalClock) Sleep(d time.Duration) { + panic("IntervalClock doesn't implement Sleep") +} + +// Timer allows for injecting fake or real timers into code that +// needs to do arbitrary things based on time. +type Timer interface { + C() <-chan time.Time + Stop() bool + Reset(d time.Duration) bool +} + +// realTimer is backed by an actual time.Timer. +type realTimer struct { + timer *time.Timer +} + +// C returns the underlying timer's channel. +func (r *realTimer) C() <-chan time.Time { + return r.timer.C +} + +// Stop calls Stop() on the underlying timer. +func (r *realTimer) Stop() bool { + return r.timer.Stop() +} + +// Reset calls Reset() on the underlying timer. +func (r *realTimer) Reset(d time.Duration) bool { + return r.timer.Reset(d) +} + +// fakeTimer implements Timer based on a FakeClock. +type fakeTimer struct { + fakeClock *FakeClock + waiter fakeClockWaiter +} + +// C returns the channel that notifies when this timer has fired. +func (f *fakeTimer) C() <-chan time.Time { + return f.waiter.destChan +} + +// Stop conditionally stops the timer. If the timer has neither fired +// nor been stopped then this call stops the timer and returns true, +// otherwise this call returns false. This is like time.Timer::Stop. +func (f *fakeTimer) Stop() bool { + f.fakeClock.lock.Lock() + defer f.fakeClock.lock.Unlock() + // The timer has already fired or been stopped, unless it is found + // among the clock's waiters. + stopped := false + oldWaiters := f.fakeClock.waiters + newWaiters := make([]fakeClockWaiter, 0, len(oldWaiters)) + seekChan := f.waiter.destChan + for i := range oldWaiters { + // Identify the timer's fakeClockWaiter by the identity of the + // destination channel, nothing else is necessarily unique and + // constant since the timer's creation. + if oldWaiters[i].destChan == seekChan { + stopped = true + } else { + newWaiters = append(newWaiters, oldWaiters[i]) + } + } + + f.fakeClock.waiters = newWaiters + + return stopped +} + +// Reset conditionally updates the firing time of the timer. If the +// timer has neither fired nor been stopped then this call resets the +// timer to the fake clock's "now" + d and returns true, otherwise +// it creates a new waiter, adds it to the clock, and returns true. +// +// It is not possible to return false, because a fake timer can be reset +// from any state (waiting to fire, already fired, and stopped). +// +// See the GoDoc for time.Timer::Reset for more context on why +// the return value of Reset() is not useful. +func (f *fakeTimer) Reset(d time.Duration) bool { + f.fakeClock.lock.Lock() + defer f.fakeClock.lock.Unlock() + waiters := f.fakeClock.waiters + seekChan := f.waiter.destChan + for i := range waiters { + if waiters[i].destChan == seekChan { + waiters[i].targetTime = f.fakeClock.time.Add(d) + return true + } + } + // No existing waiter, timer has already fired or been reset. + // We should still enable Reset() to succeed by creating a + // new waiter and adding it to the clock's waiters. + newWaiter := fakeClockWaiter{ + targetTime: f.fakeClock.time.Add(d), + destChan: seekChan, + } + f.fakeClock.waiters = append(f.fakeClock.waiters, newWaiter) + return true +} + +// Ticker defines the Ticker interface +type Ticker interface { + C() <-chan time.Time + Stop() +} + +type realTicker struct { + ticker *time.Ticker +} + +func (t *realTicker) C() <-chan time.Time { + return t.ticker.C +} + +func (t *realTicker) Stop() { + t.ticker.Stop() +} + +type fakeTicker struct { + c <-chan time.Time +} + +func (t *fakeTicker) C() <-chan time.Time { + return t.c +} + +func (t *fakeTicker) Stop() { +} diff --git a/vendor/k8s.io/apimachinery/pkg/util/runtime/runtime.go b/vendor/k8s.io/apimachinery/pkg/util/runtime/runtime.go new file mode 100644 index 000000000..035c52811 --- /dev/null +++ b/vendor/k8s.io/apimachinery/pkg/util/runtime/runtime.go @@ -0,0 +1,173 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package runtime + +import ( + "fmt" + "net/http" + "runtime" + "sync" + "time" + + "k8s.io/klog/v2" +) + +var ( + // ReallyCrash controls the behavior of HandleCrash and now defaults + // true. It's still exposed so components can optionally set to false + // to restore prior behavior. + ReallyCrash = true +) + +// PanicHandlers is a list of functions which will be invoked when a panic happens. +var PanicHandlers = []func(interface{}){logPanic} + +// HandleCrash simply catches a crash and logs an error. Meant to be called via +// defer. Additional context-specific handlers can be provided, and will be +// called in case of panic. HandleCrash actually crashes, after calling the +// handlers and logging the panic message. +// +// E.g., you can provide one or more additional handlers for something like shutting down go routines gracefully. +func HandleCrash(additionalHandlers ...func(interface{})) { + if r := recover(); r != nil { + for _, fn := range PanicHandlers { + fn(r) + } + for _, fn := range additionalHandlers { + fn(r) + } + if ReallyCrash { + // Actually proceed to panic. + panic(r) + } + } +} + +// logPanic logs the caller tree when a panic occurs (except in the special case of http.ErrAbortHandler). +func logPanic(r interface{}) { + if r == http.ErrAbortHandler { + // honor the http.ErrAbortHandler sentinel panic value: + // ErrAbortHandler is a sentinel panic value to abort a handler. + // While any panic from ServeHTTP aborts the response to the client, + // panicking with ErrAbortHandler also suppresses logging of a stack trace to the server's error log. + return + } + + // Same as stdlib http server code. Manually allocate stack trace buffer size + // to prevent excessively large logs + const size = 64 << 10 + stacktrace := make([]byte, size) + stacktrace = stacktrace[:runtime.Stack(stacktrace, false)] + if _, ok := r.(string); ok { + klog.Errorf("Observed a panic: %s\n%s", r, stacktrace) + } else { + klog.Errorf("Observed a panic: %#v (%v)\n%s", r, r, stacktrace) + } +} + +// ErrorHandlers is a list of functions which will be invoked when a nonreturnable +// error occurs. +// TODO(lavalamp): for testability, this and the below HandleError function +// should be packaged up into a testable and reusable object. +var ErrorHandlers = []func(error){ + logError, + (&rudimentaryErrorBackoff{ + lastErrorTime: time.Now(), + // 1ms was the number folks were able to stomach as a global rate limit. + // If you need to log errors more than 1000 times a second you + // should probably consider fixing your code instead. :) + minPeriod: time.Millisecond, + }).OnError, +} + +// HandlerError is a method to invoke when a non-user facing piece of code cannot +// return an error and needs to indicate it has been ignored. Invoking this method +// is preferable to logging the error - the default behavior is to log but the +// errors may be sent to a remote server for analysis. +func HandleError(err error) { + // this is sometimes called with a nil error. We probably shouldn't fail and should do nothing instead + if err == nil { + return + } + + for _, fn := range ErrorHandlers { + fn(err) + } +} + +// logError prints an error with the call stack of the location it was reported +func logError(err error) { + klog.ErrorDepth(2, err) +} + +type rudimentaryErrorBackoff struct { + minPeriod time.Duration // immutable + // TODO(lavalamp): use the clock for testability. Need to move that + // package for that to be accessible here. + lastErrorTimeLock sync.Mutex + lastErrorTime time.Time +} + +// OnError will block if it is called more often than the embedded period time. +// This will prevent overly tight hot error loops. +func (r *rudimentaryErrorBackoff) OnError(error) { + r.lastErrorTimeLock.Lock() + defer r.lastErrorTimeLock.Unlock() + d := time.Since(r.lastErrorTime) + if d < r.minPeriod { + // If the time moves backwards for any reason, do nothing + time.Sleep(r.minPeriod - d) + } + r.lastErrorTime = time.Now() +} + +// GetCaller returns the caller of the function that calls it. +func GetCaller() string { + var pc [1]uintptr + runtime.Callers(3, pc[:]) + f := runtime.FuncForPC(pc[0]) + if f == nil { + return fmt.Sprintf("Unable to find caller") + } + return f.Name() +} + +// RecoverFromPanic replaces the specified error with an error containing the +// original error, and the call tree when a panic occurs. This enables error +// handlers to handle errors and panics the same way. +func RecoverFromPanic(err *error) { + if r := recover(); r != nil { + // Same as stdlib http server code. Manually allocate stack trace buffer size + // to prevent excessively large logs + const size = 64 << 10 + stacktrace := make([]byte, size) + stacktrace = stacktrace[:runtime.Stack(stacktrace, false)] + + *err = fmt.Errorf( + "recovered from panic %q. (err=%v) Call stack:\n%s", + r, + *err, + stacktrace) + } +} + +// Must panics on non-nil errors. Useful to handling programmer level errors. +func Must(err error) { + if err != nil { + panic(err) + } +} diff --git a/vendor/k8s.io/apimachinery/pkg/util/wait/doc.go b/vendor/k8s.io/apimachinery/pkg/util/wait/doc.go new file mode 100644 index 000000000..3f0c968ec --- /dev/null +++ b/vendor/k8s.io/apimachinery/pkg/util/wait/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package wait provides tools for polling or listening for changes +// to a condition. +package wait // import "k8s.io/apimachinery/pkg/util/wait" diff --git a/vendor/k8s.io/apimachinery/pkg/util/wait/wait.go b/vendor/k8s.io/apimachinery/pkg/util/wait/wait.go new file mode 100644 index 000000000..3dea7fe7f --- /dev/null +++ b/vendor/k8s.io/apimachinery/pkg/util/wait/wait.go @@ -0,0 +1,635 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package wait + +import ( + "context" + "errors" + "math" + "math/rand" + "sync" + "time" + + "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/apimachinery/pkg/util/runtime" +) + +// For any test of the style: +// ... +// <- time.After(timeout): +// t.Errorf("Timed out") +// The value for timeout should effectively be "forever." Obviously we don't want our tests to truly lock up forever, but 30s +// is long enough that it is effectively forever for the things that can slow down a run on a heavily contended machine +// (GC, seeks, etc), but not so long as to make a developer ctrl-c a test run if they do happen to break that test. +var ForeverTestTimeout = time.Second * 30 + +// NeverStop may be passed to Until to make it never stop. +var NeverStop <-chan struct{} = make(chan struct{}) + +// Group allows to start a group of goroutines and wait for their completion. +type Group struct { + wg sync.WaitGroup +} + +func (g *Group) Wait() { + g.wg.Wait() +} + +// StartWithChannel starts f in a new goroutine in the group. +// stopCh is passed to f as an argument. f should stop when stopCh is available. +func (g *Group) StartWithChannel(stopCh <-chan struct{}, f func(stopCh <-chan struct{})) { + g.Start(func() { + f(stopCh) + }) +} + +// StartWithContext starts f in a new goroutine in the group. +// ctx is passed to f as an argument. f should stop when ctx.Done() is available. +func (g *Group) StartWithContext(ctx context.Context, f func(context.Context)) { + g.Start(func() { + f(ctx) + }) +} + +// Start starts f in a new goroutine in the group. +func (g *Group) Start(f func()) { + g.wg.Add(1) + go func() { + defer g.wg.Done() + f() + }() +} + +// Forever calls f every period for ever. +// +// Forever is syntactic sugar on top of Until. +func Forever(f func(), period time.Duration) { + Until(f, period, NeverStop) +} + +// Until loops until stop channel is closed, running f every period. +// +// Until is syntactic sugar on top of JitterUntil with zero jitter factor and +// with sliding = true (which means the timer for period starts after the f +// completes). +func Until(f func(), period time.Duration, stopCh <-chan struct{}) { + JitterUntil(f, period, 0.0, true, stopCh) +} + +// UntilWithContext loops until context is done, running f every period. +// +// UntilWithContext is syntactic sugar on top of JitterUntilWithContext +// with zero jitter factor and with sliding = true (which means the timer +// for period starts after the f completes). +func UntilWithContext(ctx context.Context, f func(context.Context), period time.Duration) { + JitterUntilWithContext(ctx, f, period, 0.0, true) +} + +// NonSlidingUntil loops until stop channel is closed, running f every +// period. +// +// NonSlidingUntil is syntactic sugar on top of JitterUntil with zero jitter +// factor, with sliding = false (meaning the timer for period starts at the same +// time as the function starts). +func NonSlidingUntil(f func(), period time.Duration, stopCh <-chan struct{}) { + JitterUntil(f, period, 0.0, false, stopCh) +} + +// NonSlidingUntilWithContext loops until context is done, running f every +// period. +// +// NonSlidingUntilWithContext is syntactic sugar on top of JitterUntilWithContext +// with zero jitter factor, with sliding = false (meaning the timer for period +// starts at the same time as the function starts). +func NonSlidingUntilWithContext(ctx context.Context, f func(context.Context), period time.Duration) { + JitterUntilWithContext(ctx, f, period, 0.0, false) +} + +// JitterUntil loops until stop channel is closed, running f every period. +// +// If jitterFactor is positive, the period is jittered before every run of f. +// If jitterFactor is not positive, the period is unchanged and not jittered. +// +// If sliding is true, the period is computed after f runs. If it is false then +// period includes the runtime for f. +// +// Close stopCh to stop. f may not be invoked if stop channel is already +// closed. Pass NeverStop to if you don't want it stop. +func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) { + BackoffUntil(f, NewJitteredBackoffManager(period, jitterFactor, &clock.RealClock{}), sliding, stopCh) +} + +// BackoffUntil loops until stop channel is closed, run f every duration given by BackoffManager. +// +// If sliding is true, the period is computed after f runs. If it is false then +// period includes the runtime for f. +func BackoffUntil(f func(), backoff BackoffManager, sliding bool, stopCh <-chan struct{}) { + var t clock.Timer + for { + select { + case <-stopCh: + return + default: + } + + if !sliding { + t = backoff.Backoff() + } + + func() { + defer runtime.HandleCrash() + f() + }() + + if sliding { + t = backoff.Backoff() + } + + // NOTE: b/c there is no priority selection in golang + // it is possible for this to race, meaning we could + // trigger t.C and stopCh, and t.C select falls through. + // In order to mitigate we re-check stopCh at the beginning + // of every loop to prevent extra executions of f(). + select { + case <-stopCh: + return + case <-t.C(): + } + } +} + +// JitterUntilWithContext loops until context is done, running f every period. +// +// If jitterFactor is positive, the period is jittered before every run of f. +// If jitterFactor is not positive, the period is unchanged and not jittered. +// +// If sliding is true, the period is computed after f runs. If it is false then +// period includes the runtime for f. +// +// Cancel context to stop. f may not be invoked if context is already expired. +func JitterUntilWithContext(ctx context.Context, f func(context.Context), period time.Duration, jitterFactor float64, sliding bool) { + JitterUntil(func() { f(ctx) }, period, jitterFactor, sliding, ctx.Done()) +} + +// Jitter returns a time.Duration between duration and duration + maxFactor * +// duration. +// +// This allows clients to avoid converging on periodic behavior. If maxFactor +// is 0.0, a suggested default value will be chosen. +func Jitter(duration time.Duration, maxFactor float64) time.Duration { + if maxFactor <= 0.0 { + maxFactor = 1.0 + } + wait := duration + time.Duration(rand.Float64()*maxFactor*float64(duration)) + return wait +} + +// ErrWaitTimeout is returned when the condition exited without success. +var ErrWaitTimeout = errors.New("timed out waiting for the condition") + +// ConditionFunc returns true if the condition is satisfied, or an error +// if the loop should be aborted. +type ConditionFunc func() (done bool, err error) + +// runConditionWithCrashProtection runs a ConditionFunc with crash protection +func runConditionWithCrashProtection(condition ConditionFunc) (bool, error) { + defer runtime.HandleCrash() + return condition() +} + +// Backoff holds parameters applied to a Backoff function. +type Backoff struct { + // The initial duration. + Duration time.Duration + // Duration is multiplied by factor each iteration, if factor is not zero + // and the limits imposed by Steps and Cap have not been reached. + // Should not be negative. + // The jitter does not contribute to the updates to the duration parameter. + Factor float64 + // The sleep at each iteration is the duration plus an additional + // amount chosen uniformly at random from the interval between + // zero and `jitter*duration`. + Jitter float64 + // The remaining number of iterations in which the duration + // parameter may change (but progress can be stopped earlier by + // hitting the cap). If not positive, the duration is not + // changed. Used for exponential backoff in combination with + // Factor and Cap. + Steps int + // A limit on revised values of the duration parameter. If a + // multiplication by the factor parameter would make the duration + // exceed the cap then the duration is set to the cap and the + // steps parameter is set to zero. + Cap time.Duration +} + +// Step (1) returns an amount of time to sleep determined by the +// original Duration and Jitter and (2) mutates the provided Backoff +// to update its Steps and Duration. +func (b *Backoff) Step() time.Duration { + if b.Steps < 1 { + if b.Jitter > 0 { + return Jitter(b.Duration, b.Jitter) + } + return b.Duration + } + b.Steps-- + + duration := b.Duration + + // calculate the next step + if b.Factor != 0 { + b.Duration = time.Duration(float64(b.Duration) * b.Factor) + if b.Cap > 0 && b.Duration > b.Cap { + b.Duration = b.Cap + b.Steps = 0 + } + } + + if b.Jitter > 0 { + duration = Jitter(duration, b.Jitter) + } + return duration +} + +// contextForChannel derives a child context from a parent channel. +// +// The derived context's Done channel is closed when the returned cancel function +// is called or when the parent channel is closed, whichever happens first. +// +// Note the caller must *always* call the CancelFunc, otherwise resources may be leaked. +func contextForChannel(parentCh <-chan struct{}) (context.Context, context.CancelFunc) { + ctx, cancel := context.WithCancel(context.Background()) + + go func() { + select { + case <-parentCh: + cancel() + case <-ctx.Done(): + } + }() + return ctx, cancel +} + +// BackoffManager manages backoff with a particular scheme based on its underlying implementation. It provides +// an interface to return a timer for backoff, and caller shall backoff until Timer.C() drains. If the second Backoff() +// is called before the timer from the first Backoff() call finishes, the first timer will NOT be drained and result in +// undetermined behavior. +// The BackoffManager is supposed to be called in a single-threaded environment. +type BackoffManager interface { + Backoff() clock.Timer +} + +type exponentialBackoffManagerImpl struct { + backoff *Backoff + backoffTimer clock.Timer + lastBackoffStart time.Time + initialBackoff time.Duration + backoffResetDuration time.Duration + clock clock.Clock +} + +// NewExponentialBackoffManager returns a manager for managing exponential backoff. Each backoff is jittered and +// backoff will not exceed the given max. If the backoff is not called within resetDuration, the backoff is reset. +// This backoff manager is used to reduce load during upstream unhealthiness. +func NewExponentialBackoffManager(initBackoff, maxBackoff, resetDuration time.Duration, backoffFactor, jitter float64, c clock.Clock) BackoffManager { + return &exponentialBackoffManagerImpl{ + backoff: &Backoff{ + Duration: initBackoff, + Factor: backoffFactor, + Jitter: jitter, + + // the current impl of wait.Backoff returns Backoff.Duration once steps are used up, which is not + // what we ideally need here, we set it to max int and assume we will never use up the steps + Steps: math.MaxInt32, + Cap: maxBackoff, + }, + backoffTimer: nil, + initialBackoff: initBackoff, + lastBackoffStart: c.Now(), + backoffResetDuration: resetDuration, + clock: c, + } +} + +func (b *exponentialBackoffManagerImpl) getNextBackoff() time.Duration { + if b.clock.Now().Sub(b.lastBackoffStart) > b.backoffResetDuration { + b.backoff.Steps = math.MaxInt32 + b.backoff.Duration = b.initialBackoff + } + b.lastBackoffStart = b.clock.Now() + return b.backoff.Step() +} + +// Backoff implements BackoffManager.Backoff, it returns a timer so caller can block on the timer for exponential backoff. +// The returned timer must be drained before calling Backoff() the second time +func (b *exponentialBackoffManagerImpl) Backoff() clock.Timer { + if b.backoffTimer == nil { + b.backoffTimer = b.clock.NewTimer(b.getNextBackoff()) + } else { + b.backoffTimer.Reset(b.getNextBackoff()) + } + return b.backoffTimer +} + +type jitteredBackoffManagerImpl struct { + clock clock.Clock + duration time.Duration + jitter float64 + backoffTimer clock.Timer +} + +// NewJitteredBackoffManager returns a BackoffManager that backoffs with given duration plus given jitter. If the jitter +// is negative, backoff will not be jittered. +func NewJitteredBackoffManager(duration time.Duration, jitter float64, c clock.Clock) BackoffManager { + return &jitteredBackoffManagerImpl{ + clock: c, + duration: duration, + jitter: jitter, + backoffTimer: nil, + } +} + +func (j *jitteredBackoffManagerImpl) getNextBackoff() time.Duration { + jitteredPeriod := j.duration + if j.jitter > 0.0 { + jitteredPeriod = Jitter(j.duration, j.jitter) + } + return jitteredPeriod +} + +// Backoff implements BackoffManager.Backoff, it returns a timer so caller can block on the timer for jittered backoff. +// The returned timer must be drained before calling Backoff() the second time +func (j *jitteredBackoffManagerImpl) Backoff() clock.Timer { + backoff := j.getNextBackoff() + if j.backoffTimer == nil { + j.backoffTimer = j.clock.NewTimer(backoff) + } else { + j.backoffTimer.Reset(backoff) + } + return j.backoffTimer +} + +// ExponentialBackoff repeats a condition check with exponential backoff. +// +// It repeatedly checks the condition and then sleeps, using `backoff.Step()` +// to determine the length of the sleep and adjust Duration and Steps. +// Stops and returns as soon as: +// 1. the condition check returns true or an error, +// 2. `backoff.Steps` checks of the condition have been done, or +// 3. a sleep truncated by the cap on duration has been completed. +// In case (1) the returned error is what the condition function returned. +// In all other cases, ErrWaitTimeout is returned. +func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error { + for backoff.Steps > 0 { + if ok, err := runConditionWithCrashProtection(condition); err != nil || ok { + return err + } + if backoff.Steps == 1 { + break + } + time.Sleep(backoff.Step()) + } + return ErrWaitTimeout +} + +// Poll tries a condition func until it returns true, an error, or the timeout +// is reached. +// +// Poll always waits the interval before the run of 'condition'. +// 'condition' will always be invoked at least once. +// +// Some intervals may be missed if the condition takes too long or the time +// window is too short. +// +// If you want to Poll something forever, see PollInfinite. +func Poll(interval, timeout time.Duration, condition ConditionFunc) error { + return pollInternal(poller(interval, timeout), condition) +} + +func pollInternal(wait WaitFunc, condition ConditionFunc) error { + done := make(chan struct{}) + defer close(done) + return WaitFor(wait, condition, done) +} + +// PollImmediate tries a condition func until it returns true, an error, or the timeout +// is reached. +// +// PollImmediate always checks 'condition' before waiting for the interval. 'condition' +// will always be invoked at least once. +// +// Some intervals may be missed if the condition takes too long or the time +// window is too short. +// +// If you want to immediately Poll something forever, see PollImmediateInfinite. +func PollImmediate(interval, timeout time.Duration, condition ConditionFunc) error { + return pollImmediateInternal(poller(interval, timeout), condition) +} + +func pollImmediateInternal(wait WaitFunc, condition ConditionFunc) error { + done, err := runConditionWithCrashProtection(condition) + if err != nil { + return err + } + if done { + return nil + } + return pollInternal(wait, condition) +} + +// PollInfinite tries a condition func until it returns true or an error +// +// PollInfinite always waits the interval before the run of 'condition'. +// +// Some intervals may be missed if the condition takes too long or the time +// window is too short. +func PollInfinite(interval time.Duration, condition ConditionFunc) error { + done := make(chan struct{}) + defer close(done) + return PollUntil(interval, condition, done) +} + +// PollImmediateInfinite tries a condition func until it returns true or an error +// +// PollImmediateInfinite runs the 'condition' before waiting for the interval. +// +// Some intervals may be missed if the condition takes too long or the time +// window is too short. +func PollImmediateInfinite(interval time.Duration, condition ConditionFunc) error { + done, err := runConditionWithCrashProtection(condition) + if err != nil { + return err + } + if done { + return nil + } + return PollInfinite(interval, condition) +} + +// PollUntil tries a condition func until it returns true, an error or stopCh is +// closed. +// +// PollUntil always waits interval before the first run of 'condition'. +// 'condition' will always be invoked at least once. +func PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error { + ctx, cancel := contextForChannel(stopCh) + defer cancel() + return WaitFor(poller(interval, 0), condition, ctx.Done()) +} + +// PollImmediateUntil tries a condition func until it returns true, an error or stopCh is closed. +// +// PollImmediateUntil runs the 'condition' before waiting for the interval. +// 'condition' will always be invoked at least once. +func PollImmediateUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error { + done, err := condition() + if err != nil { + return err + } + if done { + return nil + } + select { + case <-stopCh: + return ErrWaitTimeout + default: + return PollUntil(interval, condition, stopCh) + } +} + +// WaitFunc creates a channel that receives an item every time a test +// should be executed and is closed when the last test should be invoked. +type WaitFunc func(done <-chan struct{}) <-chan struct{} + +// WaitFor continually checks 'fn' as driven by 'wait'. +// +// WaitFor gets a channel from 'wait()'', and then invokes 'fn' once for every value +// placed on the channel and once more when the channel is closed. If the channel is closed +// and 'fn' returns false without error, WaitFor returns ErrWaitTimeout. +// +// If 'fn' returns an error the loop ends and that error is returned. If +// 'fn' returns true the loop ends and nil is returned. +// +// ErrWaitTimeout will be returned if the 'done' channel is closed without fn ever +// returning true. +// +// When the done channel is closed, because the golang `select` statement is +// "uniform pseudo-random", the `fn` might still run one or multiple time, +// though eventually `WaitFor` will return. +func WaitFor(wait WaitFunc, fn ConditionFunc, done <-chan struct{}) error { + stopCh := make(chan struct{}) + defer close(stopCh) + c := wait(stopCh) + for { + select { + case _, open := <-c: + ok, err := runConditionWithCrashProtection(fn) + if err != nil { + return err + } + if ok { + return nil + } + if !open { + return ErrWaitTimeout + } + case <-done: + return ErrWaitTimeout + } + } +} + +// poller returns a WaitFunc that will send to the channel every interval until +// timeout has elapsed and then closes the channel. +// +// Over very short intervals you may receive no ticks before the channel is +// closed. A timeout of 0 is interpreted as an infinity, and in such a case +// it would be the caller's responsibility to close the done channel. +// Failure to do so would result in a leaked goroutine. +// +// Output ticks are not buffered. If the channel is not ready to receive an +// item, the tick is skipped. +func poller(interval, timeout time.Duration) WaitFunc { + return WaitFunc(func(done <-chan struct{}) <-chan struct{} { + ch := make(chan struct{}) + + go func() { + defer close(ch) + + tick := time.NewTicker(interval) + defer tick.Stop() + + var after <-chan time.Time + if timeout != 0 { + // time.After is more convenient, but it + // potentially leaves timers around much longer + // than necessary if we exit early. + timer := time.NewTimer(timeout) + after = timer.C + defer timer.Stop() + } + + for { + select { + case <-tick.C: + // If the consumer isn't ready for this signal drop it and + // check the other channels. + select { + case ch <- struct{}{}: + default: + } + case <-after: + return + case <-done: + return + } + } + }() + + return ch + }) +} + +// ExponentialBackoffWithContext works with a request context and a Backoff. It ensures that the retry wait never +// exceeds the deadline specified by the request context. +func ExponentialBackoffWithContext(ctx context.Context, backoff Backoff, condition ConditionFunc) error { + for backoff.Steps > 0 { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + if ok, err := runConditionWithCrashProtection(condition); err != nil || ok { + return err + } + + if backoff.Steps == 1 { + break + } + + waitBeforeRetry := backoff.Step() + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(waitBeforeRetry): + } + } + + return ErrWaitTimeout +} diff --git a/vendor/modules.txt b/vendor/modules.txt index cd88900d4..7fa1d8123 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -152,7 +152,11 @@ google.golang.org/protobuf/types/known/durationpb google.golang.org/protobuf/types/known/timestamppb google.golang.org/protobuf/types/known/wrapperspb # k8s.io/apimachinery v0.21.0 => k8s.io/apimachinery v0.21.0 +## explicit +k8s.io/apimachinery/pkg/util/clock +k8s.io/apimachinery/pkg/util/runtime k8s.io/apimachinery/pkg/util/sets +k8s.io/apimachinery/pkg/util/wait k8s.io/apimachinery/pkg/version # k8s.io/client-go v0.21.0 => k8s.io/client-go v0.21.0 ## explicit