From 7415c0d23797a19dd53127e1e851558e177a66c9 Mon Sep 17 00:00:00 2001 From: Matt Keeter Date: Thu, 4 Jan 2024 13:39:40 -0500 Subject: [PATCH] Simplify handling of BlockReq at program exit (#1085) https://github.com/oxidecomputer/crucible/pull/1064 added new logic to the `BlockReq`: if the notification channel was dropped without a reply having been sent, Crucible would panic. After a bunch of discussion, I no longer think this extra complexity is worth it, and would like to remove it. ## Why did we do this in the first place? In general, we expect a `BlockReq` to _always_ be answered by Crucible. If one is dropped without a reply being sent, it indicates a programmer error. This behavior was inspired by similar logic in Propolis (https://github.com/oxidecomputer/propolis/blob/85dc6db36709981c1d35b0651a18e20caee0b73f/lib/propolis/src/block/mod.rs#L176-L182). ## Propolis and Crucible are different The `Request` in Propolis is built around a `device::TrackingMarker`. If it is dropped without a reply, then the listening side will **wait forever**; this is a potential liveness issue, so turning it into a panic is sensible. The `BlockReq` in Crucible is built around a `tokio::sync::oneshot` channel. If it is dropped without a reply, then the `oneshot::Receiver` will return immediately with an error. In fact, we handle that case in our `BlockReqWaiter`, translating this error into `CrucibleError::RecvDisconnected` (https://github.com/oxidecomputer/crucible/blob/e813da3f/upstairs/src/block_req.rs#L67). In other words, we're _already_ handling the case of a `BlockReq` being dropped without a reply (by reporting an error). There is no potential liveness issue. (This also means that the block comment in `BlockRes::drop`, https://github.com/oxidecomputer/crucible/blob/e813da3f/upstairs/src/block_req.rs#L39-L40 is incorrect) It's arguably better to panic upon programmer error, versus going through normal error-handling paths (which defer handling to the caller of the `Guest` API). However... ## False positives during program exit There is one time when a `BlockReq` can be dropped without a reply: during program exit (or scope exit, for unit tests). As far as I can tell, this panic **hasn't caught any true programmer errors**. Instead, it's found a bunch of cases where `BlockReq` are dropped during exit, and we've had to add `Drop` implementations to prevent it from firing. The original PR added `impl Drop for Guest` (https://github.com/oxidecomputer/crucible/blob/e813da3f/upstairs/src/lib.rs#L2424-L2446), but it missed a sneaky case where `BlockReq` were stored in the `UpstairsState`; a later PR added `impl Drop for Upstairs` (https://github.com/oxidecomputer/crucible/blob/e813da3f/upstairs/src/upstairs.rs#L194-L207) to avoid panics during certain unit tests. ## Future false positives This check also makes it easy to accidentally create false positives when writing new code. The `offload-write-encryption` branch (https://github.com/oxidecomputer/crucible/pull/1066) moves some `BlockReq`s into a thread pool to perform encryption outside of the main task. However, because we didn't add special handling for program exit, this caused a panic at shutdown. ## This is hard to do by accident Remember what we're actually trying to protect against: someone dropping a `BlockReq` without replying. Because we've annotated relevant functions with `#[must_use]`, this would require either ignoring warnings or explicitly discarding the `BlockReq`. It's hard to do by accident! ## Summary - This `panic!` guards against something that's hard to do by accident - If we **do** make this mistake, it's reported as a distinct error to the caller - Unlike Propolis, there's no liveness concern - So far, 100% of issues it discovered are false positives at program exit - It is easy to introduce similar false positives when writing new code - New false positives will manifest as panics during program exit - If we introduce new false positives, we will not discover the issue until runtime (and it may be rare, because Tokio does not destroys tasks in a deterministic order) ## Proposal - Remove the `panic!` - Remove the `Drop` implementations which work around it at exit - Add logging for this case: - Fire a DTrace probe if a `BlockReq` is dropped without a reply - Log when we see a `CrucibleError::RecvError` --- upstairs/src/block_req.rs | 25 +++++++++---- upstairs/src/lib.rs | 79 +++++++++++++++------------------------ upstairs/src/upstairs.rs | 23 ++---------- 3 files changed, 53 insertions(+), 74 deletions(-) diff --git a/upstairs/src/block_req.rs b/upstairs/src/block_req.rs index 0a1f6e2f7..67f6245b7 100644 --- a/upstairs/src/block_req.rs +++ b/upstairs/src/block_req.rs @@ -36,9 +36,11 @@ impl BlockRes { } impl Drop for BlockRes { fn drop(&mut self) { - // Dropping a BlockRes without issuing a completion would mean the - // associated waiter would be stuck waiting forever for a result. - assert!(self.0.is_none(), "result should be sent for BlockRes"); + if self.0.is_some() { + // During normal operation, we expect to reply to every BlockReq, so + // we'll fire a DTrace probe here. + cdt::up__block__req__dropped!(); + } } } @@ -61,10 +63,19 @@ impl BlockReqWaiter { } /// Consume this BlockReqWaiter and wait on the message - pub async fn wait(self) -> Result<(), CrucibleError> { + /// + /// If the other side of the oneshot drops without a reply, log an error + pub async fn wait(self, log: &Logger) -> Result<(), CrucibleError> { match self.recv.await { Ok(v) => v, - Err(_) => crucible_bail!(RecvDisconnected), + Err(_) => { + warn!( + log, + "BlockReqWaiter disconnected; \ + this should only happen at exit" + ); + Err(CrucibleError::RecvDisconnected) + } } } @@ -92,7 +103,7 @@ mod test { res.send_ok(); - brw.wait().await.unwrap(); + brw.wait(&crucible_common::build_logger()).await.unwrap(); } #[tokio::test] @@ -101,6 +112,6 @@ mod test { res.send_err(CrucibleError::UpstairsInactive); - assert!(brw.wait().await.is_err()); + assert!(brw.wait(&crucible_common::build_logger()).await.is_err()); } } diff --git a/upstairs/src/lib.rs b/upstairs/src/lib.rs index 599ac6b54..f607f956f 100644 --- a/upstairs/src/lib.rs +++ b/upstairs/src/lib.rs @@ -321,6 +321,7 @@ mod cdt { fn up__to__ds__write__start(_: u64) {} fn up__to__ds__write__unwritten__start(_: u64) {} fn up__to__ds__flush__start(_: u64) {} + fn up__block__req__dropped() {} fn ds__read__io__start(_: u64, _: u8) {} fn ds__write__io__start(_: u64, _: u8) {} fn ds__write__unwritten__io__start(_: u64, _: u8) {} @@ -2211,6 +2212,11 @@ impl Guest { brw } + async fn send_and_wait(&self, op: BlockOp) -> Result<(), CrucibleError> { + let brw = self.send(op).await; + brw.wait(&self.log).await + } + /* * A crucible task will listen for new work using this. */ @@ -2377,7 +2383,7 @@ impl Guest { pub async fn query_extent_size(&self) -> Result { let data = Arc::new(Mutex::new(Block::new(0, 9))); let extent_query = BlockOp::QueryExtentSize { data: data.clone() }; - self.send(extent_query).await.wait().await?; + self.send_and_wait(extent_query).await?; let result = *data.lock().await; Ok(result) @@ -2392,7 +2398,7 @@ impl Guest { let data = Arc::new(Mutex::new(wc)); let qwq = BlockOp::QueryWorkQueue { data: data.clone() }; - self.send(qwq).await.wait().await.unwrap(); + self.send_and_wait(qwq).await?; let wc = data.lock().await; Ok(*wc) @@ -2404,9 +2410,15 @@ impl Guest { gen: u64, ) -> Result<(), CrucibleError> { let waiter = self.send(BlockOp::GoActiveWithGen { gen }).await; - println!("The guest has requested activation with gen:{}", gen); - waiter.wait().await?; - println!("The guest has finished waiting for activation with:{}", gen); + info!( + self.log, + "The guest has requested activation with gen:{}", gen + ); + waiter.wait(&self.log).await?; + info!( + self.log, + "The guest has finished waiting for activation with:{}", gen + ); Ok(()) } @@ -2421,51 +2433,25 @@ impl Guest { } } -impl Drop for Guest { - fn drop(&mut self) { - // Any BlockReqs or GuestWork which remains pending on this Guest when - // it is dropped should be issued an error completion. This avoids - // dropping BlockRes instances prior to completion (which results in a - // panic). - - self.reqs.get_mut().drain(..).for_each(|req| { - req.res.send_err(CrucibleError::GenericError( - "Guest purging remaining BlockReqs during drop()".to_string(), - )); - }); - self.guest_work - .get_mut() - .active - .drain() - .for_each(|(_id, gtos)| { - gtos.notify(Err(CrucibleError::GenericError( - "Guest purging remaining GtoSs during drop()".to_string(), - ))); - }); - } -} - #[async_trait] impl BlockIO for Guest { async fn activate(&self) -> Result<(), CrucibleError> { let waiter = self.send(BlockOp::GoActive).await; - println!("The guest has requested activation"); - waiter.wait().await?; - println!("The guest has finished waiting for activation"); + info!(self.log, "The guest has requested activation"); + waiter.wait(&self.log).await?; + info!(self.log, "The guest has finished waiting for activation"); Ok(()) } /// Disable any more IO from this guest and deactivate the downstairs. async fn deactivate(&self) -> Result<(), CrucibleError> { - let waiter = self.send(BlockOp::Deactivate).await; - waiter.wait().await?; - Ok(()) + self.send_and_wait(BlockOp::Deactivate).await } async fn query_is_active(&self) -> Result { let data = Arc::new(Mutex::new(false)); let active_query = BlockOp::QueryGuestIOReady { data: data.clone() }; - self.send(active_query).await.wait().await?; + self.send_and_wait(active_query).await?; let result = *data.lock().await; Ok(result) @@ -2474,7 +2460,7 @@ impl BlockIO for Guest { async fn total_size(&self) -> Result { let data = Arc::new(Mutex::new(0)); let size_query = BlockOp::QueryTotalSize { data: data.clone() }; - self.send(size_query).await.wait().await?; + self.send_and_wait(size_query).await?; let result = *data.lock().await; Ok(result) @@ -2485,7 +2471,7 @@ impl BlockIO for Guest { if bs == 0 { let data = Arc::new(Mutex::new(0)); let size_query = BlockOp::QueryBlockSize { data: data.clone() }; - self.send(size_query).await.wait().await?; + self.send_and_wait(size_query).await?; let result = *data.lock().await; self.block_size @@ -2499,7 +2485,7 @@ impl BlockIO for Guest { async fn get_uuid(&self) -> Result { let data = Arc::new(Mutex::new(Uuid::default())); let uuid_query = BlockOp::QueryUpstairsUuid { data: data.clone() }; - self.send(uuid_query).await.wait().await?; + self.send_and_wait(uuid_query).await?; let result = *data.lock().await; Ok(result) @@ -2524,7 +2510,7 @@ impl BlockIO for Guest { return Ok(()); } let rio = BlockOp::Read { offset, data }; - Ok(self.send(rio).await.wait().await?) + self.send_and_wait(rio).await } async fn write( @@ -2548,7 +2534,7 @@ impl BlockIO for Guest { let wio = BlockOp::Write { offset, data }; self.backpressure_sleep().await; - Ok(self.send(wio).await.wait().await?) + self.send_and_wait(wio).await } async fn write_unwritten( @@ -2572,18 +2558,15 @@ impl BlockIO for Guest { let wio = BlockOp::WriteUnwritten { offset, data }; self.backpressure_sleep().await; - Ok(self.send(wio).await.wait().await?) + self.send_and_wait(wio).await } async fn flush( &self, snapshot_details: Option, ) -> Result<(), CrucibleError> { - Ok(self - .send(BlockOp::Flush { snapshot_details }) + self.send_and_wait(BlockOp::Flush { snapshot_details }) .await - .wait() - .await?) } async fn show_work(&self) -> Result { @@ -2597,7 +2580,7 @@ impl BlockIO for Guest { let data = Arc::new(Mutex::new(wc)); let sw = BlockOp::ShowWork { data: data.clone() }; - self.send(sw).await.wait().await.unwrap(); + self.send_and_wait(sw).await?; let wc = data.lock().await; Ok(*wc) @@ -2617,7 +2600,7 @@ impl BlockIO for Guest { result: data.clone(), }; - self.send(sw).await.wait().await?; + self.send_and_wait(sw).await?; let result = data.lock().await; Ok(*result) } diff --git a/upstairs/src/upstairs.rs b/upstairs/src/upstairs.rs index 0711d9b0f..8931112b1 100644 --- a/upstairs/src/upstairs.rs +++ b/upstairs/src/upstairs.rs @@ -191,21 +191,6 @@ pub(crate) struct Upstairs { pub(crate) control_tx: mpsc::Sender, } -impl Drop for Upstairs { - fn drop(&mut self) { - // Reply to any `BlockRes` that we might be holding in our - // `UpstairsState`; otherwise, their destructor will panic. - match std::mem::replace(&mut self.state, UpstairsState::Initializing) { - UpstairsState::GoActive(res) | UpstairsState::Deactivating(res) => { - res.send_err(CrucibleError::GenericError( - "Upstairs purging BlockRes during drop()".to_string(), - )); - } - _ => (), - } - } -} - /// Action to be taken which modifies the [`Upstairs`] state #[derive(Debug)] pub(crate) enum UpstairsAction { @@ -1917,7 +1902,7 @@ pub(crate) mod test { res: ds_done_res, })) .await; - assert!(ds_done_brw.wait().await.is_err()); + assert!(ds_done_brw.wait(&up.log).await.is_err()); up.force_active().unwrap(); @@ -1927,7 +1912,7 @@ pub(crate) mod test { res: ds_done_res, })) .await; - assert!(ds_done_brw.wait().await.is_ok()); + assert!(ds_done_brw.wait(&up.log).await.is_ok()); let (ds_done_brw, ds_done_res) = BlockReqWaiter::pair(); up.apply(UpstairsAction::Guest(BlockReq { @@ -1935,7 +1920,7 @@ pub(crate) mod test { res: ds_done_res, })) .await; - assert!(ds_done_brw.wait().await.is_err()) + assert!(ds_done_brw.wait(&up.log).await.is_err()) } #[tokio::test] @@ -1984,7 +1969,7 @@ pub(crate) mod test { assert!(matches!(up.state, UpstairsState::Initializing)); } } - assert!(ds_done_brw.wait().await.is_ok()); + assert!(ds_done_brw.wait(&up.log).await.is_ok()); } // Job dependency tests