Skip to content

Commit

Permalink
Support pulling Tart VM images that contain Linux VMs (#20)
Browse files Browse the repository at this point in the history
* Fix logic bug in "vetu pull" causing pulls if remote name already exists

* Support pulling Tart VM images that contain Linux VMs

* remote.gc(): remove debugging statements that were used in development

* Tart OCI puller: inject a single disk to the VM's configuration

* De-duplicate pullBlob()

* Fetch latest Hypervisor Firmware when pulling Tart VM image

* applestream: prevent high memory consumption

* Use a separate struct for Tart VM's config and validate it

* vetu run: validate VM's architecture before running

* LZ4 dictionary needs to be updated for the uncompressed blocks too

* Update LZ4 dictionary uniformly for compressed and uncompressed blocks

* Use EDK2 firmware instead of Rust Hypervisor Firmware

To work around the lack of arm64 binary[1].

[1]: cloud-hypervisor/rust-hypervisor-firmware#294

* .goreleaser.yml: require {,edk2-}cloud-hypervisor packages

* .devcontainer/Dockerfile: install edk2-cloud-hypervisor

* vetu run: Cloud Hypervisor only supports a single --disk argument

* Remove Cloud Init config

We have distro-specific configs in cirruslabs/linux-image-templates now.
  • Loading branch information
edigaryev authored Nov 16, 2023
1 parent 24e1871 commit f41c506
Show file tree
Hide file tree
Showing 24 changed files with 23,201 additions and 300 deletions.
2 changes: 1 addition & 1 deletion .devcontainer/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ RUN apt-get update && apt-get -y install qemu-utils libcap2-bin iputils-ping gen
# Install Cloud Hypervisor
RUN echo 'deb http://download.opensuse.org/repositories/home:/cloud-hypervisor/Debian_12/ /' | tee /etc/apt/sources.list.d/home:cloud-hypervisor.list
RUN curl -fsSL https://download.opensuse.org/repositories/home:cloud-hypervisor/Debian_12/Release.key | apt-key add -
RUN apt-get update && apt-get -y install cloud-hypervisor
RUN apt-get update && apt-get -y install cloud-hypervisor edk2-cloud-hypervisor
3 changes: 3 additions & 0 deletions .goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ nfpms:
maintainer: [email protected]
description: CLI for executing Cirrus tasks locally and in any CI
section: misc
dependencies:
- cloud-hypervisor
- edk2-cloud-hypervisor
formats:
- deb
- rpm
Expand Down
2 changes: 1 addition & 1 deletion internal/command/pull/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func runPull(cmd *cobra.Command, args []string) error {
fullyQualifiedRemoteName.Digest = manifest.GetDescriptor().Digest

// Pull the VM image if we don't have one already in cache
if remote.Exists(fullyQualifiedRemoteName) {
if !remote.Exists(fullyQualifiedRemoteName) {
if err := oci.PullVMDirectory(cmd.Context(), client, reference, manifest, vmDir, int(concurrency)); err != nil {
return err
}
Expand Down
19 changes: 16 additions & 3 deletions internal/command/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ import (
"github.com/cirruslabs/vetu/internal/network/bridged"
"github.com/cirruslabs/vetu/internal/network/software"
"github.com/cirruslabs/vetu/internal/storage/local"
"github.com/cirruslabs/vetu/internal/vmconfig"
"github.com/samber/lo"
"github.com/spf13/cobra"
"os"
"path/filepath"
"runtime"
"strings"
)

Expand Down Expand Up @@ -46,6 +49,12 @@ func runRun(cmd *cobra.Command, args []string) error {

vmConfig := vmDir.Config()

// Validate VM's architecture
if vmConfig.Arch != runtime.GOARCH {
return fmt.Errorf("this VM is built to run on %q, but you're running %q",
vmConfig.Arch, runtime.GOARCH)
}

// Initialize network
var network network.Network

Expand Down Expand Up @@ -75,9 +84,13 @@ func runRun(cmd *cobra.Command, args []string) error {
}

// Disks
for _, disk := range vmConfig.Disks {
targetDiskPath := filepath.Join(vmDir.Path(), disk.Name)
hvArgs = append(hvArgs, "--disk", fmt.Sprintf("path=%s", targetDiskPath))
diskArguments := lo.Map(vmConfig.Disks, func(disk vmconfig.Disk, index int) string {
path := filepath.Join(vmDir.Path(), disk.Name)
return fmt.Sprintf("path=%s", path)
})
if len(diskArguments) != 0 {
hvArgs = append(hvArgs, "--disk")
hvArgs = append(hvArgs, diskArguments...)
}

// CPU and memory
Expand Down
10 changes: 10 additions & 0 deletions internal/oci/annotations/annotations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package annotations

const (
AnnotationName = "org.cirruslabs.vetu.name"
AnnotationUncompressedSize = "org.cirruslabs.vetu.uncompressed-size"
AnnotationUncompressedDigest = "org.cirruslabs.vetu.uncompressed-digest"

AnnotationTartUncompressedSize = "org.cirruslabs.tart.uncompressed-size"
AnnotationTartUncompressedDigest = "org.cirruslabs.tart.uncompressed-content-digest"
)
177 changes: 177 additions & 0 deletions internal/oci/diskpuller/diskpuller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package diskpuller

import (
"context"
"fmt"
"github.com/cirruslabs/vetu/internal/vmdirectory"
"github.com/dustin/go-humanize"
"github.com/regclient/regclient"
"github.com/regclient/regclient/types"
"github.com/regclient/regclient/types/ref"
"github.com/samber/lo"
"github.com/schollz/progressbar/v3"
"io"
"os"
"path/filepath"
"strconv"
"sync"
)

type NameFromDiskDescriptorFunc func(diskDescriptor types.Descriptor) (string, error)
type InitializeDecompressorFunc func(compressedReader io.Reader) io.Reader

type diskTask struct {
Desc types.Descriptor
Path string
Offset int64
}

func PullDisks(
ctx context.Context,
client *regclient.RegClient,
reference ref.Ref,
vmDir *vmdirectory.VMDirectory,
concurrency int,
disks []types.Descriptor,
nameFromDiskDescriptor NameFromDiskDescriptorFunc,
uncompressedSizeAnnotation string,
initializeDecompressor InitializeDecompressorFunc,
) error {
// Process VM's disks by converting them into
// disk tasks for further parallel processing
diskTaskCh := make(chan *diskTask, len(disks))
diskNameToOffset := map[string]int64{}

for _, disk := range disks {
// Extract name
diskName, err := nameFromDiskDescriptor(disk)
if err != nil {
return err
}

// Extract and parse uncompressed size
uncompressedSizeRaw, ok := disk.Annotations[uncompressedSizeAnnotation]
if !ok {
return fmt.Errorf("disk layer has no %s annotation", uncompressedSizeAnnotation)
}
uncompressedSize, err := strconv.ParseInt(uncompressedSizeRaw, 10, 64)
if err != nil {
return err
}

diskTaskCh <- &diskTask{
Desc: disk,
Path: filepath.Join(vmDir.Path(), diskName),
Offset: diskNameToOffset[diskName],
}

diskNameToOffset[diskName] += uncompressedSize
}

// There will be no more disk tasks
close(diskTaskCh)

// Pre-create and truncate disk files
for diskName, offset := range diskNameToOffset {
diskFile, err := os.OpenFile(filepath.Join(vmDir.Path(), diskName), os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
return err
}

if err := diskFile.Truncate(offset); err != nil {
return err
}

if err := diskFile.Close(); err != nil {
return err
}
}

// Process disk tasks with the specified concurrency
totalUncompressedDisksSizeBytes := lo.Sum(lo.Values(diskNameToOffset))
totalCompressedDisksSizeBytes := lo.Sum(lo.Map(disks, func(diskDesc types.Descriptor, index int) int64 {
return diskDesc.Size
}))
fmt.Printf("pulling %d disk(s) (%s compressed, %s uncompressed)...\n", len(diskNameToOffset),
humanize.Bytes(uint64(totalCompressedDisksSizeBytes)),
humanize.Bytes(uint64(totalUncompressedDisksSizeBytes)))

progressBar := progressbar.DefaultBytes(totalCompressedDisksSizeBytes)

var wg sync.WaitGroup
wg.Add(concurrency)

diskTasksErrCh := make(chan error, concurrency)

diskTasksCtx, diskTasksCtxCancel := context.WithCancel(ctx)
defer diskTasksCtxCancel()

for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()

for {
diskTask, ok := <-diskTaskCh
if !ok {
return
}

if err := diskTask.process(diskTasksCtx, client, reference, progressBar, initializeDecompressor); err != nil {
diskTasksErrCh <- err
diskTasksCtxCancel()
}
}
}()
}

// Wait for the disk tasks to finish
wg.Wait()

// Since we've finished with pulling disks,
// we can finish the associated progress bar
if err := progressBar.Finish(); err != nil {
return err
}

// Check for errors
select {
case err := <-diskTasksErrCh:
return err
default:
return nil
}
}

func (diskTask *diskTask) process(
ctx context.Context,
client *regclient.RegClient,
reference ref.Ref,
progressBar *progressbar.ProgressBar,
initializeDecompressor InitializeDecompressorFunc,
) error {
// Open disk file and seek to the specified offset
diskFile, err := os.OpenFile(diskTask.Path, os.O_WRONLY, 0600)
if err != nil {
return err
}
if _, err := diskFile.Seek(diskTask.Offset, io.SeekStart); err != nil {
return err
}

// Pull disk layer from the OCI registry
blobReader, err := client.BlobGet(ctx, reference, diskTask.Desc)
if err != nil {
return err
}
defer blobReader.Close()

// Decompress the disk data on-the-fly and write it to the disk file
progressBarReader := progressbar.NewReader(blobReader, progressBar)
decompressor := initializeDecompressor(&progressBarReader)

if _, err := io.Copy(diskFile, decompressor); err != nil {
return err
}

return diskFile.Close()
}
11 changes: 11 additions & 0 deletions internal/oci/mediatypes/mediatypes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package mediatypes

const (
MediaTypeConfig = "application/vnd.cirruslabs.vetu.config.v1"
MediaTypeKernel = "application/vnd.cirruslabs.vetu.kernel.v1"
MediaTypeInitramfs = "application/vnd.cirruslabs.vetu.initramfs.v1"
MediaTypeDisk = "application/vnd.cirruslabs.vetu.disk.v1"

MediaTypeTartConfig = "application/vnd.cirruslabs.tart.config.v1"
MediaTypeTartDisk = "application/vnd.cirruslabs.tart.disk.v2"
)
16 changes: 0 additions & 16 deletions internal/oci/oci.go

This file was deleted.

Loading

0 comments on commit f41c506

Please sign in to comment.