Skip to content

Commit

Permalink
Fix EventService test
Browse files Browse the repository at this point in the history
I was able to reproduce the issue with a helping had from cgroup.
Limiting to one CPU with a heavily quota (0.01% of my CPU) revealed the
issue.

Note before I describe what happened:
Containerd does not send messages to subscribers retrospectively, all
subscribers will receive events from the point they subscribed.

The original test created published N events and after that created
subscribers. The connection between the test and containerd is much
slower than the execution of the test, so when containrd wants to send
out the events to all subscribers, they are already there to receive
events. That's why it works on my machine and that's why it can pass on
Github Actions sometimes.

However, on a slow machine, with only one vcpu, the test and containerd
are racing for their own cpu share. In this scenario, the events are
already published before the subscribers are ready to receive them.

Solution:
Create subscribers first and then publish events.

Disclaimer:
There is a chance, all I wrote above is not entire correct, but that's
how I understand it. It does not really matter if we only check the
logic in the test. Originally it was publish->subscribe, but it's not
correct, we need subscribers before we publish events.

Fixes liquidmetal-dev#115
  • Loading branch information
yitsushi committed Oct 20, 2021
1 parent 2941cce commit b70bad3
Showing 1 changed file with 69 additions and 61 deletions.
130 changes: 69 additions & 61 deletions infrastructure/containerd/event_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@ package containerd_test

import (
"context"
"sync"
"testing"

. "github.com/onsi/gomega"
"github.com/stretchr/testify/assert"

"github.com/weaveworks/reignite/api/events"
"github.com/weaveworks/reignite/core/ports"
"github.com/weaveworks/reignite/infrastructure/containerd"
)

const numberOfSubscribers = 2

func TestEventService_Integration(t *testing.T) {
if !runContainerDTests() {
t.Skip("skipping containerd event service integration test")
Expand All @@ -26,84 +30,88 @@ func TestEventService_Integration(t *testing.T) {
Namespace: testContainerdNs,
}, client)

t.Log("creating subscribers")
testEvents := []*events.MicroVMSpecCreated{
{ID: "vm1", Namespace: "ns1"},
{ID: "vm2", Namespace: "ns1"},
}

ctx1, cancel1 := context.WithCancel(ctx)
evt1, err1 := es.Subscribe(ctx1)
ctx2, cancel2 := context.WithCancel(ctx)
evt2, err2 := es.Subscribe(ctx2)
var wg sync.WaitGroup

errChan := make(chan error)
t.Log("creating subscribers")

testEvents := []*events.MicroVMSpecCreated{
{
ID: "vm1",
Namespace: "ns1",
},
{
ID: "vm2",
Namespace: "ns1",
},
for i := 0; i < numberOfSubscribers; i++ {
wg.Add(1)
go newSubscriber(t, es, ctx, i, len(testEvents), wg.Done)
}

go func() {
defer close(errChan)
for _, event := range testEvents {
if err := es.Publish(ctx, "/reignite/test", event); err != nil {
errChan <- err
return
}
t.Log("publishing events")

for _, event := range testEvents {
if err := es.Publish(ctx, "/reignite/test", event); err != nil {
t.Error(err)
break
}
}

t.Log("finished publishing events")

wg.Wait()
}

t.Log("finished publishing events")
}()
func newSubscriber(t *testing.T, es ports.EventService, rootContext context.Context, id int, maxEvents int, done func()) {
ctx, cancel := context.WithCancel(rootContext)
evtChan, errChan := es.Subscribe(ctx)

t.Log("subscribers waiting for events")
if err := <-errChan; err != nil {
t.Fatal(err)
subscriber := testSubscriber{
eventCh: evtChan,
eventErrCh: errChan,
cancel: cancel,
}

for _, subscriber := range []struct {
eventCh <-chan *ports.EventEnvelope
eventErrCh <-chan error
cancel func()
}{
{
eventCh: evt1,
eventErrCh: err1,
cancel: cancel1,
},
{
eventCh: evt2,
eventErrCh: err2,
cancel: cancel2,
},
} {
recvd := []interface{}{}
subscibercheck:
for {
select {
case env := <-subscriber.eventCh:
if env != nil {
recvd = append(recvd, env.Event)
} else {
break subscibercheck
}
case err := <-subscriber.eventErrCh:
if err != nil {
t.Fatal(err)
}
break subscibercheck
}
t.Logf("start subscriber (%d) is ready to receive events", id+1)
recvd, err := watch(&subscriber, maxEvents)
t.Logf("subscriber (%d) is done", id+1)

assert.NoError(t, err)
assert.Len(t, recvd, 2)

done()
}

func watch(subscriber *testSubscriber, maxEvents int) ([]interface{}, error) {

recvd := []interface{}{}

if len(recvd) == len(testEvents) {
subscriber.cancel()
var err error

for {
select {
case env := <-subscriber.eventCh:
if env != nil {
recvd = append(recvd, env.Event)
} else {
break
}
case err = <-subscriber.eventErrCh:
break
}

if len(recvd) == maxEvents {
subscriber.cancel()
break
}
}

return recvd, err
}

type testEvent struct {
Name string
Value string
}

type testSubscriber struct {
eventCh <-chan *ports.EventEnvelope
eventErrCh <-chan error
cancel func()
}

0 comments on commit b70bad3

Please sign in to comment.