Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Defer hash checking and rehashing off-thread #1190

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ edition = "2021"
[dependencies]
anyhow.workspace = true
atty.workspace = true
futures.workspace = true
nix.workspace = true
rusqlite.workspace = true
rustls-pemfile.workspace = true
Expand All @@ -21,6 +22,7 @@ slog-dtrace.workspace = true
slog-term.workspace = true
tempfile.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio-rustls.workspace = true
toml.workspace = true
twox-hash.workspace = true
Expand Down
90 changes: 90 additions & 0 deletions common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,16 @@ use std::path::Path;
use ErrorKind::NotFound;

use anyhow::{anyhow, bail, Context, Result};
use futures::{
future::{ready, Either, Ready},
stream::FuturesOrdered,
StreamExt,
};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use slog::Drain;
use tempfile::NamedTempFile;
use tokio::sync::oneshot;

mod region;
pub use region::{
Expand Down Expand Up @@ -413,3 +419,87 @@ impl From<CrucibleError> for dropshot::HttpError {
}
}
}

/// Future stored in a [`DeferredQueue`]
///
/// This is either an immediately-ready `T` or a oneshot channel which returns a
/// `T` when an off-task job finishes.
type DeferredQueueFuture<T> =
Either<Ready<Result<T, oneshot::error::RecvError>>, oneshot::Receiver<T>>;

/// A `DeferredQueue` stores pending work (optionally executed off-task)
pub struct DeferredQueue<T> {
/// Ordered stream of deferred futures
stream: FuturesOrdered<DeferredQueueFuture<T>>,

/// Stores whether it is known that there are no futures in `self.stream`
///
/// This is tracked separately because `FuturesOrdered::next` will
/// immediately return `None` if the queue is empty; we don't want that when
/// it's one of many options in a `tokio::select!`.
empty: bool,
}

impl<T> Default for DeferredQueue<T> {
fn default() -> Self {
Self::new()
}
}

impl<T> DeferredQueue<T> {
/// Build a new empty `FuturesOrdered`
pub fn new() -> Self {
Self {
stream: FuturesOrdered::new(),
empty: true,
}
}

/// Stores a new future in the queue, marking it as non-empty
fn push_back(&mut self, f: DeferredQueueFuture<T>) {
self.stream.push_back(f);
self.empty = false;
}

/// Returns the next future from the queue
///
/// If the future is `None`, then the queue is marked as empty
///
/// This function is cancel safe: if a result is taken from the internal
/// `FuturesOrdered`, then it is guaranteed to be returned.
pub async fn next(&mut self) -> Option<T> {
// Early exit if we know the stream is empty
if self.empty {
return None;
}

// Cancel-safety: there can't be any yield points after this!
let t = self.stream.next().await;
self.empty |= t.is_none();

// The oneshot is managed by a worker thread, which should never be
// dropped, so we don't expect the oneshot to fail
t.map(|t| t.expect("oneshot failed"))
}

/// Stores a new future in the queue, marking it as non-empty
pub fn push_immediate(&mut self, t: T) {
self.push_back(Either::Left(ready(Ok(t))));
}

/// Stores a new pending oneshot in the queue, returning the sender
pub fn push_oneshot(&mut self) -> oneshot::Sender<T> {
let (rx, tx) = oneshot::channel();
self.push_back(Either::Right(tx));
rx
}

/// Check whether the queue is known to be empty
///
/// It is possible for this to return `false` if the queue is actually
/// empty; in that case, a subsequent call to `next()` will return `None`
/// and *later* calls to `is_empty()` will return `true`.
pub fn is_empty(&self) -> bool {
self.empty
}
}
95 changes: 95 additions & 0 deletions downstairs/src/deferred.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2023 Oxide Computer Company
use crate::extent::DownstairsBlockContext;
use crucible_common::{integrity_hash, CrucibleError};
use crucible_protocol::Message;

/// Result of a deferred `Message`
///
/// In most cases, this is simply the original `Message` (stored in
/// `DeferredMessage::Other`).
pub(crate) enum DeferredMessage {
Write(Message, PrecomputedWrite),
Other(Message),
}

/// Data needed to perform a write, which can be computed off-thread
#[derive(Clone, Debug, PartialEq)]
pub(crate) struct PrecomputedWrite {
/// Checks whether incoming hashes are valid
pub validate_hashes_result: Result<(), CrucibleError>,
pub block_contexts: Vec<DownstairsBlockContext>,
}

