diff --git a/Cargo.lock b/Cargo.lock index 6155edb12..544a124b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -691,6 +691,7 @@ dependencies = [ "proptest", "rand 0.8.5", "rand_chacha 0.3.1", + "rayon", "reqwest", "ringbuffer", "schemars", diff --git a/Cargo.toml b/Cargo.toml index 4c38130bf..a90da3fa0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,6 +68,7 @@ opentelemetry = "0.21.0" opentelemetry-jaeger = { version = "0.17.0" } percent-encoding = "2.3" proptest = "1.4.0" +rayon = "1.8.0" rand = { version = "0.8.5", features = ["min_const_gen", "small_rng"] } rand_chacha = "0.3.1" reedline = "0.27.1" diff --git a/upstairs/Cargo.toml b/upstairs/Cargo.toml index 5a40f0f15..3fa6f8a53 100644 --- a/upstairs/Cargo.toml +++ b/upstairs/Cargo.toml @@ -31,6 +31,7 @@ omicron-common.workspace = true oximeter-producer.workspace = true oximeter.workspace = true rand.workspace = true +rayon.workspace = true ringbuffer.workspace = true schemars.workspace = true serde.workspace = true diff --git a/upstairs/src/deferred.rs b/upstairs/src/deferred.rs new file mode 100644 index 000000000..f0d9e2493 --- /dev/null +++ b/upstairs/src/deferred.rs @@ -0,0 +1,186 @@ +// Copyright 2023 Oxide Computer Company +//! Tools to defer encryption work to a separate thread pool +use std::sync::Arc; + +use crate::{ + upstairs::UpstairsConfig, BlockContext, BlockReq, BlockRes, ImpactedBlocks, +}; +use bytes::Bytes; +use crucible_common::{integrity_hash, CrucibleError, RegionDefinition}; +use futures::{ + future::{Either, Ready}, + stream::FuturesOrdered, + StreamExt, +}; +use tokio::sync::oneshot; + +/// Future stored in a [`DeferredQueue`] +/// +/// This is either an immediately-ready `T` or a oneshot channel which returns a +/// `T` when an off-task job finishes. +type DeferredQueueFuture = + Either>, oneshot::Receiver>; + +/// A `DeferredQueue` stores pending work (optionally executed off-task) +pub(crate) struct DeferredQueue { + /// Ordered stream of deferred futures + stream: FuturesOrdered>, + + /// Stores whether it is known that there are no futures in `self.stream` + /// + /// This is tracked separately because `FuturesOrdered::next` will + /// immediately return `None` if the queue is empty; we don't want that when + /// it's one of many options in a `tokio::select!`. + empty: bool, +} + +impl DeferredQueue { + /// Build a new empty `FuturesOrdered` + pub fn new() -> Self { + Self { + stream: FuturesOrdered::new(), + empty: true, + } + } + + /// Stores a new future in the queue, marking it as non-empty + pub fn push_back(&mut self, f: DeferredQueueFuture) { + self.stream.push_back(f); + self.empty = false; + } + + /// Returns the next future from the queue + /// + /// If the future is `None`, then the queue is marked as empty + /// + /// This function is cancel safe: if a result is taken from the internal + /// `FuturesOrdered`, then it guaranteed to be returned. + pub async fn next(&mut self) -> Option { + // Early exit if we know the stream is empty + if self.empty { + return None; + } + + // Cancel-safety: there can't be any yield points after this! + let t = self.stream.next().await; + self.empty |= t.is_none(); + + // The oneshot is managed by a worker thread, which should never be + // dropped, so we don't expect the oneshot + t.map(|t| t.expect("oneshot failed")) + } + + /// Check whether the queue is known to be empty + /// + /// It is possible for this to return `false` if the queue is actually + /// empty; in that case, a subsequent call to `next()` will return `None` + /// and *later* calls to `is_empty()` will return `true`. + pub fn is_empty(&self) -> bool { + self.empty + } +} + +//////////////////////////////////////////////////////////////////////////////// + +/// All of the information needed to encrypt a write operation +/// +/// The `DeferredWrite` is standalone so that it can either be executed locally +/// or in a separate worker thread. +pub(crate) struct DeferredWrite { + pub ddef: RegionDefinition, + pub impacted_blocks: ImpactedBlocks, + pub data: Bytes, + pub res: Option, + pub is_write_unwritten: bool, + pub cfg: Arc, +} + +/// Result of a deferred `BlockReq` +/// +/// In most cases, this is simply the original `BlockReq` (stored in +/// `DeferredBlockReq::Other`). The exception is `BlockReq::Write` and +/// `BlockReq::WriteUnwritten`, which require encryption; in these cases, +/// encryption is done off-thread and the result is a `DeferredBlockReq::Write`. +#[derive(Debug)] +pub(crate) enum DeferredBlockReq { + Write(EncryptedWrite), + Other(BlockReq), +} + +#[derive(Debug)] +pub(crate) struct EncryptedWrite { + pub writes: Vec, + pub impacted_blocks: ImpactedBlocks, + pub res: Option, + pub is_write_unwritten: bool, +} + +impl DeferredWrite { + pub fn run(self) -> Option { + // Build up all of the Write operations, encrypting data here + let mut writes: Vec = + Vec::with_capacity(self.impacted_blocks.len(&self.ddef)); + + let mut cur_offset: usize = 0; + let byte_len: usize = self.ddef.block_size() as usize; + for (eid, offset) in self.impacted_blocks.blocks(&self.ddef) { + let (sub_data, encryption_context, hash) = if let Some(context) = + &self.cfg.encryption_context + { + // Encrypt here + let mut mut_data = self + .data + .slice(cur_offset..(cur_offset + byte_len)) + .to_vec(); + + let (nonce, tag, hash) = + match context.encrypt_in_place(&mut mut_data[..]) { + Err(e) => { + if let Some(res) = self.res { + res.send_err(CrucibleError::EncryptionError( + e.to_string(), + )); + } + return None; + } + + Ok(v) => v, + }; + + ( + Bytes::copy_from_slice(&mut_data), + Some(crucible_protocol::EncryptionContext { + nonce: nonce.into(), + tag: tag.into(), + }), + hash, + ) + } else { + // Unencrypted + let sub_data = + self.data.slice(cur_offset..(cur_offset + byte_len)); + let hash = integrity_hash(&[&sub_data[..]]); + + (sub_data, None, hash) + }; + + writes.push(crucible_protocol::Write { + eid, + offset, + data: sub_data, + block_context: BlockContext { + hash, + encryption_context, + }, + }); + + cur_offset += byte_len; + } + Some(EncryptedWrite { + writes, + impacted_blocks: self.impacted_blocks, + res: self.res, + is_write_unwritten: self.is_write_unwritten, + }) + } +} diff --git a/upstairs/src/lib.rs b/upstairs/src/lib.rs index d6fd1d680..f884d0531 100644 --- a/upstairs/src/lib.rs +++ b/upstairs/src/lib.rs @@ -59,6 +59,7 @@ mod stats; mod impacted_blocks; pub use impacted_blocks::*; +mod deferred; mod live_repair; #[cfg(test)] diff --git a/upstairs/src/upstairs.rs b/upstairs/src/upstairs.rs index 0629da139..778d8e4ec 100644 --- a/upstairs/src/upstairs.rs +++ b/upstairs/src/upstairs.rs @@ -5,11 +5,14 @@ use crate::{ client::{ClientAction, ClientRunResult, ClientStopReason}, control::ControlRequest, deadline_secs, + deferred::{ + DeferredBlockReq, DeferredQueue, DeferredWrite, EncryptedWrite, + }, downstairs::{Downstairs, DownstairsAction}, - extent_from_offset, integrity_hash, + extent_from_offset, stats::UpStatOuter, - Block, BlockContext, BlockOp, BlockReq, BlockRes, Buffer, Bytes, ClientId, - ClientMap, CrucibleOpts, DsState, EncryptionContext, GtoS, Guest, Message, + Block, BlockOp, BlockReq, BlockRes, Buffer, Bytes, ClientId, ClientMap, + CrucibleOpts, DsState, EncryptionContext, GtoS, Guest, Message, RegionDefinition, RegionDefinitionStatus, SnapshotDetails, WQCounts, }; use crucible_common::CrucibleError; @@ -22,10 +25,11 @@ use std::{ }, }; +use futures::future::{pending, ready, Either}; use ringbuffer::RingBuffer; use slog::{debug, error, info, o, warn, Logger}; use tokio::{ - sync::mpsc, + sync::{mpsc, oneshot}, time::{sleep_until, Instant}, }; use uuid::Uuid; @@ -189,6 +193,9 @@ pub(crate) struct Upstairs { /// /// This is public so that others can clone it to get a controller handle pub(crate) control_tx: mpsc::Sender, + + /// Stream of post-processed `BlockOp` futures + deferred_reqs: DeferredQueue>, } impl Drop for Upstairs { @@ -211,11 +218,18 @@ impl Drop for Upstairs { pub(crate) enum UpstairsAction { Downstairs(DownstairsAction), Guest(BlockReq), + + /// A deferred block request has completed + DeferredBlockReq(DeferredBlockReq), + LeakCheck, FlushCheck, StatUpdate, RepairCheck, Control(ControlRequest), + + /// We received an event of some kind, but it requires no follow-up work + NoOp, } #[derive(Debug)] @@ -324,6 +338,7 @@ impl Upstairs { ); let flush_timeout_secs = opt.flush_timeout.unwrap_or(0.5); let (control_tx, control_rx) = tokio::sync::mpsc::channel(500); + Upstairs { state: UpstairsState::Initializing, cfg, @@ -340,6 +355,7 @@ impl Upstairs { downstairs, control_rx, control_tx, + deferred_reqs: DeferredQueue::new(), } } @@ -374,7 +390,6 @@ impl Upstairs { /// Select an event from possible actions async fn select(&mut self) -> UpstairsAction { - use futures::future::{pending, Either}; tokio::select! { d = self.downstairs.select() => { UpstairsAction::Downstairs(d) @@ -388,6 +403,29 @@ impl Upstairs { => { UpstairsAction::RepairCheck } + d = self.deferred_reqs.next(), if !self.deferred_reqs.is_empty() + => { + match d { + // Normal operation: the deferred task gave us back a + // DeferredBlockReq, which we need to handle. + Some(Some(d)) => UpstairsAction::DeferredBlockReq(d), + + // The innermost Option is None if the deferred task handled + // the request on its own (and replied to the `BlockReq` + // already). This happens if encryption fails, which would + // be odd, but possible? + Some(None) => UpstairsAction::NoOp, + + // The outer Option is None if the FuturesOrdered is empty + None => { + // Calling `deferred_reqs.next()` on an empty queue must + // set the flag marking the deferred futures list as + // empty; assert that here as a sanity check. + assert!(self.deferred_reqs.is_empty()); + UpstairsAction::NoOp + } + } + } _ = sleep_until(self.leak_deadline) => { UpstairsAction::LeakCheck } @@ -412,8 +450,10 @@ impl Upstairs { self.apply_downstairs_action(d).await } UpstairsAction::Guest(b) => { - self.apply_guest_request(b).await; - self.gone_too_long(); + self.defer_guest_request(b).await; + } + UpstairsAction::DeferredBlockReq(req) => { + self.apply_guest_request(req).await; } UpstairsAction::LeakCheck => { const LEAK_MS: usize = 1000; @@ -448,8 +488,13 @@ impl Upstairs { UpstairsAction::Control(c) => { self.on_control_req(c).await; } + UpstairsAction::NoOp => (), } + // Check whether we need to mark a Downstairs as faulted because too + // many jobs have piled up. + self.gone_too_long(); + // Check to see whether live-repair can continue // // This must be called before acking jobs, because it looks in @@ -515,6 +560,21 @@ impl Upstairs { self.set_backpressure(); } + /// Helper function to await all deferred block requests + /// + /// This is only useful in tests because it **only** processes deferred + /// block requests (doing no other Upstairs work). In production, there + /// could be other events that need handling simultaneously, so we do not + /// want to stall the Upstairs. + #[cfg(test)] + async fn await_deferred_reqs(&mut self) { + while let Some(req) = self.deferred_reqs.next().await { + let req = req.unwrap(); // the deferred request should not fail + self.apply(UpstairsAction::DeferredBlockReq(req)).await; + } + assert!(self.deferred_reqs.is_empty()); + } + /// Check outstanding IOops for each downstairs. /// /// If the number is too high, then mark that downstairs as failed, scrub @@ -736,7 +796,34 @@ impl Upstairs { matches!(self.state, UpstairsState::Active) } - /// Apply a guest request + /// When a `BlockReq` arrives, defer it as a future + async fn defer_guest_request(&mut self, req: BlockReq) { + match req.op { + // All Write operations are deferred, because they will offload + // encryption to a separate thread pool. + BlockOp::Write { offset, data } => { + self.submit_deferred_write(offset, data, req.res, false); + } + BlockOp::WriteUnwritten { offset, data } => { + self.submit_deferred_write(offset, data, req.res, true); + } + // If we have any deferred requests in the FuturesOrdered, then we + // have to keep using it for subsequent requests (even ones that are + // not writes) to preserve FIFO ordering + _ if !self.deferred_reqs.is_empty() => { + self.deferred_reqs.push_back(Either::Left(ready(Ok(Some( + DeferredBlockReq::Other(req), + ))))); + } + // Otherwise, we can apply a non-write operation immediately, saving + // a trip through the FuturesUnordered + _ => { + self.apply_guest_request_inner(req).await; + } + } + } + + /// Apply a deferred guest request /// /// For IO operations, we build the downstairs work and if required split /// the single IO into multiple IOs to the downstairs. The IO operations are @@ -746,7 +833,22 @@ impl Upstairs { /// This function can be called before the upstairs is active, so any /// operation that requires the upstairs to be active should check that /// and report an error. - async fn apply_guest_request(&mut self, req: BlockReq) { + async fn apply_guest_request(&mut self, req: DeferredBlockReq) { + match req { + DeferredBlockReq::Write(req) => self.submit_write(req).await, + DeferredBlockReq::Other(req) => { + self.apply_guest_request_inner(req).await + } + } + } + + /// Does the actual work for a (non-write) guest request + /// + /// # Panics + /// This function assumes that `BlockReq::Write` and + /// `BlockReq::WriteUnwritten` are always deferred and handled separately; + /// it will panic if `req` matches either of them. + async fn apply_guest_request_inner(&mut self, req: BlockReq) { // If any of the submit_* functions fail to send to the downstairs, they // return an error. These are reported to the Guest. let BlockReq { op, res } = req; @@ -854,11 +956,8 @@ impl Upstairs { BlockOp::Read { offset, data } => { self.submit_read(offset, data, res).await } - BlockOp::Write { offset, data } => { - self.submit_write(offset, data, res, false).await - } - BlockOp::WriteUnwritten { offset, data } => { - self.submit_write(offset, data, res, true).await + BlockOp::Write { .. } | BlockOp::WriteUnwritten { .. } => { + panic!("writes must always be deferred") } BlockOp::Flush { snapshot_details } => { /* @@ -1137,18 +1236,28 @@ impl Upstairs { } /// Submits a new write job to the upstairs - async fn submit_write( + /// + /// This function **defers** the write job submission, because writes + /// require encrypting data (which is expensive) and we'd like to return as + /// quickly as possible. + fn submit_deferred_write( &mut self, offset: Block, data: Bytes, res: BlockRes, is_write_unwritten: bool, ) { - self.submit_write_inner(offset, data, Some(res), is_write_unwritten) - .await + self.submit_deferred_write_inner( + offset, + data, + Some(res), + is_write_unwritten, + ) } /// Submits a dummy write (without an associated `BlockReq`) + /// + /// This **does not** go through the deferred-write pipeline #[cfg(test)] pub(crate) async fn submit_dummy_write( &mut self, @@ -1156,21 +1265,48 @@ impl Upstairs { data: Bytes, is_write_unwritten: bool, ) { - self.submit_write_inner(offset, data, None, is_write_unwritten) - .await + if let Some(w) = self + .compute_deferred_write(offset, data, None, is_write_unwritten) + .and_then(DeferredWrite::run) + { + self.apply_guest_request(DeferredBlockReq::Write(w)).await + } } /// Submits a new write job to the upstairs, optionally without a `BlockReq` /// /// # Panics /// If `req` is `None` and this isn't running in the test suite - async fn submit_write_inner( + fn submit_deferred_write_inner( &mut self, offset: Block, data: Bytes, res: Option, is_write_unwritten: bool, ) { + // It's possible for the write to be invalid out of the gate, in which + // case `compute_deferred_write` replies to the `req` itself and returns + // `None`. Otherwise, we have to store a future to process the write + // result. + if let Some(w) = + self.compute_deferred_write(offset, data, res, is_write_unwritten) + { + let (tx, rx) = oneshot::channel(); + rayon::spawn(move || { + let out = w.run().map(DeferredBlockReq::Write); + let _ = tx.send(out); + }); + self.deferred_reqs.push_back(Either::Right(rx)); + } + } + + fn compute_deferred_write( + &mut self, + offset: Block, + data: Bytes, + res: Option, + is_write_unwritten: bool, + ) -> Option { #[cfg(not(test))] assert!(res.is_some()); @@ -1178,13 +1314,13 @@ impl Upstairs { if let Some(res) = res { res.send_err(CrucibleError::UpstairsInactive); } - return; + return None; } if self.cfg.read_only { if let Some(res) = res { res.send_err(CrucibleError::ModifyingReadOnlyRegion); } - return; + return None; } /* @@ -1195,7 +1331,7 @@ impl Upstairs { if let Some(res) = res { res.send_err(e); } - return; + return None; } /* @@ -1209,64 +1345,17 @@ impl Upstairs { Block::from_bytes(data.len(), &ddef), ); - // Build up all of the Write operations, encrypting data here - let mut writes: Vec = - Vec::with_capacity(impacted_blocks.len(&ddef)); - - let mut cur_offset: usize = 0; - for (eid, offset) in impacted_blocks.blocks(&ddef) { - let byte_len: usize = ddef.block_size() as usize; - - let (sub_data, encryption_context, hash) = if let Some(context) = - &self.cfg.encryption_context - { - // Encrypt here - let mut mut_data = - data.slice(cur_offset..(cur_offset + byte_len)).to_vec(); - - let (nonce, tag, hash) = - match context.encrypt_in_place(&mut mut_data[..]) { - Err(e) => { - if let Some(res) = res { - res.send_err(CrucibleError::EncryptionError( - e.to_string(), - )); - } - return; - } - - Ok(v) => v, - }; - - ( - Bytes::copy_from_slice(&mut_data), - Some(crucible_protocol::EncryptionContext { - nonce: nonce.into(), - tag: tag.into(), - }), - hash, - ) - } else { - // Unencrypted - let sub_data = data.slice(cur_offset..(cur_offset + byte_len)); - let hash = integrity_hash(&[&sub_data[..]]); - - (sub_data, None, hash) - }; - - writes.push(crucible_protocol::Write { - eid, - offset, - data: sub_data, - block_context: BlockContext { - hash, - encryption_context, - }, - }); - - cur_offset += byte_len; - } + Some(DeferredWrite { + ddef, + impacted_blocks, + data, + res, + is_write_unwritten, + cfg: self.cfg.clone(), + }) + } + async fn submit_write(&mut self, write: EncryptedWrite) { /* * Get the next ID for the guest work struct we will make at the * end. This ID is also put into the IO struct we create that @@ -1280,7 +1369,7 @@ impl Upstairs { * want to create a gap in the IDs. */ let gw_id = gw.next_gw_id(); - if is_write_unwritten { + if write.is_write_unwritten { cdt::gw__write__unwritten__start!(|| (gw_id.0)); } else { cdt::gw__write__start!(|| (gw_id.0)); @@ -1288,16 +1377,16 @@ impl Upstairs { let next_id = self.downstairs.submit_write( gw_id, - impacted_blocks, - writes, - is_write_unwritten, + write.impacted_blocks, + write.writes, + write.is_write_unwritten, ); // New work created, add to the guest_work HM - let new_gtos = GtoS::new(next_id, None, res); + let new_gtos = GtoS::new(next_id, None, write.res); gw.active.insert(gw_id, new_gtos); - if is_write_unwritten { + if write.is_write_unwritten { cdt::up__to__ds__write__unwritten__start!(|| (gw_id.0)); } else { cdt::up__to__ds__write__start!(|| (gw_id.0)); @@ -1826,9 +1915,10 @@ pub(crate) mod test { use crate::{ downstairs::test::set_all_active, test::{make_encrypted_upstairs, make_upstairs}, - BlockReq, BlockReqWaiter, DsState, JobId, + BlockContext, BlockReq, BlockReqWaiter, DsState, JobId, }; use bytes::BytesMut; + use crucible_common::integrity_hash; use crucible_protocol::ReadResponse; use futures::FutureExt; @@ -3387,6 +3477,7 @@ pub(crate) mod test { let (_write_brw, write_res) = BlockReqWaiter::pair(); up.apply(UpstairsAction::Guest(BlockReq { op, res: write_res })) .await; + up.await_deferred_reqs().await; let id1 = JobId(1000); // We know that job IDs start at 1000 // Create and enqueue the flush by setting deactivate