From cd2d23b9e852885ba55c9a4790f116700483b326 Mon Sep 17 00:00:00 2001 From: "oxide-renovate[bot]" <146848827+oxide-renovate[bot]@users.noreply.github.com> Date: Mon, 20 Nov 2023 22:20:33 +0000 Subject: [PATCH] Update Rust crate tough to 0.15 (#4477) Co-authored-by: Rain --- Cargo.lock | 60 +++-- Cargo.toml | 4 +- nexus/Cargo.toml | 1 + nexus/src/app/update/mod.rs | 16 +- nexus/src/updates.rs | 24 +- nexus/tests/integration_tests/updates.rs | 38 ++- tufaceous-lib/Cargo.toml | 4 + tufaceous-lib/src/artifact.rs | 14 +- tufaceous-lib/src/assemble/build.rs | 15 +- tufaceous-lib/src/key.rs | 13 +- tufaceous-lib/src/repository.rs | 67 +++-- tufaceous-lib/src/root.rs | 5 +- tufaceous/Cargo.toml | 2 + tufaceous/src/dispatch.rs | 20 +- tufaceous/src/main.rs | 5 +- .../tests/integration-tests/command_tests.rs | 8 +- wicketd/Cargo.toml | 1 + wicketd/src/artifacts/artifacts_with_plan.rs | 35 ++- wicketd/src/artifacts/error.rs | 8 + wicketd/src/artifacts/extracted_artifacts.rs | 41 ++- wicketd/src/artifacts/store.rs | 9 +- wicketd/src/artifacts/update_plan.rs | 239 +++++++++++++----- wicketd/tests/integration_tests/updates.rs | 18 +- workspace-hack/Cargo.toml | 4 + 24 files changed, 434 insertions(+), 217 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 113bc6f003..c8cfe908c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -259,6 +259,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "async-recursion" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fd55a5ba1179988837d24ab4c7cc8ed6efdeff578ede0416b4225a5fca35bd0" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.32", +] + [[package]] name = "async-stream" version = "0.3.5" @@ -4577,6 +4588,7 @@ dependencies = [ "async-bb8-diesel", "async-trait", "base64 0.21.5", + "buf-list", "camino", "cancel-safe-futures", "chrono", @@ -4974,6 +4986,7 @@ dependencies = [ "signature 2.1.0", "similar", "slog", + "snafu", "spin 0.9.8", "string_cache", "subtle", @@ -4984,6 +4997,7 @@ dependencies = [ "tokio", "tokio-postgres", "tokio-stream", + "tokio-util", "toml 0.7.8", "toml_datetime", "toml_edit 0.19.15", @@ -5548,24 +5562,6 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "de3145af08024dea9fa9914f381a17b8fc6034dfb00f3a84013f7ff43f29ed4c" -[[package]] -name = "path-absolutize" -version = "3.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43eb3595c63a214e1b37b44f44b0a84900ef7ae0b4c5efce59e123d246d7a0de" -dependencies = [ - "path-dedot", -] - -[[package]] -name = "path-dedot" -version = "3.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d55e486337acb9973cdea3ec5638c1b3bcb22e573b2b7b41969e0c744d5a15e" -dependencies = [ - "once_cell", -] - [[package]] name = "path-slash" version = "0.1.5" @@ -7753,6 +7749,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e4de37ad025c587a29e8f3f5605c00f70b98715ef90b9061a815b9e59e9042d6" dependencies = [ "doc-comment", + "futures-core", + "pin-project", "snafu-derive", ] @@ -8649,18 +8647,22 @@ checksum = "ea68304e134ecd095ac6c3574494fc62b909f416c4fca77e440530221e549d3d" [[package]] name = "tough" -version = "0.14.0" +version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eda3efa9005cf9c1966984c3b9a44c3f37b7ed2c95ba338d6ad51bba70e989a0" +checksum = "d16dc5f42fc7ce7cb51eebc7a6ef91f4d69a6d41bb13f34a09674ec47e454d9b" dependencies = [ + "async-recursion", + "async-trait", + "bytes", "chrono", "dyn-clone", + "futures", + "futures-core", "globset", "hex", "log", "olpc-cjson", - "path-absolutize", - "pem 1.1.1", + "pem 3.0.2", "percent-encoding", "reqwest", "ring 0.16.20", @@ -8669,6 +8671,9 @@ dependencies = [ "serde_plain", "snafu", "tempfile", + "tokio", + "tokio-util", + "typed-path", "untrusted 0.7.1", "url", "walkdir", @@ -8843,6 +8848,7 @@ dependencies = [ "slog-envlogger", "slog-term", "tempfile", + "tokio", "tufaceous-lib", ] @@ -8851,6 +8857,7 @@ name = "tufaceous-lib" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "buf-list", "bytes", "bytesize", @@ -8860,6 +8867,7 @@ dependencies = [ "debug-ignore", "flate2", "fs-err", + "futures", "hex", "hubtools", "itertools 0.12.0", @@ -8874,6 +8882,7 @@ dependencies = [ "sha2", "slog", "tar", + "tokio", "toml 0.8.8", "tough", "url", @@ -8928,6 +8937,12 @@ dependencies = [ "utf-8", ] +[[package]] +name = "typed-path" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb9d13b8242894ff21f9990082b90a6410a43dcc6029ac4227a1467853ba781" + [[package]] name = "typenum" version = "1.16.0" @@ -9511,6 +9526,7 @@ dependencies = [ "async-trait", "base64 0.21.5", "bootstrap-agent-client", + "buf-list", "bytes", "camino", "camino-tempfile", diff --git a/Cargo.toml b/Cargo.toml index b55c4fca6a..b18b20aec7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -367,11 +367,11 @@ tokio = "1.33.0" tokio-postgres = { version = "0.7", features = [ "with-chrono-0_4", "with-uuid-1" ] } tokio-stream = "0.1.14" tokio-tungstenite = "0.18" -tokio-util = "0.7.10" +tokio-util = { version = "0.7.10", features = ["io", "io-util"] } toml = "0.8.8" toml_edit = "0.21.0" topological-sort = "0.2.2" -tough = { version = "0.14", features = [ "http" ] } +tough = { version = "0.15", features = [ "http" ] } trust-dns-client = "0.22" trust-dns-proto = "0.22" trust-dns-resolver = "0.22" diff --git a/nexus/Cargo.toml b/nexus/Cargo.toml index 4fc13a31d8..704a7ab7bd 100644 --- a/nexus/Cargo.toml +++ b/nexus/Cargo.toml @@ -12,6 +12,7 @@ anyhow.workspace = true assert_matches.workspace = true async-trait.workspace = true base64.workspace = true +buf-list.workspace = true cancel-safe-futures.workspace = true camino.workspace = true clap.workspace = true diff --git a/nexus/src/app/update/mod.rs b/nexus/src/app/update/mod.rs index 4196cd8a71..165a6ae23b 100644 --- a/nexus/src/app/update/mod.rs +++ b/nexus/src/app/update/mod.rs @@ -69,14 +69,14 @@ impl super::Nexus { ), })?; - let artifacts = tokio::task::spawn_blocking(move || { - crate::updates::read_artifacts(&trusted_root, base_url) - }) - .await - .unwrap() - .map_err(|e| Error::InternalError { - internal_message: format!("error trying to refresh updates: {}", e), - })?; + let artifacts = crate::updates::read_artifacts(&trusted_root, base_url) + .await + .map_err(|e| Error::InternalError { + internal_message: format!( + "error trying to refresh updates: {}", + e + ), + })?; // FIXME: if we hit an error in any of these database calls, the // available artifact table will be out of sync with the current diff --git a/nexus/src/updates.rs b/nexus/src/updates.rs index c2265096dc..2f57868acc 100644 --- a/nexus/src/updates.rs +++ b/nexus/src/updates.rs @@ -2,38 +2,38 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. +use buf_list::BufList; +use futures::TryStreamExt; use nexus_db_queries::db; use omicron_common::update::ArtifactsDocument; use std::convert::TryInto; -// TODO(iliana): make async/.await. awslabs/tough#213 -pub(crate) fn read_artifacts( +pub(crate) async fn read_artifacts( trusted_root: &[u8], mut base_url: String, ) -> Result< Vec, Box, > { - use std::io::Read; - if !base_url.ends_with('/') { base_url.push('/'); } let repository = tough::RepositoryLoader::new( - trusted_root, + &trusted_root, format!("{}metadata/", base_url).parse()?, format!("{}targets/", base_url).parse()?, ) - .load()?; + .load() + .await?; - let mut artifact_document = Vec::new(); - match repository.read_target(&"artifacts.json".parse()?)? { - Some(mut target) => target.read_to_end(&mut artifact_document)?, - None => return Err("artifacts.json missing".into()), - }; + let artifact_document = + match repository.read_target(&"artifacts.json".parse()?).await? { + Some(target) => target.try_collect::().await?, + None => return Err("artifacts.json missing".into()), + }; let artifacts: ArtifactsDocument = - serde_json::from_slice(&artifact_document)?; + serde_json::from_reader(buf_list::Cursor::new(&artifact_document))?; let valid_until = repository .root() diff --git a/nexus/tests/integration_tests/updates.rs b/nexus/tests/integration_tests/updates.rs index 918d5ac100..891166ed19 100644 --- a/nexus/tests/integration_tests/updates.rs +++ b/nexus/tests/integration_tests/updates.rs @@ -7,6 +7,7 @@ // - test that an unknown artifact returns 404, not 500 // - tests around target names and artifact names that contain dangerous paths like `../` +use async_trait::async_trait; use chrono::{Duration, Utc}; use dropshot::test_util::LogContext; use dropshot::{ @@ -45,17 +46,22 @@ const UPDATE_COMPONENT: &'static str = "omicron-test-component"; #[tokio::test] async fn test_update_end_to_end() { let mut config = load_test_config(); + let logctx = LogContext::new("test_update_end_to_end", &config.pkg.log); // build the TUF repo let rng = SystemRandom::new(); - let tuf_repo = new_tuf_repo(&rng); + let tuf_repo = new_tuf_repo(&rng).await; + slog::info!( + logctx.log, + "TUF repo created at {}", + tuf_repo.path().display() + ); // serve it over HTTP let dropshot_config = Default::default(); let mut api = ApiDescription::new(); api.register(static_content).unwrap(); let context = FileServerContext { base: tuf_repo.path().to_owned() }; - let logctx = LogContext::new("test_update_end_to_end", &config.pkg.log); let server = HttpServerStarter::new(&dropshot_config, api, context, &logctx.log) .unwrap() @@ -122,9 +128,14 @@ async fn static_content( for component in path.into_inner().path { fs_path.push(component); } - let body = tokio::fs::read(fs_path) - .await - .map_err(|e| HttpError::for_bad_request(None, e.to_string()))?; + let body = tokio::fs::read(fs_path).await.map_err(|e| { + // tough 0.15+ depend on ENOENT being translated into 404. + if e.kind() == std::io::ErrorKind::NotFound { + HttpError::for_not_found(None, e.to_string()) + } else { + HttpError::for_bad_request(None, e.to_string()) + } + })?; Ok(Response::builder().status(StatusCode::OK).body(body.into())?) } @@ -132,7 +143,7 @@ async fn static_content( const TARGET_CONTENTS: &[u8] = b"hello world".as_slice(); -fn new_tuf_repo(rng: &dyn SecureRandom) -> TempDir { +async fn new_tuf_repo(rng: &(dyn SecureRandom + Sync)) -> TempDir { let version = NonZeroU64::new(Utc::now().timestamp().try_into().unwrap()).unwrap(); let expires = Utc::now() + Duration::minutes(5); @@ -180,13 +191,14 @@ fn new_tuf_repo(rng: &dyn SecureRandom) -> TempDir { &signing_keys, rng, ) + .await .unwrap(); // TODO(iliana): there's no way to create a `RepositoryEditor` without having the root.json on // disk. this is really unergonomic. write and upstream a fix let mut root_tmp = NamedTempFile::new().unwrap(); root_tmp.as_file_mut().write_all(signed_root.buffer()).unwrap(); - let mut editor = RepositoryEditor::new(&root_tmp).unwrap(); + let mut editor = RepositoryEditor::new(&root_tmp).await.unwrap(); root_tmp.close().unwrap(); editor @@ -200,19 +212,20 @@ fn new_tuf_repo(rng: &dyn SecureRandom) -> TempDir { .timestamp_expires(expires); let (targets_dir, target_names) = generate_targets(); for target in target_names { - editor.add_target_path(targets_dir.path().join(target)).unwrap(); + editor.add_target_path(targets_dir.path().join(target)).await.unwrap(); } - let signed_repo = editor.sign(&signing_keys).unwrap(); + let signed_repo = editor.sign(&signing_keys).await.unwrap(); let repo = TempDir::new().unwrap(); - signed_repo.write(repo.path().join("metadata")).unwrap(); + signed_repo.write(repo.path().join("metadata")).await.unwrap(); signed_repo .copy_targets( targets_dir, repo.path().join("targets"), PathExists::Fail, ) + .await .unwrap(); repo @@ -257,8 +270,9 @@ impl Debug for KeyKeySource { } } +#[async_trait] impl KeySource for KeyKeySource { - fn as_sign( + async fn as_sign( &self, ) -> Result, Box> { @@ -267,7 +281,7 @@ impl KeySource for KeyKeySource { Ok(Box::new(Ed25519KeyPair::from_pkcs8(self.0.as_ref()).unwrap())) } - fn write( + async fn write( &self, _value: &str, _key_id_hex: &str, diff --git a/tufaceous-lib/Cargo.toml b/tufaceous-lib/Cargo.toml index bcfcee6b9c..0df3a33f98 100644 --- a/tufaceous-lib/Cargo.toml +++ b/tufaceous-lib/Cargo.toml @@ -7,6 +7,7 @@ publish = false [dependencies] anyhow = { workspace = true, features = ["backtrace"] } +async-trait.workspace = true buf-list.workspace = true bytes.workspace = true bytesize = { workspace = true, features = ["serde"] } @@ -16,6 +17,7 @@ chrono.workspace = true debug-ignore.workspace = true flate2.workspace = true fs-err.workspace = true +futures.workspace = true hex.workspace = true hubtools.workspace = true itertools.workspace = true @@ -28,6 +30,7 @@ serde_path_to_error.workspace = true sha2.workspace = true slog.workspace = true tar.workspace = true +tokio.workspace = true toml.workspace = true tough.workspace = true url = "2.4.1" @@ -36,3 +39,4 @@ omicron-workspace-hack.workspace = true [dev-dependencies] omicron-test-utils.workspace = true +tokio = { workspace = true, features = ["test-util"] } diff --git a/tufaceous-lib/src/artifact.rs b/tufaceous-lib/src/artifact.rs index 56f3e34ecb..23cf31e8c3 100644 --- a/tufaceous-lib/src/artifact.rs +++ b/tufaceous-lib/src/artifact.rs @@ -127,7 +127,7 @@ pub struct HostPhaseImages { } impl HostPhaseImages { - pub fn extract(reader: R) -> Result { + pub fn extract(reader: R) -> Result { let mut phase_1 = Vec::new(); let mut phase_2 = Vec::new(); Self::extract_into( @@ -138,13 +138,12 @@ impl HostPhaseImages { Ok(Self { phase_1: phase_1.into(), phase_2: phase_2.into() }) } - pub fn extract_into( + pub fn extract_into( reader: R, phase_1: W, phase_2: W, ) -> Result<()> { - let uncompressed = - flate2::bufread::GzDecoder::new(BufReader::new(reader)); + let uncompressed = flate2::bufread::GzDecoder::new(reader); let mut archive = tar::Archive::new(uncompressed); let mut oxide_json_found = false; @@ -248,7 +247,7 @@ pub struct RotArchives { } impl RotArchives { - pub fn extract(reader: R) -> Result { + pub fn extract(reader: R) -> Result { let mut archive_a = Vec::new(); let mut archive_b = Vec::new(); Self::extract_into( @@ -259,13 +258,12 @@ impl RotArchives { Ok(Self { archive_a: archive_a.into(), archive_b: archive_b.into() }) } - pub fn extract_into( + pub fn extract_into( reader: R, archive_a: W, archive_b: W, ) -> Result<()> { - let uncompressed = - flate2::bufread::GzDecoder::new(BufReader::new(reader)); + let uncompressed = flate2::bufread::GzDecoder::new(reader); let mut archive = tar::Archive::new(uncompressed); let mut oxide_json_found = false; diff --git a/tufaceous-lib/src/assemble/build.rs b/tufaceous-lib/src/assemble/build.rs index 081e2e10d5..4cb636c9d3 100644 --- a/tufaceous-lib/src/assemble/build.rs +++ b/tufaceous-lib/src/assemble/build.rs @@ -44,7 +44,7 @@ impl OmicronRepoAssembler { self } - pub fn build(&self) -> Result<()> { + pub async fn build(&self) -> Result<()> { let (build_dir, is_temp) = match &self.build_dir { Some(dir) => (dir.clone(), false), None => { @@ -61,7 +61,7 @@ impl OmicronRepoAssembler { slog::info!(self.log, "assembling repository in `{build_dir}`"); - match self.build_impl(&build_dir) { + match self.build_impl(&build_dir).await { Ok(()) => { if is_temp { slog::debug!(self.log, "assembly successful, cleaning up"); @@ -92,15 +92,17 @@ impl OmicronRepoAssembler { Ok(()) } - fn build_impl(&self, build_dir: &Utf8Path) -> Result<()> { + async fn build_impl(&self, build_dir: &Utf8Path) -> Result<()> { let mut repository = OmicronRepo::initialize( &self.log, build_dir, self.manifest.system_version.clone(), self.keys.clone(), self.expiry, - )? - .into_editor()?; + ) + .await? + .into_editor() + .await?; // Add all the artifacts. for (kind, entries) in &self.manifest.artifacts { @@ -118,10 +120,11 @@ impl OmicronRepoAssembler { } // Write out the repository. - repository.sign_and_finish(self.keys.clone(), self.expiry)?; + repository.sign_and_finish(self.keys.clone(), self.expiry).await?; // Now reopen the repository to archive it into a zip file. let repo2 = OmicronRepo::load_untrusted(&self.log, build_dir) + .await .context("error reopening repository to archive")?; repo2 .archive(&self.output_path) diff --git a/tufaceous-lib/src/key.rs b/tufaceous-lib/src/key.rs index 8a5054b331..96282ee377 100644 --- a/tufaceous-lib/src/key.rs +++ b/tufaceous-lib/src/key.rs @@ -5,6 +5,7 @@ use ring::rand::SecureRandom; use ring::signature::Ed25519KeyPair; use std::fmt::Display; use std::str::FromStr; +use tough::async_trait; use tough::key_source::KeySource; use tough::sign::{Sign, SignKeyPair}; @@ -38,30 +39,32 @@ impl Key { } } +#[async_trait] impl Sign for Key { fn tuf_key(&self) -> tough::schema::key::Key { self.as_sign().tuf_key() } - fn sign( + async fn sign( &self, msg: &[u8], - rng: &dyn SecureRandom, + rng: &(dyn SecureRandom + Sync), ) -> Result, Box> { - self.as_sign().sign(msg, rng) + self.as_sign().sign(msg, rng).await } } +#[async_trait] impl KeySource for Key { - fn as_sign( + async fn as_sign( &self, ) -> Result, Box> { Ok(Box::new(self.clone())) } - fn write( + async fn write( &self, _value: &str, _key_id_hex: &str, diff --git a/tufaceous-lib/src/repository.rs b/tufaceous-lib/src/repository.rs index 11a6064602..416d5c9990 100644 --- a/tufaceous-lib/src/repository.rs +++ b/tufaceous-lib/src/repository.rs @@ -4,9 +4,11 @@ use crate::{key::Key, target::TargetWriter, AddArtifact, ArchiveBuilder}; use anyhow::{anyhow, bail, Context, Result}; +use buf_list::BufList; use camino::{Utf8Path, Utf8PathBuf}; use chrono::{DateTime, Utc}; -use fs_err::{self as fs, File}; +use fs_err::{self as fs}; +use futures::TryStreamExt; use omicron_common::{ api::external::SemverVersion, update::{Artifact, ArtifactsDocument}, @@ -28,38 +30,41 @@ pub struct OmicronRepo { impl OmicronRepo { /// Initializes a new repository at the given path, writing it to disk. - pub fn initialize( + pub async fn initialize( log: &slog::Logger, repo_path: &Utf8Path, system_version: SemverVersion, keys: Vec, expiry: DateTime, ) -> Result { - let root = crate::root::new_root(keys.clone(), expiry)?; + let root = crate::root::new_root(keys.clone(), expiry).await?; let editor = OmicronRepoEditor::initialize( repo_path.to_owned(), root, system_version, - )?; + ) + .await?; editor .sign_and_finish(keys, expiry) + .await .context("error signing new repository")?; // In theory we "trust" the key we just used to sign this repository, // but the code path is equivalent to `load_untrusted`. - Self::load_untrusted(log, repo_path) + Self::load_untrusted(log, repo_path).await } /// Loads a repository from the given path. /// /// This method enforces expirations. To load without expiration enforcement, use /// [`Self::load_untrusted_ignore_expiration`]. - pub fn load_untrusted( + pub async fn load_untrusted( log: &slog::Logger, repo_path: &Utf8Path, ) -> Result { Self::load_untrusted_impl(log, repo_path, ExpirationEnforcement::Safe) + .await } /// Loads a repository from the given path, ignoring expiration. @@ -68,30 +73,36 @@ impl OmicronRepo { /// /// 1. When you're editing an existing repository and will re-sign it afterwards. /// 2. In an environment in which time isn't available. - pub fn load_untrusted_ignore_expiration( + pub async fn load_untrusted_ignore_expiration( log: &slog::Logger, repo_path: &Utf8Path, ) -> Result { Self::load_untrusted_impl(log, repo_path, ExpirationEnforcement::Unsafe) + .await } - fn load_untrusted_impl( + async fn load_untrusted_impl( log: &slog::Logger, repo_path: &Utf8Path, exp: ExpirationEnforcement, ) -> Result { let log = log.new(slog::o!("component" => "OmicronRepo")); let repo_path = repo_path.canonicalize_utf8()?; + let root_json = repo_path.join("metadata").join("1.root.json"); + let root = tokio::fs::read(&root_json) + .await + .with_context(|| format!("error reading from {root_json}"))?; let repo = RepositoryLoader::new( - File::open(repo_path.join("metadata").join("1.root.json"))?, + &root, Url::from_file_path(repo_path.join("metadata")) .expect("the canonical path is not absolute?"), Url::from_file_path(repo_path.join("targets")) .expect("the canonical path is not absolute?"), ) .expiration_enforcement(exp) - .load()?; + .load() + .await?; Ok(Self { log, repo, repo_path }) } @@ -107,12 +118,17 @@ impl OmicronRepo { } /// Reads the artifacts document from the repo. - pub fn read_artifacts(&self) -> Result { + pub async fn read_artifacts(&self) -> Result { let reader = self .repo - .read_target(&"artifacts.json".try_into()?)? + .read_target(&"artifacts.json".try_into()?) + .await? .ok_or_else(|| anyhow!("artifacts.json should be present"))?; - serde_json::from_reader(reader) + let buf_list = reader + .try_collect::() + .await + .context("error reading from artifacts.json")?; + serde_json::from_reader(buf_list::Cursor::new(&buf_list)) .context("error deserializing artifacts.json") } @@ -177,8 +193,8 @@ impl OmicronRepo { /// Converts `self` into an `OmicronRepoEditor`, which can be used to perform /// modifications to the repository. - pub fn into_editor(self) -> Result { - OmicronRepoEditor::new(self) + pub async fn into_editor(self) -> Result { + OmicronRepoEditor::new(self).await } /// Prepends the target digest to the name if using consistent snapshots. Returns both the @@ -210,8 +226,8 @@ pub struct OmicronRepoEditor { } impl OmicronRepoEditor { - fn new(repo: OmicronRepo) -> Result { - let artifacts = repo.read_artifacts()?; + async fn new(repo: OmicronRepo) -> Result { + let artifacts = repo.read_artifacts().await?; let existing_target_names = repo .repo @@ -226,7 +242,8 @@ impl OmicronRepoEditor { .join("metadata") .join(format!("{}.root.json", repo.repo.root().signed.version)), repo.repo, - )?; + ) + .await?; Ok(Self { editor, @@ -236,7 +253,7 @@ impl OmicronRepoEditor { }) } - fn initialize( + async fn initialize( repo_path: Utf8PathBuf, root: SignedRole, system_version: SemverVersion, @@ -250,7 +267,7 @@ impl OmicronRepoEditor { fs::create_dir_all(&targets_dir)?; fs::write(&root_path, root.buffer())?; - let editor = RepositoryEditor::new(&root_path)?; + let editor = RepositoryEditor::new(&root_path).await?; Ok(Self { editor, @@ -297,7 +314,7 @@ impl OmicronRepoEditor { } /// Consumes self, signing the repository and writing out this repository to disk. - pub fn sign_and_finish( + pub async fn sign_and_finish( mut self, keys: Vec, expiry: DateTime, @@ -313,9 +330,11 @@ impl OmicronRepoEditor { let signed = self .editor .sign(&crate::key::boxed_keys(keys)) + .await .context("error signing keys")?; signed .write(self.repo_path.join("metadata")) + .await .context("error writing repository")?; Ok(()) } @@ -346,8 +365,8 @@ mod tests { use chrono::Days; use omicron_test_utils::dev::test_setup_log; - #[test] - fn reject_artifacts_with_the_same_filename() { + #[tokio::test] + async fn reject_artifacts_with_the_same_filename() { let logctx = test_setup_log("reject_artifacts_with_the_same_filename"); let tempdir = Utf8TempDir::new().unwrap(); let mut repo = OmicronRepo::initialize( @@ -357,8 +376,10 @@ mod tests { vec![Key::generate_ed25519()], Utc::now() + Days::new(1), ) + .await .unwrap() .into_editor() + .await .unwrap(); // Targets are uniquely identified by their kind/name/version triple; diff --git a/tufaceous-lib/src/root.rs b/tufaceous-lib/src/root.rs index 8ecd1cdf9d..cf5f7129c5 100644 --- a/tufaceous-lib/src/root.rs +++ b/tufaceous-lib/src/root.rs @@ -8,7 +8,7 @@ use tough::editor::signed::SignedRole; use tough::schema::{KeyHolder, RoleKeys, RoleType, Root}; use tough::sign::Sign; -pub(crate) fn new_root( +pub(crate) async fn new_root( keys: Vec, expires: DateTime, ) -> Result> { @@ -47,5 +47,6 @@ pub(crate) fn new_root( &KeyHolder::Root(root), &keys, &SystemRandom::new(), - )?) + ) + .await?) } diff --git a/tufaceous/Cargo.toml b/tufaceous/Cargo.toml index e48513e24c..81248af57d 100644 --- a/tufaceous/Cargo.toml +++ b/tufaceous/Cargo.toml @@ -17,6 +17,7 @@ slog.workspace = true slog-async.workspace = true slog-envlogger.workspace = true slog-term.workspace = true +tokio.workspace = true tufaceous-lib.workspace = true omicron-workspace-hack.workspace = true @@ -27,6 +28,7 @@ fs-err.workspace = true omicron-test-utils.workspace = true predicates.workspace = true tempfile.workspace = true +tokio = { workspace = true, features = ["test-util"] } [[test]] name = "manifest-tests" diff --git a/tufaceous/src/dispatch.rs b/tufaceous/src/dispatch.rs index ea0db63fce..fc86c948df 100644 --- a/tufaceous/src/dispatch.rs +++ b/tufaceous/src/dispatch.rs @@ -36,7 +36,7 @@ pub struct Args { impl Args { /// Executes these arguments. - pub fn exec(self, log: &slog::Logger) -> Result<()> { + pub async fn exec(self, log: &slog::Logger) -> Result<()> { let repo_path = match self.repo { Some(repo) => repo, None => std::env::current_dir()?.try_into()?, @@ -52,7 +52,8 @@ impl Args { system_version, keys, self.expiry, - )?; + ) + .await?; slog::info!( log, "Initialized TUF repository in {}", @@ -87,8 +88,9 @@ impl Args { let repo = OmicronRepo::load_untrusted_ignore_expiration( &log, &repo_path, - )?; - let mut editor = repo.into_editor()?; + ) + .await?; + let mut editor = repo.into_editor().await?; let new_artifact = AddArtifact::from_path(kind, name, version, path)?; @@ -96,7 +98,7 @@ impl Args { editor .add_artifact(&new_artifact) .context("error adding artifact")?; - editor.sign_and_finish(self.keys, self.expiry)?; + editor.sign_and_finish(self.keys, self.expiry).await?; println!( "added {} {}, version {}", new_artifact.kind(), @@ -113,7 +115,8 @@ impl Args { let repo = OmicronRepo::load_untrusted_ignore_expiration( &log, &repo_path, - )?; + ) + .await?; repo.archive(&output_path)?; Ok(()) @@ -124,13 +127,14 @@ impl Args { // Now load the repository and ensure it's valid. let repo = OmicronRepo::load_untrusted(&log, &dest) + .await .with_context(|| { format!( "error loading extracted repository at `{dest}` \ (extracted files are still available)" ) })?; - repo.read_artifacts().with_context(|| { + repo.read_artifacts().await.with_context(|| { format!( "error loading artifacts.json from extracted archive \ at `{dest}`" @@ -169,7 +173,7 @@ impl Args { assembler.set_build_dir(dir); } - assembler.build()?; + assembler.build().await?; Ok(()) } diff --git a/tufaceous/src/main.rs b/tufaceous/src/main.rs index 30832cffbf..014817ee53 100644 --- a/tufaceous/src/main.rs +++ b/tufaceous/src/main.rs @@ -7,10 +7,11 @@ use clap::Parser; use slog::Drain; use tufaceous::Args; -fn main() -> Result<()> { +#[tokio::main] +async fn main() -> Result<()> { let log = setup_log(); let args = Args::parse(); - args.exec(&log) + args.exec(&log).await } fn setup_log() -> slog::Logger { diff --git a/tufaceous/tests/integration-tests/command_tests.rs b/tufaceous/tests/integration-tests/command_tests.rs index 73c94572eb..72c3a1a13a 100644 --- a/tufaceous/tests/integration-tests/command_tests.rs +++ b/tufaceous/tests/integration-tests/command_tests.rs @@ -14,8 +14,8 @@ use omicron_test_utils::dev::test_setup_log; use predicates::prelude::*; use tufaceous_lib::{Key, OmicronRepo}; -#[test] -fn test_init_and_add() -> Result<()> { +#[tokio::test] +async fn test_init_and_add() -> Result<()> { let logctx = test_setup_log("test_init_and_add"); let tempdir = tempfile::tempdir().unwrap(); let key = Key::generate_ed25519(); @@ -54,9 +54,9 @@ fn test_init_and_add() -> Result<()> { // Now read the repository and ensure the list of expected artifacts. let repo_path: Utf8PathBuf = tempdir.path().join("repo").try_into()?; - let repo = OmicronRepo::load_untrusted(&logctx.log, &repo_path)?; + let repo = OmicronRepo::load_untrusted(&logctx.log, &repo_path).await?; - let artifacts = repo.read_artifacts()?; + let artifacts = repo.read_artifacts().await?; assert_eq!( artifacts.artifacts.len(), 2, diff --git a/wicketd/Cargo.toml b/wicketd/Cargo.toml index db1ac9c04a..1360c28b19 100644 --- a/wicketd/Cargo.toml +++ b/wicketd/Cargo.toml @@ -8,6 +8,7 @@ license = "MPL-2.0" anyhow.workspace = true async-trait.workspace = true base64.workspace = true +buf-list.workspace = true bytes.workspace = true camino.workspace = true camino-tempfile.workspace = true diff --git a/wicketd/src/artifacts/artifacts_with_plan.rs b/wicketd/src/artifacts/artifacts_with_plan.rs index 331aecfc70..d3319d7f6b 100644 --- a/wicketd/src/artifacts/artifacts_with_plan.rs +++ b/wicketd/src/artifacts/artifacts_with_plan.rs @@ -50,7 +50,7 @@ pub(super) struct ArtifactsWithPlan { } impl ArtifactsWithPlan { - pub(super) fn from_zip( + pub(super) async fn from_zip( zip_data: T, log: &Logger, ) -> Result @@ -68,10 +68,12 @@ impl ArtifactsWithPlan { // anyone can sign the repositories and this code will accept that. let repository = OmicronRepo::load_untrusted_ignore_expiration(log, dir.path()) + .await .map_err(RepositoryError::LoadRepository)?; let artifacts = repository .read_artifacts() + .await .map_err(RepositoryError::ReadArtifactsDocument)?; // Create another temporary directory where we'll "permanently" (as long @@ -132,9 +134,10 @@ impl ArtifactsWithPlan { .map_err(RepositoryError::TargetHashLength)?, ); - let reader = repository + let stream = repository .repo() .read_target(&target_name) + .await .map_err(|error| RepositoryError::LocateTarget { target: artifact.target.clone(), error: Box::new(error), @@ -143,13 +146,15 @@ impl ArtifactsWithPlan { RepositoryError::MissingTarget(artifact.target.clone()) })?; - plan_builder.add_artifact( - artifact.into_id(), - artifact_hash, - io::BufReader::new(reader), - &mut by_id, - &mut by_hash, - )?; + plan_builder + .add_artifact( + artifact.into_id(), + artifact_hash, + stream, + &mut by_id, + &mut by_hash, + ) + .await?; } // Ensure we know how to apply updates from this set of artifacts; we'll @@ -218,8 +223,11 @@ mod tests { /// Test that `ArtifactsWithPlan` can extract the fake repository generated /// by tufaceous. - #[test] - fn test_extract_fake() -> Result<()> { + /// + /// See documentation for extract_nested_artifact_pair in update_plan.rs + /// for why multi_thread is required. + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_extract_fake() -> Result<()> { let logctx = test_setup_log("test_extract_fake"); let temp_dir = Utf8TempDir::new()?; let archive_path = temp_dir.path().join("archive.zip"); @@ -233,12 +241,15 @@ mod tests { ]) .context("error parsing args")?; - args.exec(&logctx.log).context("error executing assemble command")?; + args.exec(&logctx.log) + .await + .context("error executing assemble command")?; // Now check that it can be read by the archive extractor. let zip_bytes = std::fs::File::open(&archive_path) .context("error opening archive.zip")?; let plan = ArtifactsWithPlan::from_zip(zip_bytes, &logctx.log) + .await .context("error reading archive.zip")?; // Check that all known artifact kinds are present in the map. let by_id_kinds: BTreeSet<_> = diff --git a/wicketd/src/artifacts/error.rs b/wicketd/src/artifacts/error.rs index ef81ec66f3..ada8fbe011 100644 --- a/wicketd/src/artifacts/error.rs +++ b/wicketd/src/artifacts/error.rs @@ -57,6 +57,13 @@ pub(super) enum RepositoryError { )] MissingTarget(String), + #[error("error reading artifact of kind `{kind}` from repository")] + ReadArtifact { + kind: ArtifactKind, + #[source] + error: Box, + }, + #[error("error copying artifact of kind `{kind}` from repository")] CopyExtractedArtifact { kind: ArtifactKind, @@ -160,6 +167,7 @@ impl RepositoryError { | RepositoryError::LoadRepository(_) | RepositoryError::ReadArtifactsDocument(_) | RepositoryError::TargetHashRead { .. } + | RepositoryError::ReadArtifact { .. } | RepositoryError::CopyExtractedArtifact { .. } => { HttpError::for_bad_request(None, message) } diff --git a/wicketd/src/artifacts/extracted_artifacts.rs b/wicketd/src/artifacts/extracted_artifacts.rs index b796201936..5683cd1c13 100644 --- a/wicketd/src/artifacts/extracted_artifacts.rs +++ b/wicketd/src/artifacts/extracted_artifacts.rs @@ -7,6 +7,8 @@ use anyhow::Context; use camino::Utf8PathBuf; use camino_tempfile::NamedUtf8TempFile; use camino_tempfile::Utf8TempDir; +use futures::Stream; +use futures::StreamExt; use omicron_common::update::ArtifactHash; use omicron_common::update::ArtifactHashId; use omicron_common::update::ArtifactKind; @@ -14,13 +16,11 @@ use sha2::Digest; use sha2::Sha256; use slog::info; use slog::Logger; -use std::fs::File; use std::io; -use std::io::BufWriter; -use std::io::Read; use std::io::Write; use std::sync::Arc; use tokio::io::AsyncRead; +use tokio::io::AsyncWriteExt; use tokio_util::io::ReaderStream; /// Handle to the data of an extracted artifact. @@ -123,17 +123,18 @@ impl ExtractedArtifacts { self.tempdir.path().join(format!("{}", artifact_hash_id.hash)) } - /// Copy from `reader` into our temp directory, returning a handle to the + /// Copy from `stream` into our temp directory, returning a handle to the /// extracted artifact on success. - pub(super) fn store( + pub(super) async fn store( &mut self, artifact_hash_id: ArtifactHashId, - mut reader: impl Read, + stream: impl Stream>, ) -> Result { let output_path = self.path_for_artifact(&artifact_hash_id); - let mut writer = BufWriter::new( - File::create(&output_path) + let mut writer = tokio::io::BufWriter::new( + tokio::fs::File::create(&output_path) + .await .with_context(|| { format!("failed to create temp file {output_path}") }) @@ -143,15 +144,29 @@ impl ExtractedArtifacts { })?, ); - let file_size = io::copy(&mut reader, &mut writer) - .with_context(|| format!("failed writing to {output_path}")) - .map_err(|error| RepositoryError::CopyExtractedArtifact { + let mut stream = std::pin::pin!(stream); + + let mut file_size = 0; + + while let Some(res) = stream.next().await { + let chunk = res.map_err(|error| RepositoryError::ReadArtifact { kind: artifact_hash_id.kind.clone(), - error, - })? as usize; + error: Box::new(error), + })?; + file_size += chunk.len(); + writer + .write_all(&chunk) + .await + .with_context(|| format!("failed writing to {output_path}")) + .map_err(|error| RepositoryError::CopyExtractedArtifact { + kind: artifact_hash_id.kind.clone(), + error, + })?; + } writer .flush() + .await .with_context(|| format!("failed flushing {output_path}")) .map_err(|error| RepositoryError::CopyExtractedArtifact { kind: artifact_hash_id.kind.clone(), diff --git a/wicketd/src/artifacts/store.rs b/wicketd/src/artifacts/store.rs index 29e1ecef0a..2a7b4a646b 100644 --- a/wicketd/src/artifacts/store.rs +++ b/wicketd/src/artifacts/store.rs @@ -42,12 +42,9 @@ impl WicketdArtifactStore { slog::debug!(self.log, "adding repository"); let log = self.log.clone(); - let new_artifacts = tokio::task::spawn_blocking(move || { - ArtifactsWithPlan::from_zip(data, &log) - .map_err(|error| error.to_http_error()) - }) - .await - .unwrap()?; + let new_artifacts = ArtifactsWithPlan::from_zip(data, &log) + .await + .map_err(|error| error.to_http_error())?; self.replace(new_artifacts); Ok(()) diff --git a/wicketd/src/artifacts/update_plan.rs b/wicketd/src/artifacts/update_plan.rs index 5d7bee629a..c6db7c1b65 100644 --- a/wicketd/src/artifacts/update_plan.rs +++ b/wicketd/src/artifacts/update_plan.rs @@ -14,7 +14,10 @@ use super::extracted_artifacts::HashingNamedUtf8TempFile; use super::ArtifactIdData; use super::Board; use super::ExtractedArtifactDataHandle; -use anyhow::anyhow; +use bytes::Bytes; +use futures::Stream; +use futures::StreamExt; +use futures::TryStreamExt; use hubtools::RawHubrisArchive; use omicron_common::api::external::SemverVersion; use omicron_common::api::internal::nexus::KnownArtifactKind; @@ -28,7 +31,6 @@ use std::collections::btree_map; use std::collections::BTreeMap; use std::collections::HashMap; use std::io; -use std::io::Read; use tufaceous_lib::HostPhaseImages; use tufaceous_lib::RotArchives; @@ -143,24 +145,26 @@ impl<'a> UpdatePlanBuilder<'a> { }) } - pub(super) fn add_artifact( + pub(super) async fn add_artifact( &mut self, artifact_id: ArtifactId, artifact_hash: ArtifactHash, - reader: io::BufReader, + stream: impl Stream> + Send, by_id: &mut BTreeMap>, by_hash: &mut HashMap, ) -> Result<(), RepositoryError> { // If we don't know this artifact kind, we'll still serve it up by hash, // but we don't do any further processing on it. let Some(artifact_kind) = artifact_id.kind.to_known() else { - return self.add_unknown_artifact( - artifact_id, - artifact_hash, - reader, - by_id, - by_hash, - ); + return self + .add_unknown_artifact( + artifact_id, + artifact_hash, + stream, + by_id, + by_hash, + ) + .await; }; // If we do know the artifact kind, we may have additional work to do, @@ -170,48 +174,57 @@ impl<'a> UpdatePlanBuilder<'a> { match artifact_kind { KnownArtifactKind::GimletSp | KnownArtifactKind::PscSp - | KnownArtifactKind::SwitchSp => self.add_sp_artifact( - artifact_id, - artifact_kind, - artifact_hash, - reader, - by_id, - by_hash, - ), + | KnownArtifactKind::SwitchSp => { + self.add_sp_artifact( + artifact_id, + artifact_kind, + artifact_hash, + stream, + by_id, + by_hash, + ) + .await + } KnownArtifactKind::GimletRot | KnownArtifactKind::PscRot - | KnownArtifactKind::SwitchRot => self.add_rot_artifact( - artifact_id, - artifact_kind, - reader, - by_id, - by_hash, - ), + | KnownArtifactKind::SwitchRot => { + self.add_rot_artifact( + artifact_id, + artifact_kind, + stream, + by_id, + by_hash, + ) + .await + } KnownArtifactKind::Host => { - self.add_host_artifact(artifact_id, reader, by_id, by_hash) + self.add_host_artifact(artifact_id, stream, by_id, by_hash) } KnownArtifactKind::Trampoline => self.add_trampoline_artifact( artifact_id, - reader, - by_id, - by_hash, - ), - KnownArtifactKind::ControlPlane => self.add_control_plane_artifact( - artifact_id, - artifact_hash, - reader, + stream, by_id, by_hash, ), + KnownArtifactKind::ControlPlane => { + self.add_control_plane_artifact( + artifact_id, + artifact_hash, + stream, + by_id, + by_hash, + ) + .await + } } } - fn add_sp_artifact( + async fn add_sp_artifact( &mut self, artifact_id: ArtifactId, artifact_kind: KnownArtifactKind, artifact_hash: ArtifactHash, - mut reader: io::BufReader, + stream: impl Stream> + Send, by_id: &mut BTreeMap>, by_hash: &mut HashMap, ) -> Result<(), RepositoryError> { @@ -228,15 +241,18 @@ impl<'a> UpdatePlanBuilder<'a> { | KnownArtifactKind::SwitchRot => unreachable!(), }; + let mut stream = std::pin::pin!(stream); + // SP images are small, and hubtools wants a `&[u8]` to parse, so we'll // read the whole thing into memory. let mut data = Vec::new(); - reader.read_to_end(&mut data).map_err(|error| { - RepositoryError::CopyExtractedArtifact { + while let Some(res) = stream.next().await { + let chunk = res.map_err(|error| RepositoryError::ReadArtifact { kind: artifact_kind.into(), - error: anyhow!(error), - } - })?; + error: Box::new(error), + })?; + data.extend_from_slice(&chunk); + } let (artifact_id, board) = read_hubris_board_from_archive(artifact_id, data.clone())?; @@ -255,7 +271,11 @@ impl<'a> UpdatePlanBuilder<'a> { ArtifactHashId { kind: artifact_kind.into(), hash: artifact_hash }; let data = self .extracted_artifacts - .store(artifact_hash_id, io::Cursor::new(&data))?; + .store( + artifact_hash_id, + futures::stream::iter([Ok(Bytes::from(data))]), + ) + .await?; slot.insert(ArtifactIdData { id: artifact_id.clone(), data: data.clone(), @@ -273,11 +293,11 @@ impl<'a> UpdatePlanBuilder<'a> { Ok(()) } - fn add_rot_artifact( + async fn add_rot_artifact( &mut self, artifact_id: ArtifactId, artifact_kind: KnownArtifactKind, - reader: io::BufReader, + stream: impl Stream> + Send, by_id: &mut BTreeMap>, by_hash: &mut HashMap, ) -> Result<(), RepositoryError> { @@ -310,9 +330,12 @@ impl<'a> UpdatePlanBuilder<'a> { }; let (rot_a_data, rot_b_data) = Self::extract_nested_artifact_pair( + stream, &mut self.extracted_artifacts, artifact_kind, - |out_a, out_b| RotArchives::extract_into(reader, out_a, out_b), + |reader, out_a, out_b| { + RotArchives::extract_into(reader, out_a, out_b) + }, )?; // Technically we've done all we _need_ to do with the RoT images. We @@ -358,7 +381,7 @@ impl<'a> UpdatePlanBuilder<'a> { fn add_host_artifact( &mut self, artifact_id: ArtifactId, - reader: io::BufReader, + stream: impl Stream> + Send, by_id: &mut BTreeMap>, by_hash: &mut HashMap, ) -> Result<(), RepositoryError> { @@ -369,9 +392,12 @@ impl<'a> UpdatePlanBuilder<'a> { } let (phase_1_data, phase_2_data) = Self::extract_nested_artifact_pair( + stream, &mut self.extracted_artifacts, KnownArtifactKind::Host, - |out_1, out_2| HostPhaseImages::extract_into(reader, out_1, out_2), + |reader, out_1, out_2| { + HostPhaseImages::extract_into(reader, out_1, out_2) + }, )?; // Similarly to the RoT, we need to create new, non-conflicting artifact @@ -409,7 +435,7 @@ impl<'a> UpdatePlanBuilder<'a> { fn add_trampoline_artifact( &mut self, artifact_id: ArtifactId, - reader: io::BufReader, + stream: impl Stream> + Send, by_id: &mut BTreeMap>, by_hash: &mut HashMap, ) -> Result<(), RepositoryError> { @@ -422,9 +448,12 @@ impl<'a> UpdatePlanBuilder<'a> { } let (phase_1_data, phase_2_data) = Self::extract_nested_artifact_pair( + stream, &mut self.extracted_artifacts, KnownArtifactKind::Trampoline, - |out_1, out_2| HostPhaseImages::extract_into(reader, out_1, out_2), + |reader, out_1, out_2| { + HostPhaseImages::extract_into(reader, out_1, out_2) + }, )?; // Similarly to the RoT, we need to create new, non-conflicting artifact @@ -466,11 +495,11 @@ impl<'a> UpdatePlanBuilder<'a> { Ok(()) } - fn add_control_plane_artifact( + async fn add_control_plane_artifact( &mut self, artifact_id: ArtifactId, artifact_hash: ArtifactHash, - reader: io::BufReader, + stream: impl Stream> + Send, by_id: &mut BTreeMap>, by_hash: &mut HashMap, ) -> Result<(), RepositoryError> { @@ -487,7 +516,8 @@ impl<'a> UpdatePlanBuilder<'a> { hash: artifact_hash, }; - let data = self.extracted_artifacts.store(artifact_hash_id, reader)?; + let data = + self.extracted_artifacts.store(artifact_hash_id, stream).await?; self.control_plane_hash = Some(data.hash()); @@ -503,11 +533,11 @@ impl<'a> UpdatePlanBuilder<'a> { Ok(()) } - fn add_unknown_artifact( + async fn add_unknown_artifact( &mut self, artifact_id: ArtifactId, artifact_hash: ArtifactHash, - reader: io::BufReader, + stream: impl Stream> + Send, by_id: &mut BTreeMap>, by_hash: &mut HashMap, ) -> Result<(), RepositoryError> { @@ -515,7 +545,8 @@ impl<'a> UpdatePlanBuilder<'a> { let artifact_hash_id = ArtifactHashId { kind: artifact_kind.clone(), hash: artifact_hash }; - let data = self.extracted_artifacts.store(artifact_hash_id, reader)?; + let data = + self.extracted_artifacts.store(artifact_hash_id, stream).await?; record_extracted_artifact( artifact_id, @@ -529,11 +560,80 @@ impl<'a> UpdatePlanBuilder<'a> { Ok(()) } - // RoT, host OS, and trampoline OS artifacts all contain a pair of artifacts - // we actually care about (RoT: A/B images; host/trampoline: phase1/phase2 - // images). This method is a helper that converts a single artifact `reader` - // into a pair of extracted artifacts. + /// A helper that converts a single artifact `stream` into a pair of + /// extracted artifacts. + /// + /// RoT, host OS, and trampoline OS artifacts all contain a pair of + /// artifacts we actually care about (RoT: A/B images; host/trampoline: + /// phase1/phase2 images). This method is a helper to extract that. + /// + /// This method uses a `block_in_place` into synchronous code, because the + /// value of changing tufaceous to do async tarball extraction is honestly + /// pretty dubious. + /// + /// The main costs of this are that: + /// 1. This code can only be used with multithreaded Tokio executors. (This + /// is OK for production, but does require that our tests use `flavor = + /// "multi_thread`.) + /// 2. Parallelizing extraction is harder if we ever want to do that in the + /// future. (It can be done using the async-scoped crate, though.) + /// + /// Depending on how things shake out, we may want to revisit this in the + /// future. fn extract_nested_artifact_pair( + stream: impl Stream> + Send, + extracted_artifacts: &mut ExtractedArtifacts, + kind: KnownArtifactKind, + extract: F, + ) -> Result< + (ExtractedArtifactDataHandle, ExtractedArtifactDataHandle), + RepositoryError, + > + where + F: FnOnce( + &mut dyn io::BufRead, + &mut HashingNamedUtf8TempFile, + &mut HashingNamedUtf8TempFile, + ) -> anyhow::Result<()> + + Send, + { + // Since stream isn't guaranteed to be 'static, we have to use + // block_in_place here, not spawn_blocking. This does mean that the + // current task is taken over, and that this function can only be used + // from a multithreaded Tokio runtime. + // + // An alternative would be to use the `async-scoped` crate. However: + // + // - We would only spawn one task there. + // - The only safe use of async-scoped is with the `scope_and_block` + // call, which uses `tokio::task::block_in_place` anyway. + // - async-scoped also requires a multithreaded Tokio runtime. + // + // If we ever want to parallelize extraction across all the different + // artifacts, `async-scoped` would be a good fit. + tokio::task::block_in_place(|| { + let stream = std::pin::pin!(stream); + let reader = + tokio_util::io::StreamReader::new(stream.map_err(|error| { + // StreamReader requires a conversion from tough's errors to + // std::io::Error. + std::io::Error::new(io::ErrorKind::Other, error) + })); + + // RotArchives::extract_into takes a synchronous reader, so we need + // to use this bridge. The bridge can only be used from a blocking + // context. + let mut reader = tokio_util::io::SyncIoBridge::new(reader); + + Self::extract_nested_artifact_pair_impl( + extracted_artifacts, + kind, + |out_a, out_b| extract(&mut reader, out_a, out_b), + ) + }) + } + + fn extract_nested_artifact_pair_impl( extracted_artifacts: &mut ExtractedArtifacts, kind: KnownArtifactKind, extract: F, @@ -838,7 +938,9 @@ mod tests { builder.build_to_vec().unwrap() } - #[tokio::test] + // See documentation for extract_nested_artifact_pair for why multi_thread + // is required. + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_update_plan_from_artifacts() { const VERSION_0: SemverVersion = SemverVersion::new(0, 0, 0); @@ -867,10 +969,11 @@ mod tests { .add_artifact( id, hash, - io::BufReader::new(io::Cursor::new(&data)), + futures::stream::iter([Ok(Bytes::from(data))]), &mut by_id, &mut by_hash, ) + .await .unwrap(); } @@ -889,10 +992,11 @@ mod tests { .add_artifact( id, hash, - io::BufReader::new(io::Cursor::new(&data)), + futures::stream::iter([Ok(Bytes::from(data))]), &mut by_id, &mut by_hash, ) + .await .unwrap(); } @@ -917,10 +1021,11 @@ mod tests { .add_artifact( id, hash, - io::BufReader::new(io::Cursor::new(&data)), + futures::stream::iter([Ok(Bytes::from(data))]), &mut by_id, &mut by_hash, ) + .await .unwrap(); } } @@ -945,10 +1050,11 @@ mod tests { .add_artifact( id, hash, - io::BufReader::new(io::Cursor::new(data)), + futures::stream::iter([Ok(data.clone())]), &mut by_id, &mut by_hash, ) + .await .unwrap(); } @@ -972,10 +1078,11 @@ mod tests { .add_artifact( id, hash, - io::BufReader::new(io::Cursor::new(data)), + futures::stream::iter([Ok(data.clone())]), &mut by_id, &mut by_hash, ) + .await .unwrap(); } diff --git a/wicketd/tests/integration_tests/updates.rs b/wicketd/tests/integration_tests/updates.rs index fb1637f44e..b65833a74b 100644 --- a/wicketd/tests/integration_tests/updates.rs +++ b/wicketd/tests/integration_tests/updates.rs @@ -31,7 +31,9 @@ use wicketd_client::types::{ StartUpdateParams, }; -#[tokio::test] +// See documentation for extract_nested_artifact_pair in update_plan.rs for why +// multi_thread is required. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_updates() { let gateway = gateway_setup::test_setup("test_updates", SpPort::One).await; let wicketd_testctx = WicketdTestContext::setup(gateway).await; @@ -48,7 +50,7 @@ async fn test_updates() { ]) .expect("args parsed correctly"); - args.exec(log).expect("assemble command completed successfully"); + args.exec(log).await.expect("assemble command completed successfully"); // Read the archive and upload it to the server. let zip_bytes = @@ -258,7 +260,9 @@ async fn test_updates() { wicketd_testctx.teardown().await; } -#[tokio::test] +// See documentation for extract_nested_artifact_pair in update_plan.rs for why +// multi_thread is required. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_installinator_fetch() { let gateway = gateway_setup::test_setup("test_updates", SpPort::One).await; let wicketd_testctx = WicketdTestContext::setup(gateway).await; @@ -275,7 +279,7 @@ async fn test_installinator_fetch() { ]) .expect("args parsed correctly"); - args.exec(log).expect("assemble command completed successfully"); + args.exec(log).await.expect("assemble command completed successfully"); // Read the archive and upload it to the server. let zip_bytes = @@ -391,7 +395,9 @@ async fn test_installinator_fetch() { wicketd_testctx.teardown().await; } -#[tokio::test] +// See documentation for extract_nested_artifact_pair in update_plan.rs for why +// multi_thread is required. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_update_races() { let gateway = gateway_setup::test_setup( "test_artifact_upload_while_updating", @@ -412,7 +418,7 @@ async fn test_update_races() { ]) .expect("args parsed correctly"); - args.exec(log).expect("assemble command completed successfully"); + args.exec(log).await.expect("assemble command completed successfully"); // Read the archive and upload it to the server. let zip_bytes = diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index 4d416eca02..47ea83f8f2 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -85,6 +85,7 @@ sha2 = { version = "0.10.8", features = ["oid"] } signature = { version = "2.1.0", default-features = false, features = ["digest", "rand_core", "std"] } similar = { version = "2.2.1", features = ["inline", "unicode"] } slog = { version = "2.7.0", features = ["dynamic-keys", "max_level_trace", "release_max_level_debug", "release_max_level_trace"] } +snafu = { version = "0.7.5", features = ["futures"] } spin = { version = "0.9.8" } string_cache = { version = "0.8.7" } subtle = { version = "2.5.0" } @@ -94,6 +95,7 @@ time = { version = "0.3.27", features = ["formatting", "local-offset", "macros", tokio = { version = "1.33.0", features = ["full", "test-util"] } tokio-postgres = { version = "0.7.10", features = ["with-chrono-0_4", "with-serde_json-1", "with-uuid-1"] } tokio-stream = { version = "0.1.14", features = ["net"] } +tokio-util = { version = "0.7.10", features = ["codec", "io-util"] } toml = { version = "0.7.8" } toml_edit-647d43efb71741da = { package = "toml_edit", version = "0.21.0", features = ["serde"] } tracing = { version = "0.1.37", features = ["log"] } @@ -178,6 +180,7 @@ sha2 = { version = "0.10.8", features = ["oid"] } signature = { version = "2.1.0", default-features = false, features = ["digest", "rand_core", "std"] } similar = { version = "2.2.1", features = ["inline", "unicode"] } slog = { version = "2.7.0", features = ["dynamic-keys", "max_level_trace", "release_max_level_debug", "release_max_level_trace"] } +snafu = { version = "0.7.5", features = ["futures"] } spin = { version = "0.9.8" } string_cache = { version = "0.8.7" } subtle = { version = "2.5.0" } @@ -188,6 +191,7 @@ time-macros = { version = "0.2.13", default-features = false, features = ["forma tokio = { version = "1.33.0", features = ["full", "test-util"] } tokio-postgres = { version = "0.7.10", features = ["with-chrono-0_4", "with-serde_json-1", "with-uuid-1"] } tokio-stream = { version = "0.1.14", features = ["net"] } +tokio-util = { version = "0.7.10", features = ["codec", "io-util"] } toml = { version = "0.7.8" } toml_edit-647d43efb71741da = { package = "toml_edit", version = "0.21.0", features = ["serde"] } tracing = { version = "0.1.37", features = ["log"] }