From efe4f5dcfc105725dbd77beffd458d46cdbe85a5 Mon Sep 17 00:00:00 2001 From: Balazs Nadasdi Date: Tue, 9 Nov 2021 14:20:26 +0100 Subject: [PATCH] chore(fmt): Enable disabled linters in global scope == wsl * Fixed everywhere. == lll * Fix everywhere. * Added exception on `https://` links. == gochecknoglobals * Fixed everywhere. == gci * Fixed everywhere (with `golangci-lint run --fix`) == godox No TODO comments should live in the code without filed issues to track them. The reason is simple: if we have a comment with "todo", it has the same value as not having that comment at all, because no one will care about it. * No TODO, BUG, and FIXME comments in the code. * Except if it has a filed github issue reference. == gosec * Removed all nolint comments for gosec. * Added 2 items to the exclude list for gosec. --- .golangci.yml | 50 ++++++++++------- core/application/commands.go | 4 +- core/application/reconcile.go | 4 +- core/models/network.go | 4 +- core/models/vmid.go | 2 + core/models/volumes.go | 2 - core/plans/microvm_create_update.go | 16 +++++- core/plans/microvm_delete.go | 6 ++- core/steps/event/publish.go | 1 - core/steps/microvm/start.go | 1 - core/steps/network/interface_create.go | 7 ++- core/steps/network/interface_delete.go | 6 ++- core/steps/runtime/dir_create.go | 7 ++- core/steps/runtime/dir_delete.go | 3 +- core/steps/runtime/initrd_mount.go | 3 +- core/steps/runtime/kernel_mount.go | 3 +- core/steps/runtime/repo_release.go | 1 - core/steps/runtime/volume_mount.go | 9 +++- infrastructure/containerd/content.go | 35 ++++++++---- infrastructure/containerd/convert.go | 4 +- infrastructure/containerd/event_service.go | 24 +++++---- infrastructure/containerd/image_service.go | 27 +++++++--- infrastructure/containerd/lease.go | 3 +- infrastructure/containerd/repo.go | 54 +++++++++++-------- infrastructure/containerd/snapshot.go | 1 + .../controllers/microvm_controller.go | 21 +++++--- infrastructure/firecracker/config.go | 20 ++++--- infrastructure/firecracker/create.go | 15 +++++- infrastructure/firecracker/provider.go | 20 ++++--- infrastructure/firecracker/state.go | 7 ++- infrastructure/firecracker/types.go | 53 +++++++++--------- infrastructure/grpc/convert.go | 7 +++ infrastructure/grpc/server.go | 41 +++++++++----- infrastructure/network/network_service.go | 21 +++++--- infrastructure/ulid/ulid.go | 9 ++-- internal/command/flags/flags.go | 3 +- internal/command/gw/gw.go | 31 ++--------- internal/command/root.go | 2 +- internal/command/run/run.go | 24 ++++++--- pkg/log/log.go | 4 ++ pkg/planner/actuator.go | 8 ++- pkg/queue/queue.go | 3 +- pkg/validation/validate.go | 3 +- pkg/wait/wait.go | 4 +- test/e2e/utils/runner.go | 22 +++++++- 45 files changed, 392 insertions(+), 203 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index eb85a8564..1a3350057 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -28,6 +28,9 @@ issues: - text: "local replacement are not allowed: github.com/weaveworks/flintlock/" linters: - gomoddirectives + - source: "https://" + linters: + - lll - path: _test\.go linters: - goerr113 @@ -45,7 +48,9 @@ issues: linters: - exhaustivestruct - lll - + - source: "// .* #\\d+" + linters: + - godox linters-settings: funlen: lines: 110 @@ -57,43 +62,51 @@ linters-settings: cyclop: max-complexity: 12 skip-tests: true + gosec: + excludes: + - G204 # Subprocess launched with function call as argument or cmd arguments + - G404 # Use of weak random number generator (math/rand instead of crypto/rand) + exclude-generated: true + lll: + line-length: 120 + misspell: + locale: GB linters: disable-all: true enable: - - deadcode - - errcheck - - gosimple - - govet - - ineffassign - - staticcheck - - structcheck - - typecheck - - unused - - varcheck - bodyclose + - deadcode - depguard - dogsled - dupl + - errcheck - exhaustive - exportloopref + - exportloopref - funlen + - gci + - gochecknoglobals - gochecknoinits - gocognit - goconst - gocritic - gocyclo - godot + - godox - goerr113 - gofmt - gofumpt - goheader - goimports - - revive - gomnd - gomodguard - goprintffuncname - gosec + - gosimple + - govet + - ineffassign + - lll - misspell - nakedret - nestif @@ -101,17 +114,18 @@ linters: - noctx - nolintlint - prealloc + - revive - rowserrcheck - - exportloopref - sqlclosecheck + - staticcheck + - structcheck - stylecheck - testpackage + - typecheck - unconvert - unparam + - unused + - varcheck - whitespace - disabled: - - gci - - godox - - gochecknoglobals - - lll - wsl + disabled: [] diff --git a/core/application/commands.go b/core/application/commands.go index e13d5b232..227a2c007 100644 --- a/core/application/commands.go +++ b/core/application/commands.go @@ -25,10 +25,12 @@ func (a *app) CreateMicroVM(ctx context.Context, mvm *models.MicroVM) (*models.M if err != nil { return nil, fmt.Errorf("generating random name for microvm: %w", err) } + vmid, err := models.NewVMID(name, defaults.MicroVMNamespace) if err != nil { return nil, fmt.Errorf("creating vmid: %w", err) } + mvm.ID = *vmid } @@ -49,8 +51,6 @@ func (a *app) CreateMicroVM(ctx context.Context, mvm *models.MicroVM) (*models.M } } - // TODO: validate the spec - // Set the timestamp when the VMspec was created. mvm.Spec.CreatedAt = a.ports.Clock().Unix() mvm.Status.State = models.PendingState diff --git a/core/application/reconcile.go b/core/application/reconcile.go index cebf89e0d..09eb02cf6 100644 --- a/core/application/reconcile.go +++ b/core/application/reconcile.go @@ -5,7 +5,6 @@ import ( "fmt" "github.com/sirupsen/logrus" - "github.com/weaveworks/flintlock/core/models" "github.com/weaveworks/flintlock/core/plans" "github.com/weaveworks/flintlock/core/ports" @@ -18,6 +17,7 @@ func (a *app) ReconcileMicroVM(ctx context.Context, id, namespace string) error logger := log.GetLogger(ctx).WithField("action", "reconcile") logger.Debugf("Getting spec for %s/%s", namespace, id) + spec, err := a.ports.Repo.Get(ctx, ports.RepositoryGetOptions{ Name: id, Namespace: namespace, @@ -35,8 +35,8 @@ func (a *app) ResyncMicroVMs(ctx context.Context, namespace string) error { "namespace": "ns", }) logger.Info("Resyncing specs") - logger.Debug("Getting all specs") + specs, err := a.ports.Repo.GetAll(ctx, namespace) if err != nil { return fmt.Errorf("getting all microvm specs for resync: %w", err) diff --git a/core/models/network.go b/core/models/network.go index e134564b0..ef09ad91d 100644 --- a/core/models/network.go +++ b/core/models/network.go @@ -5,7 +5,7 @@ type NetworkInterface struct { // GuestDeviceName is the name of the network interface to create in the microvm. GuestDeviceName string `json:"guest_device_name" validate:"required,excludesall=/@,guestDeviceName"` // AllowMetadataRequests indicates that this interface can be used for metadata requests. - // TODO: we may hide this within the firecracker plugin. + // TODO: we may hide this within the firecracker plugin. #179 AllowMetadataRequests bool `json:"allow_mmds,omitempty"` // GuestMAC allows the specifying of a specifi MAC address to use for the interface. If // not supplied a autogenerated MAC address will be used. @@ -14,8 +14,6 @@ type NetworkInterface struct { Type IfaceType `json:"type" validate:"oneof=tap macvtap unsupported"` // Address is an optional IP address to assign to this interface. If not supplied then DHCP will be used. Address string `json:"address,omitempty" validate:"omitempty,cidr"` - // TODO: add rate limiting. - // TODO: add CNI. } type NetworkInterfaceStatus struct { diff --git a/core/models/vmid.go b/core/models/vmid.go index da861418d..895ef21f2 100644 --- a/core/models/vmid.go +++ b/core/models/vmid.go @@ -29,6 +29,7 @@ func NewVMID(name, namespace string) (*VMID, error) { if name == "" { return nil, coreerrs.ErrNameRequired } + if namespace == "" { return nil, coreerrs.ErrNamespaceRequired } @@ -98,6 +99,7 @@ func splitVMIDFromString(id string) (namespace string, name string, err error) { if parts[0] == "" { return "", "", coreerrs.ErrNamespaceRequired } + if parts[1] == "" { return "", "", coreerrs.ErrNameRequired } diff --git a/core/models/volumes.go b/core/models/volumes.go index 328eb2910..9ea0a6c4a 100644 --- a/core/models/volumes.go +++ b/core/models/volumes.go @@ -17,7 +17,6 @@ type Volume struct { PartitionID string `json:"partition_id,omitempty"` // Size is the size to resize this volume to. Size int32 `json:"size,omitempty"` - // TODO: add rate limiting. } // Volumes represents a collection of volumes. @@ -38,7 +37,6 @@ func (v Volumes) GetByID(id string) *Volume { type VolumeSource struct { // Container is used to specify a source of a volume as a OCI container. Container *ContainerVolumeSource `json:"container,omitempty"` - // TODO: add CSI. } // ContainerDriveSource represents the details of a volume coming from a OCI image. diff --git a/core/plans/microvm_create_update.go b/core/plans/microvm_create_update.go index 1fc2f0fec..238c5d771 100644 --- a/core/plans/microvm_create_update.go +++ b/core/plans/microvm_create_update.go @@ -107,25 +107,32 @@ func (p *microvmCreateOrUpdatePlan) addStep(ctx context.Context, step planner.Pr return nil } -func (p *microvmCreateOrUpdatePlan) addImageSteps(ctx context.Context, vm *models.MicroVM, imageSvc ports.ImageService) error { +func (p *microvmCreateOrUpdatePlan) addImageSteps(ctx context.Context, + vm *models.MicroVM, + imageSvc ports.ImageService, +) error { for i := range vm.Spec.Volumes { vol := vm.Spec.Volumes[i] + status, ok := vm.Status.Volumes[vol.ID] if !ok { status = &models.VolumeStatus{} vm.Status.Volumes[vol.ID] = status } + if vol.Source.Container != nil { if err := p.addStep(ctx, runtime.NewVolumeMount(&vm.ID, &vol, status, imageSvc)); err != nil { return fmt.Errorf("adding volume mount step: %w", err) } } } + if string(vm.Spec.Kernel.Image) != "" { if err := p.addStep(ctx, runtime.NewKernelMount(vm, imageSvc)); err != nil { return fmt.Errorf("adding kernel mount step: %w", err) } } + if vm.Spec.Initrd != nil { if err := p.addStep(ctx, runtime.NewInitrdMount(vm, imageSvc)); err != nil { return fmt.Errorf("adding initrd mount step: %w", err) @@ -135,14 +142,19 @@ func (p *microvmCreateOrUpdatePlan) addImageSteps(ctx context.Context, vm *model return nil } -func (p *microvmCreateOrUpdatePlan) addNetworkSteps(ctx context.Context, vm *models.MicroVM, networkSvc ports.NetworkService) error { +func (p *microvmCreateOrUpdatePlan) addNetworkSteps(ctx context.Context, + vm *models.MicroVM, + networkSvc ports.NetworkService, +) error { for i := range vm.Spec.NetworkInterfaces { iface := vm.Spec.NetworkInterfaces[i] + status, ok := vm.Status.NetworkInterfaces[iface.GuestDeviceName] if !ok { status = &models.NetworkInterfaceStatus{} vm.Status.NetworkInterfaces[iface.GuestDeviceName] = status } + if err := p.addStep(ctx, network.NewNetworkInterface(&vm.ID, &iface, status, networkSvc)); err != nil { return fmt.Errorf("adding create network interface step: %w", err) } diff --git a/core/plans/microvm_delete.go b/core/plans/microvm_delete.go index a9611d6c4..f88bd70d7 100644 --- a/core/plans/microvm_delete.go +++ b/core/plans/microvm_delete.go @@ -119,7 +119,11 @@ func (p *microvmDeletePlan) addStep(ctx context.Context, step planner.Procedure) return nil } -func (p *microvmDeletePlan) addNetworkSteps(ctx context.Context, vm *models.MicroVM, networkSvc ports.NetworkService) error { +func (p *microvmDeletePlan) addNetworkSteps( + ctx context.Context, + vm *models.MicroVM, + networkSvc ports.NetworkService, +) error { for i := range vm.Spec.NetworkInterfaces { iface := vm.Spec.NetworkInterfaces[i] diff --git a/core/steps/event/publish.go b/core/steps/event/publish.go index 9678db680..b1dc72c3f 100644 --- a/core/steps/event/publish.go +++ b/core/steps/event/publish.go @@ -5,7 +5,6 @@ import ( "fmt" "github.com/sirupsen/logrus" - "github.com/weaveworks/flintlock/core/ports" "github.com/weaveworks/flintlock/pkg/log" "github.com/weaveworks/flintlock/pkg/planner" diff --git a/core/steps/microvm/start.go b/core/steps/microvm/start.go index 0dab0421a..61c52e5c8 100644 --- a/core/steps/microvm/start.go +++ b/core/steps/microvm/start.go @@ -5,7 +5,6 @@ import ( "fmt" "github.com/sirupsen/logrus" - "github.com/weaveworks/flintlock/core/errors" "github.com/weaveworks/flintlock/core/models" "github.com/weaveworks/flintlock/core/ports" diff --git a/core/steps/network/interface_create.go b/core/steps/network/interface_create.go index 96b55a161..1667757ff 100644 --- a/core/steps/network/interface_create.go +++ b/core/steps/network/interface_create.go @@ -12,7 +12,11 @@ import ( "github.com/weaveworks/flintlock/pkg/planner" ) -func NewNetworkInterface(vmid *models.VMID, iface *models.NetworkInterface, status *models.NetworkInterfaceStatus, svc ports.NetworkService) planner.Procedure { +func NewNetworkInterface(vmid *models.VMID, + iface *models.NetworkInterface, + status *models.NetworkInterfaceStatus, + svc ports.NetworkService, +) planner.Procedure { return &createInterface{ vmid: vmid, iface: iface, @@ -77,6 +81,7 @@ func (s *createInterface) Do(ctx context.Context) ([]planner.Procedure, error) { if err != nil { return nil, fmt.Errorf("checking if networking interface exists: %w", err) } + if exists { // This whole block is unreachable right now, because // the Do function is called only if ShouldDo returns diff --git a/core/steps/network/interface_delete.go b/core/steps/network/interface_delete.go index 38275aac5..c7bcb4063 100644 --- a/core/steps/network/interface_delete.go +++ b/core/steps/network/interface_delete.go @@ -11,7 +11,11 @@ import ( "github.com/weaveworks/flintlock/pkg/planner" ) -func DeleteNetworkInterface(vmid *models.VMID, iface *models.NetworkInterface, svc ports.NetworkService) planner.Procedure { +func DeleteNetworkInterface( + vmid *models.VMID, + iface *models.NetworkInterface, + svc ports.NetworkService, +) planner.Procedure { return deleteInterface{ vmid: vmid, iface: iface, diff --git a/core/steps/runtime/dir_create.go b/core/steps/runtime/dir_create.go index 70affbc49..b602c1a69 100644 --- a/core/steps/runtime/dir_create.go +++ b/core/steps/runtime/dir_create.go @@ -7,7 +7,6 @@ import ( "github.com/sirupsen/logrus" "github.com/spf13/afero" - "github.com/weaveworks/flintlock/pkg/log" "github.com/weaveworks/flintlock/pkg/planner" ) @@ -38,8 +37,8 @@ func (s *createDirectory) ShouldDo(ctx context.Context) (bool, error) { "mode": s.mode.String(), }) logger.Debug("checking if procedure should be run") - logger.Trace("checking if directory exists") + exists, err := s.directoryExists() if err != nil { return false, err @@ -50,6 +49,7 @@ func (s *createDirectory) ShouldDo(ctx context.Context) (bool, error) { } logger.Trace("checking directory permissions") + info, err := s.fs.Stat(s.dir) if err != nil { return false, fmt.Errorf("doing stat on %s: %w", s.dir, err) @@ -78,14 +78,17 @@ func (s *createDirectory) Do(ctx context.Context) ([]planner.Procedure, error) { if err != nil { return nil, err } + if !exists { logger.Trace("creating directory") + if err := s.fs.Mkdir(s.dir, s.mode); err != nil { return nil, fmt.Errorf("creating directory %s: %w", s.dir, err) } } logger.Trace("setting permissions for directory") + if err := s.fs.Chmod(s.dir, s.mode); err != nil { return nil, fmt.Errorf("changing directory permissions for %s: %w", s.dir, err) } diff --git a/core/steps/runtime/dir_delete.go b/core/steps/runtime/dir_delete.go index 3e7ff3876..b005046af 100644 --- a/core/steps/runtime/dir_delete.go +++ b/core/steps/runtime/dir_delete.go @@ -6,7 +6,6 @@ import ( "github.com/sirupsen/logrus" "github.com/spf13/afero" - "github.com/weaveworks/flintlock/pkg/log" "github.com/weaveworks/flintlock/pkg/planner" ) @@ -52,8 +51,10 @@ func (s *deleteDirectory) Do(ctx context.Context) ([]planner.Procedure, error) { if err != nil { return nil, err } + if exists { logger.Trace("deleting directory") + if err := s.fs.RemoveAll(s.dir); err != nil { return nil, fmt.Errorf("deleting directory %s: %w", s.dir, err) } diff --git a/core/steps/runtime/initrd_mount.go b/core/steps/runtime/initrd_mount.go index 464fd5536..062453e10 100644 --- a/core/steps/runtime/initrd_mount.go +++ b/core/steps/runtime/initrd_mount.go @@ -5,7 +5,6 @@ import ( "fmt" "github.com/sirupsen/logrus" - cerrs "github.com/weaveworks/flintlock/core/errors" "github.com/weaveworks/flintlock/core/models" "github.com/weaveworks/flintlock/core/ports" @@ -50,6 +49,7 @@ func (s *initrdMount) ShouldDo(ctx context.Context) (bool, error) { } input := s.getMountSpec() + mounted, err := s.imageSvc.IsMounted(ctx, input) if err != nil { return false, fmt.Errorf("checking if image %s is mounted: %w", input.ImageName, err) @@ -76,6 +76,7 @@ func (s *initrdMount) Do(ctx context.Context) ([]planner.Procedure, error) { if err != nil { return nil, fmt.Errorf("mount images %s for initrd use: %w", input.ImageName, err) } + if len(mounts) == 0 { return nil, cerrs.ErrNoMount } diff --git a/core/steps/runtime/kernel_mount.go b/core/steps/runtime/kernel_mount.go index d7a7cbd18..e801f7f18 100644 --- a/core/steps/runtime/kernel_mount.go +++ b/core/steps/runtime/kernel_mount.go @@ -5,7 +5,6 @@ import ( "fmt" "github.com/sirupsen/logrus" - cerrs "github.com/weaveworks/flintlock/core/errors" "github.com/weaveworks/flintlock/core/models" "github.com/weaveworks/flintlock/core/ports" @@ -46,6 +45,7 @@ func (s *kernelMount) ShouldDo(ctx context.Context) (bool, error) { } input := s.getMountSpec() + mounted, err := s.imageSvc.IsMounted(ctx, input) if err != nil { return false, fmt.Errorf("checking if image %s is mounted: %w", input.ImageName, err) @@ -76,6 +76,7 @@ func (s *kernelMount) Do(ctx context.Context) ([]planner.Procedure, error) { if err != nil { return nil, fmt.Errorf("mount images %s for kernel use: %w", input.ImageName, err) } + if len(mounts) == 0 { return nil, cerrs.ErrNoMount } diff --git a/core/steps/runtime/repo_release.go b/core/steps/runtime/repo_release.go index 182b907f8..3075aa8ce 100644 --- a/core/steps/runtime/repo_release.go +++ b/core/steps/runtime/repo_release.go @@ -5,7 +5,6 @@ import ( "fmt" "github.com/sirupsen/logrus" - "github.com/weaveworks/flintlock/core/errors" "github.com/weaveworks/flintlock/core/models" "github.com/weaveworks/flintlock/core/ports" diff --git a/core/steps/runtime/volume_mount.go b/core/steps/runtime/volume_mount.go index 71b0ed225..a95a482d0 100644 --- a/core/steps/runtime/volume_mount.go +++ b/core/steps/runtime/volume_mount.go @@ -5,7 +5,6 @@ import ( "fmt" "github.com/sirupsen/logrus" - cerrs "github.com/weaveworks/flintlock/core/errors" "github.com/weaveworks/flintlock/core/models" "github.com/weaveworks/flintlock/core/ports" @@ -13,7 +12,11 @@ import ( "github.com/weaveworks/flintlock/pkg/planner" ) -func NewVolumeMount(vmid *models.VMID, volume *models.Volume, status *models.VolumeStatus, imageService ports.ImageService) planner.Procedure { +func NewVolumeMount(vmid *models.VMID, + volume *models.Volume, + status *models.VolumeStatus, + imageService ports.ImageService, +) planner.Procedure { return &volumeMount{ vmid: vmid, volume: volume, @@ -46,6 +49,7 @@ func (s *volumeMount) ShouldDo(ctx context.Context) (bool, error) { } input := s.getMountSpec() + mounted, err := s.imageSvc.IsMounted(ctx, input) if err != nil { return false, fmt.Errorf("checking if image %s is mounted: %w", input.ImageName, err) @@ -72,6 +76,7 @@ func (s *volumeMount) Do(ctx context.Context) ([]planner.Procedure, error) { if err != nil { return nil, fmt.Errorf("mount images %s for volume use: %w", input.ImageName, err) } + if len(mounts) == 0 { return nil, cerrs.ErrNoVolumeMount } diff --git a/infrastructure/containerd/content.go b/infrastructure/containerd/content.go index 503705d26..1c2fd2592 100644 --- a/infrastructure/containerd/content.go +++ b/infrastructure/containerd/content.go @@ -7,17 +7,14 @@ import ( "github.com/weaveworks/flintlock/pkg/defaults" ) -var ( - // NameLabel is the name of the containerd content store label used for the microvm name. - NameLabel = fmt.Sprintf("%s/name", defaults.Domain) - // NamespaceLabel is the name of the containerd content store label used for the microvm namespace. - NamespaceLabel = fmt.Sprintf("%s/ns", defaults.Domain) - // TypeLabel is the name of the containerd content store label used to denote the type of content. - TypeLabel = fmt.Sprintf("%s/type", defaults.Domain) - // VersionLabel is the name of the containerd content store label to hold version of the content. - VersionLabel = fmt.Sprintf("%s/version", defaults.Domain) +const ( // MicroVMSpecType is the type name for a microvm spec. MicroVMSpecType = "microvm" + + nameLabelFormat = "%s/name" + namespaceLabelFormat = "%s/ns" + typeLabelFormat = "%s/type" + versionLabelFormat = "%s/version" ) func contentRefName(microvm *models.MicroVM) string { @@ -27,3 +24,23 @@ func contentRefName(microvm *models.MicroVM) string { func labelFilter(name, value string) string { return fmt.Sprintf("labels.\"%s\"==\"%s\"", name, value) } + +// NameLabel is the name of the containerd content store label used for the microvm name. +func NameLabel() string { + return fmt.Sprintf(nameLabelFormat, defaults.Domain) +} + +// NamespaceLabel is the name of the containerd content store label used for the microvm namespace. +func NamespaceLabel() string { + return fmt.Sprintf(namespaceLabelFormat, defaults.Domain) +} + +// TypeLabel is the name of the containerd content store label used to denote the type of content. +func TypeLabel() string { + return fmt.Sprintf(typeLabelFormat, defaults.Domain) +} + +// VersionLabel is the name of the containerd content store label to hold version of the content. +func VersionLabel() string { + return fmt.Sprintf(versionLabelFormat, defaults.Domain) +} diff --git a/infrastructure/containerd/convert.go b/infrastructure/containerd/convert.go index 98eebf40e..d3f9255f5 100644 --- a/infrastructure/containerd/convert.go +++ b/infrastructure/containerd/convert.go @@ -6,7 +6,6 @@ import ( "github.com/containerd/containerd/events" "github.com/containerd/containerd/mount" "github.com/containerd/typeurl" - "github.com/weaveworks/flintlock/core/models" "github.com/weaveworks/flintlock/core/ports" ) @@ -39,11 +38,13 @@ func getOverlayMountPath(m mount.Mount) string { func convertMountsToModel(mounts []mount.Mount, snapshotter string) ([]models.Mount, error) { convertedMounts := []models.Mount{} + for _, m := range mounts { counvertedMount, err := convertMountToModel(m, snapshotter) if err != nil { return nil, fmt.Errorf("converting mount: %w", err) } + convertedMounts = append(convertedMounts, counvertedMount) } @@ -65,6 +66,7 @@ func convertCtrEventEnvelope(evt *events.Envelope) (*ports.EventEnvelope, error) if err != nil { return nil, fmt.Errorf("unmarshalling event: %w", err) } + converted.Event = v return converted, nil diff --git a/infrastructure/containerd/event_service.go b/infrastructure/containerd/event_service.go index 2f2d0c598..cf4d774bf 100644 --- a/infrastructure/containerd/event_service.go +++ b/infrastructure/containerd/event_service.go @@ -8,7 +8,6 @@ import ( "github.com/containerd/containerd" "github.com/containerd/containerd/events" "github.com/containerd/containerd/namespaces" - "github.com/weaveworks/flintlock/core/ports" ) @@ -37,6 +36,7 @@ type eventService struct { func (es *eventService) Publish(ctx context.Context, topic string, eventToPublish interface{}) error { namespaceCtx := namespaces.WithNamespace(ctx, es.cfg.Namespace) ctrEventSrv := es.client.EventService() + if err := ctrEventSrv.Publish(namespaceCtx, topic, eventToPublish); err != nil { return fmt.Errorf("publishing event: %w", err) } @@ -45,14 +45,18 @@ func (es *eventService) Publish(ctx context.Context, topic string, eventToPublis } // SubscribeTopic will subscribe to events on a named topic. -func (es *eventService) SubscribeTopic(ctx context.Context, topic string) (ch <-chan *ports.EventEnvelope, errs <-chan error) { +func (es *eventService) SubscribeTopic(ctx context.Context, + topic string, +) (ch <-chan *ports.EventEnvelope, errs <-chan error) { topicFilter := topicFilter(topic) return es.subscribe(ctx, topicFilter) } // SubscribeTopics will subscribe to events on a set of named topics. -func (es *eventService) SubscribeTopics(ctx context.Context, topics []string) (ch <-chan *ports.EventEnvelope, errs <-chan error) { +func (es *eventService) SubscribeTopics(ctx context.Context, + topics []string, +) (ch <-chan *ports.EventEnvelope, errs <-chan error) { topicFilters := []string{} for _, topic := range topics { @@ -67,18 +71,20 @@ func (es *eventService) Subscribe(ctx context.Context) (ch <-chan *ports.EventEn return es.subscribe(ctx) } -func (es *eventService) subscribe(ctx context.Context, filters ...string) (ch <-chan *ports.EventEnvelope, errs <-chan error) { +func (es *eventService) subscribe(ctx context.Context, + filters ...string, +) (ch <-chan *ports.EventEnvelope, errs <-chan error) { var ( - evtCh = make(chan *ports.EventEnvelope) - evtErrCh = make(chan error, 1) + evtCh = make(chan *ports.EventEnvelope) + evtErrCh = make(chan error, 1) + ctrEvents <-chan *events.Envelope + ctrErrs <-chan error ) + errs = evtErrCh ch = evtCh - namespaceCtx := namespaces.WithNamespace(ctx, es.cfg.Namespace) - var ctrEvents <-chan *events.Envelope - var ctrErrs <-chan error if len(filters) == 0 { ctrEvents, ctrErrs = es.client.Subscribe(namespaceCtx) } else { diff --git a/infrastructure/containerd/image_service.go b/infrastructure/containerd/image_service.go index 277341129..155936089 100644 --- a/infrastructure/containerd/image_service.go +++ b/infrastructure/containerd/image_service.go @@ -4,14 +4,12 @@ import ( "context" "fmt" - "github.com/containerd/containerd/snapshots" - "github.com/containerd/containerd" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/mount" "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/snapshots" "github.com/sirupsen/logrus" - "github.com/weaveworks/flintlock/core/models" "github.com/weaveworks/flintlock/core/ports" "github.com/weaveworks/flintlock/pkg/log" @@ -62,12 +60,14 @@ func (im *imageService) PullAndMount(ctx context.Context, input *ports.ImageMoun logger.Debugf("getting and mounting image %s for owner %s", input.ImageName, input.Owner) nsCtx := namespaces.WithNamespace(ctx, im.config.Namespace) + leaseCtx, err := withOwnerLease(nsCtx, input.Owner, im.client) if err != nil { return nil, fmt.Errorf("getting lease for image pulling and mounting: %w", err) } var image containerd.Image + exists, image, err := im.imageExists(leaseCtx, input.ImageName, input.Owner) if err != nil { return nil, fmt.Errorf("checking if image %s exists for owner %s: %w", input.ImageName, input.Owner, err) @@ -79,6 +79,7 @@ func (im *imageService) PullAndMount(ctx context.Context, input *ports.ImageMoun return nil, fmt.Errorf("getting image %s for owner %s: %w", input.ImageName, input.Owner, err) } } + ss := im.getSnapshotter(input.Use) return im.snapshotAndMount(leaseCtx, image, input.Owner, input.OwnerUsageID, ss, logger) @@ -110,6 +111,7 @@ func (im *imageService) IsMounted(ctx context.Context, input *ports.ImageMountSp if err != nil { return false, fmt.Errorf("checking image exists: %w", err) } + if !exists { return false, nil } @@ -126,7 +128,7 @@ func (im *imageService) IsMounted(ctx context.Context, input *ports.ImageMountSp return snapshotExists, nil } -func (im *imageService) imageExists(ctx context.Context, imageName string, owner string) (bool, containerd.Image, error) { +func (im *imageService) imageExists(ctx context.Context, imageName, owner string) (bool, containerd.Image, error) { leaseCtx, err := withOwnerLease(ctx, owner, im.client) if err != nil { return false, nil, fmt.Errorf("getting lease for owner: %w", err) @@ -158,13 +160,23 @@ func (im *imageService) pullImage(ctx context.Context, imageName string, owner s return image, nil } -func (im *imageService) snapshotAndMount(ctx context.Context, image containerd.Image, owner, ownerUsageID, snapshotter string, logger *logrus.Entry) ([]models.Mount, error) { +func (im *imageService) snapshotAndMount(ctx context.Context, + image containerd.Image, + owner, ownerUsageID, snapshotter string, + logger *logrus.Entry, +) ([]models.Mount, error) { unpacked, err := image.IsUnpacked(ctx, snapshotter) if err != nil { - return nil, fmt.Errorf("checking if image %s has been unpacked with snapshotter %s: %w", image.Name(), snapshotter, err) + return nil, fmt.Errorf("checking if image %s has been unpacked with snapshotter %s: %w", + image.Name(), + snapshotter, + err, + ) } + if !unpacked { logger.Debugf("image %s isn't unpacked, unpacking using %s snapshotter", image.Name(), snapshotter) + if unpackErr := image.Unpack(ctx, snapshotter); unpackErr != nil { return nil, fmt.Errorf("unpacking %s with snapshotter %s: %w", image.Name(), snapshotter, err) } @@ -174,6 +186,7 @@ func (im *imageService) snapshotAndMount(ctx context.Context, image containerd.I if err != nil { return nil, fmt.Errorf("getting rootfs content for %s: %w", image.Name(), err) } + parent := imageContent[0].String() snapshotKey := snapshotKey(owner, ownerUsageID) @@ -186,11 +199,13 @@ func (im *imageService) snapshotAndMount(ctx context.Context, image containerd.I } var mounts []mount.Mount + if !snapshotExists { labels := map[string]string{ "flintlock/owner": owner, "flintlock/owner-usage": ownerUsageID, } + mounts, err = ss.Prepare(ctx, snapshotKey, parent, snapshots.WithLabels(labels)) if err != nil { return nil, fmt.Errorf("preparing snapshot of %s: %w", image.Name(), err) diff --git a/infrastructure/containerd/lease.go b/infrastructure/containerd/lease.go index 18fb69151..4f8729186 100644 --- a/infrastructure/containerd/lease.go +++ b/infrastructure/containerd/lease.go @@ -5,7 +5,6 @@ import ( "fmt" "github.com/containerd/containerd" - "github.com/containerd/containerd/leases" ) @@ -22,6 +21,7 @@ func withOwnerLease(ctx context.Context, owner string, client *containerd.Client func getExistingOrCreateLease(ctx context.Context, name string, manager leases.Manager) (*leases.Lease, error) { filter := fmt.Sprintf("id==%s", name) + existingLeases, err := manager.List(ctx, filter) if err != nil { return nil, fmt.Errorf("listing existing containerd leases: %w", err) @@ -44,6 +44,7 @@ func getExistingOrCreateLease(ctx context.Context, name string, manager leases.M func deleteLease(ctx context.Context, owner string, client *containerd.Client) error { leaseName := getLeaseNameForOwner(owner) lease := leases.Lease{ID: leaseName} + err := client.LeasesService().Delete(ctx, lease, leases.SynchronousDelete) if err != nil { return fmt.Errorf("delete lease %s: %w", leaseName, err) diff --git a/infrastructure/containerd/repo.go b/infrastructure/containerd/repo.go index 36dd035c8..2e71568fa 100644 --- a/infrastructure/containerd/repo.go +++ b/infrastructure/containerd/repo.go @@ -14,7 +14,6 @@ import ( "github.com/google/go-cmp/cmp" "github.com/opencontainers/go-digest" v1 "github.com/opencontainers/image-spec/specs-go/v1" - "github.com/weaveworks/flintlock/core/errors" "github.com/weaveworks/flintlock/core/models" "github.com/weaveworks/flintlock/core/ports" @@ -64,9 +63,11 @@ func (r *containerdRepo) Save(ctx context.Context, microvm *models.MicroVM) (*mo if err != nil { return nil, fmt.Errorf("getting vm spec from store: %w", err) } + if existingSpec != nil { specDiff := cmp.Diff(existingSpec.Spec, microvm.Spec) statusDiff := cmp.Diff(existingSpec.Status, microvm.Status) + if specDiff == "" && statusDiff == "" { logger.Debug("microvm specs have no diff, skipping save") @@ -75,15 +76,18 @@ func (r *containerdRepo) Save(ctx context.Context, microvm *models.MicroVM) (*mo } namespaceCtx := namespaces.WithNamespace(ctx, r.config.Namespace) + leaseCtx, err := withOwnerLease(namespaceCtx, microvm.ID.String(), r.client) if err != nil { return nil, fmt.Errorf("getting lease for owner: %w", err) } + store := r.client.ContentStore() microvm.Version++ refName := contentRefName(microvm) + writer, err := store.Writer(leaseCtx, content.WithRef(refName)) if err != nil { return nil, fmt.Errorf("getting containerd writer: %w", err) @@ -100,6 +104,7 @@ func (r *containerdRepo) Save(ctx context.Context, microvm *models.MicroVM) (*mo } labels := getVMLabels(microvm) + err = writer.Commit(namespaceCtx, 0, "", content.WithLabels(labels)) if err != nil { return nil, fmt.Errorf("committing content to store: %w", err) @@ -119,6 +124,7 @@ func (r *containerdRepo) Get(ctx context.Context, options ports.RepositoryGetOpt if err != nil { return nil, fmt.Errorf("getting vm spec from store: %w", err) } + if spec == nil { return nil, errors.NewSpecNotFound(options.Name, options.Namespace, options.Version) } @@ -131,17 +137,17 @@ func (r *containerdRepo) Get(ctx context.Context, options ports.RepositoryGetOpt func (r *containerdRepo) GetAll(ctx context.Context, namespace string) ([]*models.MicroVM, error) { namespaceCtx := namespaces.WithNamespace(ctx, r.config.Namespace) store := r.client.ContentStore() + filters := []string{labelFilter(TypeLabel(), MicroVMSpecType)} + versions := map[string]int{} + digests := map[string]*digest.Digest{} - filters := []string{labelFilter(TypeLabel, MicroVMSpecType)} if namespace != "" { - filters = append(filters, labelFilter(NamespaceLabel, namespace)) + filters = append(filters, labelFilter(NamespaceLabel(), namespace)) } - versions := map[string]int{} - digests := map[string]*digest.Digest{} err := store.Walk(namespaceCtx, func(i content.Info) error { - name := i.Labels[NameLabel] - version, err := strconv.Atoi(i.Labels[VersionLabel]) + name := i.Labels[NameLabel()] + version, err := strconv.Atoi(i.Labels[VersionLabel()]) if err != nil { return fmt.Errorf("parsing version number: %w", err) } @@ -163,6 +169,7 @@ func (r *containerdRepo) GetAll(ctx context.Context, namespace string) ([]*model } items := []*models.MicroVM{} + for _, d := range digests { vm, getErr := r.getWithDigest(namespaceCtx, d) if getErr != nil { @@ -200,6 +207,7 @@ func (r *containerdRepo) Delete(ctx context.Context, microvm *models.MicroVM) er if err != nil { return fmt.Errorf("finding digests for %s: %w", microvm.ID, err) } + if len(digests) == 0 { // Ignore not found return nil @@ -244,6 +252,7 @@ func (r *containerdRepo) get(ctx context.Context, options ports.RepositoryGetOpt if err != nil { return nil, fmt.Errorf("finding content in store: %w", err) } + if digest == nil { return nil, nil } @@ -260,6 +269,7 @@ func (r *containerdRepo) getWithDigest(ctx context.Context, metadigest *digest.D } microvm := &models.MicroVM{} + err = json.Unmarshal(readData, microvm) if err != nil { return nil, fmt.Errorf("unmarshalling json content to microvm: %w", err) @@ -268,10 +278,14 @@ func (r *containerdRepo) getWithDigest(ctx context.Context, metadigest *digest.D return microvm, nil } -func (r *containerdRepo) findDigestForSpec(ctx context.Context, options ports.RepositoryGetOptions) (*digest.Digest, error) { - idLabelFilter := labelFilter(NameLabel, options.Name) - nsFilter := labelFilter(NamespaceLabel, options.Namespace) - versionFilter := labelFilter(VersionLabel, options.Version) +func (r *containerdRepo) findDigestForSpec(ctx context.Context, + options ports.RepositoryGetOptions, +) (*digest.Digest, error) { + var digest *digest.Digest + + idLabelFilter := labelFilter(NameLabel(), options.Name) + nsFilter := labelFilter(NamespaceLabel(), options.Namespace) + versionFilter := labelFilter(VersionLabel(), options.Version) combinedFilters := []string{idLabelFilter, nsFilter} @@ -281,14 +295,12 @@ func (r *containerdRepo) findDigestForSpec(ctx context.Context, options ports.Re allFilters := strings.Join(combinedFilters, ",") store := r.client.ContentStore() - - var digest *digest.Digest highestVersion := 0 err := store.Walk( ctx, func(i content.Info) error { - version, err := strconv.Atoi(i.Labels[VersionLabel]) + version, err := strconv.Atoi(i.Labels[VersionLabel()]) if err != nil { return fmt.Errorf("parsing version number: %w", err) } @@ -311,12 +323,12 @@ func (r *containerdRepo) findDigestForSpec(ctx context.Context, options ports.Re func (r *containerdRepo) findAllDigestForSpec(ctx context.Context, name, namespace string) ([]*digest.Digest, error) { store := r.client.ContentStore() - idLabelFilter := labelFilter(NameLabel, name) - nsLabelFilter := labelFilter(NamespaceLabel, namespace) + idLabelFilter := labelFilter(NameLabel(), name) + nsLabelFilter := labelFilter(NamespaceLabel(), namespace) combinedFilters := []string{idLabelFilter, nsLabelFilter} allFilters := strings.Join(combinedFilters, ",") - digests := []*digest.Digest{} + err := store.Walk( ctx, func(i content.Info) error { @@ -350,10 +362,10 @@ func (r *containerdRepo) getMutex(name string) *sync.RWMutex { func getVMLabels(microvm *models.MicroVM) map[string]string { labels := map[string]string{ - NameLabel: microvm.ID.Name(), - NamespaceLabel: microvm.ID.Namespace(), - TypeLabel: MicroVMSpecType, - VersionLabel: strconv.Itoa(microvm.Version), + NameLabel(): microvm.ID.Name(), + NamespaceLabel(): microvm.ID.Namespace(), + TypeLabel(): MicroVMSpecType, + VersionLabel(): strconv.Itoa(microvm.Version), } return labels diff --git a/infrastructure/containerd/snapshot.go b/infrastructure/containerd/snapshot.go index 80d115502..6db0de7a7 100644 --- a/infrastructure/containerd/snapshot.go +++ b/infrastructure/containerd/snapshot.go @@ -13,6 +13,7 @@ func snapshotKey(owner, ownerUsageID string) string { func snapshotExists(ctx context.Context, key string, ss snapshots.Snapshotter) (bool, error) { snapshotExists := false + err := ss.Walk(ctx, func(walkCtx context.Context, info snapshots.Info) error { if info.Name == key { snapshotExists = true diff --git a/infrastructure/controllers/microvm_controller.go b/infrastructure/controllers/microvm_controller.go index f8fd874b7..119c359d2 100644 --- a/infrastructure/controllers/microvm_controller.go +++ b/infrastructure/controllers/microvm_controller.go @@ -8,7 +8,6 @@ import ( "time" "github.com/sirupsen/logrus" - "github.com/weaveworks/flintlock/api/events" "github.com/weaveworks/flintlock/core/models" "github.com/weaveworks/flintlock/core/ports" @@ -32,7 +31,11 @@ type MicroVMController struct { queue queue.Queue } -func (r *MicroVMController) Run(ctx context.Context, numWorkers int, resyncPeriod time.Duration, resyncOnStart bool) error { +func (r *MicroVMController) Run(ctx context.Context, + numWorkers int, + resyncPeriod time.Duration, + resyncOnStart bool, +) error { logger := log.GetLogger(ctx).WithField("controller", "microvm") ctx = log.WithLogger(ctx, logger) logger.Infof("starting microvm controller with %d workers", numWorkers) @@ -44,14 +47,15 @@ func (r *MicroVMController) Run(ctx context.Context, numWorkers int, resyncPerio if resyncOnStart { if err := r.resyncSpecs(ctx, logger); err != nil { - // TODO: should we just log here? return fmt.Errorf("resyncing specs on start: %w", err) } } wg := &sync.WaitGroup{} + logger.Info("starting event listener") wg.Add(1) + go func() { defer wg.Done() r.runEventListener(ctx, resyncPeriod) @@ -59,9 +63,11 @@ func (r *MicroVMController) Run(ctx context.Context, numWorkers int, resyncPerio logger.Info("Starting workers", "num_workers", numWorkers) wg.Add(numWorkers) + for i := 0; i < numWorkers; i++ { go func() { defer wg.Done() + for r.processQueueItem(ctx) { } }() @@ -90,23 +96,24 @@ func (r *MicroVMController) runEventListener(ctx context.Context, resyncPeriod t return case evt := <-evtCh: if err := r.handleEvent(evt, logger); err != nil { + // TODO: should we exit here? #233 logger.Errorf("handling events: %s", err) - // TODO: should we exit here } case <-ticker.C: if err := r.resyncSpecs(ctx, logger); err != nil { + // TODO: should we exit here? #233 logger.Errorf("resyncing specs: %s", err) - // TODO: should we exit here } case evtErr := <-errCh: + // TODO: should we exit here? #233 logger.Errorf("error from event service: %s", evtErr) - // TODO: should we exit here? } } } func (r *MicroVMController) processQueueItem(ctx context.Context) bool { logger := log.GetLogger(ctx) + item, shutdown := r.queue.Dequeue() if shutdown { return false @@ -118,6 +125,7 @@ func (r *MicroVMController) processQueueItem(ctx context.Context) bool { return true } + vmid, err := models.NewVMIDFromString(id) if err != nil { logger.Errorf("failed to parse id into vmid %s, skipping: %s", id, err) @@ -138,6 +146,7 @@ func (r *MicroVMController) processQueueItem(ctx context.Context) bool { func (r *MicroVMController) handleEvent(envelope *ports.EventEnvelope, logger *logrus.Entry) error { var name, namespace string + switch v := envelope.Event.(type) { case *events.MicroVMSpecCreated: created, _ := envelope.Event.(*events.MicroVMSpecCreated) diff --git a/infrastructure/firecracker/config.go b/infrastructure/firecracker/config.go index 524bac544..8cc267394 100644 --- a/infrastructure/firecracker/config.go +++ b/infrastructure/firecracker/config.go @@ -5,15 +5,13 @@ import ( "encoding/base64" "fmt" - "github.com/weaveworks/flintlock/pkg/ptr" - "github.com/firecracker-microvm/firecracker-go-sdk" fcmodels "github.com/firecracker-microvm/firecracker-go-sdk/client/models" - "gopkg.in/yaml.v3" - "github.com/weaveworks/flintlock/core/errors" "github.com/weaveworks/flintlock/core/models" "github.com/weaveworks/flintlock/pkg/cloudinit" + "github.com/weaveworks/flintlock/pkg/ptr" + "gopkg.in/yaml.v3" ) const ( @@ -31,8 +29,6 @@ func CreateConfig(opts ...ConfigOption) (*VmmConfig, error) { } } - // TODO: do we need to add validation? - return cfg, nil } @@ -49,6 +45,7 @@ func WithMicroVM(vm *models.MicroVM) ConfigOption { } cfg.NetDevices = []NetworkInterfaceConfig{} + for i := range vm.Spec.NetworkInterfaces { iface := vm.Spec.NetworkInterfaces[i] @@ -62,6 +59,7 @@ func WithMicroVM(vm *models.MicroVM) ConfigOption { } cfg.BlockDevices = []BlockDeviceConfig{} + for _, vol := range vm.Spec.Volumes { status, ok := vm.Status.Volumes[vol.ID] if !ok { @@ -80,11 +78,13 @@ func WithMicroVM(vm *models.MicroVM) ConfigOption { } kernelArgs := vm.Spec.Kernel.CmdLine + if vm.Spec.Kernel.AddNetworkConfig { networkConfig, err := generateNetworkConfig(vm) if err != nil { return fmt.Errorf("generating kernel network-config: %w", err) } + kernelArgs = fmt.Sprintf("%s network-config=%s", kernelArgs, networkConfig) } @@ -132,6 +132,7 @@ func ApplyConfig(ctx context.Context, cfg *VmmConfig, client *firecracker.Client if err != nil { return fmt.Errorf("failed to put machine configuration: %w", err) } + for _, drive := range cfg.BlockDevices { _, err := client.PutGuestDriveByID(ctx, drive.ID, &fcmodels.Drive{ DriveID: &drive.ID, @@ -145,8 +146,10 @@ func ApplyConfig(ctx context.Context, cfg *VmmConfig, client *firecracker.Client return fmt.Errorf("putting drive configuration: %w", err) } } + for i, netInt := range cfg.NetDevices { guestIfaceName := fmt.Sprintf("eth%d", i) + _, err := client.PutGuestNetworkInterfaceByID(ctx, guestIfaceName, &fcmodels.NetworkInterface{ IfaceID: &guestIfaceName, GuestMac: netInt.GuestMAC, @@ -188,6 +191,7 @@ func ApplyConfig(ctx context.Context, cfg *VmmConfig, client *firecracker.Client return fmt.Errorf("failed to put logging configuration: %w", err) } } + if cfg.Metrics != nil { _, err = client.PutMetrics(ctx, &fcmodels.Metrics{ MetricsPath: &cfg.Metrics.Path, @@ -210,6 +214,7 @@ func ApplyMetadata(ctx context.Context, metadata map[string]string, client *fire meta := &Metadata{ Latest: map[string]string{}, } + for metadataKey, metadataVal := range metadata { encodedVal, err := base64.StdEncoding.DecodeString(metadataVal) if err != nil { @@ -232,6 +237,7 @@ func createNetworkIface(iface *models.NetworkInterface, status *models.NetworkIn if iface.Type == models.IfaceTypeMacvtap { hostDevName = fmt.Sprintf("/dev/tap%d", status.Index) + if macAddr == "" { macAddr = status.MACAddress } @@ -255,10 +261,12 @@ func generateNetworkConfig(vm *models.MicroVM) (string, error) { for i := range vm.Spec.NetworkInterfaces { iface := vm.Spec.NetworkInterfaces[i] + status, ok := vm.Status.NetworkInterfaces[iface.GuestDeviceName] if !ok { return "", errors.NewNetworkInterfaceStatusMissing(iface.GuestDeviceName) } + macAdress := getMacAddress(&iface, status) eth := &cloudinit.Ethernet{ diff --git a/infrastructure/firecracker/create.go b/infrastructure/firecracker/create.go index 5084462da..ebd3e7abe 100644 --- a/infrastructure/firecracker/create.go +++ b/infrastructure/firecracker/create.go @@ -12,7 +12,6 @@ import ( "github.com/firecracker-microvm/firecracker-go-sdk" "github.com/sirupsen/logrus" "github.com/spf13/afero" - "github.com/weaveworks/flintlock/core/models" "github.com/weaveworks/flintlock/pkg/defaults" "github.com/weaveworks/flintlock/pkg/log" @@ -34,6 +33,7 @@ func (p *fcProvider) Create(ctx context.Context, vm *models.MicroVM) error { logger.Debugf("creating microvm") vmState := NewState(vm.ID, p.config.StateRoot, p.fs) + if err := p.ensureState(vmState); err != nil { return fmt.Errorf("ensuring state dir: %w", err) } @@ -42,12 +42,14 @@ func (p *fcProvider) Create(ctx context.Context, vm *models.MicroVM) error { if err != nil { return fmt.Errorf("creating firecracker config: %w", err) } + if err := vmState.SetConfig(config); err != nil { return fmt.Errorf("saving firecracker config: %w", err) } id := strings.ReplaceAll(vm.ID.String(), "/", "-") args := []string{"--id", id, "--boot-timer"} + if !p.config.APIConfig { args = append(args, "--config-file", vmState.ConfigPath()) } @@ -67,7 +69,11 @@ func (p *fcProvider) Create(ctx context.Context, vm *models.MicroVM) error { return fmt.Errorf("saving pid %d to file: %w", proc.Pid, err) } - err = wait.ForCondition(wait.FileExistsCondition(vmState.SockPath(), p.fs), socketTimeoutInSec*time.Second, socketPollInMs*time.Millisecond) + err = wait.ForCondition( + wait.FileExistsCondition(vmState.SockPath(), p.fs), + socketTimeoutInSec*time.Second, + socketPollInMs*time.Millisecond, + ) if err != nil { return fmt.Errorf("waiting for sock file to exist: %w", err) } @@ -77,6 +83,7 @@ func (p *fcProvider) Create(ctx context.Context, vm *models.MicroVM) error { if err := ApplyConfig(ctx, config, client); err != nil { return fmt.Errorf("applying firecracker configuration: %w", err) } + if err := ApplyMetadata(ctx, vm.Spec.Metadata, client); err != nil { return fmt.Errorf("applying metadata to mmds: %w", err) } @@ -95,6 +102,7 @@ func (p *fcProvider) startFirecracker(cmd *exec.Cmd, vmState State, detached boo if err != nil { return nil, fmt.Errorf("opening sterr file %s: %w", vmState.StderrPath(), err) } + cmd.Stderr = stdErrFile cmd.Stdout = stdOutFile cmd.Stdin = &bytes.Buffer{} @@ -130,6 +138,7 @@ func (p *fcProvider) ensureState(vmState State) error { if err != nil { return fmt.Errorf("checking if sock dir exists: %w", err) } + if sockExists { if delErr := p.fs.Remove(vmState.SockPath()); delErr != nil { return fmt.Errorf("deleting existing sock file: %w", err) @@ -140,12 +149,14 @@ func (p *fcProvider) ensureState(vmState State) error { if err != nil { return fmt.Errorf("opening log file %s: %w", vmState.LogPath(), err) } + logFile.Close() metricsFile, err := p.fs.OpenFile(vmState.MetricsPath(), os.O_WRONLY|os.O_CREATE|os.O_APPEND, defaults.DataFilePerm) if err != nil { return fmt.Errorf("opening metrics file %s: %w", vmState.MetricsPath(), err) } + metricsFile.Close() return nil diff --git a/infrastructure/firecracker/provider.go b/infrastructure/firecracker/provider.go index 884b6863f..44f9480ca 100644 --- a/infrastructure/firecracker/provider.go +++ b/infrastructure/firecracker/provider.go @@ -7,15 +7,13 @@ import ( "net/url" "os" - "github.com/go-openapi/strfmt" - "github.com/sirupsen/logrus" - "github.com/spf13/afero" - "github.com/firecracker-microvm/firecracker-go-sdk" "github.com/firecracker-microvm/firecracker-go-sdk/client" fcmodels "github.com/firecracker-microvm/firecracker-go-sdk/client/models" "github.com/firecracker-microvm/firecracker-go-sdk/client/operations" - + "github.com/go-openapi/strfmt" + "github.com/sirupsen/logrus" + "github.com/spf13/afero" "github.com/weaveworks/flintlock/core/models" "github.com/weaveworks/flintlock/core/ports" "github.com/weaveworks/flintlock/pkg/log" @@ -78,6 +76,7 @@ func (p *fcProvider) Start(ctx context.Context, id string) error { if err != nil { return fmt.Errorf("checking if instance is running: %w", err) } + if state == ports.MicroVMStateRunning { logger.Debug("instance is already running, not starting") @@ -88,15 +87,17 @@ func (p *fcProvider) Start(ctx context.Context, id string) error { if err != nil { return fmt.Errorf("parsing vmid: %w", err) } - vmState := NewState(*vmid, p.config.StateRoot, p.fs) + vmState := NewState(*vmid, p.config.StateRoot, p.fs) socketPath := vmState.SockPath() logger.Tracef("using socket %s", socketPath) client := firecracker.NewClient(socketPath, logger, true) + _, err = client.CreateSyncAction(ctx, &fcmodels.InstanceActionInfo{ ActionType: firecracker.String("InstanceStart"), }) + if err != nil { return fmt.Errorf("failed to create start action: %w", err) } @@ -139,8 +140,8 @@ func (p *fcProvider) Delete(ctx context.Context, id string) error { if err != nil { return fmt.Errorf("parsing vmid: %w", err) } - vmState := NewState(*vmid, p.config.StateRoot, p.fs) + vmState := NewState(*vmid, p.config.StateRoot, p.fs) socketPath := vmState.SockPath() logger.Tracef("using socket %s", socketPath) @@ -193,13 +194,15 @@ func (p *fcProvider) State(ctx context.Context, id string) (ports.MicroVMState, if err != nil { return ports.MicroVMStateUnknown, fmt.Errorf("parsing vmid: %w", err) } - vmState := NewState(*vmid, p.config.StateRoot, p.fs) + vmState := NewState(*vmid, p.config.StateRoot, p.fs) pidPath := vmState.PIDPath() + exists, err := afero.Exists(p.fs, pidPath) if err != nil { return ports.MicroVMStateUnknown, fmt.Errorf("checking pid file exists: %w", err) } + if !exists { return ports.MicroVMStatePending, nil } @@ -213,6 +216,7 @@ func (p *fcProvider) State(ctx context.Context, id string) (ports.MicroVMState, if err != nil { return ports.MicroVMStateUnknown, fmt.Errorf("checking if firecracker process is running: %w", err) } + if !processExists { return ports.MicroVMStatePending, nil } diff --git a/infrastructure/firecracker/state.go b/infrastructure/firecracker/state.go index 7981e50fd..e4b7f1e2e 100644 --- a/infrastructure/firecracker/state.go +++ b/infrastructure/firecracker/state.go @@ -9,7 +9,6 @@ import ( "strconv" "github.com/spf13/afero" - "github.com/weaveworks/flintlock/core/models" "github.com/weaveworks/flintlock/pkg/defaults" ) @@ -113,11 +112,12 @@ func (s *fsState) pidReadFromFile(pidFile string) (int, error) { func (s *fsState) pidWriteToFile(pid int, pidFile string) error { file, err := s.fs.OpenFile(pidFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, defaults.DataFilePerm) - defer file.Close() //nolint: staticcheck if err != nil { return fmt.Errorf("opening pid file %s: %w", pidFile, err) } + defer file.Close() + _, err = fmt.Fprintf(file, "%d", pid) if err != nil { return fmt.Errorf("writing pid %d to file %s: %w", pid, pidFile, err) @@ -138,6 +138,7 @@ func (s *fsState) cfgReadFromFile(cfgFile string) (*VmmConfig, error) { } cfg := &VmmConfig{} + err = json.Unmarshal(data, cfg) if err != nil { return nil, fmt.Errorf("unmarshalling firecracker config: %w", err) @@ -153,7 +154,9 @@ func (s *fsState) cfgWriteToFile(cfg *VmmConfig, cfgFile string) error { } file, err := s.fs.OpenFile(cfgFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, defaults.DataFilePerm) + defer file.Close() //nolint: staticcheck + if err != nil { return fmt.Errorf("opening firecracker config file %s: %w", cfgFile, err) } diff --git a/infrastructure/firecracker/types.go b/infrastructure/firecracker/types.go index 7169bea02..f70abde88 100644 --- a/infrastructure/firecracker/types.go +++ b/infrastructure/firecracker/types.go @@ -1,7 +1,30 @@ package firecracker -// VmmConfig contains the configuration of the microvm. Based on the rust structure -// from here https://github.com/firecracker-microvm/firecracker/blob/0690010524001b606f67c1a65c67f3c27883183f/src/vmm/src/resources.rs#L51. +const ( + // CacheTypeUnsafe indovates the flushing mechanic will be advertised to + // the guest driver, but the operation will be a noop. + CacheTypeUnsafe CacheType = "Unsafe" + // CacheTypeWriteBack indicates the flushing mechanic will be advertised + // to the guest driver and flush requests coming from the guest will be + // performed using `fsync`. + CacheTypeWriteBack CacheType = "WriteBack" + + LogLevelError LogLevel = "Error" + LogLevelWarning LogLevel = "Warning" + LogLevelInfo LogLevel = "Info" + LogLevelDebug LogLevel = "Debug" + + // InstanceStateNotStarted the instance hasn't started running yet. + InstanceStateNotStarted InstanceState = "Not started" + // InstanceStateRunning the instance is running. + InstanceStateRunning InstanceState = "Running" + // InstanceStatePaused the instance is currently paused. + InstanceStatePaused InstanceState = "Paused" +) + +// VmmConfig contains the configuration of the microvm. +// Based on the rust structure from firecracker: +// https://github.com/firecracker-microvm/firecracker/blob/0690010524001b606f67c1a65c67f3c27883183f/src/vmm/src/resources.rs#L51. type VmmConfig struct { // Balloon hols the balloon device configuration. Balloon *BalloonDeviceConfig `json:"balloon,omitempty"` @@ -38,16 +61,6 @@ type VMConfig struct { type CacheType string -var ( - // CacheTypeUnsafe indovates the flushing mechanic will be advertised to - // the guest driver, but the operation will be a noop. - CacheTypeUnsafe CacheType = "Unsafe" - // CacheTypeWriteBack indicates the flushing mechanic will be advertised - // to the guest driver and flush requests coming from the guest will be - // performed using `fsync`. - CacheTypeWriteBack CacheType = "WriteBack" -) - // BlockDeviceConfig contains the configuration for a microvm block device. type BlockDeviceConfig struct { // ID is the unique identifier of the drive. @@ -104,13 +117,6 @@ type NetworkInterfaceConfig struct { type LogLevel string -var ( - LogLevelError LogLevel = "Error" - LogLevelWarning LogLevel = "Warning" - LogLevelInfo LogLevel = "Info" - LogLevelDebug LogLevel = "Debug" -) - // LoggerConfig holds the configuration for the logger. type LoggerConfig struct { // LogPath is the named pipe or file used as output for logs. @@ -161,12 +167,3 @@ type Metadata struct { // InstanceState is a type that represents the running state of a Firecracker instance. type InstanceState string - -var ( - // InstanceStateNotStarted the instance hasn't started running yet. - InstanceStateNotStarted InstanceState = "Not started" - // InstanceStateRunning the instance is running. - InstanceStateRunning InstanceState = "Running" - // InstanceStatePaused the instance is currently paused. - InstanceStatePaused InstanceState = "Paused" -) diff --git a/infrastructure/grpc/convert.go b/infrastructure/grpc/convert.go index fb1724823..d573e21cb 100644 --- a/infrastructure/grpc/convert.go +++ b/infrastructure/grpc/convert.go @@ -12,6 +12,7 @@ func convertMicroVMToModel(spec *types.MicroVMSpec) (*models.MicroVM, error) { if err != nil { return nil, fmt.Errorf("creating vmid from spec: %w", err) } + convertedModel := &models.MicroVM{ ID: *vmid, // Labels @@ -28,6 +29,7 @@ func convertMicroVMToModel(spec *types.MicroVMSpec) (*models.MicroVM, error) { if spec.Kernel.Filename != nil { convertedModel.Spec.Kernel.Filename = *spec.Kernel.Filename } + if spec.Initrd != nil { convertedModel.Spec.Initrd = &models.Initrd{ Image: models.ContainerImage(spec.Initrd.Image), @@ -60,9 +62,11 @@ func convertNetworkInterfaceToModel(netInt *types.NetworkInterface) *models.Netw AllowMetadataRequests: netInt.AllowMetadataReq, GuestDeviceName: netInt.GuestDeviceName, } + if netInt.GuestMac != nil { converted.GuestMAC = *netInt.GuestMac } + if netInt.Address != nil { converted.Address = *netInt.Address } @@ -84,9 +88,11 @@ func convertVolumeToModel(volume *types.Volume) *models.Volume { IsRoot: volume.IsRoot, IsReadOnly: volume.IsReadOnly, } + if volume.PartitionId != nil { convertedVol.PartitionID = *volume.PartitionId } + if volume.SizeInMb != nil { convertedVol.Size = *volume.SizeInMb } @@ -135,6 +141,7 @@ func convertModelToMicroVM(mvm *models.MicroVM) *types.MicroVMSpec { } converted.Metadata = map[string]string{} + for metadataKey, metadataValue := range mvm.Spec.Metadata { converted.Metadata[metadataKey] = metadataValue } diff --git a/infrastructure/grpc/server.go b/infrastructure/grpc/server.go index b5038ad07..165f359de 100644 --- a/infrastructure/grpc/server.go +++ b/infrastructure/grpc/server.go @@ -5,16 +5,15 @@ import ( "errors" "fmt" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - "google.golang.org/protobuf/types/known/emptypb" - "github.com/go-playground/validator/v10" mvmv1 "github.com/weaveworks/flintlock/api/services/microvm/v1alpha1" "github.com/weaveworks/flintlock/api/types" "github.com/weaveworks/flintlock/core/ports" "github.com/weaveworks/flintlock/pkg/log" "github.com/weaveworks/flintlock/pkg/validation" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/emptypb" ) // NewServer creates a new server instance. @@ -33,27 +32,36 @@ type server struct { validator validation.Validator } -func (s *server) CreateMicroVM(ctx context.Context, req *mvmv1.CreateMicroVMRequest) (*mvmv1.CreateMicroVMResponse, error) { +func (s *server) CreateMicroVM(ctx context.Context, + req *mvmv1.CreateMicroVMRequest, +) (*mvmv1.CreateMicroVMResponse, error) { logger := log.GetLogger(ctx) logger.Trace("converting request to model") + modelSpec, err := convertMicroVMToModel(req.Microvm) if err != nil { return nil, fmt.Errorf("converting request: %w", err) } logger.Trace("validating model") - err = s.validator.ValidateStruct(modelSpec) + var valErrors validator.ValidationErrors - if err != nil { + + if err := s.validator.ValidateStruct(modelSpec); err != nil { if errors.As(err, &valErrors) { - return nil, status.Errorf(codes.InvalidArgument, "an error occurred when attempting to validate the request: %v", err) + return nil, status.Errorf( + codes.InvalidArgument, + "an error occurred when attempting to validate the request: %v", + err, + ) } return nil, status.Errorf(codes.Internal, "an error occurred: %v", err) } logger.Infof("creating microvm %s", modelSpec.ID) + createdModel, err := s.commandUC.CreateMicroVM(ctx, modelSpec) if err != nil { logger.Errorf("failed to create microvm: %s", err) @@ -62,6 +70,7 @@ func (s *server) CreateMicroVM(ctx context.Context, req *mvmv1.CreateMicroVMRequ } logger.Trace("converting model to response") + resp := &mvmv1.CreateMicroVMResponse{ Microvm: convertModelToMicroVM(createdModel), } @@ -73,8 +82,8 @@ func (s *server) DeleteMicroVM(ctx context.Context, req *mvmv1.DeleteMicroVMRequ logger := log.GetLogger(ctx) logger.Infof("deleting microvm %s/%s", req.Id, req.Namespace) - err := s.commandUC.DeleteMicroVM(ctx, req.Id, req.Namespace) - if err != nil { + + if err := s.commandUC.DeleteMicroVM(ctx, req.Id, req.Namespace); err != nil { logger.Errorf("failed to delete microvm: %s", err) return nil, fmt.Errorf("deleting microvm: %w", err) @@ -85,8 +94,8 @@ func (s *server) DeleteMicroVM(ctx context.Context, req *mvmv1.DeleteMicroVMRequ func (s *server) GetMicroVM(ctx context.Context, req *mvmv1.GetMicroVMRequest) (*mvmv1.GetMicroVMResponse, error) { logger := log.GetLogger(ctx) - logger.Infof("getting microvm %s/%s", req.Namespace, req.Id) + foundMicrovm, err := s.queryUC.GetMicroVM(ctx, req.Id, req.Namespace) if err != nil { logger.Errorf("failed to get microvm: %s", err) @@ -95,6 +104,7 @@ func (s *server) GetMicroVM(ctx context.Context, req *mvmv1.GetMicroVMRequest) ( } logger.Trace("converting model to response") + resp := &mvmv1.GetMicroVMResponse{ Microvm: &types.MicroVM{ Version: int32(foundMicrovm.Version), @@ -106,10 +116,12 @@ func (s *server) GetMicroVM(ctx context.Context, req *mvmv1.GetMicroVMRequest) ( return resp, nil } -func (s *server) ListMicroVMs(ctx context.Context, req *mvmv1.ListMicroVMsRequest) (*mvmv1.ListMicroVMsResponse, error) { +func (s *server) ListMicroVMs(ctx context.Context, + req *mvmv1.ListMicroVMsRequest, +) (*mvmv1.ListMicroVMsResponse, error) { logger := log.GetLogger(ctx) - logger.Infof("getting all microvms in %s", req.Namespace) + foundMicrovms, err := s.queryUC.GetAllMicroVM(ctx, req.Namespace) if err != nil { logger.Errorf("failed to getting all microvm: %s", err) @@ -118,6 +130,7 @@ func (s *server) ListMicroVMs(ctx context.Context, req *mvmv1.ListMicroVMsReques } logger.Trace("converting model to response") + resp := &mvmv1.ListMicroVMsResponse{ Microvm: []*types.MicroVM{}, } @@ -139,6 +152,7 @@ func (s *server) ListMicroVMsStream(req *mvmv1.ListMicroVMsRequest, ss mvmv1.Mic logger := log.GetLogger(ctx) logger.Infof("getting all microvms in %s", req.Namespace) + foundMicrovms, err := s.queryUC.GetAllMicroVM(ctx, req.Namespace) if err != nil { logger.Errorf("failed to getting all microvm: %s", err) @@ -147,6 +161,7 @@ func (s *server) ListMicroVMsStream(req *mvmv1.ListMicroVMsRequest, ss mvmv1.Mic } logger.Info("streaming found microvm results") + for _, mvm := range foundMicrovms { resp := &mvmv1.ListMessage{ Microvm: convertModelToMicroVM(mvm), diff --git a/infrastructure/network/network_service.go b/infrastructure/network/network_service.go index 6c0965baa..45276a78a 100644 --- a/infrastructure/network/network_service.go +++ b/infrastructure/network/network_service.go @@ -7,7 +7,6 @@ import ( "github.com/sirupsen/logrus" "github.com/vishvananda/netlink" - "github.com/weaveworks/flintlock/core/errors" "github.com/weaveworks/flintlock/core/models" "github.com/weaveworks/flintlock/core/ports" @@ -34,10 +33,18 @@ func (n *networkService) IfaceCreate(ctx context.Context, input ports.IfaceCreat "service": "netlink_network", "iface": input.DeviceName, }) - logger.Debugf("creating network interface with type %s and MAC %s using parent %s", input.Type, input.MAC, n.parentDeviceName) + logger.Debugf( + "creating network interface with type %s and MAC %s using parent %s", + input.Type, + input.MAC, + n.parentDeviceName, + ) + + var ( + parentLink netlink.Link + err error + ) - var parentLink netlink.Link - var err error if input.Type == models.IfaceTypeMacvtap { if n.parentDeviceName == "" { return nil, errors.ErrParentIfaceRequired @@ -50,12 +57,13 @@ func (n *networkService) IfaceCreate(ctx context.Context, input ports.IfaceCreat } var link netlink.Link + switch input.Type { case models.IfaceTypeTap: link = &netlink.Tuntap{ LinkAttrs: netlink.LinkAttrs{ Name: input.DeviceName, - // TODO: add Namespace + // TODO: add Namespace #237 }, Mode: netlink.TUNTAP_MODE_TAP, } @@ -66,7 +74,7 @@ func (n *networkService) IfaceCreate(ctx context.Context, input ports.IfaceCreat Name: input.DeviceName, MTU: parentLink.Attrs().MTU, ParentIndex: parentLink.Attrs().Index, - Namespace: parentLink.Attrs().Namespace, // TODO: add namespace specific to vm + Namespace: parentLink.Attrs().Namespace, // TODO: add namespace specific to vm #237 TxQLen: parentLink.Attrs().TxQLen, }, Mode: netlink.MACVLAN_MODE_BRIDGE, @@ -151,6 +159,7 @@ func (n *networkService) IfaceDetails(ctx context.Context, name string) (*ports. if err != nil { return nil, fmt.Errorf("getting interface %s: %w", name, err) } + if !found { return nil, errors.ErrIfaceNotFound } diff --git a/infrastructure/ulid/ulid.go b/infrastructure/ulid/ulid.go index 13b582501..288b5c848 100644 --- a/infrastructure/ulid/ulid.go +++ b/infrastructure/ulid/ulid.go @@ -6,17 +6,17 @@ import ( "time" "github.com/oklog/ulid" - "github.com/weaveworks/flintlock/core/ports" ) -// DefaultRand is a random source based on the unix time not. -var DefaultRand = rand.New(rand.NewSource(time.Now().UnixNano())) //nolint:gosec +func randomSource() *rand.Rand { + return rand.New(rand.NewSource(time.Now().UnixNano())) +} // New will create a new ulid based ID service using the default random source. func New() ports.IDService { return &ulidIDService{ - rnd: DefaultRand, + rnd: randomSource(), } } @@ -34,6 +34,7 @@ type ulidIDService struct { // GenerateRandom will generate a random identifier using ulid. func (u *ulidIDService) GenerateRandom() (string, error) { entropy := ulid.Monotonic(u.rnd, 0) + newID, err := ulid.New(ulid.Now(), entropy) if err != nil { return "", fmt.Errorf("generating microvm id: %w", err) diff --git a/internal/command/flags/flags.go b/internal/command/flags/flags.go index 26cd9d34d..f6cf479e0 100644 --- a/internal/command/flags/flags.go +++ b/internal/command/flags/flags.go @@ -4,7 +4,6 @@ import ( "fmt" "github.com/spf13/cobra" - "github.com/weaveworks/flintlock/internal/config" "github.com/weaveworks/flintlock/pkg/defaults" ) @@ -78,9 +77,11 @@ func AddHiddenFlagsToCommand(cmd *cobra.Command, cfg *config.Config) error { if err := cmd.Flags().MarkHidden(disableReconcileFlag); err != nil { return fmt.Errorf("setting %s as hidden: %w", disableReconcileFlag, err) } + if err := cmd.Flags().MarkHidden(maximumRetryFlag); err != nil { return fmt.Errorf("setting %s as hidden: %w", maximumRetryFlag, err) } + if err := cmd.Flags().MarkHidden(disableAPIFlag); err != nil { return fmt.Errorf("setting %s as hidden: %w", disableAPIFlag, err) } diff --git a/internal/command/gw/gw.go b/internal/command/gw/gw.go index cc5a70dae..49894fef0 100644 --- a/internal/command/gw/gw.go +++ b/internal/command/gw/gw.go @@ -10,14 +10,13 @@ import ( "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" "github.com/spf13/cobra" - "google.golang.org/grpc" - mvmv1 "github.com/weaveworks/flintlock/api/services/microvm/v1alpha1" cmdflags "github.com/weaveworks/flintlock/internal/command/flags" "github.com/weaveworks/flintlock/internal/config" "github.com/weaveworks/flintlock/internal/version" "github.com/weaveworks/flintlock/pkg/flags" "github.com/weaveworks/flintlock/pkg/log" + "google.golang.org/grpc" ) // NewCommand creates a new cobra command for running the gRPC HTTP gateway. @@ -59,8 +58,10 @@ func runGWServer(ctx context.Context, cfg *config.Config) error { ctx, cancel := context.WithCancel(log.WithLogger(ctx, logger)) wg.Add(1) + go func() { defer wg.Done() + if err := serveAPI(ctx, cfg); err != nil { logger.Errorf("failed serving api: %v", err) } @@ -81,16 +82,6 @@ func serveAPI(ctx context.Context, cfg *config.Config) error { logger := log.GetLogger(ctx) mux := runtime.NewServeMux() - // TODO: create the dependencies for the server - - // grpcServer := grpc.NewServer( - // grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor), - // grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor), - // ) - // mvmv1.RegisterMicroVMServer(grpcServer, server.NewServer()) - // grpc_prometheus.Register(grpcServer) - // http.Handle("/metrics", promhttp.Handler()) - opts := []grpc.DialOption{ grpc.WithInsecure(), } @@ -107,26 +98,14 @@ func serveAPI(ctx context.Context, cfg *config.Config) error { go func() { <-ctx.Done() logger.Infof("shutting down the http gateway server") + if err := s.Shutdown(context.Background()); err != nil { logger.Errorf("failed to shutdown http gateway server: %v", err) } - // logger.Infof("shutting down grpc server") - // grpcServer.GracefulStop() }() - // logger.Debugf("starting grpc server listening on endpoint %s", cfg.GRPCAPIEndpoint) - // l, err := net.Listen("tcp", cfg.GRPCAPIEndpoint) - // if err != nil { - // return fmt.Errorf("setting up gRPC api listener: %w", err) - // } - // defer l.Close() - // go func() { - // if err := grpcServer.Serve(l); err != nil { - // logger.Fatalf("serving grpc api: %v", err) // TODO: remove this fatal - // } - // }() - logger.Debugf("starting http server listening on endpoint %s", cfg.HTTPAPIEndpoint) + if err := s.ListenAndServe(); err != nil { return fmt.Errorf("listening and serving http api: %w", err) } diff --git a/internal/command/root.go b/internal/command/root.go index 40bfbe208..878d71c08 100644 --- a/internal/command/root.go +++ b/internal/command/root.go @@ -6,7 +6,6 @@ import ( "github.com/spf13/cobra" "github.com/spf13/viper" - "github.com/weaveworks/flintlock/internal/command/gw" "github.com/weaveworks/flintlock/internal/command/run" "github.com/weaveworks/flintlock/internal/config" @@ -37,6 +36,7 @@ func NewRootCommand() (*cobra.Command, error) { } log.AddFlagsToCommand(cmd, &cfg.Logging) + if err := addRootSubCommands(cmd, cfg); err != nil { return nil, fmt.Errorf("adding subcommands: %w", err) } diff --git a/internal/command/run/run.go b/internal/command/run/run.go index 0ff3cca7e..1de29c6bb 100644 --- a/internal/command/run/run.go +++ b/internal/command/run/run.go @@ -12,9 +12,6 @@ import ( grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/spf13/cobra" - "google.golang.org/grpc" - "google.golang.org/grpc/reflection" - mvmv1 "github.com/weaveworks/flintlock/api/services/microvm/v1alpha1" cmdflags "github.com/weaveworks/flintlock/internal/command/flags" "github.com/weaveworks/flintlock/internal/config" @@ -23,6 +20,8 @@ import ( "github.com/weaveworks/flintlock/pkg/defaults" "github.com/weaveworks/flintlock/pkg/flags" "github.com/weaveworks/flintlock/pkg/log" + "google.golang.org/grpc" + "google.golang.org/grpc/reflection" ) // NewCommand creates a new cobra command for running flintlock. @@ -49,18 +48,23 @@ func NewCommand(cfg *config.Config) (*cobra.Command, error) { } cmdflags.AddGRPCServerFlagsToCommand(cmd, cfg) + if err := cmdflags.AddContainerDFlagsToCommand(cmd, cfg); err != nil { return nil, fmt.Errorf("adding containerd flags to run command: %w", err) } + if err := cmdflags.AddFirecrackerFlagsToCommand(cmd, cfg); err != nil { return nil, fmt.Errorf("adding firecracker flags to run command: %w", err) } + if err := cmdflags.AddNetworkFlagsToCommand(cmd, cfg); err != nil { return nil, fmt.Errorf("adding network flags to run command: %w", err) } + if err := cmdflags.AddHiddenFlagsToCommand(cmd, cfg); err != nil { return nil, fmt.Errorf("adding hidden flags to run command: %w", err) } + cmd.Flags().StringVar(&cfg.StateRootDir, "state-dir", defaults.StateRootDir, "The directory to use for the as the root for runtime state.") cmd.Flags().DurationVar(&cfg.ResyncPeriod, "resync-period", defaults.ResyncPeriod, "Reconcile the specs to resynchronise them based on this period.") @@ -79,8 +83,10 @@ func runServer(ctx context.Context, cfg *config.Config) error { if !cfg.DisableAPI { wg.Add(1) + go func() { defer wg.Done() + if err := serveAPI(ctx, cfg); err != nil { logger.Errorf("failed serving api: %v", err) } @@ -89,8 +95,10 @@ func runServer(ctx context.Context, cfg *config.Config) error { if !cfg.DisableReconcile { wg.Add(1) + go func() { defer wg.Done() + if err := runControllers(ctx, cfg); err != nil { logger.Errorf("failed running controllers: %v", err) } @@ -113,8 +121,9 @@ func serveAPI(ctx context.Context, cfg *config.Config) error { ports, err := inject.InitializePorts(cfg) if err != nil { - return fmt.Errorf("initializing ports for application: %w", err) + return fmt.Errorf("initialising ports for application: %w", err) } + app := inject.InitializeApp(cfg, ports) server := inject.InitializeGRPCServer(app) @@ -133,6 +142,7 @@ func serveAPI(ctx context.Context, cfg *config.Config) error { }() logger.Debugf("starting grpc server listening on endpoint %s", cfg.GRPCAPIEndpoint) + l, err := net.Listen("tcp", cfg.GRPCAPIEndpoint) if err != nil { return fmt.Errorf("setting up gRPC api listener: %w", err) @@ -142,7 +152,7 @@ func serveAPI(ctx context.Context, cfg *config.Config) error { reflection.Register(grpcServer) if err := grpcServer.Serve(l); err != nil { - logger.Fatalf("serving grpc api: %v", err) // TODO: remove this fatal + logger.Fatalf("serving grpc api: %v", err) // TODO: remove this fatal #235 } return nil @@ -153,12 +163,14 @@ func runControllers(ctx context.Context, cfg *config.Config) error { ports, err := inject.InitializePorts(cfg) if err != nil { - return fmt.Errorf("initializing ports for controller: %w", err) + return fmt.Errorf("initialising ports for controller: %w", err) } + app := inject.InitializeApp(cfg, ports) mvmControllers := inject.InializeController(app, ports) logger.Info("starting microvm controller") + if err := mvmControllers.Run(ctx, 1, cfg.ResyncPeriod, true); err != nil { logger.Fatalf("starting microvm controller: %v", err) } diff --git a/pkg/log/log.go b/pkg/log/log.go index 643504dab..ba3a84eb2 100644 --- a/pkg/log/log.go +++ b/pkg/log/log.go @@ -44,9 +44,11 @@ type Config struct { // Configure will configure the logger from the supplied config. func Configure(logConfig *Config) error { configureVerbosity(logConfig) + if err := configureFormatter(logConfig); err != nil { return fmt.Errorf("configuring log formatter: %w", err) } + if err := configureOutput(logConfig); err != nil { return fmt.Errorf("configuring log output: %w", err) } @@ -69,6 +71,7 @@ func configureFormatter(logConfig *Config) error { func configureVerbosity(logConfig *Config) { logrus.SetLevel(logrus.InfoLevel) + if logConfig.Verbosity >= LogVerbosityDebug && logConfig.Verbosity < LogVerbosityTrace { logrus.SetLevel(logrus.DebugLevel) } else if logConfig.Verbosity >= LogVerbosityTrace { @@ -90,6 +93,7 @@ func configureOutput(logConfig *Config) error { if err != nil { return fmt.Errorf("opening log file %s: %w", output, err) } + logrus.SetOutput(file) } diff --git a/pkg/planner/actuator.go b/pkg/planner/actuator.go index f65b32d2e..f9fe135d1 100644 --- a/pkg/planner/actuator.go +++ b/pkg/planner/actuator.go @@ -6,7 +6,6 @@ import ( "time" "github.com/sirupsen/logrus" - "github.com/weaveworks/flintlock/pkg/log" ) @@ -54,11 +53,13 @@ func (e *actuatorImpl) Execute(ctx context.Context, p Plan, executionID string) func (e *actuatorImpl) executePlan(ctx context.Context, p Plan, logger *logrus.Entry) (int, error) { numStepsExecuted := 0 + for { steps, err := p.Create(ctx) if err != nil { return numStepsExecuted, fmt.Errorf("creating plan for %s: %w", p.Name(), err) } + if len(steps) == 0 { logger.Debug("no more steps to execute") @@ -67,6 +68,7 @@ func (e *actuatorImpl) executePlan(ctx context.Context, p Plan, logger *logrus.E executed, err := e.react(ctx, steps, logger) numStepsExecuted += executed + if err != nil { return numStepsExecuted, fmt.Errorf("executing steps: %w", err) } @@ -75,6 +77,7 @@ func (e *actuatorImpl) executePlan(ctx context.Context, p Plan, logger *logrus.E func (e *actuatorImpl) react(ctx context.Context, steps []Procedure, logger *logrus.Entry) (int, error) { var childSteps []Procedure + numStepsExecuted := 0 for _, step := range steps { @@ -88,6 +91,7 @@ func (e *actuatorImpl) react(ctx context.Context, steps []Procedure, logger *log if err != nil { return numStepsExecuted, fmt.Errorf("checking if step %s should be executed: %w", step.Name(), err) } + if shouldDo { logger.WithField("step", step.Name()).Debug("execute step") @@ -99,9 +103,11 @@ func (e *actuatorImpl) react(ctx context.Context, steps []Procedure, logger *log } } } + if len(childSteps) > 0 { executed, err := e.react(ctx, childSteps, logger) numStepsExecuted += executed + if err != nil { return numStepsExecuted, err } diff --git a/pkg/queue/queue.go b/pkg/queue/queue.go index 44b58f05b..66900f28f 100644 --- a/pkg/queue/queue.go +++ b/pkg/queue/queue.go @@ -6,7 +6,8 @@ import ( "github.com/google/go-cmp/cmp" ) -// NOTE: this is heavily based on the workerqueue from client-go: https://github.com/kubernetes/client-go/blob/master/util/workqueue/queue.go +// NOTE: this is heavily based on the workerqueue from client-go: +// https://github.com/kubernetes/client-go/blob/master/util/workqueue/queue.go // Queue is the interface for a queue. type Queue interface { diff --git a/pkg/validation/validate.go b/pkg/validation/validate.go index cc815c688..6c41dcffe 100644 --- a/pkg/validation/validate.go +++ b/pkg/validation/validate.go @@ -21,7 +21,7 @@ type validate struct { func NewValidator() Validator { v := validator.New() - // TODO(@jmickey): Do something with this error maybe? + // TODO(@jmickey): Do something with this error maybe? #236 _ = v.RegisterValidation("imageURI", customImageURIValidator, false) _ = v.RegisterValidation("datetimeInPast", customTimestampValidator, false) _ = v.RegisterValidation("guestDeviceName", customNetworkGuestDeviceNameValidator, false) @@ -79,6 +79,7 @@ func customMicroVMSpecStructLevelValidation(sl validator.StructLevel) { // that a root device has been configured. var found bool + for _, vol := range spec.Volumes { if vol.IsRoot { if found { diff --git a/pkg/wait/wait.go b/pkg/wait/wait.go index f07c4ad36..695433484 100644 --- a/pkg/wait/wait.go +++ b/pkg/wait/wait.go @@ -17,15 +17,17 @@ type ConditionFunc func() (bool, error) // ForCondition will wait for the specified condition to be true until the max duration. func ForCondition(conditionFn ConditionFunc, maxWait time.Duration, checkInternal time.Duration) error { timeout := time.NewTimer(maxWait) + defer timeout.Stop() + checkTicker := time.NewTicker(checkInternal) defer checkTicker.Stop() - defer timeout.Stop() for { conditionMet, err := conditionFn() if err != nil { return fmt.Errorf("checking if condition met: %w", err) } + if conditionMet { return nil } diff --git a/test/e2e/utils/runner.go b/test/e2e/utils/runner.go index ecc155568..19450d604 100644 --- a/test/e2e/utils/runner.go +++ b/test/e2e/utils/runner.go @@ -109,6 +109,7 @@ func createThinPools() { scriptPath := filepath.Join(baseDir(), "hack", "scripts", "devpool.sh") command := exec.Command(scriptPath, thinpoolName, loopDeviceTag) session, err := gexec.Start(command, gk.GinkgoWriter, gk.GinkgoWriter) + gm.Expect(err).NotTo(gm.HaveOccurred()) gm.Eventually(session).Should(gexec.Exit(0)) } @@ -118,6 +119,7 @@ func cleanupThinPools() { cmd := exec.Command("losetup") loopDevices := grep(cmd, loopDeviceTag, 0) + for _, dev := range loopDevices { command := exec.Command("losetup", "-d", dev) session, err := gexec.Start(command, gk.GinkgoWriter, gk.GinkgoWriter) @@ -135,6 +137,7 @@ func writeContainerdConfig() { } pluginTree, err := toml.TreeFromMap(dmplug) gm.Expect(err).NotTo(gm.HaveOccurred()) + cfg := ccfg.Config{ Version: 2, //nolint:gomnd Root: containerdRootDir, @@ -155,13 +158,16 @@ func writeContainerdConfig() { f, err := os.Create(containerdCfg) gm.Expect(err).NotTo(gm.HaveOccurred()) + defer f.Close() + gm.Expect(toml.NewEncoder(f).Encode(cfg)).To(gm.Succeed()) } func (r *Runner) buildFLBinary() { flBin, err := gexec.Build(flintlockCmdDir) gm.Expect(err).NotTo(gm.HaveOccurred()) + r.flintlockdBin = flBin } @@ -169,27 +175,39 @@ func (r *Runner) startContainerd() { ctrdCmd := exec.Command(containerdBin, "--config", containerdCfg) ctrdSess, err := gexec.Start(ctrdCmd, gk.GinkgoWriter, gk.GinkgoWriter) gm.Expect(err).NotTo(gm.HaveOccurred()) + r.containerdSession = ctrdSess } func (r *Runner) startFlintlockd() { parentIface, err := getParentInterface() gm.Expect(err).NotTo(gm.HaveOccurred()) - flCmd := exec.Command(r.flintlockdBin, "run", "--containerd-socket", containerdSocket, "--parent-iface", parentIface) //nolint:gosec + + flCmd := exec.Command( + r.flintlockdBin, + "run", + "--containerd-socket", + containerdSocket, + "--parent-iface", + parentIface, + ) flSess, err := gexec.Start(flCmd, gk.GinkgoWriter, gk.GinkgoWriter) gm.Expect(err).NotTo(gm.HaveOccurred()) + r.flintlockdSession = flSess } func (r *Runner) dialGRPCServer() { conn, err := grpc.Dial(grpcDialTarget, grpc.WithInsecure(), grpc.WithBlock()) gm.Expect(err).NotTo(gm.HaveOccurred()) + r.flintlockdConn = conn } func getParentInterface() (string, error) { cmd := exec.Command("ip", "route", "show") iface := grep(cmd, "default", 4) //nolint:gomnd + if len(iface) == 0 { return "", errors.New("parent interface not found") //nolint:goerr113 } @@ -200,8 +218,10 @@ func getParentInterface() (string, error) { func grep(cmd *exec.Cmd, match string, loc int) []string { output, err := cmd.Output() gm.Expect(err).NotTo(gm.HaveOccurred()) + scanner := bufio.NewScanner(strings.NewReader(string(output))) out := []string{} + for scanner.Scan() { line := scanner.Text() if strings.Contains(line, match) {