diff --git a/upstairs/src/client.rs b/upstairs/src/client.rs index 1f75ae013..a1a4b7551 100644 --- a/upstairs/src/client.rs +++ b/upstairs/src/client.rs @@ -1919,7 +1919,19 @@ impl DownstairsClient { self.client_delay_us.load(Ordering::Relaxed) } - /// Looks up the region UUID + /// Checks whether the client is in a state where it can accept IO + pub(crate) fn is_accepting_io(&self) -> bool { + matches!( + self.state, + DsState::Active + | DsState::LiveRepair + | DsState::Connecting { + mode: ConnectionMode::Offline, + .. + } + ) + } + pub(crate) fn id(&self) -> Option { self.region_uuid } diff --git a/upstairs/src/downstairs.rs b/upstairs/src/downstairs.rs index b1f7499c3..89fdd311e 100644 --- a/upstairs/src/downstairs.rs +++ b/upstairs/src/downstairs.rs @@ -3292,6 +3292,13 @@ impl Downstairs { } } + /// Returns the number of clients that can accept IO + /// + /// A client can accept IO if it is in the `Active` or `LiveRepair` state. + pub fn active_client_count(&self) -> usize { + self.clients.iter().filter(|c| c.is_accepting_io()).count() + } + /// Wrapper for marking a single job as done from the given client /// /// This can be used to test handling of job acks, etc diff --git a/upstairs/src/dummy_downstairs_tests.rs b/upstairs/src/dummy_downstairs_tests.rs index 984d12d69..d7557dd73 100644 --- a/upstairs/src/dummy_downstairs_tests.rs +++ b/upstairs/src/dummy_downstairs_tests.rs @@ -12,6 +12,8 @@ use crate::guest::Guest; use crate::up_main; use crate::BlockIO; use crate::Buffer; +use crate::ClientFaultReason; +use crate::ClientStopReason; use crate::ConnectionMode; use crate::CrucibleError; use crate::DsState; @@ -33,6 +35,7 @@ use crucible_protocol::JobId; use crucible_protocol::Message; use crucible_protocol::ReadBlockContext; use crucible_protocol::ReadResponseHeader; +use crucible_protocol::SnapshotDetails; use crucible_protocol::WriteHeader; use bytes::BytesMut; @@ -291,6 +294,35 @@ impl DownstairsHandle { } } + /// Awaits a `Message::Flush` and sends a `FlushAck` with an `IoError` + /// + /// Returns the flush number for further checks. + /// + /// # Panics + /// If a non-flush message arrives + pub async fn err_flush(&mut self) -> u64 { + match self.recv().await.unwrap() { + Message::Flush { + job_id, + flush_number, + upstairs_id, + .. + } => { + self.send(Message::FlushAck { + upstairs_id, + session_id: self.upstairs_session_id.unwrap(), + job_id, + result: Err(CrucibleError::IoError("oh no".to_string())), + }) + .unwrap(); + flush_number + } + m => { + panic!("saw non flush {m:?}"); + } + } + } + /// Awaits a `Message::Write { .. }` and sends a `WriteAck` /// /// Returns the job ID for further checks. @@ -313,6 +345,23 @@ impl DownstairsHandle { } } + /// Awaits a `Message::Write` and sends a `WriteAck` with `IOError` + pub async fn err_write(&mut self) -> JobId { + match self.recv().await.unwrap() { + Message::Write { header, .. } => { + self.send(Message::WriteAck { + upstairs_id: header.upstairs_id, + session_id: self.upstairs_session_id.unwrap(), + job_id: header.job_id, + result: Err(CrucibleError::IoError("oh no".to_string())), + }) + .unwrap(); + header.job_id + } + m => panic!("saw non write: {m:?}"), + } + } + /// Awaits a `Message::Barrier { .. }` and sends a `BarrierAck` /// /// Returns the job ID for further checks. @@ -360,7 +409,7 @@ impl DownstairsHandle { job_id, blocks: Ok(vec![block]), }, - data: data.clone(), + data, }) .unwrap(); job_id @@ -813,7 +862,7 @@ async fn run_live_repair(mut harness: TestHarness) { job_id, blocks: Ok(vec![block]), }, - data: data.clone(), + data, }) { Ok(()) => panic!("DS1 should be disconnected"), Err(e) => { @@ -2964,3 +3013,204 @@ async fn test_bytes_based_barrier() { harness.ds2.ack_flush().await; harness.ds3.ack_flush().await; } + +fn assert_faulted(s: &DsState) { + match s { + DsState::Stopping(ClientStopReason::Fault( + ClientFaultReason::RequestedFault, + )) + | DsState::Connecting { + mode: ConnectionMode::Faulted, + .. + } => (), + _ => panic!("invalid state: expected faulted, got {s:?}"), + } +} + +/// Test for early rejection of writes if > 1 Downstairs is unavailable +#[tokio::test] +async fn fast_write_rejection() { + let mut harness = TestHarness::new().await; + + let write_buf = BytesMut::from(vec![1; 4096].as_slice()); + harness + .guest + .write(BlockIndex(0), write_buf.clone()) + .await + .unwrap(); + + harness.ds1().err_write().await; + harness.ds2.ack_write().await; + harness.ds3.ack_write().await; + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + let ds = harness.guest.downstairs_state().await.unwrap(); + assert_faulted(&ds[ClientId::new(0)]); + assert_eq!(ds[ClientId::new(1)], DsState::Active); + assert_eq!(ds[ClientId::new(2)], DsState::Active); + + // Send a second write, which should still work (because we have 2/3 ds) + harness + .guest + .write(BlockIndex(0), write_buf.clone()) + .await + .unwrap(); + harness.ds2.err_write().await; + harness.ds3.ack_write().await; + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + let ds = harness.guest.downstairs_state().await.unwrap(); + assert_faulted(&ds[ClientId::new(0)]); + assert_faulted(&ds[ClientId::new(1)]); + assert_eq!(ds[ClientId::new(2)], DsState::Active); + + // Subsequent writes should be rejected immediately + let r = harness.guest.write(BlockIndex(0), write_buf.clone()).await; + assert!( + matches!(r, Err(CrucibleError::IoError(..))), + "expected IoError, got {r:?}" + ); +} + +/// Make sure reads work with only 1x Downstairs +#[tokio::test] +async fn read_with_one_fault() { + let mut harness = TestHarness::new().await; + + // Use a write to fault DS0 (XXX why do read errors not fault a DS?) + let write_buf = BytesMut::from(vec![1; 4096].as_slice()); + harness + .guest + .write(BlockIndex(0), write_buf.clone()) + .await + .unwrap(); + harness.ds1().err_write().await; + harness.ds2.ack_write().await; + harness.ds3.ack_write().await; + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + let ds = harness.guest.downstairs_state().await.unwrap(); + assert_faulted(&ds[ClientId::new(0)]); + assert_eq!(ds[ClientId::new(1)], DsState::Active); + assert_eq!(ds[ClientId::new(2)], DsState::Active); + + // Check that reads still work + let h = harness.spawn(|guest| async move { + let mut buffer = Buffer::new(1, 512); + guest.read(BlockIndex(0), &mut buffer).await.unwrap(); + }); + harness.ds2.ack_read().await; + h.await.unwrap(); // we have > 1x reply, so the read will return + harness.ds3.ack_read().await; + + // Take out DS1 next + harness + .guest + .write(BlockIndex(0), write_buf.clone()) + .await + .unwrap(); + harness.ds2.err_write().await; + harness.ds3.ack_write().await; + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + let ds = harness.guest.downstairs_state().await.unwrap(); + assert_faulted(&ds[ClientId::new(0)]); + assert_faulted(&ds[ClientId::new(1)]); + assert_eq!(ds[ClientId::new(2)], DsState::Active); + + // Reads still work with 1x Downstairs + let h = harness.spawn(|guest| async move { + let mut buffer = Buffer::new(1, 512); + guest.read(BlockIndex(0), &mut buffer).await.unwrap(); + }); + harness.ds3.ack_read().await; + h.await.unwrap(); // we have > 1x reply, so the read will return +} + +/// Test early rejection of reads with 0x running Downstairs +#[tokio::test] +async fn fast_read_rejection() { + let mut harness = TestHarness::new().await; + + // Use a write to fault DS0 (XXX why do read errors not fault a DS?) + let write_buf = BytesMut::from(vec![1; 4096].as_slice()); + harness + .guest + .write(BlockIndex(0), write_buf.clone()) + .await + .unwrap(); + harness.ds1().err_write().await; + harness.ds2.err_write().await; + harness.ds3.err_write().await; + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + let ds = harness.guest.downstairs_state().await.unwrap(); + assert_faulted(&ds[ClientId::new(0)]); + assert_faulted(&ds[ClientId::new(1)]); + assert_faulted(&ds[ClientId::new(2)]); + + // Reads should return errors immediately + let mut buffer = Buffer::new(1, 512); + match harness.guest.read(BlockIndex(0), &mut buffer).await { + Err(CrucibleError::IoError(s)) => { + assert!(s.contains("too many inactive clients")) + } + r => panic!("expected IoError, got {r:?}"), + } +} + +/// Test for early rejection of flushes +#[tokio::test] +async fn fast_flush_rejection() { + let mut harness = TestHarness::new().await; + + let h = harness.spawn(|guest| async move { + guest.flush(None).await.unwrap(); + }); + harness.ds1().err_flush().await; + harness.ds2.ack_flush().await; + harness.ds3.ack_flush().await; + h.await.unwrap(); + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + let ds = harness.guest.downstairs_state().await.unwrap(); + assert_faulted(&ds[ClientId::new(0)]); + assert_eq!(ds[ClientId::new(1)], DsState::Active); + assert_eq!(ds[ClientId::new(2)], DsState::Active); + + // A flush with snapshot should fail immediately + match harness + .guest + .flush(Some(SnapshotDetails { + snapshot_name: "hiiiii".to_string(), + })) + .await + { + Err(CrucibleError::IoError(s)) => { + assert!(s.contains("too many inactive clients")) + } + r => panic!("expected IoError, got {r:?}"), + } + + // A non-snapshot flush should still succeed + let h = harness.spawn(|guest| async move { + guest.flush(None).await.unwrap(); + }); + harness.ds2.ack_flush().await; + harness.ds3.ack_flush().await; + h.await.unwrap(); + + // Use a flush to take out another downstairs + let h = harness.spawn(|guest| async move { guest.flush(None).await }); + harness.ds2.ack_flush().await; + harness.ds3.err_flush().await; + let r = h.await.unwrap(); + assert!(r.is_err()); + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + let ds = harness.guest.downstairs_state().await.unwrap(); + assert_faulted(&ds[ClientId::new(0)]); + assert_eq!(ds[ClientId::new(1)], DsState::Active); + assert_faulted(&ds[ClientId::new(2)]); + + // Subsequent flushes should fail immediately + match harness.guest.flush(None).await { + Err(CrucibleError::IoError(s)) => { + assert!(s.contains("too many inactive clients")) + } + r => panic!("expected IoError, got {r:?}"), + } +} diff --git a/upstairs/src/lib.rs b/upstairs/src/lib.rs index 4bc371e7a..c1e3712f5 100644 --- a/upstairs/src/lib.rs +++ b/upstairs/src/lib.rs @@ -953,6 +953,11 @@ impl DownstairsIO { let bad_job = match &self.work { IOop::Read { .. } => wc.done == 0, + // Flushes with snapshots must be good on all 3x Downstairs + IOop::Flush { + snapshot_details: Some(..), + .. + } => wc.skipped + wc.error > 0, IOop::Write { .. } | IOop::WriteUnwritten { .. } | IOop::Flush { .. } diff --git a/upstairs/src/upstairs.rs b/upstairs/src/upstairs.rs index ff4d7555b..57ee1f52b 100644 --- a/upstairs/src/upstairs.rs +++ b/upstairs/src/upstairs.rs @@ -1185,6 +1185,15 @@ impl Upstairs { done.send_err(CrucibleError::UpstairsInactive); return; } + + let n = self.downstairs.active_client_count(); + let required = if snapshot_details.is_some() { 3 } else { 2 }; + if n < required { + done.send_err(CrucibleError::IoError(format!( + "too many inactive clients: need {required}, got {n}" + ))); + return; + } self.submit_flush(Some(done), snapshot_details, Some(io_guard)); } BlockOp::ReplaceDownstairs { id, old, new, done } => { @@ -1396,6 +1405,17 @@ impl Upstairs { return; } + let n = self.downstairs.active_client_count(); + if n < 1 { + res.send_err(( + data, + CrucibleError::IoError(format!( + "too many inactive clients: need 1, got {n}" + )), + )); + return; + } + /* * Get the next ID for the guest work struct we will make at the * end. This ID is also put into the IO struct we create that @@ -1512,6 +1532,14 @@ impl Upstairs { return None; } + let n = self.downstairs.active_client_count(); + if n < 2 { + res.send_err(CrucibleError::IoError(format!( + "too many inactive clients: need 2, got {n}" + ))); + return None; + } + /* * Verify IO is in range for our region */