Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Package Caching #60

Merged
merged 12 commits into from
Feb 1, 2024
12 changes: 9 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
[package]
name = "omicron-zone-package"
version = "0.10.1"
version = "0.11.0"
authors = ["Sean Klein <[email protected]>"]
edition = "2018"
edition = "2021"
#
# Report a specific error in the case that the toolchain is too old for
# let-else:
Expand All @@ -15,18 +15,24 @@ description = "Packaging tools for Oxide's control plane software"
[dependencies]
anyhow = "1.0"
async-trait = "0.1.67"
blake3 = { version = "1.5", features = ["mmap", "rayon"] }
camino = { version = "1.1", features = ["serde1"] }
camino-tempfile = "1.1"
chrono = "0.4.24"
filetime = "0.2"
flate2 = "1.0.25"
futures = "0.3"
futures-util = "0.3"
hex = "0.4.3"
once_cell = "1.19"
steveklabnik marked this conversation as resolved.
Show resolved Hide resolved
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls", "stream"] }
ring = "0.16.20"
semver = { version = "1.0.17", features = ["std", "serde"] }
serde = { version = "1.0", features = [ "derive" ] }
serde_derive = "1.0"
serde_json = "1.0"
slog = "2.7"
tar = "0.4"
tempfile = "3.4"
thiserror = "1.0"
tokio = { version = "1.26", features = [ "full" ] }
toml = "0.7.3"
Expand Down
154 changes: 154 additions & 0 deletions src/archive.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

//! Tools for creating and inserting into tarballs.

use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use camino::Utf8Path;
use flate2::write::GzEncoder;
use std::convert::TryInto;
use std::fs::{File, OpenOptions};
use tar::Builder;

#[async_trait]
pub trait AsyncAppendFile {
async fn append_file_async<P>(&mut self, path: P, file: &mut File) -> std::io::Result<()>
steveklabnik marked this conversation as resolved.
Show resolved Hide resolved
where
P: AsRef<Utf8Path> + Send;

async fn append_path_with_name_async<P, N>(&mut self, path: P, name: N) -> std::io::Result<()>
where
P: AsRef<Utf8Path> + Send,
N: AsRef<Utf8Path> + Send;

async fn append_dir_all_async<P, Q>(&mut self, path: P, src_path: Q) -> std::io::Result<()>
where
P: AsRef<Utf8Path> + Send,
Q: AsRef<Utf8Path> + Send;
}

#[async_trait]
impl<W: Encoder> AsyncAppendFile for Builder<W> {
async fn append_file_async<P>(&mut self, path: P, file: &mut File) -> std::io::Result<()>
where
P: AsRef<Utf8Path> + Send,
{
tokio::task::block_in_place(move || self.append_file(path.as_ref(), file))
steveklabnik marked this conversation as resolved.
Show resolved Hide resolved
}

async fn append_path_with_name_async<P, N>(&mut self, path: P, name: N) -> std::io::Result<()>
where
P: AsRef<Utf8Path> + Send,
N: AsRef<Utf8Path> + Send,
{
tokio::task::block_in_place(move || {
self.append_path_with_name(path.as_ref(), name.as_ref())
})
}

async fn append_dir_all_async<P, Q>(&mut self, path: P, src_path: Q) -> std::io::Result<()>
where
P: AsRef<Utf8Path> + Send,
Q: AsRef<Utf8Path> + Send,
{
tokio::task::block_in_place(move || self.append_dir_all(path.as_ref(), src_path.as_ref()))
}
}

/// Helper to open a tarfile for reading/writing.
pub fn create_tarfile<P: AsRef<Utf8Path> + std::fmt::Debug>(tarfile: P) -> Result<File> {
OpenOptions::new()
.write(true)
.read(true)
.truncate(true)
.create(true)
.open(tarfile.as_ref())
.map_err(|err| anyhow!("Cannot create tarfile {:?}: {}", tarfile, err))
}

/// Helper to open a tarfile for reading.
pub fn open_tarfile<P: AsRef<Utf8Path> + std::fmt::Debug>(tarfile: P) -> Result<File> {
OpenOptions::new()
.read(true)
.open(tarfile.as_ref())
.map_err(|err| anyhow!("Cannot open tarfile {:?}: {}", tarfile, err))
}

pub trait Encoder: std::io::Write + Send {}
impl<T> Encoder for T where T: std::io::Write + Send {}

pub struct ArchiveBuilder<E: Encoder> {
pub builder: tar::Builder<E>,
}

impl<E: Encoder> ArchiveBuilder<E> {
pub fn new(builder: tar::Builder<E>) -> Self {
Self { builder }
}

pub fn into_inner(self) -> Result<E> {
self.builder
.into_inner()
.with_context(|| "Finalizing archive")
steveklabnik marked this conversation as resolved.
Show resolved Hide resolved
}
}

