Skip to content

Commit

Permalink
In waiting for ready, watch falls back to polling. (#491)
Browse files Browse the repository at this point in the history
* Fall back to polling-based watcher

* Tests pass, much cleaner

* tests and fixes for polling watcher

* Clean up a lil

* lint

* Nits

* build -u
  • Loading branch information
sixolet authored and knative-prow-robot committed Nov 20, 2019
1 parent cadc2d3 commit e07b5a9
Show file tree
Hide file tree
Showing 7 changed files with 322 additions and 39 deletions.
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc=
github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/json-iterator/go v0.0.0-20180612202835-f2b4162afba3/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v0.0.0-20180701071628-ab8a2e0c74be h1:AHimNtVIpiBjPUhEF5KNCkrUyqTSA5zWUl8sQ2bfGBE=
github.com/json-iterator/go v0.0.0-20180701071628-ab8a2e0c74be/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.7 h1:KfgG9LzI+pYjr4xvmz/5H4FXjokeP+rlHLhv3iH62Fo=
github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
Expand Down Expand Up @@ -244,6 +245,7 @@ github.com/onsi/ginkgo v1.10.1 h1:q/mM8GF/n0shIN8SaAZ0V+jnLPzen6WIVZdiwrRlMlo=
github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c h1:eSfnfIuwhxZyULg1NNuZycJcYkjYVGYe7FczwQReM6U=
github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
github.com/onsi/gomega v0.0.0-20190113212917-5533ce8a0da3 h1:EooPXg51Tn+xmWPXJUGCnJhJSpeuMlBmfJVcqIRmmv8=
github.com/onsi/gomega v0.0.0-20190113212917-5533ce8a0da3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME=
Expand Down Expand Up @@ -480,6 +482,7 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/inf.v0 v0.9.0 h1:3zYtXIO92bvsdS3ggAdA8Gb4Azj0YU+TVY1uGYNFA8o=
gopkg.in/inf.v0 v0.9.0/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
Expand Down Expand Up @@ -518,6 +521,7 @@ k8s.io/klog v0.3.1 h1:RVgyDHY/kFKtLqh67NvEWIgkMneNoIrdkN0CxDSQc68=
k8s.io/klog v0.3.1/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8=
k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I=
k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30 h1:TRb4wNWoBVrH9plmkp2q86FIDppkbrEXdXlxU3a3BMI=
k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30/go.mod h1:BXM9ceUBTj2QnfH2MK1odQs778ajze1RxcmP6S8RVVc=
k8s.io/kube-openapi v0.0.0-20190816220812-743ec37842bf h1:EYm5AW/UUDbnmnI+gK0TJDVK9qPLhM+sRHYanNKw0EQ=
k8s.io/kube-openapi v0.0.0-20190816220812-743ec37842bf/go.mod h1:1TqjTSzOxsLGIKfj0lK8EeCP7K1iUG65v09OM0/WG5E=
Expand Down
18 changes: 7 additions & 11 deletions pkg/serving/v1alpha1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
api_serving "knative.dev/serving/pkg/apis/serving"
"knative.dev/serving/pkg/apis/serving/v1alpha1"
client_v1alpha1 "knative.dev/serving/pkg/client/clientset/versioned/typed/serving/v1alpha1"
Expand Down Expand Up @@ -159,6 +160,11 @@ func (cl *knServingClient) GetService(name string) (*v1alpha1.Service, error) {
return service, nil
}

func (cl *knServingClient) WatchService(name string, timeout time.Duration) (watch.Interface, error) {
return wait.NewWatcher(cl.client.Services(cl.namespace).Watch,
cl.client.RESTClient(), cl.namespace, "services", name, timeout)
}

// List services
func (cl *knServingClient) ListServices(config ...ListConfig) (*v1alpha1.ServiceList, error) {
serviceList, err := cl.client.Services(cl.namespace).List(ListConfigs(config).toListOptions())
Expand Down Expand Up @@ -216,7 +222,7 @@ func (cl *knServingClient) DeleteService(serviceName string) error {

// Wait for a service to become ready, but not longer than provided timeout
func (cl *knServingClient) WaitForService(name string, timeout time.Duration, msgCallback wait.MessageCallback) (error, time.Duration) {
waitForReady := newServiceWaitForReady(cl.client.Services(cl.namespace).Watch)
waitForReady := wait.NewWaitForReady("service", cl.WatchService, serviceConditionExtractor)
return waitForReady.Wait(name, timeout, msgCallback)
}

Expand Down Expand Up @@ -390,16 +396,6 @@ func updateServingGvk(obj runtime.Object) error {
return util.UpdateGroupVersionKindWithScheme(obj, v1alpha1.SchemeGroupVersion, scheme.Scheme)
}

// Create wait arguments for a Knative service which can be used to wait for
// a create/update options to be finished
// Can be used by `service_create` and `service_update`, hence this extra file
func newServiceWaitForReady(watch wait.WatchFunc) wait.WaitForReady {
return wait.NewWaitForReady(
"service",
watch,
serviceConditionExtractor)
}

func serviceConditionExtractor(obj runtime.Object) (apis.Conditions, error) {
service, ok := obj.(*v1alpha1.Service)
if !ok {
Expand Down
189 changes: 189 additions & 0 deletions pkg/wait/poll_watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// Copyright © 2019 The Knative Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package wait

import (
"sync"
"time"

api_errors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/rest"
)

// PollInterval determines when you should poll. Useful to mock out, or for
// replacing with exponential backoff later.
type PollInterval interface {
PollChan() <-chan time.Time
Stop()
}

type pollingWatcher struct {
c rest.Interface
ns string
resource string
name string
timeout time.Duration
done chan bool
result chan watch.Event
wg *sync.WaitGroup
// we can mock the interface for testing.
pollInterval PollInterval
// mock hook for testing.
poll func() (runtime.Object, error)
}

type watchF func(v1.ListOptions) (watch.Interface, error)

type tickerPollInterval struct {
t *time.Ticker
}

func (t *tickerPollInterval) PollChan() <-chan time.Time {
return t.t.C
}

func (t *tickerPollInterval) Stop() {
t.t.Stop()
}

func newTickerPollInterval(d time.Duration) *tickerPollInterval {
return &tickerPollInterval{time.NewTicker(d)}
}

// NewWatcher makes a watch.Interface on the given resource in the client,
// falling back to polling if the server does not support Watch.
func NewWatcher(watchFunc watchF, c rest.Interface, ns string, resource string, name string, timeout time.Duration) (watch.Interface, error) {
native, err := nativeWatch(watchFunc, name, timeout)
if err == nil {
return native, nil
}
polling := &pollingWatcher{
c, ns, resource, name, timeout, make(chan bool), make(chan watch.Event), &sync.WaitGroup{},
newTickerPollInterval(time.Second), nativePoll(c, ns, resource, name)}
err = polling.start()
if err != nil {
return nil, err
}
return polling, nil
}

func (w *pollingWatcher) start() error {
w.wg.Add(1)

go func() {
defer w.wg.Done()
defer w.pollInterval.Stop()
var err error
var old, new runtime.Object
done := false
for !done {
old = new

select {
case <-w.pollInterval.PollChan():
new, err = w.poll()
newObj, ok1 := new.(v1.Object)
oldObj, ok2 := old.(v1.Object)

if err != nil && api_errors.IsNotFound(err) {
if old != nil {
// Deleted
w.result <- watch.Event{
Type: watch.Deleted,
Object: old,
}
}
//... Otherwise maybe just doesn't exist.
} else if err != nil {
// Just an error
w.result <- watch.Event{
Type: watch.Error,
}
} else if old == nil && new != nil {
// Added
w.result <- watch.Event{
Type: watch.Added,
Object: new,
}
} else if !(ok1 && ok2) {
// Error wrong types
w.result <- watch.Event{
Type: watch.Error,
}
} else if newObj.GetUID() != oldObj.GetUID() {
// Deleted and readded.
w.result <- watch.Event{
Type: watch.Deleted,
Object: old,
}
w.result <- watch.Event{
Type: watch.Added,
Object: new,
}
} else if newObj.GetResourceVersion() != oldObj.GetResourceVersion() {
// Modified.
w.result <- watch.Event{
Type: watch.Modified,
Object: new,
}
}
case done = <-w.done:
break
}
}
}()
return nil
}

func (w *pollingWatcher) ResultChan() <-chan watch.Event {
return w.result
}

func (w *pollingWatcher) Stop() {
w.done <- true
w.wg.Wait()
close(w.result)
close(w.done)
}

func nativeWatch(watchFunc watchF, name string, timeout time.Duration) (watch.Interface, error) {
opts := v1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String(),
}
opts.Watch = true
addWatchTimeout(&opts, timeout)
return watchFunc(opts)
}

func nativePoll(c rest.Interface, ns, resource, name string) func() (runtime.Object, error) {
return func() (runtime.Object, error) {
return c.Get().Namespace(ns).Resource(resource).Name(name).Do().Get()
}
}

func addWatchTimeout(opts *v1.ListOptions, timeout time.Duration) {
if timeout == 0 {
return
}
// Wait for service to enter 'Ready' state, with a timeout of which is slightly larger than
// the provided timeout. We have our own timeout which fires after "timeout" seconds
// and stops the watch
timeOutWatchSeconds := int64((timeout + 30*time.Second) / time.Second)
opts.TimeoutSeconds = &timeOutWatchSeconds
}
112 changes: 112 additions & 0 deletions pkg/wait/poll_watcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright © 2019 The Knative Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package wait

import (
"fmt"
"sync"
"testing"
"time"

"gotest.tools/assert"
api_errors "k8s.io/apimachinery/pkg/api/errors"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"

"knative.dev/serving/pkg/apis/serving/v1alpha1"
)

type fakePollInterval struct {
c chan time.Time
}

func (f *fakePollInterval) PollChan() <-chan time.Time {
return f.c
}

func (f *fakePollInterval) Stop() {}

func newFakePollInterval(n int) PollInterval {
c := make(chan time.Time, n)
t := time.Now()
for i := 0; i < n; i++ {
c <- t.Add(time.Duration(i) * time.Second)
}
return &fakePollInterval{c}
}

func newWatcherForTest(pollResults []runtime.Object) watch.Interface {
i := 0
poll := func() (runtime.Object, error) {
defer func() { i += 1 }()
if pollResults[i] == nil {
// 404
return nil, api_errors.NewNotFound(schema.GroupResource{"thing", "stuff"}, "eggs")
}
return pollResults[i], nil
}
ret := &pollingWatcher{nil, "", "", "", time.Minute, make(chan bool), make(chan watch.Event), &sync.WaitGroup{},
newFakePollInterval(len(pollResults)), poll}
ret.start()
return ret
}

var a, aa, b, bb, c, cc, z, zz runtime.Object

func init() {
a = &v1alpha1.Service{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "a", UID: "one"}}
aa = a.DeepCopyObject()
b = &v1alpha1.Service{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "b", UID: "one"}}
bb = b.DeepCopyObject()
c = &v1alpha1.Service{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "c", UID: "one"}}
cc = c.DeepCopyObject()
z = &v1alpha1.Service{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "z", UID: "two"}}
zz = z.DeepCopyObject()
}

