Skip to content

Commit

Permalink
this one, I have faith in you
Browse files Browse the repository at this point in the history
  • Loading branch information
yitsushi committed Oct 20, 2021
1 parent c21b635 commit 6f4201b
Showing 1 changed file with 36 additions and 42 deletions.
78 changes: 36 additions & 42 deletions infrastructure/containerd/event_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package containerd_test

import (
"context"
"sync"
"testing"

. "github.com/onsi/gomega"
Expand All @@ -12,6 +13,8 @@ import (
"github.com/weaveworks/reignite/infrastructure/containerd"
)

const numberOfSubscribers = 2

func TestEventService_Integration(t *testing.T) {
if !runContainerDTests() {
t.Skip("skipping containerd event service integration test")
Expand All @@ -27,78 +30,69 @@ func TestEventService_Integration(t *testing.T) {
Namespace: testContainerdNs,
}, client)

t.Log("creating subscribers")

ctx1, cancel1 := context.WithCancel(ctx)
evt1, err1 := es.Subscribe(ctx1)
ctx2, cancel2 := context.WithCancel(ctx)
evt2, err2 := es.Subscribe(ctx2)

errChan := make(chan error)

testEvents := []*events.MicroVMSpecCreated{
{ID: "vm1", Namespace: "ns1"},
{ID: "vm2", Namespace: "ns1"},
}

subscribers := []testSubscriber{
{eventCh: evt1, eventErrCh: err1, cancel: cancelWrapper(t, 1, cancel1)},
{eventCh: evt2, eventErrCh: err2, cancel: cancelWrapper(t, 2, cancel2)},
}
var wg sync.WaitGroup

go func() {
defer close(errChan)
t.Log("creating subscribers")

for _, event := range testEvents {
if err := es.Publish(ctx, "/reignite/test", event); err != nil {
errChan <- err
return
}
}
for i := 0; i < numberOfSubscribers; i++ {
wg.Add(1)
go newSubscriber(t, es, ctx, i, len(testEvents), wg.Done)
}

t.Log("finished publishing events")
}()
t.Log("publishing events")

t.Log("subscribers waiting for events")
if err := <-errChan; err != nil {
t.Fatal(err)
for _, event := range testEvents {
if err := es.Publish(ctx, "/reignite/test", event); err != nil {
t.Error(err)
break
}
}

for idx, subscriber := range subscribers {
t.Logf("start subscriber (%d) is ready to receive events", idx+1)
recvd, err := watch(t, &subscriber, len(testEvents))
t.Logf("subscriber (%d) is done", idx+1)
t.Log("finished publishing events")

assert.NoError(t, err)
assert.Len(t, recvd, 2)
}
wg.Wait()
}

func cancelWrapper(t *testing.T, id int, cancel context.CancelFunc) func() {
return func() {
t.Logf("context (%d) cancelled", id)
cancel()
func newSubscriber(t *testing.T, es ports.EventService, rootContext context.Context, id int, maxEvents int, done func()) {
ctx, cancel := context.WithCancel(rootContext)
evtChan, errChan := es.Subscribe(ctx)

subscriber := testSubscriber{
eventCh: evtChan,
eventErrCh: errChan,
cancel: cancel,
}

t.Logf("start subscriber (%d) is ready to receive events", id+1)
recvd, err := watch(&subscriber, maxEvents)
t.Logf("subscriber (%d) is done", id+1)

assert.NoError(t, err)
assert.Len(t, recvd, 2)

done()
}

func watch(t *testing.T, subscriber *testSubscriber, maxEvents int) ([]interface{}, error) {
func watch(subscriber *testSubscriber, maxEvents int) ([]interface{}, error) {

recvd := []interface{}{}

var err error

for {
select {
case env := <-subscriber.eventCh:
t.Logf("subscriber.eventCh received: %+v", env)

if env != nil {
recvd = append(recvd, env.Event)
} else {
break
}
case err = <-subscriber.eventErrCh:
t.Logf("subscriber.eventErrCh received: %v", err)

break
}

Expand Down

0 comments on commit 6f4201b

Please sign in to comment.