Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try to fix EventService test failures #154

Merged
merged 2 commits into from
Oct 21, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 101 additions & 60 deletions infrastructure/containerd/event_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,23 @@ package containerd_test

import (
"context"
"sync"
"testing"
"time"

. "github.com/onsi/gomega"
"github.com/stretchr/testify/assert"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need 2 assertion packages. If the consensus is testtify/assert then i'm good with that. I like gomega but not so much that i wouldn't be happy using something else.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used it because it was already in there (no addition in go.mod)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My only issue with the way gomega is used, I don't like the . imports because they can mess with the environment, they are packages and should be used as packages and not a pack of function injected into our test module.

Copy link
Contributor Author

@yitsushi yitsushi Oct 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the test to use only Gomega.


"github.com/weaveworks/reignite/api/events"
"github.com/weaveworks/reignite/core/ports"
"github.com/weaveworks/reignite/infrastructure/containerd"
)

const (
numberOfSubscribers = 2
sleepTime = 10
)

func TestEventService_Integration(t *testing.T) {
if !runContainerDTests() {
t.Skip("skipping containerd event service integration test")
Expand All @@ -26,84 +34,117 @@ func TestEventService_Integration(t *testing.T) {
Namespace: testContainerdNs,
}, client)

testEvents := []*events.MicroVMSpecCreated{
{ID: "vm1", Namespace: "ns1"},
{ID: "vm2", Namespace: "ns1"},
{ID: "vm1", Namespace: "ns2"},
}

var (
wgReady sync.WaitGroup
wgDone sync.WaitGroup
)

t.Log("creating subscribers")

ctx1, cancel1 := context.WithCancel(ctx)
evt1, err1 := es.Subscribe(ctx1)
ctx2, cancel2 := context.WithCancel(ctx)
evt2, err2 := es.Subscribe(ctx2)
for i := 0; i < numberOfSubscribers; i++ {
wgReady.Add(1)
wgDone.Add(1)

errChan := make(chan error)
data := subData{
ID: i,
ES: es,
MaxEvents: len(testEvents),
Ready: wgReady.Done,
Done: wgDone.Done,
}

testEvents := []*events.MicroVMSpecCreated{
{
ID: "vm1",
Namespace: "ns1",
},
{
ID: "vm2",
Namespace: "ns1",
},
go newSubscriber(t, ctx, data)
}

go func() {
defer close(errChan)
for _, event := range testEvents {
if err := es.Publish(ctx, "/reignite/test", event); err != nil {
errChan <- err
return
}
wgReady.Wait()

// Without this, it's still possible we publish the first ever before the
// connection is read.
time.Sleep(time.Microsecond * sleepTime)

t.Log("publishing events")

for _, event := range testEvents {
t.Logf("publishing event: %v", event)
if err := es.Publish(ctx, "/reignite/test", event); err != nil {
t.Fatal(err)
break
}
}

t.Log("finished publishing events")

wgDone.Wait()
}

type subData struct {
ID int
ES ports.EventService
MaxEvents int
Ready func()
Done func()
}

t.Log("finished publishing events")
}()
func newSubscriber(t *testing.T, rootContext context.Context, data subData) {
ctx, cancel := context.WithCancel(rootContext)
evtChan, errChan := data.ES.Subscribe(ctx)

t.Log("subscribers waiting for events")
if err := <-errChan; err != nil {
t.Fatal(err)
subscriber := testSubscriber{
eventCh: evtChan,
eventErrCh: errChan,
cancel: cancel,
}

for _, subscriber := range []struct {
eventCh <-chan *ports.EventEnvelope
eventErrCh <-chan error
cancel func()
}{
{
eventCh: evt1,
eventErrCh: err1,
cancel: cancel1,
},
{
eventCh: evt2,
eventErrCh: err2,
cancel: cancel2,
},
} {
recvd := []interface{}{}
subscibercheck:
for {
select {
case env := <-subscriber.eventCh:
if env != nil {
recvd = append(recvd, env.Event)
} else {
break subscibercheck
}
case err := <-subscriber.eventErrCh:
if err != nil {
t.Fatal(err)
}
break subscibercheck
}
t.Logf("subscriber (%d) is ready to receive events", data.ID)
data.Ready()
recvd, err := watch(t, &subscriber, data.MaxEvents)
t.Logf("subscriber (%d) is done", data.ID)

assert.NoError(t, err)
assert.Len(t, recvd, data.MaxEvents)

data.Done()
}

func watch(t *testing.T, subscriber *testSubscriber, maxEvents int) ([]interface{}, error) {

if len(recvd) == len(testEvents) {
subscriber.cancel()
recvd := []interface{}{}

var err error

for {
select {
case env := <-subscriber.eventCh:
if env == nil {
break
}
recvd = append(recvd, env.Event)
case err = <-subscriber.eventErrCh:
break
}

if len(recvd) == maxEvents {
subscriber.cancel()
break
}
}

return recvd, err
}

type testEvent struct {
Name string
Value string
}

type testSubscriber struct {
eventCh <-chan *ports.EventEnvelope
eventErrCh <-chan error
cancel func()
}