From c53ba669f0006fc294c2d5faaa2c76557b2225b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roland=20Hu=C3=9F?= Date: Wed, 4 Mar 2020 20:47:18 +0100 Subject: [PATCH] fix(wait): Only use MODIFIED events when waiting for Read == True --- pkg/serving/v1/client.go | 6 +-- pkg/wait/wait_for_ready.go | 65 ++++++++++++++++++++++++++------- pkg/wait/wait_for_ready_test.go | 16 ++++---- 3 files changed, 62 insertions(+), 25 deletions(-) diff --git a/pkg/serving/v1/client.go b/pkg/serving/v1/client.go index 003199be28..a4bb53a1ac 100644 --- a/pkg/serving/v1/client.go +++ b/pkg/serving/v1/client.go @@ -267,7 +267,7 @@ func (cl *knServingClient) DeleteService(serviceName string, timeout time.Durati waitC := make(chan error) go func() { waitForEvent := wait.NewWaitForEvent("service", cl.WatchService, func(evt *watch.Event) bool { return evt.Type == watch.Deleted }) - err, _ := waitForEvent.Wait(serviceName, timeout, wait.NoopMessageCallback()) + err, _ := waitForEvent.Wait(serviceName, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback()) waitC <- err }() err := cl.deleteService(serviceName, v1.DeletePropagationForeground) @@ -292,7 +292,7 @@ func (cl *knServingClient) deleteService(serviceName string, propagationPolicy v // Wait for a service to become ready, but not longer than provided timeout func (cl *knServingClient) WaitForService(name string, timeout time.Duration, msgCallback wait.MessageCallback) (error, time.Duration) { waitForReady := wait.NewWaitForReady("service", cl.WatchService, serviceConditionExtractor) - return waitForReady.Wait(name, timeout, msgCallback) + return waitForReady.Wait(name, wait.Options{Timeout: &timeout}, msgCallback) } // Get the configuration for a service @@ -382,7 +382,7 @@ func (cl *knServingClient) DeleteRevision(name string, timeout time.Duration) er waitC := make(chan error) go func() { waitForEvent := wait.NewWaitForEvent("revision", cl.WatchRevision, func(evt *watch.Event) bool { return evt.Type == watch.Deleted }) - err, _ := waitForEvent.Wait(name, timeout, wait.NoopMessageCallback()) + err, _ := waitForEvent.Wait(name, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback()) waitC <- err }() err := cl.deleteRevision(name) diff --git a/pkg/wait/wait_for_ready.go b/pkg/wait/wait_for_ready.go index 479b543903..52a50c4c17 100644 --- a/pkg/wait/wait_for_ready.go +++ b/pkg/wait/wait_for_ready.go @@ -25,10 +25,6 @@ import ( "knative.dev/pkg/apis" ) -// Window for how long a ReadyCondition == false has to stay -// for being considered as an error -var ErrorWindow = 2 * time.Second - // Callbacks and configuration used while waiting type waitForReadyConfig struct { watchMaker WatchMaker @@ -50,10 +46,19 @@ type EventDone func(ev *watch.Event) bool // state in its "Ready" condition. type Wait interface { - // Wait on resource the resource with this name until a given timeout + // Wait on resource the resource with this name // and write event messages for unknown event to the status writer. // Returns an error (if any) and the overall time it took to wait - Wait(name string, timeout time.Duration, msgCallback MessageCallback) (error, time.Duration) + Wait(name string, options Options, msgCallback MessageCallback) (error, time.Duration) +} + +type Options struct { + // Window for how long a ReadyCondition == false has to stay + // for being considered as an error (useful for flaky reconciliation + ErrorWindow *time.Duration + + // Timeout for how long to wait at maximum + Timeout *time.Duration } // Create watch which is used when waiting for Ready condition @@ -65,7 +70,7 @@ type ConditionsExtractor func(obj runtime.Object) (apis.Conditions, error) // Callback for event messages type MessageCallback func(durationSinceState time.Duration, message string) -// Constructor with resource type specific configuration +// NewWaitForReady waits until the condition is set to Ready == True func NewWaitForReady(kind string, watchMaker WatchMaker, extractor ConditionsExtractor) Wait { return &waitForReadyConfig{ kind: kind, @@ -74,6 +79,8 @@ func NewWaitForReady(kind string, watchMaker WatchMaker, extractor ConditionsExt } } +// NewWaitForEvent creates a Wait object which waits until a specific event (i.e. when +// the EventDone function returns true) func NewWaitForEvent(kind string, watchMaker WatchMaker, eventDone EventDone) Wait { return &waitForEvent{ kind: kind, @@ -82,7 +89,7 @@ func NewWaitForEvent(kind string, watchMaker WatchMaker, eventDone EventDone) Wa } } -// A simple message callback which prints out messages line by line +// SimpleMessageCallback returns a callback which prints out a simple event message to a given writer func SimpleMessageCallback(out io.Writer) MessageCallback { oldMessage := "" return func(duration time.Duration, message string) { @@ -95,7 +102,7 @@ func SimpleMessageCallback(out io.Writer) MessageCallback { } } -// Noop-callback +// NoopMessageCallback is callback which does nothing func NoopMessageCallback() MessageCallback { return func(durationSinceState time.Duration, message string) {} } @@ -105,12 +112,13 @@ func NoopMessageCallback() MessageCallback { // (e.g. "service"), `timeout` is a timeout after which the watch should be cancelled if no // target state has been entered yet and `out` is used for printing out status messages // msgCallback gets called for every event with an 'Ready' condition == UNKNOWN with the event's message. -func (w *waitForReadyConfig) Wait(name string, timeout time.Duration, msgCallback MessageCallback) (error, time.Duration) { +func (w *waitForReadyConfig) Wait(name string, options Options, msgCallback MessageCallback) (error, time.Duration) { + timeout := options.timeoutWithDefault() floatingTimeout := timeout for { start := time.Now() - retry, timeoutReached, err := w.waitForReadyCondition(start, name, floatingTimeout, ErrorWindow, msgCallback) + retry, timeoutReached, err := w.waitForReadyCondition(start, name, floatingTimeout, options.errorWindowWithDefault(), msgCallback) if err != nil { return err, time.Since(start) } @@ -158,8 +166,22 @@ func (w *waitForReadyConfig) waitForReadyCondition(start time.Time, name string, return true, false, nil } + // Skip event if its not a MODIFIED event, as only MODIFIED events update the condition + // we are looking for. + // This will filter out all synthetic ADDED events that created bt the API server for + // the initial state. See https://kubernetes.io/docs/reference/using-api/api-concepts/#the-resourceversion-parameter + // for details: + // "Get State and Start at Most Recent: Start a watch at the most recent resource version, + // which must be consistent (i.e. served from etcd via a quorum read). To establish initial state, + // the watch begins with synthetic “Added” events of all resources instances that exist at the starting + // resource version. All following watch events are for all changes that occurred after the resource + // version the watch started at." + if event.Type != watch.Modified { + continue + } + // Skip event if generations has not yet been consolidated - inSync, err := isGivenEqualsObservedGeneration(event.Object) + inSync, err := generationCheck(event.Object) if err != nil { return false, false, err } @@ -198,7 +220,8 @@ func (w *waitForReadyConfig) waitForReadyCondition(start time.Time, name string, } // Wait until the expected EventDone is satisfied -func (w *waitForEvent) Wait(name string, timeout time.Duration, msgCallback MessageCallback) (error, time.Duration) { +func (w *waitForEvent) Wait(name string, options Options, msgCallback MessageCallback) (error, time.Duration) { + timeout := options.timeoutWithDefault() watcher, err := w.watchMaker(name, timeout) if err != nil { return err, 0 @@ -223,7 +246,7 @@ func (w *waitForEvent) Wait(name string, timeout time.Duration, msgCallback Mess } } -func isGivenEqualsObservedGeneration(object runtime.Object) (bool, error) { +func generationCheck(object runtime.Object) (bool, error) { unstructured, err := runtime.DefaultUnstructuredConverter.ToUnstructured(object) if err != nil { return false, err @@ -247,3 +270,17 @@ func isGivenEqualsObservedGeneration(object runtime.Object) (bool, error) { } return givenGeneration == observedGeneration, nil } + +func (o Options) timeoutWithDefault() time.Duration { + if o.Timeout != nil { + return *o.Timeout + } + return 60 * time.Second +} + +func (o Options) errorWindowWithDefault() time.Duration { + if o.ErrorWindow != nil { + return *o.ErrorWindow + } + return 2 * time.Second +} diff --git a/pkg/wait/wait_for_ready_test.go b/pkg/wait/wait_for_ready_test.go index cf1ebdde31..7c376051a2 100644 --- a/pkg/wait/wait_for_ready_test.go +++ b/pkg/wait/wait_for_ready_test.go @@ -49,7 +49,7 @@ func TestAddWaitForReady(t *testing.T) { }) fakeWatchApi.Start() var msgs []string - err, _ := waitForReady.Wait("foobar", tc.timeout, func(_ time.Duration, msg string) { + err, _ := waitForReady.Wait("foobar", Options{Timeout: &tc.timeout}, func(_ time.Duration, msg string) { msgs = append(msgs, msg) }) close(fakeWatchApi.eventChan) @@ -88,7 +88,7 @@ func TestAddWaitForDelete(t *testing.T) { func(evt *watch.Event) bool { return evt.Type == watch.Deleted }) fakeWatchAPI.Start() - err, _ := waitForEvent.Wait("foobar", tc.timeout, NoopMessageCallback()) + err, _ := waitForEvent.Wait("foobar", Options{Timeout: &tc.timeout}, NoopMessageCallback()) close(fakeWatchAPI.eventChan) if tc.errorText == "" && err != nil { @@ -106,7 +106,6 @@ func TestAddWaitForDelete(t *testing.T) { if fakeWatchAPI.StopCalled != 1 { t.Errorf("%d: Exactly one 'stop' should be called, but got %d", i, fakeWatchAPI.StopCalled) } - } } @@ -130,7 +129,7 @@ func prepareDeleteTestCases(name string) []waitForReadyTestCase { func errorTest(name string) waitForReadyTestCase { events := []watch.Event{ - {watch.Added, CreateTestServiceWithConditions(name, corev1.ConditionUnknown, corev1.ConditionUnknown, "", "msg1")}, + {watch.Modified, CreateTestServiceWithConditions(name, corev1.ConditionUnknown, corev1.ConditionUnknown, "", "msg1")}, {watch.Modified, CreateTestServiceWithConditions(name, corev1.ConditionFalse, corev1.ConditionTrue, "FakeError", "Test Error")}, } @@ -164,6 +163,7 @@ func peNormal(name string) ([]watch.Event, int) { messages := pMessages(2) return []watch.Event{ {watch.Added, CreateTestServiceWithConditions(name, corev1.ConditionUnknown, corev1.ConditionUnknown, "", messages[0])}, + {watch.Modified, CreateTestServiceWithConditions(name, corev1.ConditionUnknown, corev1.ConditionUnknown, "", messages[0])}, {watch.Modified, CreateTestServiceWithConditions(name, corev1.ConditionUnknown, corev1.ConditionTrue, "", messages[1])}, {watch.Modified, CreateTestServiceWithConditions(name, corev1.ConditionTrue, corev1.ConditionTrue, "", "")}, }, len(messages) @@ -172,14 +172,14 @@ func peNormal(name string) ([]watch.Event, int) { func peTimeout(name string) ([]watch.Event, int) { messages := pMessages(1) return []watch.Event{ - {watch.Added, CreateTestServiceWithConditions(name, corev1.ConditionUnknown, corev1.ConditionUnknown, "", messages[0])}, + {watch.Modified, CreateTestServiceWithConditions(name, corev1.ConditionUnknown, corev1.ConditionUnknown, "", messages[0])}, }, len(messages) } func peWrongGeneration(name string) ([]watch.Event, int) { messages := pMessages(1) return []watch.Event{ - {watch.Added, CreateTestServiceWithConditions(name, corev1.ConditionUnknown, corev1.ConditionUnknown, "", messages[0])}, + {watch.Modified, CreateTestServiceWithConditions(name, corev1.ConditionUnknown, corev1.ConditionUnknown, "", messages[0])}, {watch.Modified, CreateTestServiceWithConditions(name, corev1.ConditionTrue, corev1.ConditionTrue, "", "", 1, 2)}, }, len(messages) } @@ -187,7 +187,7 @@ func peWrongGeneration(name string) ([]watch.Event, int) { func peReadyFalseWithinErrorWindow(name string) ([]watch.Event, int) { messages := pMessages(1) return []watch.Event{ - {watch.Added, CreateTestServiceWithConditions(name, corev1.ConditionFalse, corev1.ConditionFalse, "Route not ready", messages[0])}, + {watch.Modified, CreateTestServiceWithConditions(name, corev1.ConditionFalse, corev1.ConditionFalse, "Route not ready", messages[0])}, {watch.Modified, CreateTestServiceWithConditions(name, corev1.ConditionTrue, corev1.ConditionTrue, "Route ready", "")}, }, len(messages) } @@ -195,7 +195,7 @@ func peReadyFalseWithinErrorWindow(name string) ([]watch.Event, int) { func deNormal(name string) ([]watch.Event, int) { messages := pMessages(2) return []watch.Event{ - {watch.Added, CreateTestServiceWithConditions(name, corev1.ConditionUnknown, corev1.ConditionUnknown, "", messages[0])}, + {watch.Modified, CreateTestServiceWithConditions(name, corev1.ConditionUnknown, corev1.ConditionUnknown, "", messages[0])}, {watch.Modified, CreateTestServiceWithConditions(name, corev1.ConditionUnknown, corev1.ConditionTrue, "", messages[1])}, {watch.Deleted, CreateTestServiceWithConditions(name, corev1.ConditionTrue, corev1.ConditionTrue, "", "")}, }, len(messages)