From f5c4387c7eff3cd114f54d6acb7e91b63d28ae2e Mon Sep 17 00:00:00 2001 From: Matt Keeter Date: Tue, 19 Nov 2024 10:40:16 -0500 Subject: [PATCH 1/2] Early rejection of IOs with insufficient live Downstairs --- upstairs/src/downstairs.rs | 12 ++++++++++++ upstairs/src/lib.rs | 5 +++++ upstairs/src/upstairs.rs | 28 ++++++++++++++++++++++++++++ 3 files changed, 45 insertions(+) diff --git a/upstairs/src/downstairs.rs b/upstairs/src/downstairs.rs index fed5619ad..1a2f2fb30 100644 --- a/upstairs/src/downstairs.rs +++ b/upstairs/src/downstairs.rs @@ -3261,6 +3261,18 @@ impl Downstairs { } } + /// Returns the number of clients that can accept IO + /// + /// A client can accept IO if it is in the `Active` or `LiveRepair` state. + pub fn active_client_count(&self) -> usize { + self.clients + .iter() + .filter(|c| { + matches!(c.state(), DsState::Active | DsState::LiveRepair) + }) + .count() + } + /// Wrapper for marking a single job as done from the given client /// /// This can be used to test handling of job acks, etc diff --git a/upstairs/src/lib.rs b/upstairs/src/lib.rs index df0643c14..821129b9c 100644 --- a/upstairs/src/lib.rs +++ b/upstairs/src/lib.rs @@ -1009,6 +1009,11 @@ impl DownstairsIO { let bad_job = match &self.work { IOop::Read { .. } => wc.done == 0, + // Flushes with snapshots must be good on all 3x Downstairs + IOop::Flush { + snapshot_details: Some(..), + .. + } => wc.skipped + wc.error > 0, IOop::Write { .. } | IOop::WriteUnwritten { .. } | IOop::Flush { .. } diff --git a/upstairs/src/upstairs.rs b/upstairs/src/upstairs.rs index 58ce09638..bf7edd277 100644 --- a/upstairs/src/upstairs.rs +++ b/upstairs/src/upstairs.rs @@ -1140,6 +1140,15 @@ impl Upstairs { done.send_err(CrucibleError::UpstairsInactive); return; } + + let n = self.downstairs.active_client_count(); + let required = if snapshot_details.is_some() { 3 } else { 2 }; + if n < required { + done.send_err(CrucibleError::IoError(format!( + "too many inactive clients: need {required}, got {n}" + ))); + return; + } self.submit_flush(Some(done), snapshot_details, Some(io_guard)); } BlockOp::ReplaceDownstairs { id, old, new, done } => { @@ -1354,6 +1363,17 @@ impl Upstairs { return; } + let n = self.downstairs.active_client_count(); + if n < 1 { + res.send_err(( + data, + CrucibleError::IoError(format!( + "too many inactive clients: need 1, got {n}" + )), + )); + return; + } + /* * Get the next ID for the guest work struct we will make at the * end. This ID is also put into the IO struct we create that @@ -1470,6 +1490,14 @@ impl Upstairs { return None; } + let n = self.downstairs.active_client_count(); + if n < 2 { + res.send_err(CrucibleError::IoError(format!( + "too many inactive clients: need 2, got {n}" + ))); + return None; + } + /* * Verify IO is in range for our region */ From 775749f049384d6df9adbcf0df5fb1eb2a3a0b7e Mon Sep 17 00:00:00 2001 From: Matt Keeter Date: Tue, 19 Nov 2024 11:03:09 -0500 Subject: [PATCH 2/2] Unit tests for early rejection --- upstairs/src/dummy_downstairs_tests.rs | 239 ++++++++++++++++++++++++- 1 file changed, 237 insertions(+), 2 deletions(-) diff --git a/upstairs/src/dummy_downstairs_tests.rs b/upstairs/src/dummy_downstairs_tests.rs index fb7254fbd..bf74d5a59 100644 --- a/upstairs/src/dummy_downstairs_tests.rs +++ b/upstairs/src/dummy_downstairs_tests.rs @@ -31,6 +31,7 @@ use crucible_protocol::JobId; use crucible_protocol::Message; use crucible_protocol::ReadBlockContext; use crucible_protocol::ReadResponseHeader; +use crucible_protocol::SnapshotDetails; use crucible_protocol::WriteHeader; use bytes::BytesMut; @@ -289,6 +290,35 @@ impl DownstairsHandle { } } + /// Awaits a `Message::Flush` and sends a `FlushAck` with an `IoError` + /// + /// Returns the flush number for further checks. + /// + /// # Panics + /// If a non-flush message arrives + pub async fn err_flush(&mut self) -> u64 { + match self.recv().await.unwrap() { + Message::Flush { + job_id, + flush_number, + upstairs_id, + .. + } => { + self.send(Message::FlushAck { + upstairs_id, + session_id: self.upstairs_session_id.unwrap(), + job_id, + result: Err(CrucibleError::IoError("oh no".to_string())), + }) + .unwrap(); + flush_number + } + m => { + panic!("saw non flush {m:?}"); + } + } + } + /// Awaits a `Message::Write { .. }` and sends a `WriteAck` /// /// Returns the job ID for further checks. @@ -311,6 +341,23 @@ impl DownstairsHandle { } } + /// Awaits a `Message::Write` and sends a `WriteAck` with `IOError` + pub async fn err_write(&mut self) -> JobId { + match self.recv().await.unwrap() { + Message::Write { header, .. } => { + self.send(Message::WriteAck { + upstairs_id: header.upstairs_id, + session_id: self.upstairs_session_id.unwrap(), + job_id: header.job_id, + result: Err(CrucibleError::IoError("oh no".to_string())), + }) + .unwrap(); + header.job_id + } + m => panic!("saw non write: {m:?}"), + } + } + /// Awaits a `Message::Barrier { .. }` and sends a `BarrierAck` /// /// Returns the job ID for further checks. @@ -358,7 +405,7 @@ impl DownstairsHandle { job_id, blocks: Ok(vec![block]), }, - data: data.clone(), + data, }) .unwrap(); job_id @@ -811,7 +858,7 @@ async fn run_live_repair(mut harness: TestHarness) { job_id, blocks: Ok(vec![block]), }, - data: data.clone(), + data, }) { Ok(()) => panic!("DS1 should be disconnected"), Err(e) => { @@ -2890,3 +2937,191 @@ async fn test_bytes_based_barrier() { harness.ds2.ack_flush().await; harness.ds3.ack_flush().await; } + +/// Test for early rejection of writes if > 1 Downstairs is unavailable +#[tokio::test] +async fn fast_write_rejection() { + let mut harness = TestHarness::new().await; + + let write_buf = BytesMut::from(vec![1; 4096].as_slice()); + harness + .guest + .write(BlockIndex(0), write_buf.clone()) + .await + .unwrap(); + + harness.ds1().err_write().await; + harness.ds2.ack_write().await; + harness.ds3.ack_write().await; + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + let ds = harness.guest.downstairs_state().await.unwrap(); + assert_eq!(ds[ClientId::new(0)], DsState::Faulted); + assert_eq!(ds[ClientId::new(1)], DsState::Active); + assert_eq!(ds[ClientId::new(2)], DsState::Active); + + // Send a second write, which should still work (because we have 2/3 ds) + harness + .guest + .write(BlockIndex(0), write_buf.clone()) + .await + .unwrap(); + harness.ds2.err_write().await; + harness.ds3.ack_write().await; + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + let ds = harness.guest.downstairs_state().await.unwrap(); + assert_eq!(ds[ClientId::new(0)], DsState::Faulted); + assert_eq!(ds[ClientId::new(1)], DsState::Faulted); + assert_eq!(ds[ClientId::new(2)], DsState::Active); + + // Subsequent writes should be rejected immediately + let r = harness.guest.write(BlockIndex(0), write_buf.clone()).await; + assert!( + matches!(r, Err(CrucibleError::IoError(..))), + "expected IoError, got {r:?}" + ); +} + +/// Make sure reads work with only 1x Downstairs +#[tokio::test] +async fn read_with_one_fault() { + let mut harness = TestHarness::new().await; + + // Use a write to fault DS0 (XXX why do read errors not fault a DS?) + let write_buf = BytesMut::from(vec![1; 4096].as_slice()); + harness + .guest + .write(BlockIndex(0), write_buf.clone()) + .await + .unwrap(); + harness.ds1().err_write().await; + harness.ds2.ack_write().await; + harness.ds3.ack_write().await; + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + let ds = harness.guest.downstairs_state().await.unwrap(); + assert_eq!(ds[ClientId::new(0)], DsState::Faulted); + assert_eq!(ds[ClientId::new(1)], DsState::Active); + assert_eq!(ds[ClientId::new(2)], DsState::Active); + + // Check that reads still work + let h = harness.spawn(|guest| async move { + let mut buffer = Buffer::new(1, 512); + guest.read(BlockIndex(0), &mut buffer).await.unwrap(); + }); + harness.ds2.ack_read().await; + h.await.unwrap(); // we have > 1x reply, so the read will return + harness.ds3.ack_read().await; + + // Take out DS1 next + harness + .guest + .write(BlockIndex(0), write_buf.clone()) + .await + .unwrap(); + harness.ds2.err_write().await; + harness.ds3.ack_write().await; + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + let ds = harness.guest.downstairs_state().await.unwrap(); + assert_eq!(ds[ClientId::new(0)], DsState::Faulted); + assert_eq!(ds[ClientId::new(1)], DsState::Faulted); + assert_eq!(ds[ClientId::new(2)], DsState::Active); + + // Reads still work with 1x Downstairs + let h = harness.spawn(|guest| async move { + let mut buffer = Buffer::new(1, 512); + guest.read(BlockIndex(0), &mut buffer).await.unwrap(); + }); + harness.ds3.ack_read().await; + h.await.unwrap(); // we have > 1x reply, so the read will return +} + +/// Test early rejection of reads with 0x running Downstairs +#[tokio::test] +async fn fast_read_rejection() { + let mut harness = TestHarness::new().await; + + // Use a write to fault DS0 (XXX why do read errors not fault a DS?) + let write_buf = BytesMut::from(vec![1; 4096].as_slice()); + harness + .guest + .write(BlockIndex(0), write_buf.clone()) + .await + .unwrap(); + harness.ds1().err_write().await; + harness.ds2.err_write().await; + harness.ds3.err_write().await; + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + let ds = harness.guest.downstairs_state().await.unwrap(); + assert_eq!(ds[ClientId::new(0)], DsState::Faulted); + assert_eq!(ds[ClientId::new(1)], DsState::Faulted); + assert_eq!(ds[ClientId::new(2)], DsState::Faulted); + + // Reads should return errors immediately + let mut buffer = Buffer::new(1, 512); + match harness.guest.read(BlockIndex(0), &mut buffer).await { + Err(CrucibleError::IoError(s)) => { + assert!(s.contains("too many inactive clients")) + } + r => panic!("expected IoError, got {r:?}"), + } +} + +/// Test for early rejection of flushes +#[tokio::test] +async fn fast_flush_rejection() { + let mut harness = TestHarness::new().await; + + let h = harness.spawn(|guest| async move { + guest.flush(None).await.unwrap(); + }); + harness.ds1().err_flush().await; + harness.ds2.ack_flush().await; + harness.ds3.ack_flush().await; + h.await.unwrap(); + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + let ds = harness.guest.downstairs_state().await.unwrap(); + assert_eq!(ds[ClientId::new(0)], DsState::Faulted); + assert_eq!(ds[ClientId::new(1)], DsState::Active); + assert_eq!(ds[ClientId::new(2)], DsState::Active); + + // A flush with snapshot should fail immediately + match harness + .guest + .flush(Some(SnapshotDetails { + snapshot_name: "hiiiii".to_string(), + })) + .await + { + Err(CrucibleError::IoError(s)) => { + assert!(s.contains("too many inactive clients")) + } + r => panic!("expected IoError, got {r:?}"), + } + + // A non-snapshot flush should still succeed + let h = harness.spawn(|guest| async move { + guest.flush(None).await.unwrap(); + }); + harness.ds2.ack_flush().await; + harness.ds3.ack_flush().await; + h.await.unwrap(); + + // Use a flush to take out another downstairs + let h = harness.spawn(|guest| async move { guest.flush(None).await }); + harness.ds2.ack_flush().await; + harness.ds3.err_flush().await; + let r = h.await.unwrap(); + assert!(r.is_err()); + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + let ds = harness.guest.downstairs_state().await.unwrap(); + assert_eq!(ds[ClientId::new(0)], DsState::Faulted); + assert_eq!(ds[ClientId::new(1)], DsState::Active); + assert_eq!(ds[ClientId::new(2)], DsState::Faulted); + + // Subsequent flushes should fail immediately + match harness.guest.flush(None).await { + Err(CrucibleError::IoError(s)) => { + assert!(s.contains("too many inactive clients")) + } + r => panic!("expected IoError, got {r:?}"), + } +}