type testCase struct {
pollResults []runtime.Object
watchResults []watch.Event
}

func TestPollWatcher(t *testing.T) {
cases := []testCase{
// Doesn't exist for a while, then does for a while.
{[]runtime.Object{nil, nil, a, aa, nil}, []watch.Event{{watch.Added, a}, {watch.Deleted, a}}},
// Changes.
{[]runtime.Object{a, b}, []watch.Event{{watch.Added, a}, {watch.Modified, b}}},
// Changes but stays the same a couple times too.
{[]runtime.Object{a, aa, b, bb, c, cc, nil},
[]watch.Event{{watch.Added, a}, {watch.Modified, b}, {watch.Modified, c}, {watch.Deleted, c}}},
// Deleted and recreated between polls.
{[]runtime.Object{a, z}, []watch.Event{{watch.Added, a}, {watch.Deleted, a}, {watch.Added, z}}},
}
for _, c := range cases {
w := newWatcherForTest(c.pollResults)
for _, expected := range c.watchResults {
actual := <-w.ResultChan()
assert.Equal(t, actual.Type, expected.Type)
if actual.Type == watch.Added || actual.Type == watch.Modified || actual.Type == watch.Deleted {
fmt.Printf("expected, %v, actual %v\n", expected, actual)
assert.Equal(t, actual.Object.(metav1.Object).GetResourceVersion(), expected.Object.(metav1.Object).GetResourceVersion())
assert.Equal(t, actual.Object.(metav1.Object).GetUID(), expected.Object.(metav1.Object).GetUID())
}
}
w.Stop()
}
}
Loading

0 comments on commit e07b5a9

Please sign in to comment.