Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[repo depot 2/n] sled agent APIs to manage update artifact storage #6764

Merged
merged 33 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
a5052d0
sled agent APIs to manage update artifact storage
iliana Oct 2, 2024
ce1bc42
fn datasets -> fn dataset_mountpoints
iliana Oct 4, 2024
5ad16a1
be more resilient in the face of io errors
iliana Oct 4, 2024
485ee40
clean up temporary files on startup
iliana Oct 4, 2024
26f4107
naming consistency
iliana Oct 4, 2024
893980e
log.cleanup_successful();
iliana Oct 4, 2024
03a51c7
Merge remote-tracking branch 'origin/main' into iliana/tuf-repo-depot
iliana Oct 15, 2024
efcfb92
document ArtifactStore
iliana Oct 11, 2024
e8b2673
fn put -> put_impl
iliana Oct 11, 2024
3b15f3b
copy_from_depot should take a URL
iliana Oct 15, 2024
c909649
reduce semantic satiation
iliana Oct 15, 2024
4325077
remove default type parameter
iliana Oct 15, 2024
86b8047
StorageBackend -> DatasetsManager; attempt clean up
iliana Oct 15, 2024
599089a
create reqwest client at startup, not on first use
iliana Oct 15, 2024
f624e5d
don't embed source error strings
iliana Oct 15, 2024
b44d06b
fewer contextless errors
iliana Oct 15, 2024
013e67f
another docstring
iliana Oct 15, 2024
5eefb6e
add the repo depot API to api-manifest.toml
iliana Oct 15, 2024
aebfe32
add list artifacts operation
iliana Oct 16, 2024
d743727
Merge remote-tracking branch 'origin/main' into iliana/tuf-repo-depot
iliana Oct 28, 2024
2ae3743
create an update artifact dataset on both M.2s
iliana Oct 28, 2024
45c4cac
PUT artifacts to all artifact datasets
iliana Oct 28, 2024
3c2f866
change list API to return a count of each artifact
iliana Oct 29, 2024
743a67b
make copy_from_depot create a task and return
iliana Oct 29, 2024
58b9cbe
ls-apis expectorate
iliana Oct 29, 2024
6906679
Merge remote-tracking branch 'origin/main' into iliana/tuf-repo-depot
iliana Oct 30, 2024
cd7dc7b
review comments
iliana Oct 30, 2024
e967f02
propagate non-fatal write errors to `finalize()`
iliana Oct 30, 2024
508861d
expectoraaaaate
iliana Oct 30, 2024
9f96f71
Merge remote-tracking branch 'origin/main' into iliana/tuf-repo-depot
iliana Oct 31, 2024
32afac5
improved API responses for PUT/POST
iliana Oct 31, 2024
4d00c16
document ArtifactPutResponse fields
iliana Oct 31, 2024
9b3691a
Merge remote-tracking branch 'origin/main' into iliana/tuf-repo-depot
iliana Oct 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dev-tools/ls-apis/tests/api_dependencies.out
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ Crucible Repair (client: repair-client)
consumed by: crucible-downstairs (crucible/downstairs) via 1 path

Repo Depot API (client: repo-depot-client)
consumed by: omicron-sled-agent (omicron/sled-agent)
consumed by: omicron-sled-agent (omicron/sled-agent) via 1 path

Sled Agent (client: sled-agent-client)
consumed by: dpd (dendrite/dpd) via 1 path
Expand Down
112 changes: 60 additions & 52 deletions sled-agent/src/artifact_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
//! it does not have from another Repo Depot that does have them (at Nexus's
//! direction). This API's implementation is also part of this module.
//!
//! POST, PUT, and DELETE operations are handled by the Sled Agent API.
//! POST, PUT, and DELETE operations are called by Nexus and handled by the Sled
//! Agent API.

use std::collections::BTreeMap;
use std::io::ErrorKind;
Expand Down Expand Up @@ -271,16 +272,23 @@ impl<T: DatasetsManager> ArtifactStore<T> {
}

