diff --git a/infrastructure/containerd/event_service_test.go b/infrastructure/containerd/event_service_test.go index d2da3b1f4..be8f43d2f 100644 --- a/infrastructure/containerd/event_service_test.go +++ b/infrastructure/containerd/event_service_test.go @@ -2,15 +2,20 @@ package containerd_test import ( "context" + "sync" "testing" + "time" . "github.com/onsi/gomega" + "github.com/stretchr/testify/assert" "github.com/weaveworks/reignite/api/events" "github.com/weaveworks/reignite/core/ports" "github.com/weaveworks/reignite/infrastructure/containerd" ) +const numberOfSubscribers = 2 + func TestEventService_Integration(t *testing.T) { if !runContainerDTests() { t.Skip("skipping containerd event service integration test") @@ -26,84 +31,116 @@ func TestEventService_Integration(t *testing.T) { Namespace: testContainerdNs, }, client) + testEvents := []*events.MicroVMSpecCreated{ + {ID: "vm1", Namespace: "ns1"}, + {ID: "vm2", Namespace: "ns1"}, + {ID: "vm1", Namespace: "ns2"}, + } + + var ( + wgReady sync.WaitGroup + wgDone sync.WaitGroup + ) + t.Log("creating subscribers") - ctx1, cancel1 := context.WithCancel(ctx) - evt1, err1 := es.Subscribe(ctx1) - ctx2, cancel2 := context.WithCancel(ctx) - evt2, err2 := es.Subscribe(ctx2) + for i := 0; i < numberOfSubscribers; i++ { + wgReady.Add(1) + wgDone.Add(1) - errChan := make(chan error) + data := subData{ + ID: i, + ES: es, + MaxEvents: len(testEvents), + Ready: wgReady.Done, + Done: wgDone.Done, + } - testEvents := []*events.MicroVMSpecCreated{ - { - ID: "vm1", - Namespace: "ns1", - }, - { - ID: "vm2", - Namespace: "ns1", - }, + go newSubscriber(t, ctx, data) } - go func() { - defer close(errChan) - for _, event := range testEvents { - if err := es.Publish(ctx, "/reignite/test", event); err != nil { - errChan <- err - return - } + wgReady.Wait() + + time.Sleep(time.Second) + + t.Log("publishing events") + + for _, event := range testEvents { + t.Logf("publishing event: %v", event) + if err := es.Publish(ctx, "/reignite/test", event); err != nil { + t.Fatal(err) + break } + time.Sleep(time.Microsecond) + } - t.Log("finished publishing events") - }() + t.Log("finished publishing events") - t.Log("subscribers waiting for events") - if err := <-errChan; err != nil { - t.Fatal(err) + wgDone.Wait() +} + +type subData struct { + ID int + ES ports.EventService + MaxEvents int + Ready func() + Done func() +} + +func newSubscriber(t *testing.T, rootContext context.Context, data subData) { + ctx, cancel := context.WithCancel(rootContext) + evtChan, errChan := data.ES.Subscribe(ctx) + + subscriber := testSubscriber{ + eventCh: evtChan, + eventErrCh: errChan, + cancel: cancel, } - for _, subscriber := range []struct { - eventCh <-chan *ports.EventEnvelope - eventErrCh <-chan error - cancel func() - }{ - { - eventCh: evt1, - eventErrCh: err1, - cancel: cancel1, - }, - { - eventCh: evt2, - eventErrCh: err2, - cancel: cancel2, - }, - } { - recvd := []interface{}{} - subscibercheck: - for { - select { - case env := <-subscriber.eventCh: - if env != nil { - recvd = append(recvd, env.Event) - } else { - break subscibercheck - } - case err := <-subscriber.eventErrCh: - if err != nil { - t.Fatal(err) - } - break subscibercheck - } + t.Logf("subscriber (%d) is ready to receive events", data.ID) + data.Ready() + recvd, err := watch(t, &subscriber, data.MaxEvents) + t.Logf("subscriber (%d) is done", data.ID) + + assert.NoError(t, err) + assert.Len(t, recvd, data.MaxEvents) + + data.Done() +} + +func watch(t *testing.T, subscriber *testSubscriber, maxEvents int) ([]interface{}, error) { - if len(recvd) == len(testEvents) { - subscriber.cancel() + recvd := []interface{}{} + + var err error + + for { + select { + case env := <-subscriber.eventCh: + if env == nil { + break } + recvd = append(recvd, env.Event) + case err = <-subscriber.eventErrCh: + break + } + + if len(recvd) == maxEvents { + subscriber.cancel() + break } } + + return recvd, err } type testEvent struct { Name string Value string } + +type testSubscriber struct { + eventCh <-chan *ports.EventEnvelope + eventErrCh <-chan error + cancel func() +}