From 4b91af87e91188c6397cf9c1dc4688edb67f6c71 Mon Sep 17 00:00:00 2001 From: Richard Case Date: Fri, 20 Aug 2021 14:30:37 +0100 Subject: [PATCH] feat: added containerd based event services Implemented an event service using the containerd event service. Also moved the event definitions out of the core folder and added to the api. Removed the old transport based event service. Signed-off-by: Richard Case --- api/events/register.go | 11 + {core => api}/events/spec.go | 0 cmd/dev-helper/main.go | 174 ++++---------- core/application/app_test.go | 2 +- core/application/commands.go | 2 +- core/ports/services.go | 30 +-- infrastructure/containerd/convert.go | 24 ++ infrastructure/containerd/event_service.go | 100 ++++++++ .../containerd/event_service_test.go | 104 ++++++++ infrastructure/containerd/image_service.go | 5 +- .../containerd/image_service_test.go | 3 +- infrastructure/containerd/repo.go | 27 ++- infrastructure/mock/mock.go | 58 ++--- infrastructure/transport/errors.go | 9 - infrastructure/transport/transport.go | 103 -------- infrastructure/transport/transport_test.go | 223 ------------------ internal/command/run/run.go | 11 +- 17 files changed, 352 insertions(+), 534 deletions(-) create mode 100644 api/events/register.go rename {core => api}/events/spec.go (100%) create mode 100644 infrastructure/containerd/event_service.go create mode 100644 infrastructure/containerd/event_service_test.go delete mode 100644 infrastructure/transport/errors.go delete mode 100644 infrastructure/transport/transport.go delete mode 100644 infrastructure/transport/transport_test.go diff --git a/api/events/register.go b/api/events/register.go new file mode 100644 index 00000000..76b8dd3a --- /dev/null +++ b/api/events/register.go @@ -0,0 +1,11 @@ +package events + +import ( + "github.com/containerd/typeurl" +) + +func init() { + typeurl.Register(&MicroVMSpecCreated{}, "microvm.services.api.events.microvmspeccreated") + typeurl.Register(&MicroVMSpecUpdated{}, "microvm.services.api.events.microvmspecupdated") + typeurl.Register(&MicroVMSpecDeleted{}, "microvm.services.api.events.microvmspecdeleted") +} diff --git a/core/events/spec.go b/api/events/spec.go similarity index 100% rename from core/events/spec.go rename to api/events/spec.go diff --git a/cmd/dev-helper/main.go b/cmd/dev-helper/main.go index bb55153c..d0c9488a 100644 --- a/cmd/dev-helper/main.go +++ b/cmd/dev-helper/main.go @@ -3,20 +3,16 @@ package main import ( "context" - "encoding/json" "fmt" "log" "github.com/sirupsen/logrus" _ "github.com/containerd/containerd/api/events" - "github.com/containerd/containerd/content" - "github.com/containerd/containerd/leases" - "github.com/containerd/containerd/namespaces" - "github.com/containerd/typeurl" ctr "github.com/containerd/containerd" + "github.com/weaveworks/reignite/api/events" "github.com/weaveworks/reignite/core/models" "github.com/weaveworks/reignite/core/ports" "github.com/weaveworks/reignite/infrastructure/containerd" @@ -45,6 +41,8 @@ func main() { logger := rlog.GetLogger(ctx) logger.Infof("reignite dev-helper, using containerd socket: %s", socketPath) + //eventPublishTest(ctx, socketPath, logger) + logger.Info("starting containerd event listener") go eventListener(ctx, socketPath, logger) @@ -56,16 +54,47 @@ func main() { fmt.Scanln() imageServiceTest(ctx, socketPath, logger) - //repoUpdateTest(ctx, socketPath) - //imageLeaseTest(ctx, socketPath) - //contentStoreTest(ctx, socketPath) - logger.Info("Press [enter] to exit") fmt.Scanln() cancel() } +func eventPublishTest(ctx context.Context, socketPath string, logger *logrus.Entry) { + cfg := &containerd.Config{ + SocketPath: socketPath, + } + logger.Info("creating event service") + + es, err := containerd.NewEventService(cfg) + if err != nil { + log.Fatal(err) + } + + evt := &events.MicroVMSpecCreated{ + ID: "abcdf", + Namespace: "ns1", + } + + ctx, cancel := context.WithCancel(ctx) + + evts, errs := es.Subscribe(ctx) + + err = es.Publish(ctx, "/test", evt) + if err != nil { + log.Fatal(err) + } + + select { + case evt := <-evts: + fmt.Printf("in dev-helper, got evtenr: %#v\n", evt.Event) + case evtErr := <-errs: + fmt.Println(evtErr) + } + + cancel() +} + func repoTest(ctx context.Context, socketPath string, logger *logrus.Entry) { client, err := ctr.New(socketPath) if err != nil { @@ -113,141 +142,30 @@ func imageServiceTest(ctx context.Context, socketPath string, logger *logrus.Ent } func eventListener(ctx context.Context, socketPath string, logger *logrus.Entry) { - client, err := ctr.New(socketPath) + cfg := &containerd.Config{ + SocketPath: socketPath, + } + logger.Info("creating event service") + + es, err := containerd.NewEventService(cfg) if err != nil { log.Fatal(err) } - es := client.EventService() ch, errsCh := es.Subscribe(ctx) - for { select { case <-ctx.Done(): logger.Info("Existing event listener") + return case evt := <-ch: - v, err := typeurl.UnmarshalAny(evt.Event) - if err != nil { - logger.Errorf("error unmarshalling: %s", err) - continue - } - out, err := json.Marshal(v) - if err != nil { - logger.Errorf("cannot marshal Any into JSON: %s", err) - continue - } - logger.Infof("event received, ns %s, topic %s, body: %s", evt.Namespace, evt.Topic, string(out)) + logger.Infof("event received, ns %s, topic %s, body: %#v", evt.Namespace, evt.Topic, evt.Event) case errEvt := <-errsCh: logger.Errorf("event error received: %s", errEvt) } } } -func imageLeaseTest(ctx context.Context, socketPath string) { - client, err := ctr.New(socketPath) - if err != nil { - log.Fatal(err) - } - - nsCtx := namespaces.WithNamespace(ctx, vmNamespace) - - leaseManager := client.LeasesService() - l, err := leaseManager.Create(nsCtx, leases.WithID("mytestlease")) - if err != nil { - log.Fatal(err) - } - - leaseCtx := leases.WithLease(nsCtx, l.ID) - - image, err := client.Pull(leaseCtx, imageName, ctr.WithPullUnpack) - if err != nil { - log.Fatal(err) - } - fmt.Printf("%#v\n", image) - fmt.Println("done with pull") -} - -func contentStoreTest(ctx context.Context, socketPath string) { - client, err := ctr.New(socketPath) - if err != nil { - log.Fatal(err) - } - - nsCtx := namespaces.WithNamespace(ctx, vmNamespace) - - leaseManager := client.LeasesService() - l, err := leaseManager.Create(nsCtx, leases.WithID("mytestlease")) - if err != nil { - log.Fatal(err) - } - - vmSpec := getTestSpec() - - leaseCtx := leases.WithLease(nsCtx, l.ID) - - store := client.ContentStore() - - refName := "mytestrefname" - writer, err := store.Writer(leaseCtx, content.WithRef(refName)) - if err != nil { - log.Fatal(err) - } - - data, err := json.Marshal(vmSpec) - if err != nil { - log.Fatal(err) - } - - _, err = writer.Write(data) - if err != nil { - log.Fatal(err) - } - - labels := map[string]string{ - "vmid": vmName, - "ns": vmNamespace, - } - err = writer.Commit(leaseCtx, 0, "", content.WithLabels(labels)) - if err != nil { - log.Fatal(err) - } - - writer.Close() -} - -func repoUpdateTest(ctx context.Context, socketPath string) { - client, err := ctr.New(socketPath) - if err != nil { - log.Fatal(err) - } - - repo := containerd.NewMicroVMRepoWithClient(client) - - vmSpec := getTestSpec() - - _, err = repo.Save(ctx, vmSpec) - if err != nil { - log.Fatal(err) - } - - vmSpec.Spec.MemoryInMb = 8096 - - _, err = repo.Save(ctx, vmSpec) - if err != nil { - log.Fatal(err) - } - - specs, err := repo.GetAll(ctx, vmNamespace) - if err != nil { - log.Fatal(err) - } - - for _, spec := range specs { - log.Printf("spec: %#v\n", spec) - } - -} - func getTestSpec() *models.MicroVM { return &models.MicroVM{ ID: vmName, diff --git a/core/application/app_test.go b/core/application/app_test.go index e5b77107..737fa711 100644 --- a/core/application/app_test.go +++ b/core/application/app_test.go @@ -7,8 +7,8 @@ import ( "github.com/golang/mock/gomock" . "github.com/onsi/gomega" + "github.com/weaveworks/reignite/api/events" "github.com/weaveworks/reignite/core/application" - "github.com/weaveworks/reignite/core/events" "github.com/weaveworks/reignite/core/models" "github.com/weaveworks/reignite/infrastructure/mock" "github.com/weaveworks/reignite/pkg/defaults" diff --git a/core/application/commands.go b/core/application/commands.go index 763720e3..7d983455 100644 --- a/core/application/commands.go +++ b/core/application/commands.go @@ -4,7 +4,7 @@ import ( "context" "fmt" - "github.com/weaveworks/reignite/core/events" + "github.com/weaveworks/reignite/api/events" "github.com/weaveworks/reignite/core/models" "github.com/weaveworks/reignite/pkg/defaults" "github.com/weaveworks/reignite/pkg/log" diff --git a/core/ports/services.go b/core/ports/services.go index fff1697d..045b700c 100644 --- a/core/ports/services.go +++ b/core/ports/services.go @@ -2,6 +2,7 @@ package ports import ( "context" + "time" mvmv1 "github.com/weaveworks/reignite/api/services/microvm/v1alpha1" "github.com/weaveworks/reignite/core/models" @@ -18,28 +19,21 @@ type IDService interface { GenerateRandom() (string, error) } -// EventHandler represents an event handling function. -type EventHandler func(e *models.EventEnvelope) - -// EventErrorHandler represents an error handling function. -type EventErrorHandler func(err error) - -// EventHandlers represents a pair of event/error handlers. -type EventHandlers struct { - // Event is the event handler function. - Event EventHandler - // Error is the error handler function. - Error EventErrorHandler -} - // EventService is a port for a service that acts as a event bus. type EventService interface { - // CreateTopic will create a named topic (a.k.a channel or queue) for events. - CreateTopic(ctx context.Context, topic string) error // Publish will publish an event to a specific topic. Publish(ctx context.Context, topic string, eventToPublish interface{}) error - // Subscribe will subscribe to events on a named topic and will call the relevant handlers. - Subscribe(ctx context.Context, topic string, handlers EventHandlers) error + // SubscribeTopic will subscribe to events on a named topic.. + SubscribeTopic(ctx context.Context, topic string) (ch <-chan *EventEnvelope, errs <-chan error) + // Subscribe will subscribe to events on all topics + Subscribe(ctx context.Context) (ch <-chan *EventEnvelope, errs <-chan error) +} + +type EventEnvelope struct { + Timestamp time.Time + Namespace string + Topic string + Event interface{} } // ImageService is a port for a service that interacts with OCI images. diff --git a/infrastructure/containerd/convert.go b/infrastructure/containerd/convert.go index 117bd2b8..eb265ef4 100644 --- a/infrastructure/containerd/convert.go +++ b/infrastructure/containerd/convert.go @@ -3,8 +3,12 @@ package containerd import ( "fmt" + "github.com/containerd/containerd/events" "github.com/containerd/containerd/mount" + "github.com/containerd/typeurl" + "github.com/weaveworks/reignite/core/models" + "github.com/weaveworks/reignite/core/ports" ) func convertMountToModel(m mount.Mount, snapshotter string) (models.Mount, error) { @@ -45,3 +49,23 @@ func convertMountsToModel(mounts []mount.Mount, snapshotter string) ([]models.Mo return convertedMounts, nil } + +func convertCtrEventEnvelope(evt *events.Envelope) (*ports.EventEnvelope, error) { + if evt == nil { + return nil, nil + } + + converted := &ports.EventEnvelope{ + Timestamp: evt.Timestamp, + Namespace: evt.Namespace, + Topic: evt.Topic, + } + + v, err := typeurl.UnmarshalAny(evt.Event) + if err != nil { + return nil, fmt.Errorf("unmarshalling event: %w", err) + } + converted.Event = v + + return converted, nil +} diff --git a/infrastructure/containerd/event_service.go b/infrastructure/containerd/event_service.go new file mode 100644 index 00000000..d02fe9e6 --- /dev/null +++ b/infrastructure/containerd/event_service.go @@ -0,0 +1,100 @@ +package containerd + +import ( + "context" + "errors" + "fmt" + + "github.com/containerd/containerd" + "github.com/containerd/containerd/events" + "github.com/containerd/containerd/namespaces" + + "github.com/weaveworks/reignite/core/ports" + "github.com/weaveworks/reignite/pkg/defaults" +) + +func NewEventService(cfg *Config) (ports.EventService, error) { + client, err := containerd.New(cfg.SocketPath) + if err != nil { + return nil, fmt.Errorf("creating containerd client: %w", err) + } + + return NewEventServiceWithClient(client), nil +} + +func NewEventServiceWithClient(client *containerd.Client) ports.EventService { + return &eventService{ + client: client, + } +} + +type eventService struct { + client *containerd.Client +} + +// Publish will publish an event to a specific topic. +func (es *eventService) Publish(ctx context.Context, topic string, eventToPublish interface{}) error { + namespaceCtx := namespaces.WithNamespace(ctx, defaults.ContainerdNamespace) + ctrEventSrv := es.client.EventService() + if err := ctrEventSrv.Publish(namespaceCtx, topic, eventToPublish); err != nil { + return fmt.Errorf("publishing event: %w", err) + } + + return nil +} + +// SubscribeTopic will subscribe to events on a named topic. +func (es *eventService) SubscribeTopic(ctx context.Context, topic string) (ch <-chan *ports.EventEnvelope, errs <-chan error) { + topicFilter := fmt.Sprintf("topic==\"%s\"", topic) + + return es.subscribe(ctx, topicFilter) +} + +// Subscribe will subscribe to events on all topics. +func (es *eventService) Subscribe(ctx context.Context) (ch <-chan *ports.EventEnvelope, errs <-chan error) { + return es.subscribe(ctx) +} + +func (es *eventService) subscribe(ctx context.Context, filters ...string) (ch <-chan *ports.EventEnvelope, errs <-chan error) { + var ( + evtCh = make(chan *ports.EventEnvelope) + evtErrCh = make(chan error, 1) + ) + errs = evtErrCh + ch = evtCh + + namespaceCtx := namespaces.WithNamespace(ctx, defaults.ContainerdNamespace) + + var ctrEvents <-chan *events.Envelope + var ctrErrs <-chan error + if len(filters) == 0 { + ctrEvents, ctrErrs = es.client.Subscribe(namespaceCtx) + } else { + ctrEvents, ctrErrs = es.client.Subscribe(namespaceCtx, filters...) + } + + go func() { + defer close(evtCh) + + for { + select { + case <-ctx.Done(): + if cerr := ctx.Err(); cerr != nil && !errors.Is(cerr, context.Canceled) { + evtErrCh <- cerr + } + + return + case ctrEvt := <-ctrEvents: + converted, err := convertCtrEventEnvelope(ctrEvt) + if err != nil { + evtErrCh <- fmt.Errorf("converting containerd event envelope: %w", err) + } + evtCh <- converted + case ctrErr := <-ctrErrs: + evtErrCh <- ctrErr + } + } + }() + + return ch, errs +} diff --git a/infrastructure/containerd/event_service_test.go b/infrastructure/containerd/event_service_test.go new file mode 100644 index 00000000..55420599 --- /dev/null +++ b/infrastructure/containerd/event_service_test.go @@ -0,0 +1,104 @@ +package containerd_test + +import ( + "context" + "testing" + + . "github.com/onsi/gomega" + "github.com/weaveworks/reignite/api/events" + "github.com/weaveworks/reignite/core/ports" + "github.com/weaveworks/reignite/infrastructure/containerd" +) + +func TestEventService_Integration(t *testing.T) { + if !runContainerDTests() { + t.Skip("skipping containerd event service integration test") + } + + RegisterTestingT(t) + + client, ctx := testCreateClient(t) + + es := containerd.NewEventServiceWithClient(client) + + t.Log("creating subscribers") + + ctx1, cancel1 := context.WithCancel(ctx) + evt1, err1 := es.Subscribe(ctx1) + ctx2, cancel2 := context.WithCancel(ctx) + evt2, err2 := es.Subscribe(ctx2) + + errChan := make(chan error) + + testEvents := []*events.MicroVMSpecCreated{ + { + ID: "vm1", + Namespace: "ns1", + }, + { + ID: "vm2", + Namespace: "ns1", + }, + } + + go func() { + defer close(errChan) + for _, event := range testEvents { + if err := es.Publish(ctx, "/reignite/test", event); err != nil { + errChan <- err + return + } + } + + t.Log("finished publishing events") + }() + + t.Log("subscribers waiting for events") + if err := <-errChan; err != nil { + t.Fatal(err) + } + + for _, subscriber := range []struct { + eventCh <-chan *ports.EventEnvelope + eventErrCh <-chan error + cancel func() + }{ + { + eventCh: evt1, + eventErrCh: err1, + cancel: cancel1, + }, + { + eventCh: evt2, + eventErrCh: err2, + cancel: cancel2, + }, + } { + recvd := []interface{}{} + subscibercheck: + for { + select { + case env := <-subscriber.eventCh: + if env != nil { + recvd = append(recvd, env.Event) + } else { + break subscibercheck + } + case err := <-subscriber.eventErrCh: + if err != nil { + t.Fatal(err) + } + break subscibercheck + } + + if len(recvd) == len(testEvents) { + subscriber.cancel() + } + } + } +} + +type testEvent struct { + Name string + Value string +} diff --git a/infrastructure/containerd/image_service.go b/infrastructure/containerd/image_service.go index 3521d87c..f2483668 100644 --- a/infrastructure/containerd/image_service.go +++ b/infrastructure/containerd/image_service.go @@ -11,6 +11,7 @@ import ( "github.com/weaveworks/reignite/core/models" "github.com/weaveworks/reignite/core/ports" + "github.com/weaveworks/reignite/pkg/defaults" "github.com/weaveworks/reignite/pkg/log" ) @@ -43,7 +44,7 @@ func (im *imageService) Get(ctx context.Context, input ports.GetImageInput) erro actionMessage := fmt.Sprintf("getting image %s for owner %s/%s", input.ImageName, input.OwnerNamespace, input.OwnerName) logger.Debugf(actionMessage) - nsCtx := namespaces.WithNamespace(ctx, input.OwnerNamespace) + nsCtx := namespaces.WithNamespace(ctx, defaults.ContainerdNamespace) _, err := im.getImage(nsCtx, input.ImageName, input.OwnerName, input.OwnerNamespace) if err != nil { @@ -59,7 +60,7 @@ func (im *imageService) GetAndMount(ctx context.Context, input ports.GetImageInp logger := log.GetLogger(ctx).WithField("service", "containerd_image") logger.Debugf("getting and mounting image %s for owner %s/%s", input.ImageName, input.OwnerNamespace, input.OwnerName) - nsCtx := namespaces.WithNamespace(ctx, input.OwnerNamespace) + nsCtx := namespaces.WithNamespace(ctx, defaults.ContainerdNamespace) image, err := im.getImage(nsCtx, input.ImageName, input.OwnerName, input.OwnerNamespace) if err != nil { diff --git a/infrastructure/containerd/image_service_test.go b/infrastructure/containerd/image_service_test.go index 2c79e456..d4f24933 100644 --- a/infrastructure/containerd/image_service_test.go +++ b/infrastructure/containerd/image_service_test.go @@ -13,6 +13,7 @@ import ( "github.com/weaveworks/reignite/core/models" "github.com/weaveworks/reignite/core/ports" "github.com/weaveworks/reignite/infrastructure/containerd" + "github.com/weaveworks/reignite/pkg/defaults" ctr "github.com/containerd/containerd" "github.com/containerd/containerd/namespaces" @@ -37,7 +38,7 @@ func TestImageService_Integration(t *testing.T) { RegisterTestingT(t) client, ctx := testCreateClient(t) - namespaceCtx := namespaces.WithNamespace(ctx, testOwnerNamespace) + namespaceCtx := namespaces.WithNamespace(ctx, defaults.ContainerdNamespace) imageSvc := containerd.NewImageServiceWithClient(&containerd.Config{ SnapshotterKernel: testSnapshotter, diff --git a/infrastructure/containerd/repo.go b/infrastructure/containerd/repo.go index 1eb98f67..a1326358 100644 --- a/infrastructure/containerd/repo.go +++ b/infrastructure/containerd/repo.go @@ -15,6 +15,7 @@ import ( "github.com/weaveworks/reignite/core/models" "github.com/weaveworks/reignite/core/ports" + "github.com/weaveworks/reignite/pkg/defaults" "github.com/weaveworks/reignite/pkg/log" ) @@ -52,7 +53,7 @@ func (r *containerdRepo) Save(ctx context.Context, microvm *models.MicroVM) (*mo mu.Lock() defer mu.Unlock() - namespaceCtx := namespaces.WithNamespace(ctx, microvm.Namespace) + namespaceCtx := namespaces.WithNamespace(ctx, defaults.ContainerdNamespace) store := r.client.ContentStore() microvm.Version++ @@ -88,9 +89,9 @@ func (r *containerdRepo) Get(ctx context.Context, name, namespace string) (*mode mu.RLock() defer mu.RUnlock() - namespaceCtx := namespaces.WithNamespace(ctx, namespace) + namespaceCtx := namespaces.WithNamespace(ctx, defaults.ContainerdNamespace) - digest, err := r.findLatestDigestForSpec(namespaceCtx, name) + digest, err := r.findLatestDigestForSpec(namespaceCtx, name, namespace) if err != nil { return nil, fmt.Errorf("finding content in store: %w", err) } @@ -103,7 +104,7 @@ func (r *containerdRepo) Get(ctx context.Context, name, namespace string) (*mode // GetAll will get a list of microvm details from the containerd content store. func (r *containerdRepo) GetAll(ctx context.Context, namespace string) ([]*models.MicroVM, error) { - namespaceCtx := namespaces.WithNamespace(ctx, namespace) + namespaceCtx := namespaces.WithNamespace(ctx, defaults.ContainerdNamespace) store := r.client.ContentStore() // NOTE: this seems redundant as we have the namespace based context @@ -153,10 +154,10 @@ func (r *containerdRepo) Delete(ctx context.Context, microvm *models.MicroVM) er mu.Lock() defer mu.Unlock() - namespaceCtx := namespaces.WithNamespace(ctx, microvm.Namespace) + namespaceCtx := namespaces.WithNamespace(ctx, defaults.ContainerdNamespace) store := r.client.ContentStore() - digests, err := r.findAllDigestForSpec(namespaceCtx, microvm.ID) + digests, err := r.findAllDigestForSpec(namespaceCtx, microvm.ID, microvm.Namespace) if err != nil { return fmt.Errorf("finding digests for %s: %w", microvm.ID, err) } @@ -180,9 +181,9 @@ func (r *containerdRepo) Exists(ctx context.Context, name, namespace string) (bo mu.RLock() defer mu.RUnlock() - namespaceCtx := namespaces.WithNamespace(ctx, namespace) + namespaceCtx := namespaces.WithNamespace(ctx, defaults.ContainerdNamespace) - digest, err := r.findLatestDigestForSpec(namespaceCtx, name) + digest, err := r.findLatestDigestForSpec(namespaceCtx, name, namespace) if err != nil { return false, fmt.Errorf("finding digest for %s/%s: %w", name, namespace, err) } @@ -210,8 +211,9 @@ func (r *containerdRepo) getWithDigest(ctx context.Context, metadigest *digest.D return microvm, nil } -func (r *containerdRepo) findLatestDigestForSpec(ctx context.Context, name string) (*digest.Digest, error) { +func (r *containerdRepo) findLatestDigestForSpec(ctx context.Context, name, namespace string) (*digest.Digest, error) { idLabelFilter := labelFilter(IDLabel, name) + nsFilter := labelFilter(NamespaceLabel, namespace) store := r.client.ContentStore() var digest *digest.Digest @@ -228,7 +230,7 @@ func (r *containerdRepo) findLatestDigestForSpec(ctx context.Context, name strin } return nil - }, idLabelFilter) + }, idLabelFilter, nsFilter) if err != nil { return nil, fmt.Errorf("walking content store for %s: %w", name, err) } @@ -236,8 +238,9 @@ func (r *containerdRepo) findLatestDigestForSpec(ctx context.Context, name strin return digest, nil } -func (r *containerdRepo) findAllDigestForSpec(ctx context.Context, name string) ([]*digest.Digest, error) { +func (r *containerdRepo) findAllDigestForSpec(ctx context.Context, name, namespace string) ([]*digest.Digest, error) { idLabelFilter := labelFilter(IDLabel, name) + nsLabelFilter := labelFilter(NamespaceLabel, namespace) store := r.client.ContentStore() digests := []*digest.Digest{} @@ -245,7 +248,7 @@ func (r *containerdRepo) findAllDigestForSpec(ctx context.Context, name string) digests = append(digests, &i.Digest) return nil - }, idLabelFilter) + }, idLabelFilter, nsLabelFilter) if err != nil { return nil, fmt.Errorf("walking content store for %s: %w", name, err) } diff --git a/infrastructure/mock/mock.go b/infrastructure/mock/mock.go index 97de8101..487957c8 100644 --- a/infrastructure/mock/mock.go +++ b/infrastructure/mock/mock.go @@ -270,20 +270,6 @@ func (m *MockEventService) EXPECT() *MockEventServiceMockRecorder { return m.recorder } -// CreateTopic mocks base method. -func (m *MockEventService) CreateTopic(arg0 context.Context, arg1 string) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CreateTopic", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - -// CreateTopic indicates an expected call of CreateTopic. -func (mr *MockEventServiceMockRecorder) CreateTopic(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateTopic", reflect.TypeOf((*MockEventService)(nil).CreateTopic), arg0, arg1) -} - // Publish mocks base method. func (m *MockEventService) Publish(arg0 context.Context, arg1 string, arg2 interface{}) error { m.ctrl.T.Helper() @@ -299,17 +285,33 @@ func (mr *MockEventServiceMockRecorder) Publish(arg0, arg1, arg2 interface{}) *g } // Subscribe mocks base method. -func (m *MockEventService) Subscribe(arg0 context.Context, arg1 string, arg2 ports.EventHandlers) error { +func (m *MockEventService) Subscribe(arg0 context.Context) (<-chan *ports.EventEnvelope, <-chan error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Subscribe", arg0, arg1, arg2) - ret0, _ := ret[0].(error) - return ret0 + ret := m.ctrl.Call(m, "Subscribe", arg0) + ret0, _ := ret[0].(<-chan *ports.EventEnvelope) + ret1, _ := ret[1].(<-chan error) + return ret0, ret1 } // Subscribe indicates an expected call of Subscribe. -func (mr *MockEventServiceMockRecorder) Subscribe(arg0, arg1, arg2 interface{}) *gomock.Call { +func (mr *MockEventServiceMockRecorder) Subscribe(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Subscribe", reflect.TypeOf((*MockEventService)(nil).Subscribe), arg0) +} + +// SubscribeTopic mocks base method. +func (m *MockEventService) SubscribeTopic(arg0 context.Context, arg1 string) (<-chan *ports.EventEnvelope, <-chan error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SubscribeTopic", arg0, arg1) + ret0, _ := ret[0].(<-chan *ports.EventEnvelope) + ret1, _ := ret[1].(<-chan error) + return ret0, ret1 +} + +// SubscribeTopic indicates an expected call of SubscribeTopic. +func (mr *MockEventServiceMockRecorder) SubscribeTopic(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Subscribe", reflect.TypeOf((*MockEventService)(nil).Subscribe), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeTopic", reflect.TypeOf((*MockEventService)(nil).SubscribeTopic), arg0, arg1) } // MockIDService is a mock of IDService interface. @@ -374,30 +376,30 @@ func (m *MockImageService) EXPECT() *MockImageServiceMockRecorder { } // Get mocks base method. -func (m *MockImageService) Get(arg0 context.Context, arg1, arg2, arg3 string) error { +func (m *MockImageService) Get(arg0 context.Context, arg1 ports.GetImageInput) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Get", arg0, arg1, arg2, arg3) + ret := m.ctrl.Call(m, "Get", arg0, arg1) ret0, _ := ret[0].(error) return ret0 } // Get indicates an expected call of Get. -func (mr *MockImageServiceMockRecorder) Get(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { +func (mr *MockImageServiceMockRecorder) Get(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockImageService)(nil).Get), arg0, arg1, arg2, arg3) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockImageService)(nil).Get), arg0, arg1) } // GetAndMount mocks base method. -func (m *MockImageService) GetAndMount(arg0 context.Context, arg1, arg2, arg3 string) ([]models.Mount, error) { +func (m *MockImageService) GetAndMount(arg0 context.Context, arg1 ports.GetImageInput) ([]models.Mount, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetAndMount", arg0, arg1, arg2, arg3) + ret := m.ctrl.Call(m, "GetAndMount", arg0, arg1) ret0, _ := ret[0].([]models.Mount) ret1, _ := ret[1].(error) return ret0, ret1 } // GetAndMount indicates an expected call of GetAndMount. -func (mr *MockImageServiceMockRecorder) GetAndMount(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { +func (mr *MockImageServiceMockRecorder) GetAndMount(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAndMount", reflect.TypeOf((*MockImageService)(nil).GetAndMount), arg0, arg1, arg2, arg3) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAndMount", reflect.TypeOf((*MockImageService)(nil).GetAndMount), arg0, arg1) } diff --git a/infrastructure/transport/errors.go b/infrastructure/transport/errors.go deleted file mode 100644 index 574e9e1c..00000000 --- a/infrastructure/transport/errors.go +++ /dev/null @@ -1,9 +0,0 @@ -package transport - -import "errors" - -var ( - errHandlerRequired = errors.New("event handler is required") - errErrorHandlerRequired = errors.New("error handler is required") - errTopicRequired = errors.New("topic name is required") -) diff --git a/infrastructure/transport/transport.go b/infrastructure/transport/transport.go deleted file mode 100644 index b09b1390..00000000 --- a/infrastructure/transport/transport.go +++ /dev/null @@ -1,103 +0,0 @@ -package transport - -import ( - "context" - "fmt" - - "github.com/vmware/transport-go/bus" - "github.com/vmware/transport-go/model" - - event "github.com/weaveworks/reignite/core" - "github.com/weaveworks/reignite/core/models" - "github.com/weaveworks/reignite/core/ports" -) - -// New creates a new event service based on Transport (https://vmware.github.io/transport/). -func New() ports.EventService { - return &transportEvents{ - eventBus: bus.GetBus(), - } -} - -type transportEvents struct { - eventBus bus.EventBus -} - -// CreateTopic will create a named topic (a.k.a channel or queue) for events. -func (te *transportEvents) CreateTopic(ctx context.Context, topic string) error { - if topic == "" { - return errTopicRequired - } - - manager := te.eventBus.GetChannelManager() - - if !manager.CheckChannelExists(topic) { - manager.CreateChannel(topic) - } - - return nil -} - -// Publish will publish an event to a specific topic. -func (te *transportEvents) Publish(ctx context.Context, topic string, eventToPublish interface{}) error { - if topic == "" { - return errTopicRequired - } - - manager := te.eventBus.GetChannelManager() - - if !manager.CheckChannelExists(topic) { - return event.ErrTopicNotFound{Name: topic} - } - - if err := te.eventBus.SendRequestMessage(topic, eventToPublish, nil); err != nil { - return fmt.Errorf("sending message to channel: %w", err) - } - - return nil -} - -// Subscribe will subscribe to events on a named topic and will call the relevant handler. -func (te *transportEvents) Subscribe(ctx context.Context, topic string, handlers ports.EventHandlers) error { - if handlers.Event == nil { - return errHandlerRequired - } - if handlers.Error == nil { - return errErrorHandlerRequired - } - if topic == "" { - return errTopicRequired - } - - manager := te.eventBus.GetChannelManager() - - if !manager.CheckChannelExists(topic) { - return event.ErrTopicNotFound{Name: topic} - } - - h, err := te.eventBus.ListenRequestStream(topic) - if err != nil { - return fmt.Errorf("listening for transport events: %w", err) - } - h.Handle(te.subsciberHandler(handlers.Event), te.subsciberErrorHandler(handlers.Error)) - - return nil -} - -func (te *transportEvents) subsciberHandler(handler ports.EventHandler) bus.MessageHandlerFunction { - return func(msg *model.Message) { - evt := &models.EventEnvelope{ - ID: msg.Id.String(), - Topic: msg.Channel, - Event: msg.Payload, - } - - handler(evt) - } -} - -func (te *transportEvents) subsciberErrorHandler(errHandler ports.EventErrorHandler) bus.MessageErrorFunction { - return func(err error) { - errHandler(err) - } -} diff --git a/infrastructure/transport/transport_test.go b/infrastructure/transport/transport_test.go deleted file mode 100644 index 8412c880..00000000 --- a/infrastructure/transport/transport_test.go +++ /dev/null @@ -1,223 +0,0 @@ -package transport_test - -import ( - "context" - "testing" - "time" - - . "github.com/onsi/gomega" - - "github.com/weaveworks/reignite/core/models" - "github.com/weaveworks/reignite/core/ports" - "github.com/weaveworks/reignite/infrastructure/transport" -) - -func TestTransport_SimplePubSub(t *testing.T) { - RegisterTestingT(t) - - ctx := context.Background() - trans := transport.New() - - topicName := "test" - messageReceived := false - errorReceived := false - - handler := func(e *models.EventEnvelope) { - messageReceived = true - } - errHandler := func(err error) { - errorReceived = true - } - - err := trans.CreateTopic(ctx, topicName) - Expect(err).NotTo(HaveOccurred()) - - err = trans.Subscribe(ctx, topicName, ports.EventHandlers{ - Event: handler, - Error: errHandler, - }) - Expect(err).NotTo(HaveOccurred()) - - err = trans.Publish(ctx, topicName, "someevent") - Expect(err).NotTo(HaveOccurred()) - - time.Sleep(1 * time.Second) - - Expect(messageReceived).To(BeTrue()) - Expect(errorReceived).To(BeFalse()) -} - -func TestTransport_MultipleSubscribers(t *testing.T) { - RegisterTestingT(t) - - ctx := context.Background() - trans := transport.New() - - topicName := "test" - sub1MessageReceived := false - sub2MessageReceived := false - errorReceived := false - - handler1 := func(e *models.EventEnvelope) { - sub1MessageReceived = true - } - handler2 := func(e *models.EventEnvelope) { - sub2MessageReceived = true - } - errHandler := func(err error) { - errorReceived = true - } - - err := trans.CreateTopic(ctx, topicName) - Expect(err).NotTo(HaveOccurred()) - - err = trans.Subscribe(ctx, topicName, ports.EventHandlers{ - Event: handler1, - Error: errHandler, - }) - Expect(err).NotTo(HaveOccurred()) - - err = trans.Subscribe(ctx, topicName, ports.EventHandlers{ - Event: handler2, - Error: errHandler, - }) - Expect(err).NotTo(HaveOccurred()) - - err = trans.Publish(ctx, topicName, "someevent") - Expect(err).NotTo(HaveOccurred()) - - time.Sleep(1 * time.Second) - - Expect(sub1MessageReceived).To(BeTrue()) - Expect(sub2MessageReceived).To(BeTrue()) - Expect(errorReceived).To(BeFalse()) -} - -func TestTransport_SubscribeUnknownTopic(t *testing.T) { - RegisterTestingT(t) - - ctx := context.Background() - trans := transport.New() - - topicName := "doesntexist" - - handler := func(e *models.EventEnvelope) {} - - errHandler := func(err error) {} - - err := trans.Subscribe(ctx, topicName, ports.EventHandlers{ - Event: handler, - Error: errHandler, - }) - Expect(err).To(HaveOccurred()) -} - -func TestTransport_SubscribeEmptyTopic(t *testing.T) { - RegisterTestingT(t) - - ctx := context.Background() - trans := transport.New() - - topicName := "" - - handler := func(e *models.EventEnvelope) {} - - errHandler := func(err error) {} - - err := trans.Subscribe(ctx, topicName, ports.EventHandlers{ - Event: handler, - Error: errHandler, - }) - Expect(err).To(HaveOccurred()) -} - -func TestTransport_PublishUnknownTopic(t *testing.T) { - RegisterTestingT(t) - - ctx := context.Background() - trans := transport.New() - - topicName := "doesntexist" - - err := trans.Publish(ctx, topicName, "someevent") - Expect(err).To(HaveOccurred()) -} - -func TestTransport_PublishEmptyTopic(t *testing.T) { - RegisterTestingT(t) - - ctx := context.Background() - trans := transport.New() - - topicName := "" - - err := trans.Publish(ctx, topicName, "someevent") - Expect(err).To(HaveOccurred()) -} - -func TestTransport_IdempotentCreateTopic(t *testing.T) { - RegisterTestingT(t) - - ctx := context.Background() - trans := transport.New() - - topicName := "test" - - err := trans.CreateTopic(ctx, topicName) - Expect(err).NotTo(HaveOccurred(), "creating topic first time should succeed") - - err = trans.CreateTopic(ctx, topicName) - Expect(err).NotTo(HaveOccurred(), "creating topic again time should succeed") -} - -func TestTransport_CreateEmptyTopic(t *testing.T) { - RegisterTestingT(t) - - ctx := context.Background() - trans := transport.New() - - topicName := "" - - err := trans.CreateTopic(ctx, topicName) - Expect(err).To(HaveOccurred(), "creating topic with a blank name should fail") -} - -func TestTransport_SubscribeNilHandler(t *testing.T) { - RegisterTestingT(t) - - ctx := context.Background() - trans := transport.New() - - topicName := "test" - - if err := trans.CreateTopic(ctx, topicName); err != nil { - t.Fatal(err) - } - - errHandler := func(err error) {} - - err := trans.Subscribe(ctx, topicName, ports.EventHandlers{ - Error: errHandler, - }) - Expect(err).To(HaveOccurred()) -} - -func TestTransport_SubscribeNilErrorHandler(t *testing.T) { - RegisterTestingT(t) - - ctx := context.Background() - trans := transport.New() - - topicName := "test" - - if err := trans.CreateTopic(ctx, topicName); err != nil { - t.Fatal(err) - } - - handler := func(e *models.EventEnvelope) {} - - err := trans.Subscribe(ctx, topicName, ports.EventHandlers{ - Event: handler, - }) - Expect(err).To(HaveOccurred()) -} diff --git a/internal/command/run/run.go b/internal/command/run/run.go index 7a66edb1..26f6b427 100644 --- a/internal/command/run/run.go +++ b/internal/command/run/run.go @@ -17,14 +17,12 @@ import ( mvmv1 "github.com/weaveworks/reignite/api/services/microvm/v1alpha1" "github.com/weaveworks/reignite/core/application" - containerd_repo "github.com/weaveworks/reignite/infrastructure/containerd" + reignite_ctr "github.com/weaveworks/reignite/infrastructure/containerd" "github.com/weaveworks/reignite/infrastructure/firecracker" microvmgrpc "github.com/weaveworks/reignite/infrastructure/grpc" - "github.com/weaveworks/reignite/infrastructure/transport" "github.com/weaveworks/reignite/infrastructure/ulid" cmdflags "github.com/weaveworks/reignite/internal/command/flags" "github.com/weaveworks/reignite/internal/config" - "github.com/weaveworks/reignite/pkg/defaults" "github.com/weaveworks/reignite/pkg/flags" "github.com/weaveworks/reignite/pkg/log" ) @@ -92,11 +90,8 @@ func serveAPI(ctx context.Context, cfg *config.Config) error { if err != nil { return fmt.Errorf("creating containerd client: %w", err) } - repo := containerd_repo.NewMicroVMRepoWithClient(containerdClient) - eventSvc := transport.New() - if err := eventSvc.CreateTopic(ctx, defaults.TopicMicroVMEvents); err != nil { - return fmt.Errorf("creating %s topic: %w", defaults.TopicMicroVMEvents, err) - } + repo := reignite_ctr.NewMicroVMRepoWithClient(containerdClient) + eventSvc := reignite_ctr.NewEventServiceWithClient(containerdClient) idSvc := ulid.New() mvmprovider := firecracker.New(&cfg.Firecracker)