Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

In waiting for ready, watch falls back to polling. #491

Merged
merged 7 commits into from
Nov 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc=
github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/json-iterator/go v0.0.0-20180612202835-f2b4162afba3/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v0.0.0-20180701071628-ab8a2e0c74be h1:AHimNtVIpiBjPUhEF5KNCkrUyqTSA5zWUl8sQ2bfGBE=
github.com/json-iterator/go v0.0.0-20180701071628-ab8a2e0c74be/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.7 h1:KfgG9LzI+pYjr4xvmz/5H4FXjokeP+rlHLhv3iH62Fo=
github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
Expand Down Expand Up @@ -244,6 +245,7 @@ github.com/onsi/ginkgo v1.10.1 h1:q/mM8GF/n0shIN8SaAZ0V+jnLPzen6WIVZdiwrRlMlo=
github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c h1:eSfnfIuwhxZyULg1NNuZycJcYkjYVGYe7FczwQReM6U=
github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
github.com/onsi/gomega v0.0.0-20190113212917-5533ce8a0da3 h1:EooPXg51Tn+xmWPXJUGCnJhJSpeuMlBmfJVcqIRmmv8=
github.com/onsi/gomega v0.0.0-20190113212917-5533ce8a0da3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME=
Expand Down Expand Up @@ -480,6 +482,7 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/inf.v0 v0.9.0 h1:3zYtXIO92bvsdS3ggAdA8Gb4Azj0YU+TVY1uGYNFA8o=
gopkg.in/inf.v0 v0.9.0/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
Expand Down Expand Up @@ -518,6 +521,7 @@ k8s.io/klog v0.3.1 h1:RVgyDHY/kFKtLqh67NvEWIgkMneNoIrdkN0CxDSQc68=
k8s.io/klog v0.3.1/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8=
k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I=
k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30 h1:TRb4wNWoBVrH9plmkp2q86FIDppkbrEXdXlxU3a3BMI=
k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30/go.mod h1:BXM9ceUBTj2QnfH2MK1odQs778ajze1RxcmP6S8RVVc=
k8s.io/kube-openapi v0.0.0-20190816220812-743ec37842bf h1:EYm5AW/UUDbnmnI+gK0TJDVK9qPLhM+sRHYanNKw0EQ=
k8s.io/kube-openapi v0.0.0-20190816220812-743ec37842bf/go.mod h1:1TqjTSzOxsLGIKfj0lK8EeCP7K1iUG65v09OM0/WG5E=
Expand Down
18 changes: 7 additions & 11 deletions pkg/serving/v1alpha1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
api_serving "knative.dev/serving/pkg/apis/serving"
"knative.dev/serving/pkg/apis/serving/v1alpha1"
client_v1alpha1 "knative.dev/serving/pkg/client/clientset/versioned/typed/serving/v1alpha1"
Expand Down Expand Up @@ -159,6 +160,11 @@ func (cl *knServingClient) GetService(name string) (*v1alpha1.Service, error) {
return service, nil
}

func (cl *knServingClient) WatchService(name string, timeout time.Duration) (watch.Interface, error) {
return wait.NewWatcher(cl.client.Services(cl.namespace).Watch,
cl.client.RESTClient(), cl.namespace, "services", name, timeout)
}

