From 2d56a022b7119552623d85680ab011521e0a0fa2 Mon Sep 17 00:00:00 2001 From: Naomi Seyfer Date: Thu, 7 Nov 2019 15:26:01 -0800 Subject: [PATCH 1/7] Fall back to polling-based watcher --- go.sum | 4 + pkg/serving/v1alpha1/client.go | 17 ++-- pkg/wait/poll_watcher.go | 164 ++++++++++++++++++++++++++++++++ pkg/wait/wait_for_ready.go | 25 ++--- pkg/wait/wait_for_ready_test.go | 3 +- vendor/modules.txt | 3 +- 6 files changed, 190 insertions(+), 26 deletions(-) create mode 100644 pkg/wait/poll_watcher.go diff --git a/go.sum b/go.sum index 9d264d55bd..08c1f926bd 100644 --- a/go.sum +++ b/go.sum @@ -188,6 +188,7 @@ github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc= github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/json-iterator/go v0.0.0-20180612202835-f2b4162afba3/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= +github.com/json-iterator/go v0.0.0-20180701071628-ab8a2e0c74be h1:AHimNtVIpiBjPUhEF5KNCkrUyqTSA5zWUl8sQ2bfGBE= github.com/json-iterator/go v0.0.0-20180701071628-ab8a2e0c74be/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.7 h1:KfgG9LzI+pYjr4xvmz/5H4FXjokeP+rlHLhv3iH62Fo= github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= @@ -244,6 +245,7 @@ github.com/onsi/ginkgo v1.10.1 h1:q/mM8GF/n0shIN8SaAZ0V+jnLPzen6WIVZdiwrRlMlo= github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c h1:eSfnfIuwhxZyULg1NNuZycJcYkjYVGYe7FczwQReM6U= github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= +github.com/onsi/gomega v0.0.0-20190113212917-5533ce8a0da3 h1:EooPXg51Tn+xmWPXJUGCnJhJSpeuMlBmfJVcqIRmmv8= github.com/onsi/gomega v0.0.0-20190113212917-5533ce8a0da3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME= @@ -480,6 +482,7 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/inf.v0 v0.9.0 h1:3zYtXIO92bvsdS3ggAdA8Gb4Azj0YU+TVY1uGYNFA8o= gopkg.in/inf.v0 v0.9.0/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= @@ -518,6 +521,7 @@ k8s.io/klog v0.3.1 h1:RVgyDHY/kFKtLqh67NvEWIgkMneNoIrdkN0CxDSQc68= k8s.io/klog v0.3.1/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk= k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8= k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I= +k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30 h1:TRb4wNWoBVrH9plmkp2q86FIDppkbrEXdXlxU3a3BMI= k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30/go.mod h1:BXM9ceUBTj2QnfH2MK1odQs778ajze1RxcmP6S8RVVc= k8s.io/kube-openapi v0.0.0-20190816220812-743ec37842bf h1:EYm5AW/UUDbnmnI+gK0TJDVK9qPLhM+sRHYanNKw0EQ= k8s.io/kube-openapi v0.0.0-20190816220812-743ec37842bf/go.mod h1:1TqjTSzOxsLGIKfj0lK8EeCP7K1iUG65v09OM0/WG5E= diff --git a/pkg/serving/v1alpha1/client.go b/pkg/serving/v1alpha1/client.go index 7dd62100ec..2f430aa8b7 100644 --- a/pkg/serving/v1alpha1/client.go +++ b/pkg/serving/v1alpha1/client.go @@ -29,6 +29,7 @@ import ( v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" api_serving "knative.dev/serving/pkg/apis/serving" "knative.dev/serving/pkg/apis/serving/v1alpha1" client_v1alpha1 "knative.dev/serving/pkg/client/clientset/versioned/typed/serving/v1alpha1" @@ -159,6 +160,10 @@ func (cl *knServingClient) GetService(name string) (*v1alpha1.Service, error) { return service, nil } +func (cl *knServingClient) WatchService(name string, timeout time.Duration) (watch.Interface, error) { + return wait.NewWatcher(cl.client.RESTClient(), cl.namespace, "services", name, timeout) +} + // List services func (cl *knServingClient) ListServices(config ...ListConfig) (*v1alpha1.ServiceList, error) { serviceList, err := cl.client.Services(cl.namespace).List(ListConfigs(config).toListOptions()) @@ -216,7 +221,7 @@ func (cl *knServingClient) DeleteService(serviceName string) error { // Wait for a service to become ready, but not longer than provided timeout func (cl *knServingClient) WaitForService(name string, timeout time.Duration, msgCallback wait.MessageCallback) (error, time.Duration) { - waitForReady := newServiceWaitForReady(cl.client.Services(cl.namespace).Watch) + waitForReady := wait.NewWaitForReady("service", cl.WatchService, serviceConditionExtractor) return waitForReady.Wait(name, timeout, msgCallback) } @@ -390,16 +395,6 @@ func updateServingGvk(obj runtime.Object) error { return util.UpdateGroupVersionKindWithScheme(obj, v1alpha1.SchemeGroupVersion, scheme.Scheme) } -// Create wait arguments for a Knative service which can be used to wait for -// a create/update options to be finished -// Can be used by `service_create` and `service_update`, hence this extra file -func newServiceWaitForReady(watch wait.WatchFunc) wait.WaitForReady { - return wait.NewWaitForReady( - "service", - watch, - serviceConditionExtractor) -} - func serviceConditionExtractor(obj runtime.Object) (apis.Conditions, error) { service, ok := obj.(*v1alpha1.Service) if !ok { diff --git a/pkg/wait/poll_watcher.go b/pkg/wait/poll_watcher.go new file mode 100644 index 0000000000..04524310bc --- /dev/null +++ b/pkg/wait/poll_watcher.go @@ -0,0 +1,164 @@ +// Copyright © 2019 The Knative Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wait + +import ( + "fmt" + "sync" + "time" + + api_errors "k8s.io/apimachinery/pkg/api/errors" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/rest" + "knative.dev/serving/pkg/client/clientset/versioned/scheme" +) + +type PollingWatcher struct { + c rest.Interface + ns string + resource string + name string + timeout time.Duration + done chan bool + result chan watch.Event + wg *sync.WaitGroup +} + +func NewWatcher(c rest.Interface, ns string, resource string, name string, timeout time.Duration) (watch.Interface, error) { + native, err := nativeWatch(c, ns, resource, name, timeout) + if err == nil { + return native, nil + } + fmt.Println("falling back to polling") + polling := &PollingWatcher{c, ns, resource, name, timeout, make(chan bool), make(chan watch.Event), &sync.WaitGroup{}} + err = polling.start() + if err != nil { + return nil, err + } + return polling, nil +} + +func (w *PollingWatcher) poll() (runtime.Object, error) { + return w.c.Get(). + Namespace(w.ns). + Resource(w.resource). + Name(w.name). + Do(). + Get() +} + +func nativeWatch(c rest.Interface, ns string, resource string, name string, timeout time.Duration) (watch.Interface, error) { + opts := v1.ListOptions{ + FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String(), + } + opts.Watch = true + addWatchTimeout(&opts, timeout) + + return c.Get(). + Namespace(ns). + Resource(resource). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Watch() +} + +func (w *PollingWatcher) start() error { + w.wg.Add(1) + + go func() { + defer w.wg.Done() + var err error + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + var obj, newobj runtime.Object + gotNotFound := false + done := false + for !done { + obj = newobj + select { + case <-ticker.C: + newobj, err = w.poll() + if err != nil { + if api_errors.IsNotFound(err) { + // This is ok. It's either a delete or not created yet. + if obj != nil { + // a delete + w.result <- watch.Event{ + Type: watch.Deleted, + Object: obj, + } + } + gotNotFound = true + continue + } else { + // Send an error and then break + w.result <- watch.Event{ + Type: watch.Error, + } + break + } + } + if gotNotFound { + // Created + w.result <- watch.Event{ + Type: watch.Added, + Object: newobj, + } + gotNotFound = false + continue + } + gotNotFound = false + // This could still be a create. Are the uids the same? + newObj, ok1 := newobj.(v1.Object) + oldObj, ok2 := obj.(v1.Object) + if ok1 && ok2 && newObj.GetUID() != oldObj.GetUID() { + // It's a delete and recreate + w.result <- watch.Event{ + Type: watch.Deleted, + Object: obj, + } + w.result <- watch.Event{ + Type: watch.Added, + Object: newobj, + } + continue + } + if ok1 && ok2 && newObj.GetResourceVersion() != oldObj.GetResourceVersion() { + w.result <- watch.Event{ + Type: watch.Modified, + Object: newobj, + } + continue + } + case done = <-w.done: + break + } + } + }() + return nil +} + +func (w *PollingWatcher) ResultChan() <-chan watch.Event { + return w.result +} + +func (w *PollingWatcher) Stop() { + w.done <- true + w.wg.Wait() + close(w.result) +} diff --git a/pkg/wait/wait_for_ready.go b/pkg/wait/wait_for_ready.go index a4a67db421..e21ea25ffd 100644 --- a/pkg/wait/wait_for_ready.go +++ b/pkg/wait/wait_for_ready.go @@ -21,7 +21,6 @@ import ( corev1 "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" "knative.dev/pkg/apis" @@ -29,7 +28,7 @@ import ( // Callbacks and configuration used while waiting type waitForReadyConfig struct { - watchFunc WatchFunc + watchMaker WatchMaker conditionsExtractor ConditionsExtractor kind string } @@ -43,9 +42,15 @@ type WaitForReady interface { Wait(name string, timeout time.Duration, msgCallback MessageCallback) (error, time.Duration) } -// Create watch which is used when waiting for Ready condition +// Utility alias for "A function like Watch from the client" type WatchFunc func(opts v1.ListOptions) (watch.Interface, error) +// Utility alias for "A function that takes the name and just gets the resource" +type GetFunc func(name string) (runtime.Object, error) + +// Create watch which is used when waiting for Ready condition +type WatchMaker func(name string, timeout time.Duration) (watch.Interface, error) + // Extract conditions from a runtime object type ConditionsExtractor func(obj runtime.Object) (apis.Conditions, error) @@ -53,10 +58,10 @@ type ConditionsExtractor func(obj runtime.Object) (apis.Conditions, error) type MessageCallback func(durationSinceState time.Duration, message string) // Constructor with resource type specific configuration -func NewWaitForReady(kind string, watchFunc WatchFunc, extractor ConditionsExtractor) WaitForReady { +func NewWaitForReady(kind string, watchMaker WatchMaker, extractor ConditionsExtractor) WaitForReady { return &waitForReadyConfig{ kind: kind, - watchFunc: watchFunc, + watchMaker: watchMaker, conditionsExtractor: extractor, } } @@ -85,15 +90,11 @@ func NoopMessageCallback() MessageCallback { // target state has been entered yet and `out` is used for printing out status messages // msgCallback gets called for every event with an 'Ready' condition == UNKNOWN with the event's message. func (w *waitForReadyConfig) Wait(name string, timeout time.Duration, msgCallback MessageCallback) (error, time.Duration) { - opts := v1.ListOptions{ - FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String(), - } - addWatchTimeout(&opts, timeout) floatingTimeout := timeout for { start := time.Now() - retry, timeoutReached, err := w.waitForReadyCondition(start, opts, name, floatingTimeout, msgCallback) + retry, timeoutReached, err := w.waitForReadyCondition(start, name, floatingTimeout, msgCallback) if err != nil { return err, time.Since(start) } @@ -121,9 +122,9 @@ func addWatchTimeout(opts *v1.ListOptions, timeout time.Duration) { opts.TimeoutSeconds = &timeOutWatchSeconds } -func (w *waitForReadyConfig) waitForReadyCondition(start time.Time, opts v1.ListOptions, name string, timeout time.Duration, msgCallback MessageCallback) (retry bool, timeoutReached bool, err error) { +func (w *waitForReadyConfig) waitForReadyCondition(start time.Time, name string, timeout time.Duration, msgCallback MessageCallback) (retry bool, timeoutReached bool, err error) { - watcher, err := w.watchFunc(opts) + watcher, err := w.watchMaker(name, timeout) if err != nil { return false, false, err } diff --git a/pkg/wait/wait_for_ready_test.go b/pkg/wait/wait_for_ready_test.go index 242d7bae8a..02384e4b2c 100644 --- a/pkg/wait/wait_for_ready_test.go +++ b/pkg/wait/wait_for_ready_test.go @@ -21,7 +21,6 @@ import ( "gotest.tools/assert" "gotest.tools/assert/cmp" corev1 "k8s.io/api/core/v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" "knative.dev/pkg/apis" @@ -42,7 +41,7 @@ func TestAddWaitForReady(t *testing.T) { waitForReady := NewWaitForReady( "blub", - func(opts v1.ListOptions) (watch.Interface, error) { + func(name string, timeout time.Duration) (watch.Interface, error) { return fakeWatchApi, nil }, func(obj runtime.Object) (apis.Conditions, error) { diff --git a/vendor/modules.txt b/vendor/modules.txt index ddb03b29bc..7791216cb2 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -405,8 +405,8 @@ k8s.io/apimachinery/pkg/api/meta k8s.io/apimachinery/pkg/util/runtime k8s.io/apimachinery/pkg/fields k8s.io/apimachinery/pkg/labels -k8s.io/apimachinery/pkg/runtime/schema k8s.io/apimachinery/pkg/watch +k8s.io/apimachinery/pkg/runtime/schema k8s.io/apimachinery/pkg/util/validation/field k8s.io/apimachinery/pkg/conversion k8s.io/apimachinery/pkg/selection @@ -423,6 +423,7 @@ k8s.io/apimachinery/pkg/util/naming k8s.io/apimachinery/pkg/util/net k8s.io/apimachinery/pkg/util/yaml k8s.io/apimachinery/pkg/runtime/serializer +k8s.io/apimachinery/pkg/runtime/serializer/streaming k8s.io/apimachinery/third_party/forked/golang/reflect k8s.io/apimachinery/pkg/runtime/serializer/streaming k8s.io/apimachinery/pkg/apis/meta/v1/unstructured From 9a9e3376b20986ebbb5e27b11220bff747de4e79 Mon Sep 17 00:00:00 2001 From: Naomi Seyfer Date: Thu, 7 Nov 2019 17:33:23 -0800 Subject: [PATCH 2/7] Tests pass, much cleaner --- pkg/serving/v1alpha1/client.go | 3 +- pkg/wait/poll_watcher.go | 141 +++++++++++++++++---------------- pkg/wait/wait_for_ready.go | 18 ----- 3 files changed, 76 insertions(+), 86 deletions(-) diff --git a/pkg/serving/v1alpha1/client.go b/pkg/serving/v1alpha1/client.go index 2f430aa8b7..395ff7e058 100644 --- a/pkg/serving/v1alpha1/client.go +++ b/pkg/serving/v1alpha1/client.go @@ -161,7 +161,8 @@ func (cl *knServingClient) GetService(name string) (*v1alpha1.Service, error) { } func (cl *knServingClient) WatchService(name string, timeout time.Duration) (watch.Interface, error) { - return wait.NewWatcher(cl.client.RESTClient(), cl.namespace, "services", name, timeout) + return wait.NewWatcher(cl.client.Services(cl.namespace).Watch, + cl.client.RESTClient(), cl.namespace, "services", name, timeout) } // List services diff --git a/pkg/wait/poll_watcher.go b/pkg/wait/poll_watcher.go index 04524310bc..f95fb1beae 100644 --- a/pkg/wait/poll_watcher.go +++ b/pkg/wait/poll_watcher.go @@ -15,7 +15,6 @@ package wait import ( - "fmt" "sync" "time" @@ -28,7 +27,7 @@ import ( "knative.dev/serving/pkg/client/clientset/versioned/scheme" ) -type PollingWatcher struct { +type pollingWatcher struct { c rest.Interface ns string resource string @@ -39,13 +38,16 @@ type PollingWatcher struct { wg *sync.WaitGroup } -func NewWatcher(c rest.Interface, ns string, resource string, name string, timeout time.Duration) (watch.Interface, error) { - native, err := nativeWatch(c, ns, resource, name, timeout) +type WatchFunc func(v1.ListOptions) (watch.Interface, error) + +// NewWatcher makes a watch.Interface on the given resource in the client, +// falling back to polling if the server does not support Watch. +func NewWatcher(watchFunc WatchFunc, c rest.Interface, ns string, resource string, name string, timeout time.Duration) (watch.Interface, error) { + native, err := nativeWatch(watchFunc, c, ns, resource, name, timeout) if err == nil { return native, nil } - fmt.Println("falling back to polling") - polling := &PollingWatcher{c, ns, resource, name, timeout, make(chan bool), make(chan watch.Event), &sync.WaitGroup{}} + polling := &pollingWatcher{c, ns, resource, name, timeout, make(chan bool), make(chan watch.Event), &sync.WaitGroup{}} err = polling.start() if err != nil { return nil, err @@ -53,7 +55,7 @@ func NewWatcher(c rest.Interface, ns string, resource string, name string, timeo return polling, nil } -func (w *PollingWatcher) poll() (runtime.Object, error) { +func (w *pollingWatcher) poll() (runtime.Object, error) { return w.c.Get(). Namespace(w.ns). Resource(w.resource). @@ -62,22 +64,7 @@ func (w *PollingWatcher) poll() (runtime.Object, error) { Get() } -func nativeWatch(c rest.Interface, ns string, resource string, name string, timeout time.Duration) (watch.Interface, error) { - opts := v1.ListOptions{ - FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String(), - } - opts.Watch = true - addWatchTimeout(&opts, timeout) - - return c.Get(). - Namespace(ns). - Resource(resource). - VersionedParams(&opts, scheme.ParameterCodec). - Timeout(timeout). - Watch() -} - -func (w *PollingWatcher) start() error { +func (w *pollingWatcher) start() error { w.wg.Add(1) go func() { @@ -85,65 +72,55 @@ func (w *PollingWatcher) start() error { var err error ticker := time.NewTicker(time.Second) defer ticker.Stop() - var obj, newobj runtime.Object - gotNotFound := false + var old, new runtime.Object done := false for !done { - obj = newobj + old = new + select { case <-ticker.C: - newobj, err = w.poll() - if err != nil { - if api_errors.IsNotFound(err) { - // This is ok. It's either a delete or not created yet. - if obj != nil { - // a delete - w.result <- watch.Event{ - Type: watch.Deleted, - Object: obj, - } - } - gotNotFound = true - continue - } else { - // Send an error and then break - w.result <- watch.Event{ - Type: watch.Error, - } - break + new, err = w.poll() + newObj, ok1 := new.(v1.Object) + oldObj, ok2 := old.(v1.Object) + + if old != nil && err != nil && api_errors.IsNotFound(err) { + // Deleted + w.result <- watch.Event{ + Type: watch.Deleted, + Object: old, } - } - if gotNotFound { - // Created + } else if err != nil { + // Just an error + w.result <- watch.Event{ + Type: watch.Error, + } + } else if old == nil && new != nil { + // Added w.result <- watch.Event{ Type: watch.Added, - Object: newobj, + Object: new, } - gotNotFound = false - continue - } - gotNotFound = false - // This could still be a create. Are the uids the same? - newObj, ok1 := newobj.(v1.Object) - oldObj, ok2 := obj.(v1.Object) - if ok1 && ok2 && newObj.GetUID() != oldObj.GetUID() { - // It's a delete and recreate + } else if !(ok1 && ok2) { + // Error wrong types + w.result <- watch.Event{ + Type: watch.Error, + } + } else if newObj.GetUID() != oldObj.GetUID() { + // Deleted and readded. w.result <- watch.Event{ Type: watch.Deleted, - Object: obj, + Object: old, } w.result <- watch.Event{ Type: watch.Added, - Object: newobj, + Object: new, } - continue - } - if ok1 && ok2 && newObj.GetResourceVersion() != oldObj.GetResourceVersion() { + } else if newObj.GetResourceVersion() != oldObj.GetResourceVersion() { + // Modified. w.result <- watch.Event{ Type: watch.Modified, - Object: newobj, + Object: new, } - continue } case done = <-w.done: break @@ -153,12 +130,42 @@ func (w *PollingWatcher) start() error { return nil } -func (w *PollingWatcher) ResultChan() <-chan watch.Event { +func (w *pollingWatcher) ResultChan() <-chan watch.Event { return w.result } -func (w *PollingWatcher) Stop() { +func (w *pollingWatcher) Stop() { w.done <- true w.wg.Wait() close(w.result) + close(w.done) +} + +func nativeWatch(watchFunc WatchFunc, c rest.Interface, ns string, resource string, name string, timeout time.Duration) (watch.Interface, error) { + opts := v1.ListOptions{ + FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String(), + } + opts.Watch = true + addWatchTimeout(&opts, timeout) + + if watchFunc != nil { + return watchFunc(opts) + } + return c.Get(). + Namespace(ns). + Resource(resource). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Watch() +} + +func addWatchTimeout(opts *v1.ListOptions, timeout time.Duration) { + if timeout == 0 { + return + } + // Wait for service to enter 'Ready' state, with a timeout of which is slightly larger than + // the provided timeout. We have our own timeout which fires after "timeout" seconds + // and stops the watch + timeOutWatchSeconds := int64((timeout + 30*time.Second) / time.Second) + opts.TimeoutSeconds = &timeOutWatchSeconds } diff --git a/pkg/wait/wait_for_ready.go b/pkg/wait/wait_for_ready.go index e21ea25ffd..957fa0f493 100644 --- a/pkg/wait/wait_for_ready.go +++ b/pkg/wait/wait_for_ready.go @@ -20,7 +20,6 @@ import ( "time" corev1 "k8s.io/api/core/v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" "knative.dev/pkg/apis" @@ -42,12 +41,6 @@ type WaitForReady interface { Wait(name string, timeout time.Duration, msgCallback MessageCallback) (error, time.Duration) } -// Utility alias for "A function like Watch from the client" -type WatchFunc func(opts v1.ListOptions) (watch.Interface, error) - -// Utility alias for "A function that takes the name and just gets the resource" -type GetFunc func(name string) (runtime.Object, error) - // Create watch which is used when waiting for Ready condition type WatchMaker func(name string, timeout time.Duration) (watch.Interface, error) @@ -111,17 +104,6 @@ func (w *waitForReadyConfig) Wait(name string, timeout time.Duration, msgCallbac } } -func addWatchTimeout(opts *v1.ListOptions, timeout time.Duration) { - if timeout == 0 { - return - } - // Wait for service to enter 'Ready' state, with a timeout of which is slightly larger than - // the provided timeout. We have our own timeout which fires after "timeout" seconds - // and stops the watch - timeOutWatchSeconds := int64((timeout + 30*time.Second) / time.Second) - opts.TimeoutSeconds = &timeOutWatchSeconds -} - func (w *waitForReadyConfig) waitForReadyCondition(start time.Time, name string, timeout time.Duration, msgCallback MessageCallback) (retry bool, timeoutReached bool, err error) { watcher, err := w.watchMaker(name, timeout) From eb3b0a259859c82b6a25e9ffb01d7a850a5f646f Mon Sep 17 00:00:00 2001 From: Naomi Seyfer Date: Sat, 9 Nov 2019 14:28:42 -0800 Subject: [PATCH 3/7] tests and fixes for polling watcher --- pkg/wait/poll_watcher.go | 65 +++++++++++++++------ pkg/wait/poll_watcher_test.go | 107 ++++++++++++++++++++++++++++++++++ 2 files changed, 154 insertions(+), 18 deletions(-) create mode 100644 pkg/wait/poll_watcher_test.go diff --git a/pkg/wait/poll_watcher.go b/pkg/wait/poll_watcher.go index f95fb1beae..187a72180c 100644 --- a/pkg/wait/poll_watcher.go +++ b/pkg/wait/poll_watcher.go @@ -27,6 +27,11 @@ import ( "knative.dev/serving/pkg/client/clientset/versioned/scheme" ) +type PollInterval interface { + PollChan() <-chan time.Time + Stop() +} + type pollingWatcher struct { c rest.Interface ns string @@ -36,10 +41,30 @@ type pollingWatcher struct { done chan bool result chan watch.Event wg *sync.WaitGroup + // we can mock the interface for testing. + pollInterval PollInterval + // mock hook for testing. + poll func() (runtime.Object, error) } type WatchFunc func(v1.ListOptions) (watch.Interface, error) +type tickerPollInterval struct { + t *time.Ticker +} + +func (t *tickerPollInterval) PollChan() <-chan time.Time { + return t.t.C +} + +func (t *tickerPollInterval) Stop() { + t.t.Stop() +} + +func newTickerPollInterval(d time.Duration) *tickerPollInterval { + return &tickerPollInterval{time.NewTicker(d)} +} + // NewWatcher makes a watch.Interface on the given resource in the client, // falling back to polling if the server does not support Watch. func NewWatcher(watchFunc WatchFunc, c rest.Interface, ns string, resource string, name string, timeout time.Duration) (watch.Interface, error) { @@ -47,7 +72,9 @@ func NewWatcher(watchFunc WatchFunc, c rest.Interface, ns string, resource strin if err == nil { return native, nil } - polling := &pollingWatcher{c, ns, resource, name, timeout, make(chan bool), make(chan watch.Event), &sync.WaitGroup{}} + polling := &pollingWatcher{ + c, ns, resource, name, timeout, make(chan bool), make(chan watch.Event), &sync.WaitGroup{}, + newTickerPollInterval(time.Second), nativePoll(c, ns, resource, name)} err = polling.start() if err != nil { return nil, err @@ -55,40 +82,33 @@ func NewWatcher(watchFunc WatchFunc, c rest.Interface, ns string, resource strin return polling, nil } -func (w *pollingWatcher) poll() (runtime.Object, error) { - return w.c.Get(). - Namespace(w.ns). - Resource(w.resource). - Name(w.name). - Do(). - Get() -} - func (w *pollingWatcher) start() error { w.wg.Add(1) go func() { defer w.wg.Done() + defer w.pollInterval.Stop() var err error - ticker := time.NewTicker(time.Second) - defer ticker.Stop() var old, new runtime.Object done := false for !done { old = new select { - case <-ticker.C: + case <-w.pollInterval.PollChan(): new, err = w.poll() newObj, ok1 := new.(v1.Object) oldObj, ok2 := old.(v1.Object) - if old != nil && err != nil && api_errors.IsNotFound(err) { - // Deleted - w.result <- watch.Event{ - Type: watch.Deleted, - Object: old, + if err != nil && api_errors.IsNotFound(err) { + if old != nil { + // Deleted + w.result <- watch.Event{ + Type: watch.Deleted, + Object: old, + } } + //... Otherwise maybe just doesn't exist. } else if err != nil { // Just an error w.result <- watch.Event{ @@ -151,6 +171,9 @@ func nativeWatch(watchFunc WatchFunc, c rest.Interface, ns string, resource stri if watchFunc != nil { return watchFunc(opts) } + // Technically the watchFunc isn't necessary, we could just do this. But the + // watchFunc is *much* easier to mock, so we might as well plumb it in for + // ease of testing. return c.Get(). Namespace(ns). Resource(resource). @@ -159,6 +182,12 @@ func nativeWatch(watchFunc WatchFunc, c rest.Interface, ns string, resource stri Watch() } +func nativePoll(c rest.Interface, ns, resource, name string) func() (runtime.Object, error) { + return func() (runtime.Object, error) { + return c.Get().Namespace(ns).Resource(resource).Name(name).Do().Get() + } +} + func addWatchTimeout(opts *v1.ListOptions, timeout time.Duration) { if timeout == 0 { return diff --git a/pkg/wait/poll_watcher_test.go b/pkg/wait/poll_watcher_test.go new file mode 100644 index 0000000000..5223148890 --- /dev/null +++ b/pkg/wait/poll_watcher_test.go @@ -0,0 +1,107 @@ +// Copyright © 2019 The Knative Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wait + +import ( + "fmt" + "sync" + "testing" + "time" + + "gotest.tools/assert" + api_errors "k8s.io/apimachinery/pkg/api/errors" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + + "knative.dev/serving/pkg/apis/serving/v1alpha1" +) + +type fakePollInterval struct { + c chan time.Time +} + +func (f *fakePollInterval) PollChan() <-chan time.Time { + return f.c +} + +func (f *fakePollInterval) Stop() {} + +func newFakePollInterval(n int) PollInterval { + c := make(chan time.Time, n) + t := time.Now() + for i := 0; i < n; i++ { + c <- t.Add(time.Duration(i) * time.Second) + } + return &fakePollInterval{c} +} + +func newWatcherForTest(pollResults []runtime.Object) watch.Interface { + i := 0 + poll := func() (runtime.Object, error) { + defer func() { i += 1 }() + if pollResults[i] == nil { + // 404 + return nil, api_errors.NewNotFound(schema.GroupResource{"thing", "stuff"}, "eggs") + } + return pollResults[i], nil + } + ret := &pollingWatcher{nil, "", "", "", time.Minute, make(chan bool), make(chan watch.Event), &sync.WaitGroup{}, + newFakePollInterval(len(pollResults)), poll} + ret.start() + return ret +} + +var a, aa, b, bb, c, cc, z, zz runtime.Object + +func init() { + a = &v1alpha1.Service{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "a", UID: "one"}} + aa = a.DeepCopyObject() + b = &v1alpha1.Service{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "b", UID: "one"}} + bb = b.DeepCopyObject() + c = &v1alpha1.Service{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "c", UID: "one"}} + cc = c.DeepCopyObject() + z = &v1alpha1.Service{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "z", UID: "two"}} + zz = z.DeepCopyObject() +} + +type testCase struct { + pollResults []runtime.Object + watchResults []watch.Event +} + +func TestPollWatcher(t *testing.T) { + cases := []testCase{ + {[]runtime.Object{nil, nil, a, aa, nil}, []watch.Event{{watch.Added, a}, {watch.Deleted, a}}}, + {[]runtime.Object{a, b}, []watch.Event{{watch.Added, a}, {watch.Modified, b}}}, + {[]runtime.Object{a, aa, b, bb, c, cc, nil}, + []watch.Event{{watch.Added, a}, {watch.Modified, b}, {watch.Modified, c}, {watch.Deleted, c}}}, + {[]runtime.Object{a, z}, []watch.Event{{watch.Added, a}, {watch.Deleted, a}, {watch.Added, z}}}, + } + for _, c := range cases { + w := newWatcherForTest(c.pollResults) + for _, expected := range c.watchResults { + actual := <-w.ResultChan() + assert.Equal(t, actual.Type, expected.Type) + if actual.Type == watch.Added || actual.Type == watch.Modified || actual.Type == watch.Deleted { + fmt.Printf("expected, %v, actual %v\n", expected, actual) + assert.Equal(t, actual.Object.(metav1.Object).GetResourceVersion(), expected.Object.(metav1.Object).GetResourceVersion()) + assert.Equal(t, actual.Object.(metav1.Object).GetUID(), expected.Object.(metav1.Object).GetUID()) + } + } + } +} From dbf5402da3e85801618c731b1bb640468b7c2242 Mon Sep 17 00:00:00 2001 From: Naomi Seyfer Date: Sat, 9 Nov 2019 14:40:11 -0800 Subject: [PATCH 4/7] Clean up a lil --- pkg/wait/poll_watcher.go | 19 +++---------------- pkg/wait/poll_watcher_test.go | 5 +++++ 2 files changed, 8 insertions(+), 16 deletions(-) diff --git a/pkg/wait/poll_watcher.go b/pkg/wait/poll_watcher.go index 187a72180c..c5cef14f93 100644 --- a/pkg/wait/poll_watcher.go +++ b/pkg/wait/poll_watcher.go @@ -24,7 +24,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/rest" - "knative.dev/serving/pkg/client/clientset/versioned/scheme" ) type PollInterval interface { @@ -68,7 +67,7 @@ func newTickerPollInterval(d time.Duration) *tickerPollInterval { // NewWatcher makes a watch.Interface on the given resource in the client, // falling back to polling if the server does not support Watch. func NewWatcher(watchFunc WatchFunc, c rest.Interface, ns string, resource string, name string, timeout time.Duration) (watch.Interface, error) { - native, err := nativeWatch(watchFunc, c, ns, resource, name, timeout) + native, err := nativeWatch(watchFunc, name, timeout) if err == nil { return native, nil } @@ -161,25 +160,13 @@ func (w *pollingWatcher) Stop() { close(w.done) } -func nativeWatch(watchFunc WatchFunc, c rest.Interface, ns string, resource string, name string, timeout time.Duration) (watch.Interface, error) { +func nativeWatch(watchFunc WatchFunc, name string, timeout time.Duration) (watch.Interface, error) { opts := v1.ListOptions{ FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String(), } opts.Watch = true addWatchTimeout(&opts, timeout) - - if watchFunc != nil { - return watchFunc(opts) - } - // Technically the watchFunc isn't necessary, we could just do this. But the - // watchFunc is *much* easier to mock, so we might as well plumb it in for - // ease of testing. - return c.Get(). - Namespace(ns). - Resource(resource). - VersionedParams(&opts, scheme.ParameterCodec). - Timeout(timeout). - Watch() + return watchFunc(opts) } func nativePoll(c rest.Interface, ns, resource, name string) func() (runtime.Object, error) { diff --git a/pkg/wait/poll_watcher_test.go b/pkg/wait/poll_watcher_test.go index 5223148890..8ea4f8ea5f 100644 --- a/pkg/wait/poll_watcher_test.go +++ b/pkg/wait/poll_watcher_test.go @@ -86,10 +86,14 @@ type testCase struct { func TestPollWatcher(t *testing.T) { cases := []testCase{ + // Doesn't exist for a while, then does for a while. {[]runtime.Object{nil, nil, a, aa, nil}, []watch.Event{{watch.Added, a}, {watch.Deleted, a}}}, + // Changes. {[]runtime.Object{a, b}, []watch.Event{{watch.Added, a}, {watch.Modified, b}}}, + // Changes but stays the same a couple times too. {[]runtime.Object{a, aa, b, bb, c, cc, nil}, []watch.Event{{watch.Added, a}, {watch.Modified, b}, {watch.Modified, c}, {watch.Deleted, c}}}, + // Deleted and recreated between polls. {[]runtime.Object{a, z}, []watch.Event{{watch.Added, a}, {watch.Deleted, a}, {watch.Added, z}}}, } for _, c := range cases { @@ -103,5 +107,6 @@ func TestPollWatcher(t *testing.T) { assert.Equal(t, actual.Object.(metav1.Object).GetUID(), expected.Object.(metav1.Object).GetUID()) } } + w.Stop() } } From 0eb9188c0c582e200a75aa8d72c5a45ec2ad2235 Mon Sep 17 00:00:00 2001 From: Naomi Seyfer Date: Sat, 9 Nov 2019 14:53:21 -0800 Subject: [PATCH 5/7] lint --- pkg/wait/poll_watcher.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pkg/wait/poll_watcher.go b/pkg/wait/poll_watcher.go index c5cef14f93..5d3efc9c79 100644 --- a/pkg/wait/poll_watcher.go +++ b/pkg/wait/poll_watcher.go @@ -26,6 +26,8 @@ import ( "k8s.io/client-go/rest" ) +// PollInterval determins when you should poll. Useful to mock out, or for +// replacing with exponential backoff later. type PollInterval interface { PollChan() <-chan time.Time Stop() @@ -46,7 +48,7 @@ type pollingWatcher struct { poll func() (runtime.Object, error) } -type WatchFunc func(v1.ListOptions) (watch.Interface, error) +type watchF func(v1.ListOptions) (watch.Interface, error) type tickerPollInterval struct { t *time.Ticker @@ -66,7 +68,7 @@ func newTickerPollInterval(d time.Duration) *tickerPollInterval { // NewWatcher makes a watch.Interface on the given resource in the client, // falling back to polling if the server does not support Watch. -func NewWatcher(watchFunc WatchFunc, c rest.Interface, ns string, resource string, name string, timeout time.Duration) (watch.Interface, error) { +func NewWatcher(watchFunc watchF, c rest.Interface, ns string, resource string, name string, timeout time.Duration) (watch.Interface, error) { native, err := nativeWatch(watchFunc, name, timeout) if err == nil { return native, nil @@ -160,7 +162,7 @@ func (w *pollingWatcher) Stop() { close(w.done) } -func nativeWatch(watchFunc WatchFunc, name string, timeout time.Duration) (watch.Interface, error) { +func nativeWatch(watchFunc watchF, name string, timeout time.Duration) (watch.Interface, error) { opts := v1.ListOptions{ FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String(), } From fb36bb0eb9910975c9c1945b4e804a9ebba0cc95 Mon Sep 17 00:00:00 2001 From: Naomi Seyfer Date: Tue, 19 Nov 2019 10:00:01 -0800 Subject: [PATCH 6/7] Nits --- pkg/wait/poll_watcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/wait/poll_watcher.go b/pkg/wait/poll_watcher.go index 5d3efc9c79..9db78ee76b 100644 --- a/pkg/wait/poll_watcher.go +++ b/pkg/wait/poll_watcher.go @@ -26,7 +26,7 @@ import ( "k8s.io/client-go/rest" ) -// PollInterval determins when you should poll. Useful to mock out, or for +// PollInterval determines when you should poll. Useful to mock out, or for // replacing with exponential backoff later. type PollInterval interface { PollChan() <-chan time.Time From 2d62934c2d74be5d6562c17b3b25d13ee79c7227 Mon Sep 17 00:00:00 2001 From: Naomi Seyfer Date: Tue, 19 Nov 2019 10:19:07 -0800 Subject: [PATCH 7/7] build -u --- vendor/modules.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/vendor/modules.txt b/vendor/modules.txt index 7791216cb2..389d9eb466 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -425,7 +425,6 @@ k8s.io/apimachinery/pkg/util/yaml k8s.io/apimachinery/pkg/runtime/serializer k8s.io/apimachinery/pkg/runtime/serializer/streaming k8s.io/apimachinery/third_party/forked/golang/reflect -k8s.io/apimachinery/pkg/runtime/serializer/streaming k8s.io/apimachinery/pkg/apis/meta/v1/unstructured k8s.io/apimachinery/pkg/util/mergepatch k8s.io/apimachinery/third_party/forked/golang/json