diff --git a/upstairs/src/block_req.rs b/upstairs/src/block_req.rs index 0a1f6e2f7..67f6245b7 100644 --- a/upstairs/src/block_req.rs +++ b/upstairs/src/block_req.rs @@ -36,9 +36,11 @@ impl BlockRes { } impl Drop for BlockRes { fn drop(&mut self) { - // Dropping a BlockRes without issuing a completion would mean the - // associated waiter would be stuck waiting forever for a result. - assert!(self.0.is_none(), "result should be sent for BlockRes"); + if self.0.is_some() { + // During normal operation, we expect to reply to every BlockReq, so + // we'll fire a DTrace probe here. + cdt::up__block__req__dropped!(); + } } } @@ -61,10 +63,19 @@ impl BlockReqWaiter { } /// Consume this BlockReqWaiter and wait on the message - pub async fn wait(self) -> Result<(), CrucibleError> { + /// + /// If the other side of the oneshot drops without a reply, log an error + pub async fn wait(self, log: &Logger) -> Result<(), CrucibleError> { match self.recv.await { Ok(v) => v, - Err(_) => crucible_bail!(RecvDisconnected), + Err(_) => { + warn!( + log, + "BlockReqWaiter disconnected; \ + this should only happen at exit" + ); + Err(CrucibleError::RecvDisconnected) + } } } @@ -92,7 +103,7 @@ mod test { res.send_ok(); - brw.wait().await.unwrap(); + brw.wait(&crucible_common::build_logger()).await.unwrap(); } #[tokio::test] @@ -101,6 +112,6 @@ mod test { res.send_err(CrucibleError::UpstairsInactive); - assert!(brw.wait().await.is_err()); + assert!(brw.wait(&crucible_common::build_logger()).await.is_err()); } } diff --git a/upstairs/src/lib.rs b/upstairs/src/lib.rs index 599ac6b54..f607f956f 100644 --- a/upstairs/src/lib.rs +++ b/upstairs/src/lib.rs @@ -321,6 +321,7 @@ mod cdt { fn up__to__ds__write__start(_: u64) {} fn up__to__ds__write__unwritten__start(_: u64) {} fn up__to__ds__flush__start(_: u64) {} + fn up__block__req__dropped() {} fn ds__read__io__start(_: u64, _: u8) {} fn ds__write__io__start(_: u64, _: u8) {} fn ds__write__unwritten__io__start(_: u64, _: u8) {} @@ -2211,6 +2212,11 @@ impl Guest { brw } + async fn send_and_wait(&self, op: BlockOp) -> Result<(), CrucibleError> { + let brw = self.send(op).await; + brw.wait(&self.log).await + } + /* * A crucible task will listen for new work using this. */ @@ -2377,7 +2383,7 @@ impl Guest { pub async fn query_extent_size(&self) -> Result { let data = Arc::new(Mutex::new(Block::new(0, 9))); let extent_query = BlockOp::QueryExtentSize { data: data.clone() }; - self.send(extent_query).await.wait().await?; + self.send_and_wait(extent_query).await?; let result = *data.lock().await; Ok(result) @@ -2392,7 +2398,7 @@ impl Guest { let data = Arc::new(Mutex::new(wc)); let qwq = BlockOp::QueryWorkQueue { data: data.clone() }; - self.send(qwq).await.wait().await.unwrap(); + self.send_and_wait(qwq).await?; let wc = data.lock().await; Ok(*wc) @@ -2404,9 +2410,15 @@ impl Guest { gen: u64, ) -> Result<(), CrucibleError> { let waiter = self.send(BlockOp::GoActiveWithGen { gen }).await; - println!("The guest has requested activation with gen:{}", gen); - waiter.wait().await?; - println!("The guest has finished waiting for activation with:{}", gen); + info!( + self.log, + "The guest has requested activation with gen:{}", gen + ); + waiter.wait(&self.log).await?; + info!( + self.log, + "The guest has finished waiting for activation with:{}", gen + ); Ok(()) } @@ -2421,51 +2433,25 @@ impl Guest { } } -impl Drop for Guest { - fn drop(&mut self) { - // Any BlockReqs or GuestWork which remains pending on this Guest when - // it is dropped should be issued an error completion. This avoids - // dropping BlockRes instances prior to completion (which results in a - // panic). - - self.reqs.get_mut().drain(..).for_each(|req| { - req.res.send_err(CrucibleError::GenericError( - "Guest purging remaining BlockReqs during drop()".to_string(), - )); - }); - self.guest_work - .get_mut() - .active - .drain() - .for_each(|(_id, gtos)| { - gtos.notify(Err(CrucibleError::GenericError( - "Guest purging remaining GtoSs during drop()".to_string(), - ))); - }); - } -} - #[async_trait] impl BlockIO for Guest { async fn activate(&self) -> Result<(), CrucibleError> { let waiter = self.send(BlockOp::GoActive).await; - println!("The guest has requested activation"); - waiter.wait().await?; - println!("The guest has finished waiting for activation"); + info!(self.log, "The guest has requested activation"); + waiter.wait(&self.log).await?; + info!(self.log, "The guest has finished waiting for activation"); Ok(()) } /// Disable any more IO from this guest and deactivate the downstairs. async fn deactivate(&self) -> Result<(), CrucibleError> { - let waiter = self.send(BlockOp::Deactivate).await; - waiter.wait().await?; - Ok(()) + self.send_and_wait(BlockOp::Deactivate).await } async fn query_is_active(&self) -> Result { let data = Arc::new(Mutex::new(false)); let active_query = BlockOp::QueryGuestIOReady { data: data.clone() }; - self.send(active_query).await.wait().await?; + self.send_and_wait(active_query).await?; let result = *data.lock().await; Ok(result) @@ -2474,7 +2460,7 @@ impl BlockIO for Guest { async fn total_size(&self) -> Result { let data = Arc::new(Mutex::new(0)); let size_query = BlockOp::QueryTotalSize { data: data.clone() }; - self.send(size_query).await.wait().await?; + self.send_and_wait(size_query).await?; let result = *data.lock().await; Ok(result) @@ -2485,7 +2471,7 @@ impl BlockIO for Guest { if bs == 0 { let data = Arc::new(Mutex::new(0)); let size_query = BlockOp::QueryBlockSize { data: data.clone() }; - self.send(size_query).await.wait().await?; + self.send_and_wait(size_query).await?; let result = *data.lock().await; self.block_size @@ -2499,7 +2485,7 @@ impl BlockIO for Guest { async fn get_uuid(&self) -> Result { let data = Arc::new(Mutex::new(Uuid::default())); let uuid_query = BlockOp::QueryUpstairsUuid { data: data.clone() }; - self.send(uuid_query).await.wait().await?; + self.send_and_wait(uuid_query).await?; let result = *data.lock().await; Ok(result) @@ -2524,7 +2510,7 @@ impl BlockIO for Guest { return Ok(()); } let rio = BlockOp::Read { offset, data }; - Ok(self.send(rio).await.wait().await?) + self.send_and_wait(rio).await } async fn write( @@ -2548,7 +2534,7 @@ impl BlockIO for Guest { let wio = BlockOp::Write { offset, data }; self.backpressure_sleep().await; - Ok(self.send(wio).await.wait().await?) + self.send_and_wait(wio).await } async fn write_unwritten( @@ -2572,18 +2558,15 @@ impl BlockIO for Guest { let wio = BlockOp::WriteUnwritten { offset, data }; self.backpressure_sleep().await; - Ok(self.send(wio).await.wait().await?) + self.send_and_wait(wio).await } async fn flush( &self, snapshot_details: Option, ) -> Result<(), CrucibleError> { - Ok(self - .send(BlockOp::Flush { snapshot_details }) + self.send_and_wait(BlockOp::Flush { snapshot_details }) .await - .wait() - .await?) } async fn show_work(&self) -> Result { @@ -2597,7 +2580,7 @@ impl BlockIO for Guest { let data = Arc::new(Mutex::new(wc)); let sw = BlockOp::ShowWork { data: data.clone() }; - self.send(sw).await.wait().await.unwrap(); + self.send_and_wait(sw).await?; let wc = data.lock().await; Ok(*wc) @@ -2617,7 +2600,7 @@ impl BlockIO for Guest { result: data.clone(), }; - self.send(sw).await.wait().await?; + self.send_and_wait(sw).await?; let result = data.lock().await; Ok(*result) } diff --git a/upstairs/src/upstairs.rs b/upstairs/src/upstairs.rs index 0711d9b0f..8931112b1 100644 --- a/upstairs/src/upstairs.rs +++ b/upstairs/src/upstairs.rs @@ -191,21 +191,6 @@ pub(crate) struct Upstairs { pub(crate) control_tx: mpsc::Sender, } -impl Drop for Upstairs { - fn drop(&mut self) { - // Reply to any `BlockRes` that we might be holding in our - // `UpstairsState`; otherwise, their destructor will panic. - match std::mem::replace(&mut self.state, UpstairsState::Initializing) { - UpstairsState::GoActive(res) | UpstairsState::Deactivating(res) => { - res.send_err(CrucibleError::GenericError( - "Upstairs purging BlockRes during drop()".to_string(), - )); - } - _ => (), - } - } -} - /// Action to be taken which modifies the [`Upstairs`] state #[derive(Debug)] pub(crate) enum UpstairsAction { @@ -1917,7 +1902,7 @@ pub(crate) mod test { res: ds_done_res, })) .await; - assert!(ds_done_brw.wait().await.is_err()); + assert!(ds_done_brw.wait(&up.log).await.is_err()); up.force_active().unwrap(); @@ -1927,7 +1912,7 @@ pub(crate) mod test { res: ds_done_res, })) .await; - assert!(ds_done_brw.wait().await.is_ok()); + assert!(ds_done_brw.wait(&up.log).await.is_ok()); let (ds_done_brw, ds_done_res) = BlockReqWaiter::pair(); up.apply(UpstairsAction::Guest(BlockReq { @@ -1935,7 +1920,7 @@ pub(crate) mod test { res: ds_done_res, })) .await; - assert!(ds_done_brw.wait().await.is_err()) + assert!(ds_done_brw.wait(&up.log).await.is_err()) } #[tokio::test] @@ -1984,7 +1969,7 @@ pub(crate) mod test { assert!(matches!(up.state, UpstairsState::Initializing)); } } - assert!(ds_done_brw.wait().await.is_ok()); + assert!(ds_done_brw.wait(&up.log).await.is_ok()); } // Job dependency tests