// List services
func (cl *knServingClient) ListServices(config ...ListConfig) (*v1alpha1.ServiceList, error) {
serviceList, err := cl.client.Services(cl.namespace).List(ListConfigs(config).toListOptions())
Expand Down Expand Up @@ -216,7 +222,7 @@ func (cl *knServingClient) DeleteService(serviceName string) error {

// Wait for a service to become ready, but not longer than provided timeout
func (cl *knServingClient) WaitForService(name string, timeout time.Duration, msgCallback wait.MessageCallback) (error, time.Duration) {
waitForReady := newServiceWaitForReady(cl.client.Services(cl.namespace).Watch)
waitForReady := wait.NewWaitForReady("service", cl.WatchService, serviceConditionExtractor)
return waitForReady.Wait(name, timeout, msgCallback)
}

Expand Down Expand Up @@ -390,16 +396,6 @@ func updateServingGvk(obj runtime.Object) error {
return util.UpdateGroupVersionKindWithScheme(obj, v1alpha1.SchemeGroupVersion, scheme.Scheme)
}

// Create wait arguments for a Knative service which can be used to wait for
// a create/update options to be finished
// Can be used by `service_create` and `service_update`, hence this extra file
func newServiceWaitForReady(watch wait.WatchFunc) wait.WaitForReady {
return wait.NewWaitForReady(
"service",
watch,
serviceConditionExtractor)
}

func serviceConditionExtractor(obj runtime.Object) (apis.Conditions, error) {
service, ok := obj.(*v1alpha1.Service)
if !ok {
Expand Down
189 changes: 189 additions & 0 deletions pkg/wait/poll_watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// Copyright © 2019 The Knative Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package wait

import (
"sync"
"time"

api_errors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/rest"
)

// PollInterval determines when you should poll. Useful to mock out, or for
// replacing with exponential backoff later.
type PollInterval interface {
sixolet marked this conversation as resolved.
Show resolved Hide resolved
PollChan() <-chan time.Time
Stop()
}

type pollingWatcher struct {
c rest.Interface
ns string
resource string
name string
timeout time.Duration
done chan bool
result chan watch.Event
wg *sync.WaitGroup
// we can mock the interface for testing.
pollInterval PollInterval
// mock hook for testing.
poll func() (runtime.Object, error)
}

type watchF func(v1.ListOptions) (watch.Interface, error)

type tickerPollInterval struct {
t *time.Ticker
}

func (t *tickerPollInterval) PollChan() <-chan time.Time {
return t.t.C
}

func (t *tickerPollInterval) Stop() {
t.t.Stop()
}

func newTickerPollInterval(d time.Duration) *tickerPollInterval {
return &tickerPollInterval{time.NewTicker(d)}
}

// NewWatcher makes a watch.Interface on the given resource in the client,
// falling back to polling if the server does not support Watch.
func NewWatcher(watchFunc watchF, c rest.Interface, ns string, resource string, name string, timeout time.Duration) (watch.Interface, error) {
native, err := nativeWatch(watchFunc, name, timeout)
if err == nil {
return native, nil
}
polling := &pollingWatcher{
c, ns, resource, name, timeout, make(chan bool), make(chan watch.Event), &sync.WaitGroup{},
newTickerPollInterval(time.Second), nativePoll(c, ns, resource, name)}
err = polling.start()
if err != nil {
return nil, err
}
return polling, nil
}

func (w *pollingWatcher) start() error {
w.wg.Add(1)

go func() {
defer w.wg.Done()
defer w.pollInterval.Stop()
var err error
var old, new runtime.Object
done := false
for !done {
old = new

select {
case <-w.pollInterval.PollChan():
new, err = w.poll()
newObj, ok1 := new.(v1.Object)
oldObj, ok2 := old.(v1.Object)

if err != nil && api_errors.IsNotFound(err) {
if old != nil {
// Deleted
w.result <- watch.Event{
Type: watch.Deleted,
Object: old,
}
}
//... Otherwise maybe just doesn't exist.
} else if err != nil {
// Just an error
w.result <- watch.Event{
Type: watch.Error,
}
} else if old == nil && new != nil {
// Added
w.result <- watch.Event{
Type: watch.Added,
Object: new,
}
} else if !(ok1 && ok2) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we separate the if check for err, ok1 and ok2 ? (OR may be a nested switch)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried it that way, and it was less clear and more error prone, so I returned it to this way.

// Error wrong types
w.result <- watch.Event{
Type: watch.Error,
}
} else if newObj.GetUID() != oldObj.GetUID() {
// Deleted and readded.
w.result <- watch.Event{
Type: watch.Deleted,
Object: old,
}
w.result <- watch.Event{
Type: watch.Added,
Object: new,
}
} else if newObj.GetResourceVersion() != oldObj.GetResourceVersion() {
// Modified.
w.result <- watch.Event{
Type: watch.Modified,
Object: new,
}
}
case done = <-w.done:
break
}
}
}()
return nil
}

func (w *pollingWatcher) ResultChan() <-chan watch.Event {
return w.result
}

func (w *pollingWatcher) Stop() {
w.done <- true
w.wg.Wait()
close(w.result)
close(w.done)
}

func nativeWatch(watchFunc watchF, name string, timeout time.Duration) (watch.Interface, error) {
opts := v1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String(),
}
opts.Watch = true
addWatchTimeout(&opts, timeout)
return watchFunc(opts)
}

func nativePoll(c rest.Interface, ns, resource, name string) func() (runtime.Object, error) {
return func() (runtime.Object, error) {
return c.Get().Namespace(ns).Resource(resource).Name(name).Do().Get()
}
}

func addWatchTimeout(opts *v1.ListOptions, timeout time.Duration) {
if timeout == 0 {
return
}
// Wait for service to enter 'Ready' state, with a timeout of which is slightly larger than
// the provided timeout. We have our own timeout which fires after "timeout" seconds
// and stops the watch
timeOutWatchSeconds := int64((timeout + 30*time.Second) / time.Second)
opts.TimeoutSeconds = &timeOutWatchSeconds
}
112 changes: 112 additions & 0 deletions pkg/wait/poll_watcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright © 2019 The Knative Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package wait

import (
"fmt"
"sync"
"testing"
"time"

"gotest.tools/assert"
api_errors "k8s.io/apimachinery/pkg/api/errors"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"

"knative.dev/serving/pkg/apis/serving/v1alpha1"
)

type fakePollInterval struct {
c chan time.Time
}

func (f *fakePollInterval) PollChan() <-chan time.Time {
return f.c
}

func (f *fakePollInterval) Stop() {}

func newFakePollInterval(n int) PollInterval {
c := make(chan time.Time, n)
t := time.Now()
for i := 0; i < n; i++ {
c <- t.Add(time.Duration(i) * time.Second)
}
return &fakePollInterval{c}
}

func newWatcherForTest(pollResults []runtime.Object) watch.Interface {
i := 0
poll := func() (runtime.Object, error) {
defer func() { i += 1 }()
sixolet marked this conversation as resolved.
Show resolved Hide resolved
if pollResults[i] == nil {
// 404
return nil, api_errors.NewNotFound(schema.GroupResource{"thing", "stuff"}, "eggs")
}
return pollResults[i], nil
}
ret := &pollingWatcher{nil, "", "", "", time.Minute, make(chan bool), make(chan watch.Event), &sync.WaitGroup{},
newFakePollInterval(len(pollResults)), poll}
ret.start()
return ret
}

var a, aa, b, bb, c, cc, z, zz runtime.Object

func init() {
a = &v1alpha1.Service{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "a", UID: "one"}}
aa = a.DeepCopyObject()
b = &v1alpha1.Service{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "b", UID: "one"}}
bb = b.DeepCopyObject()
c = &v1alpha1.Service{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "c", UID: "one"}}
cc = c.DeepCopyObject()
z = &v1alpha1.Service{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "z", UID: "two"}}
zz = z.DeepCopyObject()
}