/// Adds a package at `package_path` to a new zone image
/// being built using the `archive` builder.
pub async fn add_package_to_zone_archive<E: Encoder>(
archive: &mut ArchiveBuilder<E>,
package_path: &Utf8Path,
) -> Result<()> {
let tmp = camino_tempfile::tempdir()?;
let gzr = flate2::read::GzDecoder::new(open_tarfile(package_path)?);
if gzr.header().is_none() {
return Err(anyhow!(
steveklabnik marked this conversation as resolved.
Show resolved Hide resolved
"Missing gzip header from {} - cannot add it to zone image",
package_path,
));
}
let mut component_reader = tar::Archive::new(gzr);
let entries = component_reader.entries()?;

// First, unpack the existing entries
for entry in entries {
let mut entry = entry?;

// Ignore the JSON header files
let entry_path = entry.path()?;
if entry_path == Utf8Path::new("oxide.json") {
continue;
}

let entry_path: &Utf8Path = entry_path.strip_prefix("root/")?.try_into()?;
let entry_unpack_path = tmp.path().join(entry_path);
entry.unpack(&entry_unpack_path)?;

let entry_path = entry.path()?.into_owned();
let entry_path: &Utf8Path = entry_path.as_path().try_into()?;
assert!(entry_unpack_path.exists());

archive
.builder
.append_path_with_name_async(entry_unpack_path, entry_path)
.await?;
}
Ok(())
}

pub async fn new_compressed_archive_builder(
path: &Utf8Path,
) -> Result<ArchiveBuilder<GzEncoder<File>>> {
let file = create_tarfile(path)?;
// TODO: Consider using async compression, async tar.
// It's not the *worst* thing in the world for a packaging tool to block
// here, but it would help the other async threads remain responsive if
// we avoided blocking.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 full agreement that this isn't needed but could be cool (I know this comment was moved from old code, but imho it's still true and so worth remaking on)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally! With the block_on calls this is slightly mitigated, but it still has a small cost.

let gzw = GzEncoder::new(file, flate2::Compression::fast());
let mut archive = Builder::new(gzw);
archive.mode(tar::HeaderMode::Deterministic);

Ok(ArchiveBuilder::new(archive))
}
37 changes: 20 additions & 17 deletions src/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
//! Tools for downloading blobs

use anyhow::{anyhow, Context, Result};
use camino::{Utf8Path, Utf8PathBuf};
use chrono::{DateTime, FixedOffset, Utc};
use futures_util::StreamExt;
use reqwest::header::{CONTENT_LENGTH, LAST_MODIFIED};
use ring::digest::{Context as DigestContext, Digest, SHA256};
use std::path::{Path, PathBuf};
use serde::{Deserialize, Serialize};
use std::str::FromStr;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};

Expand All @@ -20,16 +21,16 @@ const S3_BUCKET: &str = "https://oxide-omicron-build.s3.amazonaws.com";
// Name for the directory component where downloaded blobs are stored.
pub(crate) const BLOB: &str = "blob";

#[derive(Debug)]
pub enum Source<'a> {
S3(&'a PathBuf),
Buildomat(&'a crate::package::PrebuiltBlob),
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub enum Source {
S3(Utf8PathBuf),
Buildomat(crate::package::PrebuiltBlob),
}

impl<'a> Source<'a> {
impl Source {
pub(crate) fn get_url(&self) -> String {
match self {
Self::S3(s) => format!("{}/{}", S3_BUCKET, s.to_string_lossy()),
Self::S3(s) => format!("{}/{}", S3_BUCKET, s),
Self::Buildomat(spec) => {
format!(
"https://buildomat.eng.oxide.computer/public/file/oxidecomputer/{}/{}/{}/{}",
Expand All @@ -43,7 +44,7 @@ impl<'a> Source<'a> {
&self,
url: &str,
client: &reqwest::Client,
destination: &Path,
destination: &Utf8Path,
) -> Result<bool> {
if !destination.exists() {
return Ok(true);
Expand Down Expand Up @@ -90,14 +91,16 @@ impl<'a> Source<'a> {
}

// Downloads "source" from S3_BUCKET to "destination".
pub async fn download<'a>(
progress: &impl Progress,
source: &Source<'a>,
destination: &Path,
pub async fn download(
progress: &dyn Progress,
source: &Source,
destination: &Utf8Path,
) -> Result<()> {
let blob = destination
.file_name()
.ok_or_else(|| anyhow!("missing blob filename"))?;
.as_ref()
.ok_or_else(|| anyhow!("missing blob filename"))?
.to_string();

let url = source.get_url();
let client = reqwest::Client::new();
Expand Down Expand Up @@ -134,15 +137,15 @@ pub async fn download<'a>(
let blob_progress = if let Some(length) = content_length {
progress.sub_progress(length)
} else {
Box::new(NoProgress)
Box::new(NoProgress::new())
};
blob_progress.set_message(blob.to_string_lossy().into_owned().into());
blob_progress.set_message(blob.into());

let mut stream = response.bytes_stream();
while let Some(chunk) = stream.next().await {
let chunk = chunk?;
file.write_all(&chunk).await?;
blob_progress.increment(chunk.len() as u64);
blob_progress.increment_completed(chunk.len() as u64);
}
drop(blob_progress);

Expand Down Expand Up @@ -170,7 +173,7 @@ pub async fn download<'a>(
Ok(())
}

async fn get_sha256_digest(path: &Path) -> Result<Digest> {
async fn get_sha256_digest(path: &Utf8Path) -> Result<Digest> {
let mut reader = BufReader::new(
tokio::fs::File::open(path)
.await
Expand Down
Loading
Loading