Skip to content

Commit

Permalink
Simplify handling of BlockReq at program exit (#1085)
Browse files Browse the repository at this point in the history
#1064 added new logic to the
`BlockReq`: if the notification channel was dropped without a reply having been
sent, Crucible would panic.

After a bunch of discussion, I no longer think this extra complexity is worth
it, and would like to remove it.

## Why did we do this in the first place?
In general, we expect a `BlockReq` to _always_ be answered by Crucible.  If one
is dropped without a reply being sent, it indicates a programmer error.  This
behavior was inspired by similar logic in Propolis
(https://github.com/oxidecomputer/propolis/blob/85dc6db36709981c1d35b0651a18e20caee0b73f/lib/propolis/src/block/mod.rs#L176-L182).

## Propolis and Crucible are different
The `Request` in Propolis is built around
a `device::TrackingMarker`.  If it is dropped without a reply, then the
listening side will **wait forever**; this is a potential liveness issue, so
turning it into a panic is sensible.

The `BlockReq` in Crucible is built around a `tokio::sync::oneshot` channel.  If
it is dropped without a reply, then the `oneshot::Receiver` will return
immediately with an error.  In fact, we handle that case in our
`BlockReqWaiter`, translating this error into `CrucibleError::RecvDisconnected`
(https://github.com/oxidecomputer/crucible/blob/e813da3f/upstairs/src/block_req.rs#L67).
In other words, we're _already_ handling the case of a `BlockReq` being dropped
without a reply (by reporting an error).  There is no potential liveness issue.

(This also means that the block comment in `BlockRes::drop`,
https://github.com/oxidecomputer/crucible/blob/e813da3f/upstairs/src/block_req.rs#L39-L40
is incorrect)

It's arguably better to panic upon programmer error, versus going through normal
error-handling paths (which defer handling to the caller of the `Guest` API).
However...

## False positives during program exit
There is one time when a `BlockReq` can be dropped without a reply: during
program exit (or scope exit, for unit tests).

As far as I can tell, this panic **hasn't caught any true programmer errors**.
Instead, it's found a bunch of cases where `BlockReq` are dropped during exit,
and we've had to add `Drop` implementations to prevent it from firing.  The
original PR added `impl Drop for Guest`
(https://github.com/oxidecomputer/crucible/blob/e813da3f/upstairs/src/lib.rs#L2424-L2446),
but it missed a sneaky case where `BlockReq` were stored in the `UpstairsState`;
a later PR added `impl Drop for Upstairs`
(https://github.com/oxidecomputer/crucible/blob/e813da3f/upstairs/src/upstairs.rs#L194-L207)
to avoid panics during certain unit tests.

## Future false positives
This check also makes it easy to accidentally create false positives when
writing new code.  The `offload-write-encryption` branch
(#1066) moves some
`BlockReq`s into a thread pool to perform encryption outside of the main task.
However, because we didn't add special handling for program exit, this caused a
panic at shutdown.

## This is hard to do by accident
Remember what we're actually trying to protect against: someone dropping a
`BlockReq` without replying.  Because we've annotated relevant functions with
`#[must_use]`, this would require either ignoring warnings or explicitly
discarding the `BlockReq`.  It's hard to do by accident!

## Summary
- This `panic!` guards against something that's hard to do by accident
- If we **do** make this mistake, it's reported as a distinct error to the
  caller
  - Unlike Propolis, there's no liveness concern
- So far, 100% of issues it discovered are false positives at program exit
  - It is easy to introduce similar false positives when writing new code
  - New false positives will manifest as panics during program exit
  - If we introduce new false positives, we will not discover the issue until
    runtime (and it may be rare, because Tokio does not destroys tasks in a
    deterministic order)

## Proposal
- Remove the `panic!`
- Remove the `Drop` implementations which work around it at exit
- Add logging for this case:
  - Fire a DTrace probe if a `BlockReq` is dropped without a reply
  - Log when we see a `CrucibleError::RecvError`
  • Loading branch information
mkeeter authored Jan 4, 2024
1 parent ceaf444 commit 7415c0d
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 74 deletions.
25 changes: 18 additions & 7 deletions upstairs/src/block_req.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ impl BlockRes {
}
impl Drop for BlockRes {
fn drop(&mut self) {
// Dropping a BlockRes without issuing a completion would mean the
// associated waiter would be stuck waiting forever for a result.
assert!(self.0.is_none(), "result should be sent for BlockRes");
if self.0.is_some() {
// During normal operation, we expect to reply to every BlockReq, so
// we'll fire a DTrace probe here.
cdt::up__block__req__dropped!();
}
}
}

Expand All @@ -61,10 +63,19 @@ impl BlockReqWaiter {
}

/// Consume this BlockReqWaiter and wait on the message
pub async fn wait(self) -> Result<(), CrucibleError> {
///
/// If the other side of the oneshot drops without a reply, log an error
pub async fn wait(self, log: &Logger) -> Result<(), CrucibleError> {
match self.recv.await {
Ok(v) => v,
Err(_) => crucible_bail!(RecvDisconnected),
Err(_) => {
warn!(
log,
"BlockReqWaiter disconnected; \
this should only happen at exit"
);
Err(CrucibleError::RecvDisconnected)
}
}
}

Expand Down Expand Up @@ -92,7 +103,7 @@ mod test {

res.send_ok();

brw.wait().await.unwrap();
brw.wait(&crucible_common::build_logger()).await.unwrap();
}

#[tokio::test]
Expand All @@ -101,6 +112,6 @@ mod test {

res.send_err(CrucibleError::UpstairsInactive);

assert!(brw.wait().await.is_err());
assert!(brw.wait(&crucible_common::build_logger()).await.is_err());
}
}
79 changes: 31 additions & 48 deletions upstairs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ mod cdt {
fn up__to__ds__write__start(_: u64) {}
fn up__to__ds__write__unwritten__start(_: u64) {}
fn up__to__ds__flush__start(_: u64) {}
fn up__block__req__dropped() {}
fn ds__read__io__start(_: u64, _: u8) {}
fn ds__write__io__start(_: u64, _: u8) {}
fn ds__write__unwritten__io__start(_: u64, _: u8) {}
Expand Down Expand Up @@ -2211,6 +2212,11 @@ impl Guest {
brw
}

async fn send_and_wait(&self, op: BlockOp) -> Result<(), CrucibleError> {
let brw = self.send(op).await;
brw.wait(&self.log).await
}

/*
* A crucible task will listen for new work using this.
*/
Expand Down Expand Up @@ -2377,7 +2383,7 @@ impl Guest {
pub async fn query_extent_size(&self) -> Result<Block, CrucibleError> {
let data = Arc::new(Mutex::new(Block::new(0, 9)));
let extent_query = BlockOp::QueryExtentSize { data: data.clone() };
self.send(extent_query).await.wait().await?;
self.send_and_wait(extent_query).await?;

let result = *data.lock().await;
Ok(result)
Expand All @@ -2392,7 +2398,7 @@ impl Guest {

let data = Arc::new(Mutex::new(wc));
let qwq = BlockOp::QueryWorkQueue { data: data.clone() };
self.send(qwq).await.wait().await.unwrap();
self.send_and_wait(qwq).await?;

let wc = data.lock().await;
Ok(*wc)
Expand All @@ -2404,9 +2410,15 @@ impl Guest {
gen: u64,
) -> Result<(), CrucibleError> {
let waiter = self.send(BlockOp::GoActiveWithGen { gen }).await;
println!("The guest has requested activation with gen:{}", gen);
waiter.wait().await?;
println!("The guest has finished waiting for activation with:{}", gen);
info!(
self.log,
"The guest has requested activation with gen:{}", gen
);
waiter.wait(&self.log).await?;
info!(
self.log,
"The guest has finished waiting for activation with:{}", gen
);
Ok(())
}

Expand All @@ -2421,51 +2433,25 @@ impl Guest {
}
}

impl Drop for Guest {
fn drop(&mut self) {
// Any BlockReqs or GuestWork which remains pending on this Guest when
// it is dropped should be issued an error completion. This avoids
// dropping BlockRes instances prior to completion (which results in a
// panic).

self.reqs.get_mut().drain(..).for_each(|req| {
req.res.send_err(CrucibleError::GenericError(
"Guest purging remaining BlockReqs during drop()".to_string(),
));
});
self.guest_work
.get_mut()
.active
.drain()
.for_each(|(_id, gtos)| {
gtos.notify(Err(CrucibleError::GenericError(
"Guest purging remaining GtoSs during drop()".to_string(),
)));
});
}
}

#[async_trait]
impl BlockIO for Guest {
async fn activate(&self) -> Result<(), CrucibleError> {
let waiter = self.send(BlockOp::GoActive).await;
println!("The guest has requested activation");
waiter.wait().await?;
println!("The guest has finished waiting for activation");
info!(self.log, "The guest has requested activation");
waiter.wait(&self.log).await?;
info!(self.log, "The guest has finished waiting for activation");
Ok(())
}

/// Disable any more IO from this guest and deactivate the downstairs.
async fn deactivate(&self) -> Result<(), CrucibleError> {
let waiter = self.send(BlockOp::Deactivate).await;
waiter.wait().await?;
Ok(())
self.send_and_wait(BlockOp::Deactivate).await
}

async fn query_is_active(&self) -> Result<bool, CrucibleError> {
let data = Arc::new(Mutex::new(false));
let active_query = BlockOp::QueryGuestIOReady { data: data.clone() };
self.send(active_query).await.wait().await?;
self.send_and_wait(active_query).await?;

let result = *data.lock().await;
Ok(result)
Expand All @@ -2474,7 +2460,7 @@ impl BlockIO for Guest {
async fn total_size(&self) -> Result<u64, CrucibleError> {
let data = Arc::new(Mutex::new(0));
let size_query = BlockOp::QueryTotalSize { data: data.clone() };
self.send(size_query).await.wait().await?;
self.send_and_wait(size_query).await?;

let result = *data.lock().await;
Ok(result)
Expand All @@ -2485,7 +2471,7 @@ impl BlockIO for Guest {
if bs == 0 {
let data = Arc::new(Mutex::new(0));
let size_query = BlockOp::QueryBlockSize { data: data.clone() };
self.send(size_query).await.wait().await?;
self.send_and_wait(size_query).await?;

let result = *data.lock().await;
self.block_size
Expand All @@ -2499,7 +2485,7 @@ impl BlockIO for Guest {
async fn get_uuid(&self) -> Result<Uuid, CrucibleError> {
let data = Arc::new(Mutex::new(Uuid::default()));
let uuid_query = BlockOp::QueryUpstairsUuid { data: data.clone() };
self.send(uuid_query).await.wait().await?;
self.send_and_wait(uuid_query).await?;

let result = *data.lock().await;
Ok(result)
Expand All @@ -2524,7 +2510,7 @@ impl BlockIO for Guest {
return Ok(());
}
let rio = BlockOp::Read { offset, data };
Ok(self.send(rio).await.wait().await?)
self.send_and_wait(rio).await
}

async fn write(
Expand All @@ -2548,7 +2534,7 @@ impl BlockIO for Guest {
let wio = BlockOp::Write { offset, data };

self.backpressure_sleep().await;
Ok(self.send(wio).await.wait().await?)
self.send_and_wait(wio).await
}

async fn write_unwritten(
Expand All @@ -2572,18 +2558,15 @@ impl BlockIO for Guest {
let wio = BlockOp::WriteUnwritten { offset, data };

self.backpressure_sleep().await;
Ok(self.send(wio).await.wait().await?)
self.send_and_wait(wio).await
}

async fn flush(
&self,
snapshot_details: Option<SnapshotDetails>,
) -> Result<(), CrucibleError> {
Ok(self
.send(BlockOp::Flush { snapshot_details })
self.send_and_wait(BlockOp::Flush { snapshot_details })
.await
.wait()
.await?)
}

async fn show_work(&self) -> Result<WQCounts, CrucibleError> {
Expand All @@ -2597,7 +2580,7 @@ impl BlockIO for Guest {

let data = Arc::new(Mutex::new(wc));
let sw = BlockOp::ShowWork { data: data.clone() };
self.send(sw).await.wait().await.unwrap();
self.send_and_wait(sw).await?;

let wc = data.lock().await;
Ok(*wc)
Expand All @@ -2617,7 +2600,7 @@ impl BlockIO for Guest {
result: data.clone(),
};

self.send(sw).await.wait().await?;
self.send_and_wait(sw).await?;
let result = data.lock().await;
Ok(*result)
}
Expand Down
23 changes: 4 additions & 19 deletions upstairs/src/upstairs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,21 +191,6 @@ pub(crate) struct Upstairs {
pub(crate) control_tx: mpsc::Sender<ControlRequest>,
}

impl Drop for Upstairs {
fn drop(&mut self) {
// Reply to any `BlockRes` that we might be holding in our
// `UpstairsState`; otherwise, their destructor will panic.
match std::mem::replace(&mut self.state, UpstairsState::Initializing) {
UpstairsState::GoActive(res) | UpstairsState::Deactivating(res) => {
res.send_err(CrucibleError::GenericError(
"Upstairs purging BlockRes during drop()".to_string(),
));
}
_ => (),
}
}
}

/// Action to be taken which modifies the [`Upstairs`] state
#[derive(Debug)]
pub(crate) enum UpstairsAction {
Expand Down Expand Up @@ -1917,7 +1902,7 @@ pub(crate) mod test {
res: ds_done_res,
}))
.await;
assert!(ds_done_brw.wait().await.is_err());
assert!(ds_done_brw.wait(&up.log).await.is_err());

up.force_active().unwrap();

Expand All @@ -1927,15 +1912,15 @@ pub(crate) mod test {
res: ds_done_res,
}))
.await;
assert!(ds_done_brw.wait().await.is_ok());
assert!(ds_done_brw.wait(&up.log).await.is_ok());

let (ds_done_brw, ds_done_res) = BlockReqWaiter::pair();
up.apply(UpstairsAction::Guest(BlockReq {
op: BlockOp::Deactivate,
res: ds_done_res,
}))
.await;
assert!(ds_done_brw.wait().await.is_err())
assert!(ds_done_brw.wait(&up.log).await.is_err())
}

#[tokio::test]
Expand Down Expand Up @@ -1984,7 +1969,7 @@ pub(crate) mod test {
assert!(matches!(up.state, UpstairsState::Initializing));
}
}
assert!(ds_done_brw.wait().await.is_ok());
assert!(ds_done_brw.wait(&up.log).await.is_ok());
}

// Job dependency tests
Expand Down

0 comments on commit 7415c0d

Please sign in to comment.