Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support pulling Tart VM images that contain Linux VMs #20

Merged
merged 16 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .devcontainer/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ RUN apt-get update && apt-get -y install qemu-utils libcap2-bin iputils-ping gen
# Install Cloud Hypervisor
RUN echo 'deb http://download.opensuse.org/repositories/home:/cloud-hypervisor/Debian_12/ /' | tee /etc/apt/sources.list.d/home:cloud-hypervisor.list
RUN curl -fsSL https://download.opensuse.org/repositories/home:cloud-hypervisor/Debian_12/Release.key | apt-key add -
RUN apt-get update && apt-get -y install cloud-hypervisor
RUN apt-get update && apt-get -y install cloud-hypervisor edk2-cloud-hypervisor
3 changes: 3 additions & 0 deletions .goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ nfpms:
maintainer: [email protected]
description: CLI for executing Cirrus tasks locally and in any CI
section: misc
dependencies:
- cloud-hypervisor
- edk2-cloud-hypervisor
formats:
- deb
- rpm
Expand Down
2 changes: 1 addition & 1 deletion internal/command/pull/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func runPull(cmd *cobra.Command, args []string) error {
fullyQualifiedRemoteName.Digest = manifest.GetDescriptor().Digest

// Pull the VM image if we don't have one already in cache
if remote.Exists(fullyQualifiedRemoteName) {
if !remote.Exists(fullyQualifiedRemoteName) {
if err := oci.PullVMDirectory(cmd.Context(), client, reference, manifest, vmDir, int(concurrency)); err != nil {
return err
}
Expand Down
19 changes: 16 additions & 3 deletions internal/command/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ import (
"github.com/cirruslabs/vetu/internal/network/bridged"
"github.com/cirruslabs/vetu/internal/network/software"
"github.com/cirruslabs/vetu/internal/storage/local"
"github.com/cirruslabs/vetu/internal/vmconfig"
"github.com/samber/lo"
"github.com/spf13/cobra"
"os"
"path/filepath"
"runtime"
"strings"
)

Expand Down Expand Up @@ -46,6 +49,12 @@ func runRun(cmd *cobra.Command, args []string) error {

vmConfig := vmDir.Config()

// Validate VM's architecture
if vmConfig.Arch != runtime.GOARCH {
return fmt.Errorf("this VM is built to run on %q, but you're running %q",
vmConfig.Arch, runtime.GOARCH)
}

// Initialize network
var network network.Network

Expand Down Expand Up @@ -75,9 +84,13 @@ func runRun(cmd *cobra.Command, args []string) error {
}

// Disks
for _, disk := range vmConfig.Disks {
targetDiskPath := filepath.Join(vmDir.Path(), disk.Name)
hvArgs = append(hvArgs, "--disk", fmt.Sprintf("path=%s", targetDiskPath))
diskArguments := lo.Map(vmConfig.Disks, func(disk vmconfig.Disk, index int) string {
path := filepath.Join(vmDir.Path(), disk.Name)
return fmt.Sprintf("path=%s", path)
})
if len(diskArguments) != 0 {
hvArgs = append(hvArgs, "--disk")
hvArgs = append(hvArgs, diskArguments...)
}

// CPU and memory
Expand Down
10 changes: 10 additions & 0 deletions internal/oci/annotations/annotations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package annotations

const (
AnnotationName = "org.cirruslabs.vetu.name"
AnnotationUncompressedSize = "org.cirruslabs.vetu.uncompressed-size"
AnnotationUncompressedDigest = "org.cirruslabs.vetu.uncompressed-digest"

AnnotationTartUncompressedSize = "org.cirruslabs.tart.uncompressed-size"
AnnotationTartUncompressedDigest = "org.cirruslabs.tart.uncompressed-content-digest"
)
177 changes: 177 additions & 0 deletions internal/oci/diskpuller/diskpuller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package diskpuller

import (
"context"
"fmt"
"github.com/cirruslabs/vetu/internal/vmdirectory"
"github.com/dustin/go-humanize"
"github.com/regclient/regclient"
"github.com/regclient/regclient/types"
"github.com/regclient/regclient/types/ref"
"github.com/samber/lo"
"github.com/schollz/progressbar/v3"
"io"
"os"
"path/filepath"
"strconv"
"sync"
)

type NameFromDiskDescriptorFunc func(diskDescriptor types.Descriptor) (string, error)
type InitializeDecompressorFunc func(compressedReader io.Reader) io.Reader

type diskTask struct {
Desc types.Descriptor
Path string
Offset int64
}

func PullDisks(
ctx context.Context,
client *regclient.RegClient,
reference ref.Ref,
vmDir *vmdirectory.VMDirectory,
concurrency int,
disks []types.Descriptor,
nameFromDiskDescriptor NameFromDiskDescriptorFunc,
uncompressedSizeAnnotation string,
initializeDecompressor InitializeDecompressorFunc,
) error {
// Process VM's disks by converting them into
// disk tasks for further parallel processing
diskTaskCh := make(chan *diskTask, len(disks))
diskNameToOffset := map[string]int64{}

for _, disk := range disks {
// Extract name
diskName, err := nameFromDiskDescriptor(disk)
if err != nil {
return err
}

// Extract and parse uncompressed size
uncompressedSizeRaw, ok := disk.Annotations[uncompressedSizeAnnotation]
if !ok {
return fmt.Errorf("disk layer has no %s annotation", uncompressedSizeAnnotation)
}
uncompressedSize, err := strconv.ParseInt(uncompressedSizeRaw, 10, 64)
if err != nil {
return err
}

diskTaskCh <- &diskTask{
Desc: disk,
Path: filepath.Join(vmDir.Path(), diskName),
Offset: diskNameToOffset[diskName],
}

diskNameToOffset[diskName] += uncompressedSize
}

// There will be no more disk tasks
close(diskTaskCh)

// Pre-create and truncate disk files
for diskName, offset := range diskNameToOffset {
diskFile, err := os.OpenFile(filepath.Join(vmDir.Path(), diskName), os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
return err
}

if err := diskFile.Truncate(offset); err != nil {
return err
}

if err := diskFile.Close(); err != nil {
return err
}
}

// Process disk tasks with the specified concurrency
totalUncompressedDisksSizeBytes := lo.Sum(lo.Values(diskNameToOffset))
totalCompressedDisksSizeBytes := lo.Sum(lo.Map(disks, func(diskDesc types.Descriptor, index int) int64 {
return diskDesc.Size
}))
fmt.Printf("pulling %d disk(s) (%s compressed, %s uncompressed)...\n", len(diskNameToOffset),
humanize.Bytes(uint64(totalCompressedDisksSizeBytes)),
humanize.Bytes(uint64(totalUncompressedDisksSizeBytes)))

progressBar := progressbar.DefaultBytes(totalCompressedDisksSizeBytes)

var wg sync.WaitGroup
wg.Add(concurrency)

diskTasksErrCh := make(chan error, concurrency)

diskTasksCtx, diskTasksCtxCancel := context.WithCancel(ctx)
defer diskTasksCtxCancel()

for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()

for {
diskTask, ok := <-diskTaskCh
if !ok {
return
}

if err := diskTask.process(diskTasksCtx, client, reference, progressBar, initializeDecompressor); err != nil {
diskTasksErrCh <- err
diskTasksCtxCancel()
}
}
}()
}

// Wait for the disk tasks to finish
wg.Wait()

// Since we've finished with pulling disks,
// we can finish the associated progress bar
if err := progressBar.Finish(); err != nil {
return err
}

// Check for errors
select {
case err := <-diskTasksErrCh:
return err
default:
return nil
}
}

func (diskTask *diskTask) process(
ctx context.Context,
client *regclient.RegClient,
reference ref.Ref,
progressBar *progressbar.ProgressBar,
initializeDecompressor InitializeDecompressorFunc,
) error {
// Open disk file and seek to the specified offset
diskFile, err := os.OpenFile(diskTask.Path, os.O_WRONLY, 0600)
if err != nil {
return err
}
if _, err := diskFile.Seek(diskTask.Offset, io.SeekStart); err != nil {
return err
}

// Pull disk layer from the OCI registry
blobReader, err := client.BlobGet(ctx, reference, diskTask.Desc)
if err != nil {
return err
}
defer blobReader.Close()

// Decompress the disk data on-the-fly and write it to the disk file
progressBarReader := progressbar.NewReader(blobReader, progressBar)
decompressor := initializeDecompressor(&progressBarReader)

if _, err := io.Copy(diskFile, decompressor); err != nil {
return err
}

return diskFile.Close()
}
11 changes: 11 additions & 0 deletions internal/oci/mediatypes/mediatypes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package mediatypes

const (
MediaTypeConfig = "application/vnd.cirruslabs.vetu.config.v1"
MediaTypeKernel = "application/vnd.cirruslabs.vetu.kernel.v1"
MediaTypeInitramfs = "application/vnd.cirruslabs.vetu.initramfs.v1"
MediaTypeDisk = "application/vnd.cirruslabs.vetu.disk.v1"

MediaTypeTartConfig = "application/vnd.cirruslabs.tart.config.v1"
MediaTypeTartDisk = "application/vnd.cirruslabs.tart.disk.v2"
)
16 changes: 0 additions & 16 deletions internal/oci/oci.go

This file was deleted.

Loading