Skip to content

Commit

Permalink
Fix EventService test
Browse files Browse the repository at this point in the history
I was able to reproduce the issue with a helping had from cgroup.
Limiting to one CPU with a heavily quota (0.01% of my CPU) revealed the
issue.

Note before I describe what happened:
Containerd does not send messages to subscribers retrospectively, all
subscribers will receive events from the point they subscribed.

The original test created published N events and after that created
subscribers. The connection between the test and containerd is much
slower than the execution of the test, so when containrd wants to send
out the events to all subscribers, they are already there to receive
events. That's why it works on my machine and that's why it can pass on
Github Actions sometimes.

However, on a slow machine, with only one vcpu, the test and containerd
are racing for their own cpu share. In this scenario, the events are
already published before the subscribers are ready to receive them.

Solution:
Create subscribers first and then publish events.

Disclaimer:
There is a chance, all I wrote above is not entire correct, but that's
how I understand it. It does not really matter if we only check the
logic in the test. Originally it was publish->subscribe, but it's not
correct, we need subscribers before we publish events.

Fixes liquidmetal-dev#115
  • Loading branch information
yitsushi committed Oct 20, 2021
1 parent 2941cce commit 2d86efb
Showing 1 changed file with 97 additions and 60 deletions.
157 changes: 97 additions & 60 deletions infrastructure/containerd/event_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,20 @@ package containerd_test

import (
"context"
"sync"
"testing"
"time"

. "github.com/onsi/gomega"
"github.com/stretchr/testify/assert"

"github.com/weaveworks/reignite/api/events"
"github.com/weaveworks/reignite/core/ports"
"github.com/weaveworks/reignite/infrastructure/containerd"
)

const numberOfSubscribers = 2

func TestEventService_Integration(t *testing.T) {
if !runContainerDTests() {
t.Skip("skipping containerd event service integration test")
Expand All @@ -26,84 +31,116 @@ func TestEventService_Integration(t *testing.T) {
Namespace: testContainerdNs,
}, client)

testEvents := []*events.MicroVMSpecCreated{
{ID: "vm1", Namespace: "ns1"},
{ID: "vm2", Namespace: "ns1"},
{ID: "vm1", Namespace: "ns2"},
}

var (
wgReady sync.WaitGroup
wgDone sync.WaitGroup
)

t.Log("creating subscribers")

ctx1, cancel1 := context.WithCancel(ctx)
evt1, err1 := es.Subscribe(ctx1)
ctx2, cancel2 := context.WithCancel(ctx)
evt2, err2 := es.Subscribe(ctx2)
for i := 0; i < numberOfSubscribers; i++ {
wgReady.Add(1)
wgDone.Add(1)

errChan := make(chan error)
data := subData{
ID: i,
ES: es,
MaxEvents: len(testEvents),
Ready: wgReady.Done,
Done: wgDone.Done,
}

testEvents := []*events.MicroVMSpecCreated{
{
ID: "vm1",
Namespace: "ns1",
},
{
ID: "vm2",
Namespace: "ns1",
},
go newSubscriber(t, ctx, data)
}

go func() {
defer close(errChan)
for _, event := range testEvents {
if err := es.Publish(ctx, "/reignite/test", event); err != nil {
errChan <- err
return
}
wgReady.Wait()

time.Sleep(time.Second)

t.Log("publishing events")

for _, event := range testEvents {
t.Logf("publishing event: %v", event)
if err := es.Publish(ctx, "/reignite/test", event); err != nil {
t.Fatal(err)
break
}
time.Sleep(time.Microsecond)
}

t.Log("finished publishing events")
}()
t.Log("finished publishing events")

t.Log("subscribers waiting for events")
if err := <-errChan; err != nil {
t.Fatal(err)
wgDone.Wait()
}

type subData struct {
ID int
ES ports.EventService
MaxEvents int
Ready func()
Done func()
}

func newSubscriber(t *testing.T, rootContext context.Context, data subData) {
ctx, cancel := context.WithCancel(rootContext)
evtChan, errChan := data.ES.Subscribe(ctx)

subscriber := testSubscriber{
eventCh: evtChan,
eventErrCh: errChan,
cancel: cancel,
}

for _, subscriber := range []struct {
eventCh <-chan *ports.EventEnvelope
eventErrCh <-chan error
cancel func()
}{
{
eventCh: evt1,
eventErrCh: err1,
cancel: cancel1,
},
{
eventCh: evt2,
eventErrCh: err2,
cancel: cancel2,
},
} {
recvd := []interface{}{}
subscibercheck:
for {
select {
case env := <-subscriber.eventCh:
if env != nil {
recvd = append(recvd, env.Event)
} else {
break subscibercheck
}
case err := <-subscriber.eventErrCh:
if err != nil {
t.Fatal(err)
}
break subscibercheck
}
t.Logf("subscriber (%d) is ready to receive events", data.ID)
data.Ready()
recvd, err := watch(t, &subscriber, data.MaxEvents)
t.Logf("subscriber (%d) is done", data.ID)

assert.NoError(t, err)
assert.Len(t, recvd, data.MaxEvents)

data.Done()
}

func watch(t *testing.T, subscriber *testSubscriber, maxEvents int) ([]interface{}, error) {

if len(recvd) == len(testEvents) {
subscriber.cancel()
recvd := []interface{}{}

var err error

for {
select {
case env := <-subscriber.eventCh:
if env == nil {
break
}
recvd = append(recvd, env.Event)
case err = <-subscriber.eventErrCh:
break
}

if len(recvd) == maxEvents {
subscriber.cancel()
break
}
}

return recvd, err
}

type testEvent struct {
Name string
Value string
}

type testSubscriber struct {
eventCh <-chan *ports.EventEnvelope
eventErrCh <-chan error
cancel func()
}

0 comments on commit 2d86efb

Please sign in to comment.