From f29a36dfab639b255b7b39a1ca1cc34ce2804639 Mon Sep 17 00:00:00 2001 From: Karl Isenberg Date: Wed, 12 Jun 2024 13:14:55 -0700 Subject: [PATCH] Refactor Reflector ListAndWatch - Extract watchWithResync to simplify ListAndWatch - Wrap watchHandler with two variants, one for WatchList and one for just Watch. - Replace a bool pointer arg with a bool arg and bool return, to improve readability. - Use errors.Is to satisfy the linter - Use %w to wrap the store.Replace error, to allow unwrapping. Kubernetes-commit: 65fc1bb463c85a4c85e619bf7acac9503e23a253 --- tools/cache/reflector.go | 131 +++++++++++++++++++++++----------- tools/cache/reflector_test.go | 15 ++-- 2 files changed, 98 insertions(+), 48 deletions(-) diff --git a/tools/cache/reflector.go b/tools/cache/reflector.go index a617147bda..5e7dd57409 100644 --- a/tools/cache/reflector.go +++ b/tools/cache/reflector.go @@ -366,12 +366,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { } klog.V(2).Infof("Caches populated for %v from %s", r.typeDescription, r.name) - - resyncerrc := make(chan error, 1) - cancelCh := make(chan struct{}) - defer close(cancelCh) - go r.startResync(stopCh, cancelCh, resyncerrc) - return r.watch(w, stopCh, resyncerrc) + return r.watchWithResync(w, stopCh) } // startResync periodically calls r.store.Resync() method. @@ -402,6 +397,15 @@ func (r *Reflector) startResync(stopCh <-chan struct{}, cancelCh <-chan struct{} } } +// watchWithResync runs watch with startResync in the background. +func (r *Reflector) watchWithResync(w watch.Interface, stopCh <-chan struct{}) error { + resyncerrc := make(chan error, 1) + cancelCh := make(chan struct{}) + defer close(cancelCh) + go r.startResync(stopCh, cancelCh, resyncerrc) + return r.watch(w, stopCh, resyncerrc) +} + // watch simply starts a watch request with the server. func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc chan error) error { var err error @@ -451,13 +455,14 @@ func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc } } - err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, nil, r.clock, resyncerrc, stopCh) + err = handleWatch(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, + r.clock, resyncerrc, stopCh) // Ensure that watch will not be reused across iterations. w.Stop() w = nil retry.After(err) if err != nil { - if err != errorStopRequested { + if !errors.Is(err, errorStopRequested) { switch { case isExpiredError(err): // Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already @@ -668,14 +673,12 @@ func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) { } return nil, err } - bookmarkReceived := pointer.Bool(false) - err = watchHandler(start, w, temporaryStore, r.expectedType, r.expectedGVK, r.name, r.typeDescription, + watchListBookmarkReceived, err := handleListWatch(start, w, temporaryStore, r.expectedType, r.expectedGVK, r.name, r.typeDescription, func(rv string) { resourceVersion = rv }, - bookmarkReceived, r.clock, make(chan error), stopCh) if err != nil { w.Stop() // stop and retry with clean state - if err == errorStopRequested { + if errors.Is(err, errorStopRequested) { return nil, nil } if isErrorRetriableWithSideEffectsFn(err) { @@ -683,7 +686,7 @@ func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) { } return nil, err } - if *bookmarkReceived { + if watchListBookmarkReceived { break } } @@ -697,8 +700,8 @@ func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) { // component as soon as it finishes replacing the content. checkWatchListDataConsistencyIfRequested(wait.ContextForChannel(stopCh), r.name, resourceVersion, wrapListFuncWithContext(r.listerWatcher.List), temporaryStore.List) - if err = r.store.Replace(temporaryStore.List(), resourceVersion); err != nil { - return nil, fmt.Errorf("unable to sync watch-list result: %v", err) + if err := r.store.Replace(temporaryStore.List(), resourceVersion); err != nil { + return nil, fmt.Errorf("unable to sync watch-list result: %w", err) } initTrace.Step("SyncWith done") r.setLastSyncResourceVersion(resourceVersion) @@ -715,8 +718,12 @@ func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) err return r.store.Replace(found, resourceVersion) } -// watchHandler watches w and sets setLastSyncResourceVersion -func watchHandler(start time.Time, +// handleListWatch consumes events from w, updates the Store, and records the +// last seen ResourceVersion, to allow continuing from that ResourceVersion on +// retry. If successful, the watcher will be left open after receiving the +// initial set of objects, to allow watching for future events. +func handleListWatch( + start time.Time, w watch.Interface, store Store, expectedType reflect.Type, @@ -724,33 +731,77 @@ func watchHandler(start time.Time, name string, expectedTypeName string, setLastSyncResourceVersion func(string), - exitOnInitialEventsEndBookmark *bool, clock clock.Clock, - errc chan error, + errCh chan error, + stopCh <-chan struct{}, +) (bool, error) { + exitOnWatchListBookmarkReceived := true + return handleAnyWatch(start, w, store, expectedType, expectedGVK, name, expectedTypeName, + setLastSyncResourceVersion, exitOnWatchListBookmarkReceived, clock, errCh, stopCh) +} + +// handleListWatch consumes events from w, updates the Store, and records the +// last seen ResourceVersion, to allow continuing from that ResourceVersion on +// retry. The watcher will always be stopped on exit. +func handleWatch( + start time.Time, + w watch.Interface, + store Store, + expectedType reflect.Type, + expectedGVK *schema.GroupVersionKind, + name string, + expectedTypeName string, + setLastSyncResourceVersion func(string), + clock clock.Clock, + errCh chan error, stopCh <-chan struct{}, ) error { + exitOnWatchListBookmarkReceived := false + _, err := handleAnyWatch(start, w, store, expectedType, expectedGVK, name, expectedTypeName, + setLastSyncResourceVersion, exitOnWatchListBookmarkReceived, clock, errCh, stopCh) + return err +} + +// handleAnyWatch consumes events from w, updates the Store, and records the last +// seen ResourceVersion, to allow continuing from that ResourceVersion on retry. +// If exitOnWatchListBookmarkReceived is true, the watch events will be consumed +// until a bookmark event is received with the WatchList annotation present. +// Returns true (watchListBookmarkReceived) if the WatchList bookmark was +// received, even if exitOnWatchListBookmarkReceived is false. +// The watcher will always be stopped, unless exitOnWatchListBookmarkReceived is +// true and watchListBookmarkReceived is true. This allows the same watch stream +// to be re-used by the caller to continue watching for new events. +func handleAnyWatch(start time.Time, + w watch.Interface, + store Store, + expectedType reflect.Type, + expectedGVK *schema.GroupVersionKind, + name string, + expectedTypeName string, + setLastSyncResourceVersion func(string), + exitOnWatchListBookmarkReceived bool, + clock clock.Clock, + errCh chan error, + stopCh <-chan struct{}, +) (bool, error) { + watchListBookmarkReceived := false eventCount := 0 - initialEventsEndBookmarkWarningTicker := newInitialEventsEndBookmarkTicker(name, clock, start, exitOnInitialEventsEndBookmark != nil) + initialEventsEndBookmarkWarningTicker := newInitialEventsEndBookmarkTicker(name, clock, start, exitOnWatchListBookmarkReceived) defer initialEventsEndBookmarkWarningTicker.Stop() - if exitOnInitialEventsEndBookmark != nil { - // set it to false just in case somebody - // made it positive - *exitOnInitialEventsEndBookmark = false - } loop: for { select { case <-stopCh: - return errorStopRequested - case err := <-errc: - return err + return watchListBookmarkReceived, errorStopRequested + case err := <-errCh: + return watchListBookmarkReceived, err case event, ok := <-w.ResultChan(): if !ok { break loop } if event.Type == watch.Error { - return apierrors.FromObject(event.Object) + return watchListBookmarkReceived, apierrors.FromObject(event.Object) } if expectedType != nil { if e, a := expectedType, reflect.TypeOf(event.Object); e != a { @@ -792,9 +843,7 @@ loop: case watch.Bookmark: // A `Bookmark` means watch has synced here, just update the resourceVersion if meta.GetAnnotations()[metav1.InitialEventsAnnotationKey] == "true" { - if exitOnInitialEventsEndBookmark != nil { - *exitOnInitialEventsEndBookmark = true - } + watchListBookmarkReceived = true } default: utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event)) @@ -804,10 +853,10 @@ loop: rvu.UpdateResourceVersion(resourceVersion) } eventCount++ - if exitOnInitialEventsEndBookmark != nil && *exitOnInitialEventsEndBookmark { + if exitOnWatchListBookmarkReceived && watchListBookmarkReceived { watchDuration := clock.Since(start) klog.V(4).Infof("exiting %v Watch because received the bookmark that marks the end of initial events stream, total %v items received in %v", name, eventCount, watchDuration) - return nil + return watchListBookmarkReceived, nil } initialEventsEndBookmarkWarningTicker.observeLastEventTimeStamp(clock.Now()) case <-initialEventsEndBookmarkWarningTicker.C(): @@ -817,10 +866,10 @@ loop: watchDuration := clock.Since(start) if watchDuration < 1*time.Second && eventCount == 0 { - return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", name) + return watchListBookmarkReceived, fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", name) } klog.V(4).Infof("%s: Watch close - %v total %v items received", name, expectedTypeName, eventCount) - return nil + return watchListBookmarkReceived, nil } // LastSyncResourceVersion is the resource version observed when last sync with the underlying store @@ -962,14 +1011,14 @@ type initialEventsEndBookmarkTicker struct { // Note that the caller controls whether to call t.C() and t.Stop(). // // In practice, the reflector exits the watchHandler as soon as the bookmark event is received and calls the t.C() method. -func newInitialEventsEndBookmarkTicker(name string, c clock.Clock, watchStart time.Time, exitOnInitialEventsEndBookmarkRequested bool) *initialEventsEndBookmarkTicker { - return newInitialEventsEndBookmarkTickerInternal(name, c, watchStart, 10*time.Second, exitOnInitialEventsEndBookmarkRequested) +func newInitialEventsEndBookmarkTicker(name string, c clock.Clock, watchStart time.Time, exitOnWatchListBookmarkReceived bool) *initialEventsEndBookmarkTicker { + return newInitialEventsEndBookmarkTickerInternal(name, c, watchStart, 10*time.Second, exitOnWatchListBookmarkReceived) } -func newInitialEventsEndBookmarkTickerInternal(name string, c clock.Clock, watchStart time.Time, tickInterval time.Duration, exitOnInitialEventsEndBookmarkRequested bool) *initialEventsEndBookmarkTicker { +func newInitialEventsEndBookmarkTickerInternal(name string, c clock.Clock, watchStart time.Time, tickInterval time.Duration, exitOnWatchListBookmarkReceived bool) *initialEventsEndBookmarkTicker { clockWithTicker, ok := c.(clock.WithTicker) - if !ok || !exitOnInitialEventsEndBookmarkRequested { - if exitOnInitialEventsEndBookmarkRequested { + if !ok || !exitOnWatchListBookmarkReceived { + if exitOnWatchListBookmarkReceived { klog.Warningf("clock does not support WithTicker interface but exitOnInitialEventsEndBookmark was requested") } return &initialEventsEndBookmarkTicker{ diff --git a/tools/cache/reflector_test.go b/tools/cache/reflector_test.go index 32946345d3..1b4904a627 100644 --- a/tools/cache/reflector_test.go +++ b/tools/cache/reflector_test.go @@ -231,7 +231,7 @@ func TestReflectorHandleWatchStoppedBefore(t *testing.T) { return resultCh }, } - err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, stopCh) + err := handleWatch(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, stopCh) if err == nil { t.Errorf("unexpected non-error") } @@ -267,7 +267,7 @@ func TestReflectorHandleWatchStoppedAfter(t *testing.T) { return resultCh }, } - err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, stopCh) + err := handleWatch(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, stopCh) if err == nil { t.Errorf("unexpected non-error") } @@ -295,7 +295,7 @@ func TestReflectorHandleWatchResultChanClosedBefore(t *testing.T) { } // Simulate the result channel being closed by the producer before handleWatch is called. close(resultCh) - err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, wait.NeverStop) + err := handleWatch(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, wait.NeverStop) if err == nil { t.Errorf("unexpected non-error") } @@ -328,7 +328,7 @@ func TestReflectorHandleWatchResultChanClosedAfter(t *testing.T) { return resultCh }, } - err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, wait.NeverStop) + err := handleWatch(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, wait.NeverStop) if err == nil { t.Errorf("unexpected non-error") } @@ -362,8 +362,9 @@ func TestReflectorWatchHandler(t *testing.T) { fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "32"}}) fw.Stop() }() - err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, setLastSyncResourceVersion, nil, g.clock, nevererrc, stopCh) - if !errors.Is(err, errorStopRequested) { + err := handleWatch(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, setLastSyncResourceVersion, g.clock, nevererrc, stopCh) + // TODO(karlkfi): Fix FakeWatcher to avoid race condition between watcher.Stop() & close(stopCh) + if err != nil && !errors.Is(err, errorStopRequested) { t.Errorf("unexpected error %v", err) } @@ -406,7 +407,7 @@ func TestReflectorStopWatch(t *testing.T) { fw := watch.NewFake() stopWatch := make(chan struct{}) close(stopWatch) - err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, stopWatch) + err := handleWatch(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, stopWatch) if err != errorStopRequested { t.Errorf("expected stop error, got %q", err) }