Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try to fix EventService test failures #154

Merged
merged 2 commits into from
Oct 21, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 100 additions & 60 deletions infrastructure/containerd/event_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package containerd_test

import (
"context"
"sync"
"testing"
"time"

. "github.com/onsi/gomega"

Expand All @@ -11,6 +13,11 @@ import (
"github.com/weaveworks/reignite/infrastructure/containerd"
)

const (
numberOfSubscribers = 2
sleepTime = 10
)

func TestEventService_Integration(t *testing.T) {
if !runContainerDTests() {
t.Skip("skipping containerd event service integration test")
Expand All @@ -26,84 +33,117 @@ func TestEventService_Integration(t *testing.T) {
Namespace: testContainerdNs,
}, client)

testEvents := []*events.MicroVMSpecCreated{
{ID: "vm1", Namespace: "ns1"},
{ID: "vm2", Namespace: "ns1"},
{ID: "vm1", Namespace: "ns2"},
}

var (
wgReady sync.WaitGroup
wgDone sync.WaitGroup
)

t.Log("creating subscribers")

ctx1, cancel1 := context.WithCancel(ctx)
evt1, err1 := es.Subscribe(ctx1)
ctx2, cancel2 := context.WithCancel(ctx)
evt2, err2 := es.Subscribe(ctx2)
for i := 0; i < numberOfSubscribers; i++ {
wgReady.Add(1)
wgDone.Add(1)

errChan := make(chan error)
data := subData{
ID: i,
ES: es,
MaxEvents: len(testEvents),
Ready: wgReady.Done,
Done: wgDone.Done,
}

testEvents := []*events.MicroVMSpecCreated{
{
ID: "vm1",
Namespace: "ns1",
},
{
ID: "vm2",
Namespace: "ns1",
},
go newSubscriber(t, ctx, data)
}

go func() {
defer close(errChan)
for _, event := range testEvents {
if err := es.Publish(ctx, "/reignite/test", event); err != nil {
errChan <- err
return
}
wgReady.Wait()

// Without this, it's still possible we publish the first ever before the
// connection is read.
time.Sleep(time.Microsecond * sleepTime)

t.Log("publishing events")

for _, event := range testEvents {
t.Logf("publishing event: %v", event)
if err := es.Publish(ctx, "/reignite/test", event); err != nil {
t.Fatal(err)
break
}
}

t.Log("finished publishing events")

wgDone.Wait()
}

type subData struct {
ID int
ES ports.EventService
MaxEvents int
Ready func()
Done func()
}

t.Log("finished publishing events")
}()
func newSubscriber(t *testing.T, rootContext context.Context, data subData) {
ctx, cancel := context.WithCancel(rootContext)
evtChan, errChan := data.ES.Subscribe(ctx)

t.Log("subscribers waiting for events")
if err := <-errChan; err != nil {
t.Fatal(err)
subscriber := testSubscriber{
eventCh: evtChan,
eventErrCh: errChan,
cancel: cancel,
}

for _, subscriber := range []struct {
eventCh <-chan *ports.EventEnvelope
eventErrCh <-chan error
cancel func()
}{
{
eventCh: evt1,
eventErrCh: err1,
cancel: cancel1,
},
{
eventCh: evt2,
eventErrCh: err2,
cancel: cancel2,
},
} {
recvd := []interface{}{}
subscibercheck:
for {
select {
case env := <-subscriber.eventCh:
if env != nil {
recvd = append(recvd, env.Event)
} else {
break subscibercheck
}
case err := <-subscriber.eventErrCh:
if err != nil {
t.Fatal(err)
}
break subscibercheck
}
t.Logf("subscriber (%d) is ready to receive events", data.ID)
data.Ready()
recvd, err := watch(t, &subscriber, data.MaxEvents)
t.Logf("subscriber (%d) is done", data.ID)

Expect(err).To(BeNil())
Expect(recvd).To(HaveLen(data.MaxEvents))

data.Done()
}

func watch(t *testing.T, subscriber *testSubscriber, maxEvents int) ([]interface{}, error) {

if len(recvd) == len(testEvents) {
subscriber.cancel()
recvd := []interface{}{}

var err error

for {
select {
case env := <-subscriber.eventCh:
if env == nil {
break
}
recvd = append(recvd, env.Event)
case err = <-subscriber.eventErrCh:
break
}

if len(recvd) == maxEvents {
subscriber.cancel()
break
}
}

return recvd, err
}

type testEvent struct {
Name string
Value string
}

type testSubscriber struct {
eventCh <-chan *ports.EventEnvelope
eventErrCh <-chan error
cancel func()
}