/// Common implementation for all artifact write operations that creates
/// a temporary file on all datasets.
/// a temporary file on all datasets. Returns an [`ArtifactWriter`] that
/// can be used to write the artifact to all temporary files, then move all
/// temporary files to their final paths.
///
/// Errors are logged and ignored unless a temporary file already exists
/// (another task is writing to this artifact) or no temporary files could
/// be created.
/// Most errors during the write process are considered non-fatal errors.
/// All non-fatal errors are logged, and the most recently-seen non-fatal
/// error is returned by [`ArtifactWriter::finalize`].
///
/// In this method, possible fatal errors are:
/// - No temporary files could be created.
/// - A temporary file already exists (another task is writing to this
/// artifact).
async fn writer(
&self,
sha256: ArtifactHash,
) -> Result<ArtifactWriter, Error> {
let mut inner = Vec::new();
let mut files = Vec::new();
let mut last_error = None;
for mountpoint in self.storage.artifact_storage_paths().await? {
let temp_dir = mountpoint.join(TEMP_SUBDIR);
Expand Down Expand Up @@ -316,16 +324,17 @@ impl<T: DatasetsManager> ArtifactStore<T> {
};
let file = NamedUtf8TempFile::from_parts(file, temp_path);

inner.push(Some((file, mountpoint)));
files.push(Some((file, mountpoint)));
}
if inner.is_empty() {
if files.is_empty() {
Err(last_error.unwrap_or(Error::NoUpdateDataset))
} else {
Ok(ArtifactWriter {
hasher: Sha256::new(),
files: inner,
files,
log: self.log.clone(),
sha256,
last_error,
})
}
}
Expand Down Expand Up @@ -471,9 +480,13 @@ struct ArtifactWriter {
hasher: Sha256,
log: Logger,
sha256: ArtifactHash,
last_error: Option<Error>,
}

impl ArtifactWriter {
/// Calls [`ArtifactWriter::write`] for each chunk in the stream, then
/// [`ArtifactWriter::finalize`]. See the documentation for these functions
/// for error handling information.
async fn write_stream(
self,
stream: impl Stream<Item = Result<impl AsRef<[u8]>, Error>>,
Expand All @@ -487,13 +500,17 @@ impl ArtifactWriter {
writer.finalize().await
}

/// Write `chunk` to all files. If an error occurs, it is logged and the
/// temporary file is dropped. If there are no files left to write to, the
/// most recently-seen error is returned.
/// Write `chunk` to all temporary files.
///
/// Errors in this method are considered non-fatal errors. All non-fatal
/// errors are logged, and the most recently-seen non-fatal error is
/// returned by [`ArtifactWriter::finalize`].
///
/// If all files have failed, this method returns the most recently-seen
/// non-fatal error as a fatal error.
async fn write(&mut self, chunk: impl AsRef<[u8]>) -> Result<(), Error> {
self.hasher.update(&chunk);

let mut last_error = None;
for option in &mut self.files {
if let Some((mut file, mountpoint)) = option.take() {
match file.as_file_mut().write_all(chunk.as_ref()).await {
Expand All @@ -503,7 +520,11 @@ impl ArtifactWriter {
Err(err) => {
let path = file.path().to_owned();
log_and_store!(
last_error, &self.log, "write to", path, err
self.last_error,
&self.log,
"write to",
path,
err
);
// `file` and `final_path` are dropped here, cleaning up
// the file
Expand All @@ -514,16 +535,18 @@ impl ArtifactWriter {

self.files.retain(Option::is_some);
if self.files.is_empty() {
Err(last_error.unwrap_or(Error::NoUpdateDataset))
Err(self.last_error.take().unwrap_or(Error::NoUpdateDataset))
} else {
Ok(())
}
}

/// Rename all files to their final paths. If an error occurs, it is logged.
/// If none of the files are renamed successfully, the most recently-seen
/// error is returned.
async fn finalize(self) -> Result<(), Error> {
/// Rename all files to their final paths.
///
/// Errors in this method are considered non-fatal errors, but this method
/// will return the most recently-seen error by any method in the write
/// process.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, to clarify end-to-end behavior for the PUT operation from Nexus:

  • If we fail to write to either of the M.2s, Nexus will see an error
  • If we successfully write to one of the M.2s, we'll still see an error, but it'll finish the write to one of them?

From Nexus's perspective, it seems like we can't really distinguish between these cases through the PUT API. Do you think this matters?

My concerns is mostly "do we keep operating successfully, even in a scenario where we have reduced M.2 capacity".

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW one possible solution here would be to propagate a result back through the API, that indicates:

  • "We wrote to two M.2s successfully"
  • "We wrote to one M.2 successfully, and the other failed (here's the error)"
  • "Neither write completed successfully, here are the errors (or the most recent error)"

Then this decision is punted up to Nexus, and Nexus could decide "at least one write count as success, but I'll log the error and keep moving".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nexus could call the list API to see if there was a partial success, I suppose; or maybe it's worth instead returning something like {"datasets": 2, "successful_writes": 1} as a non-4xx error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the actual answer to this question depends on the exact design of how Nexus is going to replicate artifacts across sleds. If Nexus is able to try another sled, maybe this current design is fine. But if it's a saga where the sleds are picked out in advance and there's no retry flow (this seems like a poor design) then it would be better to return OK here; at least there's a copy on this one sled.

Copy link
Collaborator

@smklein smklein Oct 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a hypothetical world where we have all sleds operating with one M.2 - shouldn't this be able to succeed? We are operating at reduced redundancy, but we do have a copy that got successfully written.

(Agreed that we could make all PUT calls query the list API afterwards? But if that's our inclination, this also seems like it should be part of the error result)

basically, I think it's critical for Nexus to be able to distinguish between the cases of:

  • We successfully wrote to at least one M.2, and
  • Every other possible outcome

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll implement the {"datasets": 2, "successful_writes": 1} 200 OK response so that Nexus can make a decision with that information. I don't think it matters to return the error since sled agent is logging all I/O errors it runs into.

async fn finalize(mut self) -> Result<(), Error> {
let digest = self.hasher.finalize();
if digest.as_slice() != self.sha256.as_ref() {
return Err(Error::HashMismatch {
Expand All @@ -532,41 +555,29 @@ impl ArtifactWriter {
});
}

let mut last_error = None;
let mut any_success = false;
for (mut file, mountpoint) in self.files.into_iter().flatten() {
// 1. Open the mountpoint and its temp dir so we can fsync them at
// the end.
// 1. fsync the temporary file.
if let Err(err) = file.as_file_mut().sync_all().await {
let path = file.path().to_owned();
log_and_store!(self.last_error, &self.log, "sync", path, err);
continue;
}
// 2. Open the parent directory so we can fsync it.
let parent_dir = match File::open(&mountpoint).await {
Ok(dir) => dir,
Err(err) => {
log_and_store!(
last_error, &self.log, "open", mountpoint, err
);
continue;
}
};
let temp_dir_path = mountpoint.join(TEMP_SUBDIR);
let temp_dir = match File::open(&temp_dir_path).await {
Ok(dir) => dir,
Err(err) => {
log_and_store!(
last_error,
self.last_error,
&self.log,
"open",
temp_dir_path,
mountpoint,
err
);
continue;
}
};
// 2. fsync the file.
if let Err(err) = file.as_file_mut().sync_all().await {
let path = file.path().to_owned();
log_and_store!(last_error, &self.log, "sync", path, err);
continue;
}
// 3. Rename temporary file.
// 3. Rename the temporary file.
let final_path = mountpoint.join(self.sha256.to_string());
let moved_final_path = final_path.clone();
if let Err(err) = tokio::task::spawn_blocking(move || {
Expand All @@ -581,25 +592,20 @@ impl ArtifactWriter {
"from" => err.file.path().as_str(),
"to" => final_path.as_str(),
);
last_error = Some(Error::FileRename {
self.last_error = Some(Error::FileRename {
from: err.file.path().to_owned(),
to: final_path,
err: err.error,
});
continue;
}
// 4. fsync the parent directory for both the final path and its
// previous path.
// 4. fsync the parent directory.
if let Err(err) = parent_dir.sync_all().await {
log_and_store!(last_error, &self.log, "sync", mountpoint, err);
continue;
}
if let Err(err) = temp_dir.sync_all().await {
log_and_store!(
last_error,
self.last_error,
&self.log,
"sync",
temp_dir_path,
mountpoint,
err
);
continue;
Expand All @@ -608,15 +614,17 @@ impl ArtifactWriter {
any_success = true;
}

if any_success {
if let Some(last_error) = self.last_error {
Err(last_error)
} else if any_success {
info!(
&self.log,
"Wrote artifact";
"sha256" => &self.sha256.to_string(),
);
Ok(())
} else {
Err(last_error.unwrap_or(Error::NoUpdateDataset))
Err(Error::NoUpdateDataset)
}
}
}
Expand Down
Loading