diff --git a/infrastructure/containerd/event_service_test.go b/infrastructure/containerd/event_service_test.go index 0ae12932..9daa4ab5 100644 --- a/infrastructure/containerd/event_service_test.go +++ b/infrastructure/containerd/event_service_test.go @@ -2,6 +2,7 @@ package containerd_test import ( "context" + "sync" "testing" . "github.com/onsi/gomega" @@ -12,6 +13,8 @@ import ( "github.com/weaveworks/reignite/infrastructure/containerd" ) +const numberOfSubscribers = 2 + func TestEventService_Integration(t *testing.T) { if !runContainerDTests() { t.Skip("skipping containerd event service integration test") @@ -27,61 +30,56 @@ func TestEventService_Integration(t *testing.T) { Namespace: testContainerdNs, }, client) - t.Log("creating subscribers") - - ctx1, cancel1 := context.WithCancel(ctx) - evt1, err1 := es.Subscribe(ctx1) - ctx2, cancel2 := context.WithCancel(ctx) - evt2, err2 := es.Subscribe(ctx2) - - errChan := make(chan error) - testEvents := []*events.MicroVMSpecCreated{ {ID: "vm1", Namespace: "ns1"}, {ID: "vm2", Namespace: "ns1"}, } - subscribers := []testSubscriber{ - {eventCh: evt1, eventErrCh: err1, cancel: cancelWrapper(t, 1, cancel1)}, - {eventCh: evt2, eventErrCh: err2, cancel: cancelWrapper(t, 2, cancel2)}, - } + var wg sync.WaitGroup - go func() { - defer close(errChan) + t.Log("creating subscribers") - for _, event := range testEvents { - if err := es.Publish(ctx, "/reignite/test", event); err != nil { - errChan <- err - return - } - } + for i := 0; i < numberOfSubscribers; i++ { + wg.Add(1) + go newSubscriber(t, es, ctx, i, len(testEvents), wg.Done) + } - t.Log("finished publishing events") - }() + t.Log("publishing events") - t.Log("subscribers waiting for events") - if err := <-errChan; err != nil { - t.Fatal(err) + for _, event := range testEvents { + if err := es.Publish(ctx, "/reignite/test", event); err != nil { + t.Error(err) + break + } } - for idx, subscriber := range subscribers { - t.Logf("start subscriber (%d) is ready to receive events", idx+1) - recvd, err := watch(t, &subscriber, len(testEvents)) - t.Logf("subscriber (%d) is done", idx+1) + t.Log("finished publishing events") - assert.NoError(t, err) - assert.Len(t, recvd, 2) - } + wg.Wait() } -func cancelWrapper(t *testing.T, id int, cancel context.CancelFunc) func() { - return func() { - t.Logf("context (%d) cancelled", id) - cancel() +func newSubscriber(t *testing.T, es ports.EventService, rootContext context.Context, id int, maxEvents int, done func()) { + ctx, cancel := context.WithCancel(rootContext) + evtChan, errChan := es.Subscribe(ctx) + + subscriber := testSubscriber{ + eventCh: evtChan, + eventErrCh: errChan, + cancel: cancel, } + + t.Logf("start subscriber (%d) is ready to receive events", id+1) + recvd, err := watch(&subscriber, maxEvents) + t.Logf("subscriber (%d) is done", id+1) + + assert.NoError(t, err) + assert.Len(t, recvd, 2) + + done() } -func watch(t *testing.T, subscriber *testSubscriber, maxEvents int) ([]interface{}, error) { +func watch(subscriber *testSubscriber, maxEvents int) ([]interface{}, error) { + recvd := []interface{}{} var err error @@ -89,16 +87,12 @@ func watch(t *testing.T, subscriber *testSubscriber, maxEvents int) ([]interface for { select { case env := <-subscriber.eventCh: - t.Logf("subscriber.eventCh received: %+v", env) - if env != nil { recvd = append(recvd, env.Event) } else { break } case err = <-subscriber.eventErrCh: - t.Logf("subscriber.eventErrCh received: %v", err) - break }