Skip to content

Commit

Permalink
Try to fix EventService test failures
Browse files Browse the repository at this point in the history
I was not able to reproduce the issue on my machine (why would I), first
I did a bit of cleanup to reduce the nested definitions. It's a bit
easier to follow now.

Added extra logging about contexts, so we can know which one throws the
`code = Canceled desc = context canceled` error.

Created a PR on my fork and restarted the test job 4 times, none of them
failed.

I assume the real fix is break after `subscriber.cancel()`. I'm not 100%
convinced, but potentially when we close the context and next time
checking for `eventCh` or `eventErrCh`, theoretically both of them are
closed when the next loop starts, but we are talking very short CPU
cycles, so anything can happen.

Related to liquidmetal-dev#115

Intentionally not marking wth `Fixes #`.
  • Loading branch information
yitsushi committed Oct 20, 2021
1 parent 2941cce commit 7583cf4
Showing 1 changed file with 56 additions and 42 deletions.
98 changes: 56 additions & 42 deletions infrastructure/containerd/event_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"

. "github.com/onsi/gomega"
"github.com/stretchr/testify/assert"

"github.com/weaveworks/reignite/api/events"
"github.com/weaveworks/reignite/core/ports"
Expand Down Expand Up @@ -36,18 +37,18 @@ func TestEventService_Integration(t *testing.T) {
errChan := make(chan error)

testEvents := []*events.MicroVMSpecCreated{
{
ID: "vm1",
Namespace: "ns1",
},
{
ID: "vm2",
Namespace: "ns1",
},
{ID: "vm1", Namespace: "ns1"},
{ID: "vm2", Namespace: "ns1"},
}

subscribers := []testSubscriber{
{eventCh: evt1, eventErrCh: err1, cancel: cancelWrapper(t, 1, cancel1)},
{eventCh: evt2, eventErrCh: err2, cancel: cancelWrapper(t, 2, cancel2)},
}

go func() {
defer close(errChan)

for _, event := range testEvents {
if err := es.Publish(ctx, "/reignite/test", event); err != nil {
errChan <- err
Expand All @@ -63,47 +64,60 @@ func TestEventService_Integration(t *testing.T) {
t.Fatal(err)
}

for _, subscriber := range []struct {
eventCh <-chan *ports.EventEnvelope
eventErrCh <-chan error
cancel func()
}{
{
eventCh: evt1,
eventErrCh: err1,
cancel: cancel1,
},
{
eventCh: evt2,
eventErrCh: err2,
cancel: cancel2,
},
} {
recvd := []interface{}{}
subscibercheck:
for {
select {
case env := <-subscriber.eventCh:
if env != nil {
recvd = append(recvd, env.Event)
} else {
break subscibercheck
}
case err := <-subscriber.eventErrCh:
if err != nil {
t.Fatal(err)
}
break subscibercheck
}
for idx, subscriber := range subscribers {
t.Logf("start subscriber (%d) is ready to receive events", idx+1)
recvd, err := watch(&subscriber, len(testEvents))
t.Logf("subscriber (%d) is done", idx+1)

assert.NoError(t, err)
assert.Len(t, recvd, 2)

// if len(recvd) == len(testEvents) {
// subscriber.cancel()
// }
}
}

func cancelWrapper(t *testing.T, id int, cancel context.CancelFunc) func() {
return func() {
t.Logf("context (%d) cancelled", id)
cancel()
}
}

func watch(subscriber *testSubscriber, maxEvents int) ([]interface{}, error) {
recvd := []interface{}{}

if len(recvd) == len(testEvents) {
subscriber.cancel()
var err error

for {
select {
case env := <-subscriber.eventCh:
if env != nil {
recvd = append(recvd, env.Event)
} else {
break
}
case err = <-subscriber.eventErrCh:
break
}

if len(recvd) == maxEvents {
subscriber.cancel()
break
}
}

return recvd, err
}

type testEvent struct {
Name string
Value string
}

type testSubscriber struct {
eventCh <-chan *ports.EventEnvelope
eventErrCh <-chan error
cancel func()
}

0 comments on commit 7583cf4

Please sign in to comment.