Skip to content

Commit

Permalink
Merge e645e2e into 7bfe7cd
Browse files Browse the repository at this point in the history
  • Loading branch information
yitsushi authored Oct 21, 2021
2 parents 7bfe7cd + e645e2e commit e6ce829
Showing 1 changed file with 100 additions and 60 deletions.
160 changes: 100 additions & 60 deletions infrastructure/containerd/event_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package containerd_test

import (
"context"
"sync"
"testing"
"time"

. "github.com/onsi/gomega"

Expand All @@ -11,6 +13,11 @@ import (
"github.com/weaveworks/reignite/infrastructure/containerd"
)

const (
numberOfSubscribers = 2
sleepTime = 10
)

func TestEventService_Integration(t *testing.T) {
if !runContainerDTests() {
t.Skip("skipping containerd event service integration test")
Expand All @@ -26,84 +33,117 @@ func TestEventService_Integration(t *testing.T) {
Namespace: testContainerdNs,
}, client)

testEvents := []*events.MicroVMSpecCreated{
{ID: "vm1", Namespace: "ns1"},
{ID: "vm2", Namespace: "ns1"},
{ID: "vm1", Namespace: "ns2"},
}

var (
wgReady sync.WaitGroup
wgDone sync.WaitGroup
)

t.Log("creating subscribers")

ctx1, cancel1 := context.WithCancel(ctx)
evt1, err1 := es.Subscribe(ctx1)
ctx2, cancel2 := context.WithCancel(ctx)
evt2, err2 := es.Subscribe(ctx2)
for i := 0; i < numberOfSubscribers; i++ {
wgReady.Add(1)
wgDone.Add(1)

errChan := make(chan error)
data := subData{
ID: i,
ES: es,
MaxEvents: len(testEvents),
Ready: wgReady.Done,
Done: wgDone.Done,
}

testEvents := []*events.MicroVMSpecCreated{
{
ID: "vm1",
Namespace: "ns1",
},
{
ID: "vm2",
Namespace: "ns1",
},
go newSubscriber(t, ctx, data)
}

go func() {
defer close(errChan)
for _, event := range testEvents {
if err := es.Publish(ctx, "/reignite/test", event); err != nil {
errChan <- err
return
}
wgReady.Wait()

// Without this, it's still possible we publish the first ever before the
// connection is read.
time.Sleep(time.Microsecond * sleepTime)

t.Log("publishing events")

for _, event := range testEvents {
t.Logf("publishing event: %v", event)
if err := es.Publish(ctx, "/reignite/test", event); err != nil {
t.Fatal(err)
break
}
}

t.Log("finished publishing events")

wgDone.Wait()
}

type subData struct {
ID int
ES ports.EventService
MaxEvents int
Ready func()
Done func()
}

t.Log("finished publishing events")
}()
func newSubscriber(t *testing.T, rootContext context.Context, data subData) {
ctx, cancel := context.WithCancel(rootContext)
evtChan, errChan := data.ES.Subscribe(ctx)

t.Log("subscribers waiting for events")
if err := <-errChan; err != nil {
t.Fatal(err)
subscriber := testSubscriber{
eventCh: evtChan,
eventErrCh: errChan,
cancel: cancel,
}

for _, subscriber := range []struct {
eventCh <-chan *ports.EventEnvelope
eventErrCh <-chan error
cancel func()
}{
{
eventCh: evt1,
eventErrCh: err1,
cancel: cancel1,
},
{
eventCh: evt2,
eventErrCh: err2,
cancel: cancel2,
},
} {
recvd := []interface{}{}
subscibercheck:
for {
select {
case env := <-subscriber.eventCh:
if env != nil {
recvd = append(recvd, env.Event)
} else {
break subscibercheck
}
case err := <-subscriber.eventErrCh:
if err != nil {
t.Fatal(err)
}
break subscibercheck
}
t.Logf("subscriber (%d) is ready to receive events", data.ID)
data.Ready()
recvd, err := watch(t, &subscriber, data.MaxEvents)
t.Logf("subscriber (%d) is done", data.ID)

Expect(err).To(BeNil())
Expect(recvd).To(HaveLen(data.MaxEvents))

data.Done()
}

func watch(t *testing.T, subscriber *testSubscriber, maxEvents int) ([]interface{}, error) {

if len(recvd) == len(testEvents) {
subscriber.cancel()
recvd := []interface{}{}

var err error

for {
select {
case env := <-subscriber.eventCh:
if env == nil {
break
}
recvd = append(recvd, env.Event)
case err = <-subscriber.eventErrCh:
break
}

if len(recvd) == maxEvents {
subscriber.cancel()
break
}
}

return recvd, err
}

type testEvent struct {
Name string
Value string
}

type testSubscriber struct {
eventCh <-chan *ports.EventEnvelope
eventErrCh <-chan error
cancel func()
}

0 comments on commit e6ce829

Please sign in to comment.