Skip to content

Commit

Permalink
[release-0.19] Backport #1117 #1083 #1113 (#1135)
Browse files Browse the repository at this point in the history
* Embed the namespace in request body while creating channels (#1117)

* Embed the namespace in request body while creating channels

 since on the eventing side, defaulting for channel isnt picking
 the namespace from the context (see knative/eventing#4514)

 workaround for #1100
 this changeset should be reverted when eventing#4514 is resolved

* Add CHANGELOG

* Update CHANGELOG for v0.19.1

* Cross-compile the kn binary for linux/s390x (#1083)

* Update CHANGELOG for v0.19.1

* Fix date in changelog

* Fix race conditions when creating watches (#1113)

* Fix a race condition between creating a watch and initiating the action that emits the event it is watching for

* update changelog

* add PR ID to changelog entry

* Fix merge in Changelog

* Fix table format in Changelog
  • Loading branch information
navidshaikh authored Nov 25, 2020
1 parent 260970b commit 3fcd9b2
Show file tree
Hide file tree
Showing 13 changed files with 83 additions and 61 deletions.
18 changes: 18 additions & 0 deletions CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,24 @@
| https://github.com/knative/client/pull/[#]
////

### v0.19.1 (2020-11-25)
[cols="1,10,3", options="header", width="100%"]
|===
| | Description | PR

| 🐛
| Fix a race condition when using Kubernetes watches
| https://github.com/knative/client/pull/1113[#1113]

| 🐛
| Embed the namespace in request body while creating channels
| https://github.com/knative/client/pull/1117[#1117]

| 🎁
| Add kn _s390x_ binary to the release
| https://github.com/knative/client/pull/1083[#1083]
|===

### v0.19.0 (2020-11-11)
[cols="1,10,3", options="header", width="100%"]
|===
Expand Down
2 changes: 2 additions & 0 deletions hack/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,8 @@ cross_build() {
GOOS=darwin GOARCH=amd64 go build -mod=vendor -ldflags "${ld_flags}" -o ./kn-darwin-amd64 ./cmd/... || failed=1
echo " 🎠 kn-windows-amd64.exe"
GOOS=windows GOARCH=amd64 go build -mod=vendor -ldflags "${ld_flags}" -o ./kn-windows-amd64.exe ./cmd/... || failed=1
echo " Z kn-linux-s390x"
GOOS=linux GOARCH=s390x go build -mod=vendor -ldflags "${ld_flags}" -o ./kn-linux-s390x ./cmd/... || failed=1

return ${failed}
}
Expand Down
6 changes: 4 additions & 2 deletions hack/release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ function build_release() {
GOOS=darwin GOARCH=amd64 go build -mod=vendor -ldflags "${ld_flags}" -o ./kn-darwin-amd64 ./cmd/...
echo "🚧 🎠 Building for Windows"
GOOS=windows GOARCH=amd64 go build -mod=vendor -ldflags "${ld_flags}" -o ./kn-windows-amd64.exe ./cmd/...
echo "🚧 Z Building for Linux(s390x)"
GOOS=linux GOARCH=s390x go build -mod=vendor -ldflags "${ld_flags}" -o ./kn-linux-s390x ./cmd/...
echo "🚧 🐳 Building the container image"
ko resolve --strict ${KO_FLAGS} -f config/ > kn-image-location.yaml
ARTIFACTS_TO_PUBLISH="kn-darwin-amd64 kn-linux-amd64 kn-linux-arm64 kn-windows-amd64.exe kn-image-location.yaml"
sha256sum ${ARTIFACTS_TO_PUBLISH} > checksums.txt
ARTIFACTS_TO_PUBLISH="kn-darwin-amd64 kn-linux-amd64 kn-linux-arm64 kn-windows-amd64.exe kn-linux-s390x kn-image-location.yaml"
sha256sum "${ARTIFACTS_TO_PUBLISH}" > checksums.txt
ARTIFACTS_TO_PUBLISH="${ARTIFACTS_TO_PUBLISH} checksums.txt"
echo "🧮 Checksum:"
cat checksums.txt
Expand Down
11 changes: 8 additions & 3 deletions pkg/eventing/v1beta1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,12 +245,17 @@ func (c *knEventingClient) DeleteBroker(name string, timeout time.Duration) erro
return c.deleteBroker(name, apis_v1.DeletePropagationBackground)
}
waitC := make(chan error)
watcher, err := c.WatchBroker(name, timeout)
if err != nil {
return nil
}
defer watcher.Stop()
go func() {
waitForEvent := wait.NewWaitForEvent("broker", c.WatchBroker, func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(name, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
waitForEvent := wait.NewWaitForEvent("broker", func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(watcher, name, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
waitC <- err
}()
err := c.deleteBroker(name, apis_v1.DeletePropagationForeground)
err = c.deleteBroker(name, apis_v1.DeletePropagationForeground)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kn/commands/channel/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,6 @@ func cleanupChannelMockClient() {
channelClientFactory = nil
}

func createChannel(name string, gvk *schema.GroupVersionKind) *v1beta1.Channel {
return clientv1beta1.NewChannelBuilder(name).Type(gvk).Build()
func createChannel(name, namespace string, gvk *schema.GroupVersionKind) *v1beta1.Channel {
return clientv1beta1.NewChannelBuilder(name, namespace).Type(gvk).Build()
}
2 changes: 1 addition & 1 deletion pkg/kn/commands/channel/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func NewChannelCreateCommand(p *commands.KnParams) *cobra.Command {
return err
}

cb := knmessagingv1beta1.NewChannelBuilder(name)
cb := knmessagingv1beta1.NewChannelBuilder(name, namespace)

if cmd.Flag("type").Changed {
gvk, err := ctypeFlags.Parse()
Expand Down
4 changes: 2 additions & 2 deletions pkg/kn/commands/channel/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestCreateChannelErrorCaseTypeFormat(t *testing.T) {
func TestCreateChannelDefaultChannel(t *testing.T) {
cClient := v1beta1.NewMockKnChannelsClient(t)
cRecorder := cClient.Recorder()
cRecorder.CreateChannel(createChannel("pipe", nil), nil)
cRecorder.CreateChannel(createChannel("pipe", "default", nil), nil)
out, err := executeChannelCommand(cClient, "create", "pipe")
assert.NilError(t, err, "channel should be created")
assert.Assert(t, util.ContainsAll(out, "created", "pipe", "default"))
Expand All @@ -53,7 +53,7 @@ func TestCreateChannelDefaultChannel(t *testing.T) {
func TestCreateChannelWithTypeFlagInMemoryChannel(t *testing.T) {
cClient := v1beta1.NewMockKnChannelsClient(t)
cRecorder := cClient.Recorder()
cRecorder.CreateChannel(createChannel("pipe", &schema.GroupVersionKind{Group: "messaging.knative.dev", Version: "v1beta1", Kind: "InMemoryChannel"}), nil)
cRecorder.CreateChannel(createChannel("pipe", "default", &schema.GroupVersionKind{Group: "messaging.knative.dev", Version: "v1beta1", Kind: "InMemoryChannel"}), nil)
out, err := executeChannelCommand(cClient, "create", "pipe", "--type", "imcv1beta1")
assert.NilError(t, err, "channel should be created")
assert.Assert(t, util.ContainsAll(out, "created", "pipe", "default"))
Expand Down
2 changes: 1 addition & 1 deletion pkg/kn/commands/channel/describe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestDescribeChannelErrorCaseNotFound(t *testing.T) {
func TestDescribeChannel(t *testing.T) {
cClient := v1beta1.NewMockKnChannelsClient(t)
cRecorder := cClient.Recorder()
cRecorder.GetChannel("pipe", createChannel("pipe", &schema.GroupVersionKind{Group: "messaging.knative.dev", Version: "v1beta1", Kind: "InMemoryChannel"}), nil)
cRecorder.GetChannel("pipe", createChannel("pipe", "default", &schema.GroupVersionKind{Group: "messaging.knative.dev", Version: "v1beta1", Kind: "InMemoryChannel"}), nil)
out, err := executeChannelCommand(cClient, "describe", "pipe")
assert.NilError(t, err, "channel should be described")
assert.Assert(t, util.ContainsAll(out, "messaging.knative.dev", "v1beta1", "InMemoryChannel", "pipe"))
Expand Down
4 changes: 2 additions & 2 deletions pkg/kn/commands/channel/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func TestChannelList(t *testing.T) {
cRecorder := cClient.Recorder()
clist := &messagingv1beta1.ChannelList{}
clist.Items = []messagingv1beta1.Channel{
*createChannel("c0", &schema.GroupVersionKind{Group: "messaging.knative.dev", Version: "v1beta1", Kind: "InMemoryChannel"}),
*createChannel("c1", &schema.GroupVersionKind{Group: "messaging.knative.dev", Version: "v1beta1", Kind: "InMemoryChannel"}),
*createChannel("c0", "default", &schema.GroupVersionKind{Group: "messaging.knative.dev", Version: "v1beta1", Kind: "InMemoryChannel"}),
*createChannel("c1", "default", &schema.GroupVersionKind{Group: "messaging.knative.dev", Version: "v1beta1", Kind: "InMemoryChannel"}),
}
cRecorder.ListChannel(clist, nil)
out, err := executeChannelCommand(cClient, "list")
Expand Down
5 changes: 3 additions & 2 deletions pkg/messaging/v1beta1/channels_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,11 @@ type ChannelBuilder struct {
}

// NewChannelBuilder for building Channel object
func NewChannelBuilder(name string) *ChannelBuilder {
func NewChannelBuilder(name, namespace string) *ChannelBuilder {
return &ChannelBuilder{channel: &v1beta1.Channel{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Name: name,
Namespace: namespace,
},
}}
}
Expand Down
29 changes: 22 additions & 7 deletions pkg/serving/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,12 +319,17 @@ func (cl *knServingClient) DeleteService(serviceName string, timeout time.Durati
return cl.deleteService(serviceName, v1.DeletePropagationBackground)
}
waitC := make(chan error)
watcher, err := cl.WatchService(serviceName, timeout)
if err != nil {
return nil
}
defer watcher.Stop()
go func() {
waitForEvent := wait.NewWaitForEvent("service", cl.WatchService, func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(serviceName, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
waitForEvent := wait.NewWaitForEvent("service", func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(watcher, serviceName, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
waitC <- err
}()
err := cl.deleteService(serviceName, v1.DeletePropagationForeground)
err = cl.deleteService(serviceName, v1.DeletePropagationForeground)
if err != nil {
return err
}
Expand All @@ -346,8 +351,13 @@ func (cl *knServingClient) deleteService(serviceName string, propagationPolicy v

// Wait for a service to become ready, but not longer than provided timeout
func (cl *knServingClient) WaitForService(name string, timeout time.Duration, msgCallback wait.MessageCallback) (error, time.Duration) {
waitForReady := wait.NewWaitForReady("service", cl.WatchService, serviceConditionExtractor)
return waitForReady.Wait(name, wait.Options{Timeout: &timeout}, msgCallback)
watcher, err := cl.WatchService(name, timeout)
if err != nil {
return err, timeout
}
defer watcher.Stop()
waitForReady := wait.NewWaitForReady("service", serviceConditionExtractor)
return waitForReady.Wait(watcher, name, wait.Options{Timeout: &timeout}, msgCallback)
}

// Get the configuration for a service
Expand Down Expand Up @@ -460,9 +470,14 @@ func (cl *knServingClient) DeleteRevision(name string, timeout time.Duration) er
return cl.deleteRevision(name)
}
waitC := make(chan error)
watcher, err := cl.WatchRevision(name, timeout)
if err != nil {
return err
}
defer watcher.Stop()
go func() {
waitForEvent := wait.NewWaitForEvent("revision", cl.WatchRevision, func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(name, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
waitForEvent := wait.NewWaitForEvent("revision", func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(watcher, name, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
waitC <- err
}()
err = cl.deleteRevision(name)
Expand Down
39 changes: 12 additions & 27 deletions pkg/wait/wait_for_ready.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,14 @@ import (

// Callbacks and configuration used while waiting
type waitForReadyConfig struct {
watchMaker WatchMaker
conditionsExtractor ConditionsExtractor
kind string
}

// Callbacks and configuration used while waiting for event
type waitForEvent struct {
watchMaker WatchMaker
eventDone EventDone
kind string
eventDone EventDone
kind string
}

// EventDone is a marker to stop actual waiting on given event state
Expand All @@ -49,7 +47,7 @@ type Wait interface {
// Wait on resource the resource with this name
// and write event messages for unknown event to the status writer.
// Returns an error (if any) and the overall time it took to wait
Wait(name string, options Options, msgCallback MessageCallback) (error, time.Duration)
Wait(watcher watch.Interface, name string, options Options, msgCallback MessageCallback) (error, time.Duration)
}

type Options struct {
Expand All @@ -71,21 +69,19 @@ type ConditionsExtractor func(obj runtime.Object) (apis.Conditions, error)
type MessageCallback func(durationSinceState time.Duration, message string)

// NewWaitForReady waits until the condition is set to Ready == True
func NewWaitForReady(kind string, watchMaker WatchMaker, extractor ConditionsExtractor) Wait {
func NewWaitForReady(kind string, extractor ConditionsExtractor) Wait {
return &waitForReadyConfig{
kind: kind,
watchMaker: watchMaker,
conditionsExtractor: extractor,
}
}

// NewWaitForEvent creates a Wait object which waits until a specific event (i.e. when
// the EventDone function returns true)
func NewWaitForEvent(kind string, watchMaker WatchMaker, eventDone EventDone) Wait {
func NewWaitForEvent(kind string, eventDone EventDone) Wait {
return &waitForEvent{
kind: kind,
watchMaker: watchMaker,
eventDone: eventDone,
kind: kind,
eventDone: eventDone,
}
}

Expand All @@ -112,13 +108,13 @@ func NoopMessageCallback() MessageCallback {
// (e.g. "service"), `timeout` is a timeout after which the watch should be cancelled if no
// target state has been entered yet and `out` is used for printing out status messages
// msgCallback gets called for every event with an 'Ready' condition == UNKNOWN with the event's message.
func (w *waitForReadyConfig) Wait(name string, options Options, msgCallback MessageCallback) (error, time.Duration) {
func (w *waitForReadyConfig) Wait(watcher watch.Interface, name string, options Options, msgCallback MessageCallback) (error, time.Duration) {

timeout := options.timeoutWithDefault()
floatingTimeout := timeout
for {
start := time.Now()
retry, timeoutReached, err := w.waitForReadyCondition(start, name, floatingTimeout, options.errorWindowWithDefault(), msgCallback)
retry, timeoutReached, err := w.waitForReadyCondition(watcher, start, name, floatingTimeout, options.errorWindowWithDefault(), msgCallback)
if err != nil {
return err, time.Since(start)
}
Expand All @@ -141,13 +137,7 @@ func (w *waitForReadyConfig) Wait(name string, options Options, msgCallback Mess
// An errorWindow can be specified which takes into account of intermediate "false" ready conditions. So before returning
// an error, this methods waits for the errorWindow duration and if an "True" or "Unknown" event arrives in the meantime
// for the "Ready" condition, then the method continues to wait.
func (w *waitForReadyConfig) waitForReadyCondition(start time.Time, name string, timeout time.Duration, errorWindow time.Duration, msgCallback MessageCallback) (retry bool, timeoutReached bool, err error) {

watcher, err := w.watchMaker(name, timeout)
if err != nil {
return false, false, err
}
defer watcher.Stop()
func (w *waitForReadyConfig) waitForReadyCondition(watcher watch.Interface, start time.Time, name string, timeout time.Duration, errorWindow time.Duration, msgCallback MessageCallback) (retry bool, timeoutReached bool, err error) {

// channel used to transport the error that has been received
errChan := make(chan error)
Expand Down Expand Up @@ -239,13 +229,8 @@ func (w *waitForReadyConfig) waitForReadyCondition(start time.Time, name string,
}

// Wait until the expected EventDone is satisfied
func (w *waitForEvent) Wait(name string, options Options, msgCallback MessageCallback) (error, time.Duration) {
func (w *waitForEvent) Wait(watcher watch.Interface, name string, options Options, msgCallback MessageCallback) (error, time.Duration) {
timeout := options.timeoutWithDefault()
watcher, err := w.watchMaker(name, timeout)
if err != nil {
return err, 0
}
defer watcher.Stop()
start := time.Now()
// channel used to transport the error
errChan := make(chan error)
Expand All @@ -255,7 +240,7 @@ func (w *waitForEvent) Wait(name string, options Options, msgCallback MessageCal
select {
case <-timer.C:
return fmt.Errorf("timeout: %s '%s' not ready after %d seconds", w.kind, name, int(timeout/time.Second)), time.Since(start)
case err = <-errChan:
case err := <-errChan:
return err, time.Since(start)
case event := <-watcher.ResultChan():
if w.eventDone(&event) {
Expand Down
18 changes: 6 additions & 12 deletions pkg/wait/wait_for_ready_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,12 @@ func TestAddWaitForReady(t *testing.T) {

waitForReady := NewWaitForReady(
"blub",
func(name string, timeout time.Duration) (watch.Interface, error) {
return fakeWatchApi, nil
},
func(obj runtime.Object) (apis.Conditions, error) {
return apis.Conditions(obj.(*servingv1.Service).Status.Conditions), nil
})
fakeWatchApi.Start()
var msgs []string
err, _ := waitForReady.Wait("foobar", Options{Timeout: &tc.timeout}, func(_ time.Duration, msg string) {
err, _ := waitForReady.Wait(fakeWatchApi, "foobar", Options{Timeout: &tc.timeout}, func(_ time.Duration, msg string) {
msgs = append(msgs, msg)
})
close(fakeWatchApi.eventChan)
Expand All @@ -69,8 +66,8 @@ func TestAddWaitForReady(t *testing.T) {
// check messages
assert.Assert(t, cmp.DeepEqual(tc.messagesExpected, msgs), "%d: Messages expected to be equal", i)

if fakeWatchApi.StopCalled != 1 {
t.Errorf("%d: Exactly one 'stop' should be called, but got %d", i, fakeWatchApi.StopCalled)
if fakeWatchApi.StopCalled != 0 {
t.Errorf("%d: Exactly zero 'stop' should be called, but got %d", i, fakeWatchApi.StopCalled)
}

}
Expand All @@ -82,13 +79,10 @@ func TestAddWaitForDelete(t *testing.T) {

waitForEvent := NewWaitForEvent(
"blub",
func(name string, timeout time.Duration) (watch.Interface, error) {
return fakeWatchAPI, nil
},
func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
fakeWatchAPI.Start()

err, _ := waitForEvent.Wait("foobar", Options{Timeout: &tc.timeout}, NoopMessageCallback())
err, _ := waitForEvent.Wait(fakeWatchAPI, "foobar", Options{Timeout: &tc.timeout}, NoopMessageCallback())
close(fakeWatchAPI.eventChan)

if tc.errorText == "" && err != nil {
Expand All @@ -103,8 +97,8 @@ func TestAddWaitForDelete(t *testing.T) {
}
}

if fakeWatchAPI.StopCalled != 1 {
t.Errorf("%d: Exactly one 'stop' should be called, but got %d", i, fakeWatchAPI.StopCalled)
if fakeWatchAPI.StopCalled != 0 {
t.Errorf("%d: Exactly zero 'stop' should be called, but got %d", i, fakeWatchAPI.StopCalled)
}
}
}
Expand Down

0 comments on commit 3fcd9b2

Please sign in to comment.