Skip to content

Commit

Permalink
feat: added image service and refactored
Browse files Browse the repository at this point in the history
Added a new imageservice that uses containerd to pull imagesand then
perform snapshotting. The service then return the mount point for
image that is specific to the owner (i.e. the microvm).

Signed-off-by: Richard Case <[email protected]>
  • Loading branch information
richardcase committed Aug 20, 2021
1 parent e2fb5ec commit 70b580f
Show file tree
Hide file tree
Showing 49 changed files with 1,720 additions and 975 deletions.
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
# Output of the go coverage tool
coverage.out

# GoReleaser dist folder
dist/
# Out folder
out/

# Bin
bin/
Expand Down
13 changes: 12 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ BUF_VERSION := v0.43.2
# Directories
REPO_ROOT := $(shell git rev-parse --show-toplevel)
BIN_DIR := bin
OUT_DIR := out
REIGNITED_CMD := cmd/reignited
TOOLS_DIR := hack/tools
TOOLS_BIN_DIR := $(TOOLS_DIR)/bin
Expand All @@ -28,6 +29,9 @@ $(TOOLS_SHARE_DIR):
$(BIN_DIR):
mkdir -p $@

$(OUT_DIR):
mkdir -p $@

# Binaries
GOLANGCI_LINT := $(TOOLS_BIN_DIR)/golangci-lint
GINKGO := $(TOOLS_BIN_DIR)/ginkgo
Expand Down Expand Up @@ -73,7 +77,14 @@ lint: $(GOLANGCI_LINT) $(BUF) ## Lint

.PHONY: test
test: ## Run unit tests
go test ./...
go test -v ./...

.PHONY: test-int
test-int: $(OUT_DIR) ## Run tests (including intengration tests)
CTR_ROOT_DIR=$(OUT_DIR)/containerd
mkdir -p $(CTR_ROOT_DIR)
sudo go test -v -count=1 ./...
sudo rm -rf $(CTR_ROOT_DIR)

.PHONY: test-e2e
test-e2e: ## Run e2e tests
Expand Down
291 changes: 291 additions & 0 deletions cmd/dev-helper/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,291 @@
//nolint
package main

import (
"context"
"encoding/json"
"fmt"
"log"

"github.com/sirupsen/logrus"

_ "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/typeurl"

ctr "github.com/containerd/containerd"

"github.com/weaveworks/reignite/core/models"
"github.com/weaveworks/reignite/core/ports"
"github.com/weaveworks/reignite/infrastructure/containerd"
rlog "github.com/weaveworks/reignite/pkg/log"
)

//NOTE: this is a temporary app to help with development

const (
vmName = "vm1"
vmNamespace = "teamabc"
imageName = "docker.io/library/ubuntu:groovy"
)

func main() {
//socketPath := defaults.ContainerdSocket
socketPath := "/home/richard/code/scratch/containerdlocal/run/containerd.sock"

ctx, cancel := context.WithCancel(context.Background())

rlog.Configure(&rlog.Config{
Verbosity: 0,
Format: "text",
Output: "stderr",
})
logger := rlog.GetLogger(ctx)
logger.Infof("reignite dev-helper, using containerd socket: %s", socketPath)

logger.Info("starting containerd event listener")
go eventListener(ctx, socketPath, logger)

logger.Infof("Press [enter] to write vmspec to using containerd repo")
fmt.Scanln()
repoTest(ctx, socketPath, logger)

logger.Infof("Press [enter] to get image %s", imageName)
fmt.Scanln()
imageServiceTest(ctx, socketPath, logger)

//repoUpdateTest(ctx, socketPath)
//imageLeaseTest(ctx, socketPath)
//contentStoreTest(ctx, socketPath)

logger.Info("Press [enter] to exit")
fmt.Scanln()

cancel()
}

func repoTest(ctx context.Context, socketPath string, logger *logrus.Entry) {
client, err := ctr.New(socketPath)
if err != nil {
log.Fatal(err)
}

repo := containerd.NewMicroVMRepoWithClient(client)

vmSpec := getTestSpec()
logger.Infof("saving microvm spec %s/%s", vmSpec.Namespace, vmSpec.ID)

_, err = repo.Save(ctx, vmSpec)
if err != nil {
log.Fatal(err)
}
}

func imageServiceTest(ctx context.Context, socketPath string, logger *logrus.Entry) {
cfg := &containerd.Config{
//Snapshotter: defaults.ContainerdSnapshotter,
//Snapshotter: "overlayfs",
SnapshotterKernel: "native",
SnapshotterVolume: "native",
SocketPath: socketPath,
}
logger.Infof("using snapshotters %s & %s", cfg.SnapshotterKernel, cfg.SnapshotterVolume)

imageService, err := containerd.NewImageService(cfg)
if err != nil {
log.Fatal(err)
}

input := ports.GetImageInput{
ImageName: imageName,
OwnerName: vmName,
OwnerNamespace: vmNamespace,
Use: models.ImageUseVolume,
}
mountPoint, err := imageService.GetAndMount(ctx, input)
if err != nil {
log.Fatal(err)
}

logger.Infof("mounted image %s to %s (type %s)", imageName, mountPoint[0].Source, mountPoint[0].Type)
}