impl PrecomputedWrite {
#[cfg(test)]
pub(crate) fn empty() -> Self {
PrecomputedWrite {
validate_hashes_result: Ok(()),
block_contexts: vec![],
}
}

/// Precomputes relevant data from a set of writes
pub(crate) fn from_writes(writes: &[crucible_protocol::Write]) -> Self {
let validate_hashes_result = Self::validate_hashes(writes);
let block_contexts = Self::compute_block_contexts(writes);
PrecomputedWrite {
validate_hashes_result,
block_contexts,
}
}

fn compute_block_contexts(
writes: &[crucible_protocol::Write],
) -> Vec<DownstairsBlockContext> {
writes
.iter()
.map(|write| {
// TODO it would be nice if we could profile what % of time we're
// spending on hashes locally vs writing to disk
let on_disk_hash = integrity_hash(&[&write.data[..]]);

DownstairsBlockContext {
block_context: write.block_context,
block: write.offset.value,
on_disk_hash,
}
})
.collect()
}

fn validate_hashes(
writes: &[crucible_protocol::Write],
) -> Result<(), CrucibleError> {
for write in writes {
let computed_hash = if let Some(encryption_context) =
&write.block_context.encryption_context
{
integrity_hash(&[
&encryption_context.nonce[..],
&encryption_context.tag[..],
&write.data[..],
])
} else {
integrity_hash(&[&write.data[..]])
};

if computed_hash != write.block_context.hash {
// TODO: print out the extent and block where this failed!!
return Err(CrucibleError::HashMismatch);
}
}

Ok(())
}
}

impl DeferredMessage {
/// Returns a reference to the original message
pub(crate) fn into_parts(self) -> (Message, Option<PrecomputedWrite>) {
match self {
DeferredMessage::Write(msg, pre) => (msg, Some(pre)),
DeferredMessage::Other(msg) => (msg, None),
}
}
}
5 changes: 4 additions & 1 deletion downstairs/src/dynamometer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,10 @@ pub async fn dynamometer(
.collect();

let io_operation_time = Instant::now();
region.region_write(&writes, JobId(1000), false).await?;
let pre = PrecomputedWrite::from_writes(&writes);
region
.region_write_pre(&writes, &pre, JobId(1000), false)
.await?;

total_io_time += io_operation_time.elapsed();
io_operations_sent += num_writes;
Expand Down
31 changes: 28 additions & 3 deletions downstairs/src/extent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,29 @@ pub(crate) trait ExtentInner: Send + Sync + Debug {
&mut self,
job_id: JobId,
writes: &[crucible_protocol::Write],
ctxs: &[DownstairsBlockContext],
only_write_unwritten: bool,
iov_max: usize,
) -> Result<(), CrucibleError>;

#[cfg(test)]
fn write_without_precomputed_contexts(
&mut self,
job_id: JobId,
writes: &[crucible_protocol::Write],
only_write_unwritten: bool,
iov_max: usize,
) -> Result<(), CrucibleError> {
let pre = PrecomputedWrite::from_writes(writes);
self.write(
job_id,
writes,
&pre.block_contexts,
only_write_unwritten,
iov_max,
)
}

#[cfg(test)]
fn get_block_contexts(
&mut self,
Expand All @@ -75,7 +94,7 @@ pub(crate) trait ExtentInner: Send + Sync + Debug {
}

/// BlockContext, with the addition of block index and on_disk_hash
#[derive(Copy, Clone)]
#[derive(Debug, Copy, Clone, PartialEq)]
pub struct DownstairsBlockContext {
pub block_context: BlockContext,

Expand Down Expand Up @@ -536,6 +555,7 @@ impl Extent {
&mut self,
job_id: JobId,
writes: &[crucible_protocol::Write],
ctxs: &[DownstairsBlockContext],
only_write_unwritten: bool,
) -> Result<(), CrucibleError> {
if self.read_only {
Expand All @@ -546,8 +566,13 @@ impl Extent {
(job_id.0, self.number, writes.len() as u64)
});

self.inner
.write(job_id, writes, only_write_unwritten, self.iov_max)?;
self.inner.write(
job_id,
writes,
ctxs,
only_write_unwritten,
self.iov_max,
)?;

cdt::extent__write__done!(|| {
(job_id.0, self.number, writes.len() as u64)
Expand Down
Loading