Skip to content

Commit

Permalink
Merge pull request #63 from weaveworks/ctr_event_service
Browse files Browse the repository at this point in the history
feat: added containerd based event services
  • Loading branch information
richardcase authored Aug 20, 2021
2 parents 5ad582b + 4b91af8 commit b7fe083
Show file tree
Hide file tree
Showing 17 changed files with 352 additions and 534 deletions.
11 changes: 11 additions & 0 deletions api/events/register.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package events

import (
"github.com/containerd/typeurl"
)

func init() {
typeurl.Register(&MicroVMSpecCreated{}, "microvm.services.api.events.microvmspeccreated")
typeurl.Register(&MicroVMSpecUpdated{}, "microvm.services.api.events.microvmspecupdated")
typeurl.Register(&MicroVMSpecDeleted{}, "microvm.services.api.events.microvmspecdeleted")
}
File renamed without changes.
174 changes: 46 additions & 128 deletions cmd/dev-helper/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,16 @@ package main

import (
"context"
"encoding/json"
"fmt"
"log"

"github.com/sirupsen/logrus"

_ "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/typeurl"

ctr "github.com/containerd/containerd"

"github.com/weaveworks/reignite/api/events"
"github.com/weaveworks/reignite/core/models"
"github.com/weaveworks/reignite/core/ports"
"github.com/weaveworks/reignite/infrastructure/containerd"
Expand Down Expand Up @@ -45,6 +41,8 @@ func main() {
logger := rlog.GetLogger(ctx)
logger.Infof("reignite dev-helper, using containerd socket: %s", socketPath)

//eventPublishTest(ctx, socketPath, logger)

logger.Info("starting containerd event listener")
go eventListener(ctx, socketPath, logger)

Expand All @@ -56,16 +54,47 @@ func main() {
fmt.Scanln()
imageServiceTest(ctx, socketPath, logger)

//repoUpdateTest(ctx, socketPath)
//imageLeaseTest(ctx, socketPath)
//contentStoreTest(ctx, socketPath)

logger.Info("Press [enter] to exit")
fmt.Scanln()

cancel()
}

func eventPublishTest(ctx context.Context, socketPath string, logger *logrus.Entry) {
cfg := &containerd.Config{
SocketPath: socketPath,
}
logger.Info("creating event service")

es, err := containerd.NewEventService(cfg)
if err != nil {
log.Fatal(err)
}

evt := &events.MicroVMSpecCreated{
ID: "abcdf",
Namespace: "ns1",
}

ctx, cancel := context.WithCancel(ctx)

evts, errs := es.Subscribe(ctx)

err = es.Publish(ctx, "/test", evt)
if err != nil {
log.Fatal(err)
}

select {
case evt := <-evts:
fmt.Printf("in dev-helper, got evtenr: %#v\n", evt.Event)
case evtErr := <-errs:
fmt.Println(evtErr)
}

cancel()
}

func repoTest(ctx context.Context, socketPath string, logger *logrus.Entry) {
client, err := ctr.New(socketPath)
if err != nil {
Expand Down Expand Up @@ -113,141 +142,30 @@ func imageServiceTest(ctx context.Context, socketPath string, logger *logrus.Ent
}

func eventListener(ctx context.Context, socketPath string, logger *logrus.Entry) {
client, err := ctr.New(socketPath)
cfg := &containerd.Config{
SocketPath: socketPath,
}
logger.Info("creating event service")

es, err := containerd.NewEventService(cfg)
if err != nil {
log.Fatal(err)
}

es := client.EventService()
ch, errsCh := es.Subscribe(ctx)

for {
select {
case <-ctx.Done():
logger.Info("Existing event listener")
return
case evt := <-ch:
v, err := typeurl.UnmarshalAny(evt.Event)
if err != nil {
logger.Errorf("error unmarshalling: %s", err)
continue
}
out, err := json.Marshal(v)
if err != nil {
logger.Errorf("cannot marshal Any into JSON: %s", err)
continue
}
logger.Infof("event received, ns %s, topic %s, body: %s", evt.Namespace, evt.Topic, string(out))
logger.Infof("event received, ns %s, topic %s, body: %#v", evt.Namespace, evt.Topic, evt.Event)
case errEvt := <-errsCh:
logger.Errorf("event error received: %s", errEvt)
}
}
}

func imageLeaseTest(ctx context.Context, socketPath string) {
client, err := ctr.New(socketPath)
if err != nil {
log.Fatal(err)
}

nsCtx := namespaces.WithNamespace(ctx, vmNamespace)

leaseManager := client.LeasesService()
l, err := leaseManager.Create(nsCtx, leases.WithID("mytestlease"))
if err != nil {
log.Fatal(err)
}

leaseCtx := leases.WithLease(nsCtx, l.ID)

image, err := client.Pull(leaseCtx, imageName, ctr.WithPullUnpack)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%#v\n", image)
fmt.Println("done with pull")
}

func contentStoreTest(ctx context.Context, socketPath string) {
client, err := ctr.New(socketPath)
if err != nil {
log.Fatal(err)
}

nsCtx := namespaces.WithNamespace(ctx, vmNamespace)

leaseManager := client.LeasesService()
l, err := leaseManager.Create(nsCtx, leases.WithID("mytestlease"))
if err != nil {
log.Fatal(err)
}

vmSpec := getTestSpec()

leaseCtx := leases.WithLease(nsCtx, l.ID)

store := client.ContentStore()

refName := "mytestrefname"
writer, err := store.Writer(leaseCtx, content.WithRef(refName))
if err != nil {
log.Fatal(err)
}

data, err := json.Marshal(vmSpec)
if err != nil {
log.Fatal(err)
}

_, err = writer.Write(data)
if err != nil {
log.Fatal(err)
}

labels := map[string]string{
"vmid": vmName,
"ns": vmNamespace,
}
err = writer.Commit(leaseCtx, 0, "", content.WithLabels(labels))
if err != nil {
log.Fatal(err)
}

writer.Close()
}

func repoUpdateTest(ctx context.Context, socketPath string) {
client, err := ctr.New(socketPath)
if err != nil {
log.Fatal(err)
}

repo := containerd.NewMicroVMRepoWithClient(client)

vmSpec := getTestSpec()

_, err = repo.Save(ctx, vmSpec)
if err != nil {
log.Fatal(err)
}

vmSpec.Spec.MemoryInMb = 8096

_, err = repo.Save(ctx, vmSpec)
if err != nil {
log.Fatal(err)
}

specs, err := repo.GetAll(ctx, vmNamespace)
if err != nil {
log.Fatal(err)
}

for _, spec := range specs {
log.Printf("spec: %#v\n", spec)
}

}

func getTestSpec() *models.MicroVM {
return &models.MicroVM{
ID: vmName,
Expand Down
2 changes: 1 addition & 1 deletion core/application/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"github.com/golang/mock/gomock"
. "github.com/onsi/gomega"

"github.com/weaveworks/reignite/api/events"
"github.com/weaveworks/reignite/core/application"
"github.com/weaveworks/reignite/core/events"
"github.com/weaveworks/reignite/core/models"
"github.com/weaveworks/reignite/infrastructure/mock"
"github.com/weaveworks/reignite/pkg/defaults"
Expand Down
2 changes: 1 addition & 1 deletion core/application/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"fmt"

"github.com/weaveworks/reignite/core/events"
"github.com/weaveworks/reignite/api/events"
"github.com/weaveworks/reignite/core/models"
"github.com/weaveworks/reignite/pkg/defaults"
"github.com/weaveworks/reignite/pkg/log"
Expand Down
30 changes: 12 additions & 18 deletions core/ports/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ports

import (
"context"
"time"

mvmv1 "github.com/weaveworks/reignite/api/services/microvm/v1alpha1"
"github.com/weaveworks/reignite/core/models"
Expand All @@ -18,28 +19,21 @@ type IDService interface {
GenerateRandom() (string, error)
}

// EventHandler represents an event handling function.
type EventHandler func(e *models.EventEnvelope)

// EventErrorHandler represents an error handling function.
type EventErrorHandler func(err error)

// EventHandlers represents a pair of event/error handlers.
type EventHandlers struct {
// Event is the event handler function.
Event EventHandler
// Error is the error handler function.
Error EventErrorHandler
}

// EventService is a port for a service that acts as a event bus.
type EventService interface {
// CreateTopic will create a named topic (a.k.a channel or queue) for events.
CreateTopic(ctx context.Context, topic string) error
// Publish will publish an event to a specific topic.
Publish(ctx context.Context, topic string, eventToPublish interface{}) error
// Subscribe will subscribe to events on a named topic and will call the relevant handlers.
Subscribe(ctx context.Context, topic string, handlers EventHandlers) error
// SubscribeTopic will subscribe to events on a named topic..
SubscribeTopic(ctx context.Context, topic string) (ch <-chan *EventEnvelope, errs <-chan error)
// Subscribe will subscribe to events on all topics
Subscribe(ctx context.Context) (ch <-chan *EventEnvelope, errs <-chan error)
}

type EventEnvelope struct {
Timestamp time.Time
Namespace string
Topic string
Event interface{}
}

// ImageService is a port for a service that interacts with OCI images.
Expand Down
24 changes: 24 additions & 0 deletions infrastructure/containerd/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ package containerd
import (
"fmt"

"github.com/containerd/containerd/events"
"github.com/containerd/containerd/mount"
"github.com/containerd/typeurl"

"github.com/weaveworks/reignite/core/models"
"github.com/weaveworks/reignite/core/ports"
)

func convertMountToModel(m mount.Mount, snapshotter string) (models.Mount, error) {
Expand Down Expand Up @@ -45,3 +49,23 @@ func convertMountsToModel(mounts []mount.Mount, snapshotter string) ([]models.Mo

return convertedMounts, nil
}

func convertCtrEventEnvelope(evt *events.Envelope) (*ports.EventEnvelope, error) {
if evt == nil {
return nil, nil
}

converted := &ports.EventEnvelope{
Timestamp: evt.Timestamp,
Namespace: evt.Namespace,
Topic: evt.Topic,
}

v, err := typeurl.UnmarshalAny(evt.Event)
if err != nil {
return nil, fmt.Errorf("unmarshalling event: %w", err)
}
converted.Event = v

return converted, nil
}
Loading

0 comments on commit b7fe083

Please sign in to comment.