From b6f1379fbe03ac721ba59cba8da7912433de7fb5 Mon Sep 17 00:00:00 2001 From: Balazs Nadasdi Date: Wed, 20 Oct 2021 15:42:33 +0200 Subject: [PATCH] Fix EventService test I was able to reproduce the issue with a helping had from cgroup. Limiting to one CPU with a heavily quota (0.01% of my CPU) revealed the issue. Note before I describe what happened: Containerd does not send messages to subscribers retrospectively, all subscribers will receive events from the point they subscribed. The original test created published N events and after that created subscribers. The connection between the test and containerd is much slower than the execution of the test, so when containrd wants to send out the events to all subscribers, they are already there to receive events. That's why it works on my machine and that's why it can pass on Github Actions sometimes. However, on a slow machine, with only one vcpu, the test and containerd are racing for their own cpu share. In this scenario, the events are already published before the subscribers are ready to receive them. Solution: Create subscribers first and then publish events. Disclaimer: There is a chance, all I wrote above is not entire correct, but that's how I understand it. It does not really matter if we only check the logic in the test. Originally it was publish->subscribe, but it's not correct, we need subscribers before we publish events. Fixes #115 --- .../containerd/event_service_test.go | 161 +++++++++++------- 1 file changed, 101 insertions(+), 60 deletions(-) diff --git a/infrastructure/containerd/event_service_test.go b/infrastructure/containerd/event_service_test.go index d2da3b1f..b97c9133 100644 --- a/infrastructure/containerd/event_service_test.go +++ b/infrastructure/containerd/event_service_test.go @@ -2,15 +2,23 @@ package containerd_test import ( "context" + "sync" "testing" + "time" . "github.com/onsi/gomega" + "github.com/stretchr/testify/assert" "github.com/weaveworks/reignite/api/events" "github.com/weaveworks/reignite/core/ports" "github.com/weaveworks/reignite/infrastructure/containerd" ) +const ( + numberOfSubscribers = 2 + sleepTime = 10 +) + func TestEventService_Integration(t *testing.T) { if !runContainerDTests() { t.Skip("skipping containerd event service integration test") @@ -26,84 +34,117 @@ func TestEventService_Integration(t *testing.T) { Namespace: testContainerdNs, }, client) + testEvents := []*events.MicroVMSpecCreated{ + {ID: "vm1", Namespace: "ns1"}, + {ID: "vm2", Namespace: "ns1"}, + {ID: "vm1", Namespace: "ns2"}, + } + + var ( + wgReady sync.WaitGroup + wgDone sync.WaitGroup + ) + t.Log("creating subscribers") - ctx1, cancel1 := context.WithCancel(ctx) - evt1, err1 := es.Subscribe(ctx1) - ctx2, cancel2 := context.WithCancel(ctx) - evt2, err2 := es.Subscribe(ctx2) + for i := 0; i < numberOfSubscribers; i++ { + wgReady.Add(1) + wgDone.Add(1) - errChan := make(chan error) + data := subData{ + ID: i, + ES: es, + MaxEvents: len(testEvents), + Ready: wgReady.Done, + Done: wgDone.Done, + } - testEvents := []*events.MicroVMSpecCreated{ - { - ID: "vm1", - Namespace: "ns1", - }, - { - ID: "vm2", - Namespace: "ns1", - }, + go newSubscriber(t, ctx, data) } - go func() { - defer close(errChan) - for _, event := range testEvents { - if err := es.Publish(ctx, "/reignite/test", event); err != nil { - errChan <- err - return - } + wgReady.Wait() + + // Without this, it's still possible we publish the first ever before the + // connection is read. + time.Sleep(time.Microsecond * sleepTime) + + t.Log("publishing events") + + for _, event := range testEvents { + t.Logf("publishing event: %v", event) + if err := es.Publish(ctx, "/reignite/test", event); err != nil { + t.Fatal(err) + break } + } + + t.Log("finished publishing events") + + wgDone.Wait() +} + +type subData struct { + ID int + ES ports.EventService + MaxEvents int + Ready func() + Done func() +} - t.Log("finished publishing events") - }() +func newSubscriber(t *testing.T, rootContext context.Context, data subData) { + ctx, cancel := context.WithCancel(rootContext) + evtChan, errChan := data.ES.Subscribe(ctx) - t.Log("subscribers waiting for events") - if err := <-errChan; err != nil { - t.Fatal(err) + subscriber := testSubscriber{ + eventCh: evtChan, + eventErrCh: errChan, + cancel: cancel, } - for _, subscriber := range []struct { - eventCh <-chan *ports.EventEnvelope - eventErrCh <-chan error - cancel func() - }{ - { - eventCh: evt1, - eventErrCh: err1, - cancel: cancel1, - }, - { - eventCh: evt2, - eventErrCh: err2, - cancel: cancel2, - }, - } { - recvd := []interface{}{} - subscibercheck: - for { - select { - case env := <-subscriber.eventCh: - if env != nil { - recvd = append(recvd, env.Event) - } else { - break subscibercheck - } - case err := <-subscriber.eventErrCh: - if err != nil { - t.Fatal(err) - } - break subscibercheck - } + t.Logf("subscriber (%d) is ready to receive events", data.ID) + data.Ready() + recvd, err := watch(t, &subscriber, data.MaxEvents) + t.Logf("subscriber (%d) is done", data.ID) + + assert.NoError(t, err) + assert.Len(t, recvd, data.MaxEvents) + + data.Done() +} + +func watch(t *testing.T, subscriber *testSubscriber, maxEvents int) ([]interface{}, error) { - if len(recvd) == len(testEvents) { - subscriber.cancel() + recvd := []interface{}{} + + var err error + + for { + select { + case env := <-subscriber.eventCh: + if env == nil { + break } + recvd = append(recvd, env.Event) + case err = <-subscriber.eventErrCh: + break + } + + if len(recvd) == maxEvents { + subscriber.cancel() + break } } + + return recvd, err } type testEvent struct { Name string Value string } + +type testSubscriber struct { + eventCh <-chan *ports.EventEnvelope + eventErrCh <-chan error + cancel func() +}