type testCase struct {
pollResults []runtime.Object
watchResults []watch.Event
}

func TestPollWatcher(t *testing.T) {
cases := []testCase{
// Doesn't exist for a while, then does for a while.
{[]runtime.Object{nil, nil, a, aa, nil}, []watch.Event{{watch.Added, a}, {watch.Deleted, a}}},
// Changes.
{[]runtime.Object{a, b}, []watch.Event{{watch.Added, a}, {watch.Modified, b}}},
// Changes but stays the same a couple times too.
{[]runtime.Object{a, aa, b, bb, c, cc, nil},
[]watch.Event{{watch.Added, a}, {watch.Modified, b}, {watch.Modified, c}, {watch.Deleted, c}}},
// Deleted and recreated between polls.
{[]runtime.Object{a, z}, []watch.Event{{watch.Added, a}, {watch.Deleted, a}, {watch.Added, z}}},
}
for _, c := range cases {
w := newWatcherForTest(c.pollResults)
for _, expected := range c.watchResults {
actual := <-w.ResultChan()
assert.Equal(t, actual.Type, expected.Type)
if actual.Type == watch.Added || actual.Type == watch.Modified || actual.Type == watch.Deleted {
fmt.Printf("expected, %v, actual %v\n", expected, actual)
assert.Equal(t, actual.Object.(metav1.Object).GetResourceVersion(), expected.Object.(metav1.Object).GetResourceVersion())
assert.Equal(t, actual.Object.(metav1.Object).GetUID(), expected.Object.(metav1.Object).GetUID())
}
}
w.Stop()
}
}
Loading