func eventListener(ctx context.Context, socketPath string, logger *logrus.Entry) {
client, err := ctr.New(socketPath)
if err != nil {
log.Fatal(err)
}

es := client.EventService()
ch, errsCh := es.Subscribe(ctx)

for {
select {
case <-ctx.Done():
logger.Info("Existing event listener")
case evt := <-ch:
v, err := typeurl.UnmarshalAny(evt.Event)
if err != nil {
logger.Errorf("error unmarshalling: %s", err)
continue
}
out, err := json.Marshal(v)
if err != nil {
logger.Errorf("cannot marshal Any into JSON: %s", err)
continue
}
logger.Infof("event received, ns %s, topic %s, body: %s", evt.Namespace, evt.Topic, string(out))
case errEvt := <-errsCh:
logger.Errorf("event error received: %s", errEvt)
}
}
}

func imageLeaseTest(ctx context.Context, socketPath string) {
client, err := ctr.New(socketPath)
if err != nil {
log.Fatal(err)
}

nsCtx := namespaces.WithNamespace(ctx, vmNamespace)

leaseManager := client.LeasesService()
l, err := leaseManager.Create(nsCtx, leases.WithID("mytestlease"))
if err != nil {
log.Fatal(err)
}

leaseCtx := leases.WithLease(nsCtx, l.ID)

image, err := client.Pull(leaseCtx, imageName, ctr.WithPullUnpack)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%#v\n", image)
fmt.Println("done with pull")
}

func contentStoreTest(ctx context.Context, socketPath string) {
client, err := ctr.New(socketPath)
if err != nil {
log.Fatal(err)
}

nsCtx := namespaces.WithNamespace(ctx, vmNamespace)

leaseManager := client.LeasesService()
l, err := leaseManager.Create(nsCtx, leases.WithID("mytestlease"))
if err != nil {
log.Fatal(err)
}

vmSpec := getTestSpec()

leaseCtx := leases.WithLease(nsCtx, l.ID)

store := client.ContentStore()

refName := "mytestrefname"
writer, err := store.Writer(leaseCtx, content.WithRef(refName))
if err != nil {
log.Fatal(err)
}

data, err := json.Marshal(vmSpec)
if err != nil {
log.Fatal(err)
}

_, err = writer.Write(data)
if err != nil {
log.Fatal(err)
}

labels := map[string]string{
"vmid": vmName,
"ns": vmNamespace,
}
err = writer.Commit(leaseCtx, 0, "", content.WithLabels(labels))
if err != nil {
log.Fatal(err)
}

writer.Close()
}

func repoUpdateTest(ctx context.Context, socketPath string) {
client, err := ctr.New(socketPath)
if err != nil {
log.Fatal(err)
}

repo := containerd.NewMicroVMRepoWithClient(client)

vmSpec := getTestSpec()

_, err = repo.Save(ctx, vmSpec)
if err != nil {
log.Fatal(err)
}

vmSpec.Spec.MemoryInMb = 8096

_, err = repo.Save(ctx, vmSpec)
if err != nil {
log.Fatal(err)
}

specs, err := repo.GetAll(ctx, vmNamespace)
if err != nil {
log.Fatal(err)
}

for _, spec := range specs {
log.Printf("spec: %#v\n", spec)
}

}

func getTestSpec() *models.MicroVM {
return &models.MicroVM{
ID: vmName,
Namespace: vmNamespace,
Spec: models.MicroVMSpec{
MemoryInMb: 2048,
VCPU: 4,
Kernel: models.Kernel{
Image: "docker.io/linuxkit/kernel:5.4.129",
CmdLine: "console=ttyS0 reboot=k panic=1 pci=off i8042.noaux i8042.nomux i8042.nopnp i8042.dumbkbd ds=nocloud-net;s=http://169.254.169.254/latest/ network-config=ASDFGFDFG",
},
NetworkInterfaces: []models.NetworkInterface{
{
AllowMetadataRequests: false,
GuestMAC: "AA:FF:00:00:00:01",
HostDeviceName: "tap1",
GuestDeviceName: "eth0",
},
{
AllowMetadataRequests: false,
HostDeviceName: "/dev/tap55",
GuestDeviceName: "eth1",
},
},
Volumes: []models.Volume{
{
ID: "root",
IsRoot: true,
IsReadOnly: false,
MountPoint: "/",
Source: models.VolumeSource{
Container: &models.ContainerVolumeSource{
Image: imageName,
},
},
Size: 20000,
},
},
},
}
}
Loading

0 comments on commit 70b580f

Please sign in to comment.