Skip to content

Commit

Permalink
Merge pull request ostreedev#181 from cgwalters/deploy-bug
Browse files Browse the repository at this point in the history
 containers: Better handle errors from worker and/or driver
  • Loading branch information
cgwalters authored Dec 10, 2021
2 parents fd6dc4c + a27dac8 commit 896fd4b
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 10 deletions.
2 changes: 2 additions & 0 deletions lib/src/container/deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use super::OstreeImageReference;
use crate::container::store::PrepareResult;
use anyhow::Result;
use fn_error_context::context;
use ostree::glib;

/// The key in the OSTree origin which holds a serialized [`super::OstreeImageReference`].
Expand Down Expand Up @@ -30,6 +31,7 @@ pub struct DeployOpts<'a> {
/// Write a container image to an OSTree deployment.
///
/// This API is currently intended for only an initial deployment.
#[context("Performing deployment")]
pub async fn deploy(
sysroot: &ostree::Sysroot,
stateroot: &str,
Expand Down
13 changes: 6 additions & 7 deletions lib/src/container/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,10 +279,9 @@ impl LayeredImageImporter {
)
.await?;
let importer = crate::tar::import_tar(&self.repo, blob, None);
let (commit, driver) = tokio::join!(importer, driver);
driver?;
let commit =
commit.with_context(|| format!("Parsing blob {}", base_layer_ref.digest()))?;
let commit = super::unencapsulate::join_fetch(importer, driver)
.await
.with_context(|| format!("Parsing blob {}", base_layer_ref.digest()))?;
// TODO support ref writing in tar import
self.repo.set_ref_immediate(
None,
Expand Down Expand Up @@ -314,9 +313,9 @@ impl LayeredImageImporter {
};
let w =
crate::tar::write_tar(&self.repo, blob, layer.ostree_ref.as_str(), Some(opts));
let (r, driver) = tokio::join!(w, driver);
let r = r.with_context(|| format!("Parsing layer blob {}", layer.digest()))?;
driver?;
let r = super::unencapsulate::join_fetch(w, driver)
.await
.with_context(|| format!("Parsing layer blob {}", layer.digest()))?;
layer_commits.push(r.commit);
if !r.filtered.is_empty() {
let filtered = HashMap::from_iter(r.filtered.into_iter());
Expand Down
41 changes: 38 additions & 3 deletions lib/src/container/unencapsulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,41 @@ fn require_one_layer_blob(manifest: &oci_image::ImageManifest) -> Result<&oci_im
}
}

/// Use this to process potential errors from a worker and a driver.
/// This is really a brutal hack around the fact that an error can occur
/// on either our side or in the proxy. But if an error occurs on our
/// side, then we will close the pipe, which will *also* cause the proxy
/// to error out.
///
/// What we really want is for the proxy to tell us when it got an
/// error from us closing the pipe. Or, we could store that state
/// on our side. Both are slightly tricky, so we have this (again)
/// hacky thing where we just search for `broken pipe` in the error text.
///
/// Or to restate all of the above - what this function does is check
/// to see if the worker function had an error *and* if the proxy
/// had an error, but if the proxy's error ends in `broken pipe`
/// then it means the real only error is from the worker.
pub(crate) async fn join_fetch<T: std::fmt::Debug>(
worker: impl Future<Output = Result<T>>,
driver: impl Future<Output = Result<()>>,
) -> Result<T> {
let (worker, driver) = tokio::join!(worker, driver);
match (worker, driver) {
(Ok(t), Ok(())) => Ok(t),
(Err(worker), Err(driver)) => {
let text = driver.root_cause().to_string();
if text.ends_with("broken pipe") {
Err(worker)
} else {
Err(worker.context(format!("proxy failure: {} and client error", text)))
}
}
(Ok(_), Err(driver)) => Err(driver),
(Err(worker), Ok(())) => Err(worker),
}
}

/// Configuration for container fetches.
#[derive(Debug, Default)]
pub struct UnencapsulateOptions {
Expand Down Expand Up @@ -219,9 +254,9 @@ async fn unencapsulate_from_manifest_impl(
SignatureSource::ContainerPolicy | SignatureSource::ContainerPolicyAllowInsecure => {}
}
let import = crate::tar::import_tar(repo, blob, Some(taropts));
let (import, driver) = tokio::join!(import, driver);
driver?;
let ostree_commit = import.with_context(|| format!("Parsing blob {}", layer.digest()))?;
let ostree_commit = join_fetch(import, driver)
.await
.with_context(|| format!("Parsing blob {}", layer.digest()))?;

event!(Level::DEBUG, "created commit {}", ostree_commit);
Ok(ostree_commit)
Expand Down

0 comments on commit 896fd4b

Please sign in to comment.