diff --git a/go.sum b/go.sum index 9d264d55bd..08c1f926bd 100644 --- a/go.sum +++ b/go.sum @@ -188,6 +188,7 @@ github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc= github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/json-iterator/go v0.0.0-20180612202835-f2b4162afba3/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= +github.com/json-iterator/go v0.0.0-20180701071628-ab8a2e0c74be h1:AHimNtVIpiBjPUhEF5KNCkrUyqTSA5zWUl8sQ2bfGBE= github.com/json-iterator/go v0.0.0-20180701071628-ab8a2e0c74be/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.7 h1:KfgG9LzI+pYjr4xvmz/5H4FXjokeP+rlHLhv3iH62Fo= github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= @@ -244,6 +245,7 @@ github.com/onsi/ginkgo v1.10.1 h1:q/mM8GF/n0shIN8SaAZ0V+jnLPzen6WIVZdiwrRlMlo= github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c h1:eSfnfIuwhxZyULg1NNuZycJcYkjYVGYe7FczwQReM6U= github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= +github.com/onsi/gomega v0.0.0-20190113212917-5533ce8a0da3 h1:EooPXg51Tn+xmWPXJUGCnJhJSpeuMlBmfJVcqIRmmv8= github.com/onsi/gomega v0.0.0-20190113212917-5533ce8a0da3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME= @@ -480,6 +482,7 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/inf.v0 v0.9.0 h1:3zYtXIO92bvsdS3ggAdA8Gb4Azj0YU+TVY1uGYNFA8o= gopkg.in/inf.v0 v0.9.0/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= @@ -518,6 +521,7 @@ k8s.io/klog v0.3.1 h1:RVgyDHY/kFKtLqh67NvEWIgkMneNoIrdkN0CxDSQc68= k8s.io/klog v0.3.1/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk= k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8= k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I= +k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30 h1:TRb4wNWoBVrH9plmkp2q86FIDppkbrEXdXlxU3a3BMI= k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30/go.mod h1:BXM9ceUBTj2QnfH2MK1odQs778ajze1RxcmP6S8RVVc= k8s.io/kube-openapi v0.0.0-20190816220812-743ec37842bf h1:EYm5AW/UUDbnmnI+gK0TJDVK9qPLhM+sRHYanNKw0EQ= k8s.io/kube-openapi v0.0.0-20190816220812-743ec37842bf/go.mod h1:1TqjTSzOxsLGIKfj0lK8EeCP7K1iUG65v09OM0/WG5E= diff --git a/pkg/serving/v1alpha1/client.go b/pkg/serving/v1alpha1/client.go index 7dd62100ec..395ff7e058 100644 --- a/pkg/serving/v1alpha1/client.go +++ b/pkg/serving/v1alpha1/client.go @@ -29,6 +29,7 @@ import ( v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" api_serving "knative.dev/serving/pkg/apis/serving" "knative.dev/serving/pkg/apis/serving/v1alpha1" client_v1alpha1 "knative.dev/serving/pkg/client/clientset/versioned/typed/serving/v1alpha1" @@ -159,6 +160,11 @@ func (cl *knServingClient) GetService(name string) (*v1alpha1.Service, error) { return service, nil } +func (cl *knServingClient) WatchService(name string, timeout time.Duration) (watch.Interface, error) { + return wait.NewWatcher(cl.client.Services(cl.namespace).Watch, + cl.client.RESTClient(), cl.namespace, "services", name, timeout) +} + // List services func (cl *knServingClient) ListServices(config ...ListConfig) (*v1alpha1.ServiceList, error) { serviceList, err := cl.client.Services(cl.namespace).List(ListConfigs(config).toListOptions()) @@ -216,7 +222,7 @@ func (cl *knServingClient) DeleteService(serviceName string) error { // Wait for a service to become ready, but not longer than provided timeout func (cl *knServingClient) WaitForService(name string, timeout time.Duration, msgCallback wait.MessageCallback) (error, time.Duration) { - waitForReady := newServiceWaitForReady(cl.client.Services(cl.namespace).Watch) + waitForReady := wait.NewWaitForReady("service", cl.WatchService, serviceConditionExtractor) return waitForReady.Wait(name, timeout, msgCallback) } @@ -390,16 +396,6 @@ func updateServingGvk(obj runtime.Object) error { return util.UpdateGroupVersionKindWithScheme(obj, v1alpha1.SchemeGroupVersion, scheme.Scheme) } -// Create wait arguments for a Knative service which can be used to wait for -// a create/update options to be finished -// Can be used by `service_create` and `service_update`, hence this extra file -func newServiceWaitForReady(watch wait.WatchFunc) wait.WaitForReady { - return wait.NewWaitForReady( - "service", - watch, - serviceConditionExtractor) -} - func serviceConditionExtractor(obj runtime.Object) (apis.Conditions, error) { service, ok := obj.(*v1alpha1.Service) if !ok { diff --git a/pkg/wait/poll_watcher.go b/pkg/wait/poll_watcher.go new file mode 100644 index 0000000000..9db78ee76b --- /dev/null +++ b/pkg/wait/poll_watcher.go @@ -0,0 +1,189 @@ +// Copyright © 2019 The Knative Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wait + +import ( + "sync" + "time" + + api_errors "k8s.io/apimachinery/pkg/api/errors" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/rest" +) + +// PollInterval determines when you should poll. Useful to mock out, or for +// replacing with exponential backoff later. +type PollInterval interface { + PollChan() <-chan time.Time + Stop() +} + +type pollingWatcher struct { + c rest.Interface + ns string + resource string + name string + timeout time.Duration + done chan bool + result chan watch.Event + wg *sync.WaitGroup + // we can mock the interface for testing. + pollInterval PollInterval + // mock hook for testing. + poll func() (runtime.Object, error) +} + +type watchF func(v1.ListOptions) (watch.Interface, error) + +type tickerPollInterval struct { + t *time.Ticker +} + +func (t *tickerPollInterval) PollChan() <-chan time.Time { + return t.t.C +} + +func (t *tickerPollInterval) Stop() { + t.t.Stop() +} + +func newTickerPollInterval(d time.Duration) *tickerPollInterval { + return &tickerPollInterval{time.NewTicker(d)} +} + +// NewWatcher makes a watch.Interface on the given resource in the client, +// falling back to polling if the server does not support Watch. +func NewWatcher(watchFunc watchF, c rest.Interface, ns string, resource string, name string, timeout time.Duration) (watch.Interface, error) { + native, err := nativeWatch(watchFunc, name, timeout) + if err == nil { + return native, nil + } + polling := &pollingWatcher{ + c, ns, resource, name, timeout, make(chan bool), make(chan watch.Event), &sync.WaitGroup{}, + newTickerPollInterval(time.Second), nativePoll(c, ns, resource, name)} + err = polling.start() + if err != nil { + return nil, err + } + return polling, nil +} + +func (w *pollingWatcher) start() error { + w.wg.Add(1) + + go func() { + defer w.wg.Done() + defer w.pollInterval.Stop() + var err error + var old, new runtime.Object + done := false + for !done { + old = new + + select { + case <-w.pollInterval.PollChan(): + new, err = w.poll() + newObj, ok1 := new.(v1.Object) + oldObj, ok2 := old.(v1.Object) + + if err != nil && api_errors.IsNotFound(err) { + if old != nil { + // Deleted + w.result <- watch.Event{ + Type: watch.Deleted, + Object: old, + } + } + //... Otherwise maybe just doesn't exist. + } else if err != nil { + // Just an error + w.result <- watch.Event{ + Type: watch.Error, + } + } else if old == nil && new != nil { + // Added + w.result <- watch.Event{ + Type: watch.Added, + Object: new, + } + } else if !(ok1 && ok2) { + // Error wrong types + w.result <- watch.Event{ + Type: watch.Error, + } + } else if newObj.GetUID() != oldObj.GetUID() { + // Deleted and readded. + w.result <- watch.Event{ + Type: watch.Deleted, + Object: old, + } + w.result <- watch.Event{ + Type: watch.Added, + Object: new, + } + } else if newObj.GetResourceVersion() != oldObj.GetResourceVersion() { + // Modified. + w.result <- watch.Event{ + Type: watch.Modified, + Object: new, + } + } + case done = <-w.done: + break + } + } + }() + return nil +} + +func (w *pollingWatcher) ResultChan() <-chan watch.Event { + return w.result +} + +func (w *pollingWatcher) Stop() { + w.done <- true + w.wg.Wait() + close(w.result) + close(w.done) +} + +func nativeWatch(watchFunc watchF, name string, timeout time.Duration) (watch.Interface, error) { + opts := v1.ListOptions{ + FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String(), + } + opts.Watch = true + addWatchTimeout(&opts, timeout) + return watchFunc(opts) +} + +func nativePoll(c rest.Interface, ns, resource, name string) func() (runtime.Object, error) { + return func() (runtime.Object, error) { + return c.Get().Namespace(ns).Resource(resource).Name(name).Do().Get() + } +} + +func addWatchTimeout(opts *v1.ListOptions, timeout time.Duration) { + if timeout == 0 { + return + } + // Wait for service to enter 'Ready' state, with a timeout of which is slightly larger than + // the provided timeout. We have our own timeout which fires after "timeout" seconds + // and stops the watch + timeOutWatchSeconds := int64((timeout + 30*time.Second) / time.Second) + opts.TimeoutSeconds = &timeOutWatchSeconds +} diff --git a/pkg/wait/poll_watcher_test.go b/pkg/wait/poll_watcher_test.go new file mode 100644 index 0000000000..8ea4f8ea5f --- /dev/null +++ b/pkg/wait/poll_watcher_test.go @@ -0,0 +1,112 @@ +// Copyright © 2019 The Knative Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wait + +import ( + "fmt" + "sync" + "testing" + "time" + + "gotest.tools/assert" + api_errors "k8s.io/apimachinery/pkg/api/errors" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + + "knative.dev/serving/pkg/apis/serving/v1alpha1" +) + +type fakePollInterval struct { + c chan time.Time +} + +func (f *fakePollInterval) PollChan() <-chan time.Time { + return f.c +} + +func (f *fakePollInterval) Stop() {} + +func newFakePollInterval(n int) PollInterval { + c := make(chan time.Time, n) + t := time.Now() + for i := 0; i < n; i++ { + c <- t.Add(time.Duration(i) * time.Second) + } + return &fakePollInterval{c} +} + +func newWatcherForTest(pollResults []runtime.Object) watch.Interface { + i := 0 + poll := func() (runtime.Object, error) { + defer func() { i += 1 }() + if pollResults[i] == nil { + // 404 + return nil, api_errors.NewNotFound(schema.GroupResource{"thing", "stuff"}, "eggs") + } + return pollResults[i], nil + } + ret := &pollingWatcher{nil, "", "", "", time.Minute, make(chan bool), make(chan watch.Event), &sync.WaitGroup{}, + newFakePollInterval(len(pollResults)), poll} + ret.start() + return ret +} + +var a, aa, b, bb, c, cc, z, zz runtime.Object + +func init() { + a = &v1alpha1.Service{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "a", UID: "one"}} + aa = a.DeepCopyObject() + b = &v1alpha1.Service{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "b", UID: "one"}} + bb = b.DeepCopyObject() + c = &v1alpha1.Service{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "c", UID: "one"}} + cc = c.DeepCopyObject() + z = &v1alpha1.Service{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "z", UID: "two"}} + zz = z.DeepCopyObject() +} + +type testCase struct { + pollResults []runtime.Object + watchResults []watch.Event +} + +func TestPollWatcher(t *testing.T) { + cases := []testCase{ + // Doesn't exist for a while, then does for a while. + {[]runtime.Object{nil, nil, a, aa, nil}, []watch.Event{{watch.Added, a}, {watch.Deleted, a}}}, + // Changes. + {[]runtime.Object{a, b}, []watch.Event{{watch.Added, a}, {watch.Modified, b}}}, + // Changes but stays the same a couple times too. + {[]runtime.Object{a, aa, b, bb, c, cc, nil}, + []watch.Event{{watch.Added, a}, {watch.Modified, b}, {watch.Modified, c}, {watch.Deleted, c}}}, + // Deleted and recreated between polls. + {[]runtime.Object{a, z}, []watch.Event{{watch.Added, a}, {watch.Deleted, a}, {watch.Added, z}}}, + } + for _, c := range cases { + w := newWatcherForTest(c.pollResults) + for _, expected := range c.watchResults { + actual := <-w.ResultChan() + assert.Equal(t, actual.Type, expected.Type) + if actual.Type == watch.Added || actual.Type == watch.Modified || actual.Type == watch.Deleted { + fmt.Printf("expected, %v, actual %v\n", expected, actual) + assert.Equal(t, actual.Object.(metav1.Object).GetResourceVersion(), expected.Object.(metav1.Object).GetResourceVersion()) + assert.Equal(t, actual.Object.(metav1.Object).GetUID(), expected.Object.(metav1.Object).GetUID()) + } + } + w.Stop() + } +} diff --git a/pkg/wait/wait_for_ready.go b/pkg/wait/wait_for_ready.go index a4a67db421..957fa0f493 100644 --- a/pkg/wait/wait_for_ready.go +++ b/pkg/wait/wait_for_ready.go @@ -20,8 +20,6 @@ import ( "time" corev1 "k8s.io/api/core/v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" "knative.dev/pkg/apis" @@ -29,7 +27,7 @@ import ( // Callbacks and configuration used while waiting type waitForReadyConfig struct { - watchFunc WatchFunc + watchMaker WatchMaker conditionsExtractor ConditionsExtractor kind string } @@ -44,7 +42,7 @@ type WaitForReady interface { } // Create watch which is used when waiting for Ready condition -type WatchFunc func(opts v1.ListOptions) (watch.Interface, error) +type WatchMaker func(name string, timeout time.Duration) (watch.Interface, error) // Extract conditions from a runtime object type ConditionsExtractor func(obj runtime.Object) (apis.Conditions, error) @@ -53,10 +51,10 @@ type ConditionsExtractor func(obj runtime.Object) (apis.Conditions, error) type MessageCallback func(durationSinceState time.Duration, message string) // Constructor with resource type specific configuration -func NewWaitForReady(kind string, watchFunc WatchFunc, extractor ConditionsExtractor) WaitForReady { +func NewWaitForReady(kind string, watchMaker WatchMaker, extractor ConditionsExtractor) WaitForReady { return &waitForReadyConfig{ kind: kind, - watchFunc: watchFunc, + watchMaker: watchMaker, conditionsExtractor: extractor, } } @@ -85,15 +83,11 @@ func NoopMessageCallback() MessageCallback { // target state has been entered yet and `out` is used for printing out status messages // msgCallback gets called for every event with an 'Ready' condition == UNKNOWN with the event's message. func (w *waitForReadyConfig) Wait(name string, timeout time.Duration, msgCallback MessageCallback) (error, time.Duration) { - opts := v1.ListOptions{ - FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String(), - } - addWatchTimeout(&opts, timeout) floatingTimeout := timeout for { start := time.Now() - retry, timeoutReached, err := w.waitForReadyCondition(start, opts, name, floatingTimeout, msgCallback) + retry, timeoutReached, err := w.waitForReadyCondition(start, name, floatingTimeout, msgCallback) if err != nil { return err, time.Since(start) } @@ -110,20 +104,9 @@ func (w *waitForReadyConfig) Wait(name string, timeout time.Duration, msgCallbac } } -func addWatchTimeout(opts *v1.ListOptions, timeout time.Duration) { - if timeout == 0 { - return - } - // Wait for service to enter 'Ready' state, with a timeout of which is slightly larger than - // the provided timeout. We have our own timeout which fires after "timeout" seconds - // and stops the watch - timeOutWatchSeconds := int64((timeout + 30*time.Second) / time.Second) - opts.TimeoutSeconds = &timeOutWatchSeconds -} - -func (w *waitForReadyConfig) waitForReadyCondition(start time.Time, opts v1.ListOptions, name string, timeout time.Duration, msgCallback MessageCallback) (retry bool, timeoutReached bool, err error) { +func (w *waitForReadyConfig) waitForReadyCondition(start time.Time, name string, timeout time.Duration, msgCallback MessageCallback) (retry bool, timeoutReached bool, err error) { - watcher, err := w.watchFunc(opts) + watcher, err := w.watchMaker(name, timeout) if err != nil { return false, false, err } diff --git a/pkg/wait/wait_for_ready_test.go b/pkg/wait/wait_for_ready_test.go index 242d7bae8a..02384e4b2c 100644 --- a/pkg/wait/wait_for_ready_test.go +++ b/pkg/wait/wait_for_ready_test.go @@ -21,7 +21,6 @@ import ( "gotest.tools/assert" "gotest.tools/assert/cmp" corev1 "k8s.io/api/core/v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" "knative.dev/pkg/apis" @@ -42,7 +41,7 @@ func TestAddWaitForReady(t *testing.T) { waitForReady := NewWaitForReady( "blub", - func(opts v1.ListOptions) (watch.Interface, error) { + func(name string, timeout time.Duration) (watch.Interface, error) { return fakeWatchApi, nil }, func(obj runtime.Object) (apis.Conditions, error) { diff --git a/vendor/modules.txt b/vendor/modules.txt index ddb03b29bc..389d9eb466 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -405,8 +405,8 @@ k8s.io/apimachinery/pkg/api/meta k8s.io/apimachinery/pkg/util/runtime k8s.io/apimachinery/pkg/fields k8s.io/apimachinery/pkg/labels -k8s.io/apimachinery/pkg/runtime/schema k8s.io/apimachinery/pkg/watch +k8s.io/apimachinery/pkg/runtime/schema k8s.io/apimachinery/pkg/util/validation/field k8s.io/apimachinery/pkg/conversion k8s.io/apimachinery/pkg/selection @@ -423,8 +423,8 @@ k8s.io/apimachinery/pkg/util/naming k8s.io/apimachinery/pkg/util/net k8s.io/apimachinery/pkg/util/yaml k8s.io/apimachinery/pkg/runtime/serializer -k8s.io/apimachinery/third_party/forked/golang/reflect k8s.io/apimachinery/pkg/runtime/serializer/streaming +k8s.io/apimachinery/third_party/forked/golang/reflect k8s.io/apimachinery/pkg/apis/meta/v1/unstructured k8s.io/apimachinery/pkg/util/mergepatch k8s.io/apimachinery/third_party/forked/golang/json