diff --git a/cmon/src/main.rs b/cmon/src/main.rs index 1f3595808..bc1e2371e 100644 --- a/cmon/src/main.rs +++ b/cmon/src/main.rs @@ -8,7 +8,7 @@ use strum::IntoEnumIterator; use strum_macros::EnumIter; use tokio::time::{sleep, Duration}; -use crucible::{Arg, DsState}; +use crucible::{Arg, ClientStopReason, DsState}; /// Connect to crucible control server #[derive(Parser, Debug)] @@ -87,18 +87,23 @@ enum Action { // Translate a DsState into a three letter string for printing. fn short_state(dss: DsState) -> String { match dss { - DsState::New => "NEW".to_string(), + DsState::New + | DsState::Stopping(ClientStopReason::NegotiationFailed(..)) => { + "NEW".to_string() + } DsState::WaitActive => "WAC".to_string(), DsState::WaitQuorum => "WAQ".to_string(), DsState::Reconcile => "REC".to_string(), DsState::Active => "ACT".to_string(), - DsState::Faulted => "FLT".to_string(), + DsState::Faulted | DsState::Stopping(ClientStopReason::Fault(..)) => { + "FLT".to_string() + } DsState::LiveRepairReady => "LRR".to_string(), DsState::LiveRepair => "LR".to_string(), DsState::Offline => "OFF".to_string(), - DsState::Deactivated => "DAV".to_string(), - DsState::Disabled => "DIS".to_string(), - DsState::Replacing => "RPC".to_string(), + DsState::Stopping(ClientStopReason::Deactivated) => "DAV".to_string(), + DsState::Stopping(ClientStopReason::Disabled) => "DIS".to_string(), + DsState::Stopping(ClientStopReason::Replacing) => "RPC".to_string(), DsState::Replaced => "RPD".to_string(), } } diff --git a/openapi/crucible-control.json b/openapi/crucible-control.json index 5e193f1c7..4ff8f0e10 100644 --- a/openapi/crucible-control.json +++ b/openapi/crucible-control.json @@ -65,6 +65,173 @@ "acked" ] }, + "ClientFaultReason": { + "description": "Subset of [`ClientStopReason`] for faulting a client", + "oneOf": [ + { + "type": "string", + "enum": [ + "requested_fault" + ] + }, + { + "description": "Received an error from some non-recoverable IO (write or flush)", + "type": "string", + "enum": [ + "i_o_error" + ] + }, + { + "description": "Live-repair failed", + "type": "string", + "enum": [ + "failed_live_repair" + ] + }, + { + "description": "Too many jobs in the queue", + "type": "string", + "enum": [ + "too_many_outstanding_jobs" + ] + }, + { + "description": "Too many bytes in the queue", + "type": "string", + "enum": [ + "too_many_outstanding_bytes" + ] + }, + { + "description": "The upstairs has requested that we deactivate when we were offline", + "type": "string", + "enum": [ + "offline_deactivated" + ] + }, + { + "description": "The Upstairs has dropped jobs that would be needed for replay", + "type": "string", + "enum": [ + "ineligible_for_replay" + ] + } + ] + }, + "ClientNegotiationFailed": { + "description": "Subset of [`ClientStopReason`] for faulting a client", + "oneOf": [ + { + "description": "Reconcile failed and we're restarting", + "type": "string", + "enum": [ + "failed_reconcile" + ] + }, + { + "description": "Negotiation message received out of order", + "type": "string", + "enum": [ + "bad_negotiation_order" + ] + }, + { + "description": "Negotiation says that we are incompatible", + "type": "string", + "enum": [ + "incompatible" + ] + } + ] + }, + "ClientStopReason": { + "description": "When the upstairs halts the IO client task, it must provide a reason", + "oneOf": [ + { + "description": "We are about to replace the client task", + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": [ + "replacing" + ] + } + }, + "required": [ + "type" + ] + }, + { + "description": "We have disabled the downstairs client for some reason\n\n(for example, we have received `Message::YouAreNoLongerActive`)", + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": [ + "disabled" + ] + } + }, + "required": [ + "type" + ] + }, + { + "description": "The upstairs has requested that we deactivate", + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": [ + "deactivated" + ] + } + }, + "required": [ + "type" + ] + }, + { + "description": "Something went wrong during negotiation", + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": [ + "negotiation_failed" + ] + }, + "value": { + "$ref": "#/components/schemas/ClientNegotiationFailed" + } + }, + "required": [ + "type", + "value" + ] + }, + { + "description": "We have explicitly faulted the client", + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": [ + "fault" + ] + }, + "value": { + "$ref": "#/components/schemas/ClientFaultReason" + } + }, + "required": [ + "type", + "value" + ] + } + ] + }, "DownstairsWork": { "description": "`DownstairsWork` holds the information gathered from the downstairs", "type": "object", @@ -88,21 +255,166 @@ ] }, "DsState": { - "type": "string", - "enum": [ - "new", - "wait_active", - "wait_quorum", - "reconcile", - "active", - "faulted", - "live_repair_ready", - "live_repair", - "offline", - "deactivated", - "disabled", - "replacing", - "replaced" + "oneOf": [ + { + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": [ + "new" + ] + } + }, + "required": [ + "type" + ] + }, + { + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": [ + "wait_active" + ] + } + }, + "required": [ + "type" + ] + }, + { + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": [ + "wait_quorum" + ] + } + }, + "required": [ + "type" + ] + }, + { + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": [ + "reconcile" + ] + } + }, + "required": [ + "type" + ] + }, + { + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": [ + "active" + ] + } + }, + "required": [ + "type" + ] + }, + { + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": [ + "faulted" + ] + } + }, + "required": [ + "type" + ] + }, + { + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": [ + "live_repair_ready" + ] + } + }, + "required": [ + "type" + ] + }, + { + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": [ + "live_repair" + ] + } + }, + "required": [ + "type" + ] + }, + { + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": [ + "offline" + ] + } + }, + "required": [ + "type" + ] + }, + { + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": [ + "replaced" + ] + } + }, + "required": [ + "type" + ] + }, + { + "description": "The IO task for the client is being stopped", + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": [ + "stopping" + ] + }, + "value": { + "$ref": "#/components/schemas/ClientStopReason" + } + }, + "required": [ + "type", + "value" + ] + } ] }, "Error": { diff --git a/tools/test_fail_live_repair.sh b/tools/test_fail_live_repair.sh index 333f95ade..0a71a8765 100755 --- a/tools/test_fail_live_repair.sh +++ b/tools/test_fail_live_repair.sh @@ -53,6 +53,12 @@ for bin in $cds $crucible_test $dsc; do fi done +# The jq program is required for processing the /info endpoint +if ! jq --version > /dev/null; then + echo "Can't find jq program, required for this test" + exit 1 +fi + # Verify there is not a downstairs already running. if pgrep -fl -U "$(id -u)" "$cds"; then echo "Downstairs already running" >&2 @@ -160,13 +166,7 @@ while [[ $count -le $loops ]]; do choice_state="undefined" while [[ "$choice_state" != "faulted" ]]; do sleep 3 - if [[ $choice -eq 0 ]]; then - choice_state=$(curl -s http://127.0.0.1:7890/info | awk -F\" '{print $8}') - elif [[ $choice -eq 1 ]]; then - choice_state=$(curl -s http://127.0.0.1:7890/info | awk -F\" '{print $10}') - else - choice_state=$(curl -s http://127.0.0.1:7890/info | awk -F\" '{print $12}') - fi + choice_state=$(curl -s http://127.0.0.1:7890/info | jq -r .ds_state[$choice].type) done if [[ $pstop -eq 0 ]]; then @@ -180,13 +180,7 @@ while [[ $count -le $loops ]]; do choice_state="undefined" while [[ "$choice_state" != "live_repair" ]]; do sleep 2 - if [[ $choice -eq 0 ]]; then - choice_state=$(curl -s http://127.0.0.1:7890/info | awk -F\" '{print $8}') - elif [[ $choice -eq 1 ]]; then - choice_state=$(curl -s http://127.0.0.1:7890/info | awk -F\" '{print $10}') - else - choice_state=$(curl -s http://127.0.0.1:7890/info | awk -F\" '{print $12}') - fi + choice_state=$(curl -s http://127.0.0.1:7890/info | jq -r .ds_state[$choice].type) done # Give the live repair between 5 and 10 seconds to start repairing. @@ -204,13 +198,7 @@ while [[ $count -le $loops ]]; do choice_state="undefined" while [[ "$choice_state" != "faulted" ]]; do sleep 3 - if [[ $choice -eq 0 ]]; then - choice_state=$(curl -s http://127.0.0.1:7890/info | awk -F\" '{print $8}') - elif [[ $choice -eq 1 ]]; then - choice_state=$(curl -s http://127.0.0.1:7890/info | awk -F\" '{print $10}') - else - choice_state=$(curl -s http://127.0.0.1:7890/info | awk -F\" '{print $12}') - fi + choice_state=$(curl -s http://127.0.0.1:7890/info | jq -r .ds_state[$choice].type) done sleep 2 @@ -223,10 +211,11 @@ while [[ $count -le $loops ]]; do # Now wait for all downstairs to be active echo Now wait for all downstairs to be active | tee -a "$test_log" - all_state=$(curl -s http://127.0.0.1:7890/info | awk -F\" '{print $8","$10","$12}') - while [[ "${all_state}" != "active,active,active" ]]; do + all_state=$(curl -s http://127.0.0.1:7890/info | jq -r .ds_state[].type | tr '\n' ',') + # The trailing comma here is required + while [[ "${all_state}" != "active,active,active," ]]; do sleep 5 - all_state=$(curl -s http://127.0.0.1:7890/info | awk -F\" '{print $8","$10","$12}') + all_state=$(curl -s http://127.0.0.1:7890/info | jq -r .ds_state[].type | tr '\n' ',') done echo All downstairs active, now stop IO test and wait for it to finish | tee -a "$test_log" diff --git a/upstairs/src/client.rs b/upstairs/src/client.rs index a890248e6..4b2f9b3a0 100644 --- a/upstairs/src/client.rs +++ b/upstairs/src/client.rs @@ -21,6 +21,8 @@ use std::{ }; use futures::StreamExt; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; use slog::{debug, error, info, o, warn, Logger}; use tokio::{ net::{TcpSocket, TcpStream}, @@ -317,12 +319,13 @@ impl DownstairsClient { }); } - fn halt_io_task(&mut self, r: ClientStopReason) { + fn halt_io_task(&mut self, up_state: &UpstairsState, r: ClientStopReason) { info!(self.log, "halting IO task due to {r:?}"); if let Some(t) = self.client_task.client_stop_tx.take() { if let Err(_e) = t.send(r) { warn!(self.log, "failed to send stop request") } + self.checked_state_transition(up_state, DsState::Stopping(r)); } else { warn!(self.log, "client task is already stopping") } @@ -546,8 +549,7 @@ impl DownstairsClient { /// Switches the client state to Deactivated and stops the IO task pub(crate) fn deactivate(&mut self, up_state: &UpstairsState) { - self.checked_state_transition(up_state, DsState::Deactivated); - self.halt_io_task(ClientStopReason::Deactivated) + self.halt_io_task(up_state, ClientStopReason::Deactivated) } /// Resets this Downstairs and start a fresh connection @@ -584,19 +586,27 @@ impl DownstairsClient { DsState::Active | DsState::Offline if !can_replay => { Some(DsState::Faulted) } - DsState::Active => Some(DsState::Offline), - DsState::LiveRepair | DsState::LiveRepairReady => { + DsState::LiveRepair + | DsState::LiveRepairReady + | DsState::Stopping(ClientStopReason::Fault(..)) => { Some(DsState::Faulted) } - DsState::Deactivated - | DsState::Reconcile + DsState::Active => Some(DsState::Offline), + + DsState::Reconcile | DsState::WaitQuorum | DsState::WaitActive - | DsState::Disabled => Some(DsState::New), + | DsState::Stopping(ClientStopReason::NegotiationFailed(..)) + | DsState::Stopping(ClientStopReason::Disabled) + | DsState::Stopping(ClientStopReason::Deactivated) => { + Some(DsState::New) + } // If we have replaced a downstairs, don't forget that. - DsState::Replacing => Some(DsState::Replaced), + DsState::Stopping(ClientStopReason::Replacing) => { + Some(DsState::Replaced) + } // We stay in these states through the task restart DsState::Offline @@ -853,33 +863,7 @@ impl DownstairsClient { up_state: &UpstairsState, reason: ClientNegotiationFailed, ) { - let new_state = match self.state { - DsState::Active | DsState::Offline => DsState::Offline, - DsState::Faulted => DsState::Faulted, - DsState::Deactivated => DsState::New, - DsState::Reconcile => DsState::New, - DsState::LiveRepair => DsState::Faulted, - DsState::LiveRepairReady => DsState::Faulted, - DsState::Replacing => DsState::Replaced, - _ => { - /* - * Any other state means we had not yet enabled this - * downstairs to receive IO, so we go to the back of the - * line and have to re-verify it again. - */ - DsState::New - } - }; - - info!( - self.log, - "restarting connection, transition from {} to {}", - self.state, - new_state, - ); - - self.checked_state_transition(up_state, new_state); - self.halt_io_task(reason.into()); + self.halt_io_task(up_state, reason.into()); } /// Sets the current state to `DsState::Active` @@ -938,8 +922,13 @@ impl DownstairsClient { // We never send jobs if we're in certain inactive states DsState::Faulted | DsState::Replaced - | DsState::Replacing - | DsState::LiveRepairReady => EnqueueResult::Skip, + | DsState::LiveRepairReady + | DsState::Stopping( + ClientStopReason::Fault(..) + | ClientStopReason::Disabled + | ClientStopReason::Replacing + | ClientStopReason::NegotiationFailed(..), + ) => EnqueueResult::Skip, // We conditionally send jobs if we're in live-repair, depending on // the current extent. @@ -969,8 +958,7 @@ impl DownstairsClient { | DsState::WaitActive | DsState::WaitQuorum | DsState::Reconcile - | DsState::Deactivated - | DsState::Disabled => panic!( + | DsState::Stopping(ClientStopReason::Deactivated) => panic!( "enqueue should not be called from state {:?}", self.state ), @@ -986,10 +974,9 @@ impl DownstairsClient { self.target_addr = Some(new); self.region_metadata = None; - self.checked_state_transition(up_state, DsState::Replacing); self.stats.replaced += 1; - self.halt_io_task(ClientStopReason::Replacing); + self.halt_io_task(up_state, ClientStopReason::Replacing); } /// Sets `self.state` to `new_state`, with logging and validity checking @@ -1032,11 +1019,11 @@ impl DownstairsClient { ) }; match new_state { - DsState::Replacing => { - // A downstairs can be replaced at any time. - } DsState::Replaced => { - assert_eq!(old_state, DsState::Replacing); + assert_eq!( + old_state, + DsState::Stopping(ClientStopReason::Replacing) + ); } DsState::WaitActive => { if old_state == DsState::Offline { @@ -1072,7 +1059,8 @@ impl DownstairsClient { | DsState::Reconcile | DsState::LiveRepair | DsState::LiveRepairReady - | DsState::Offline => {} // Okay + | DsState::Offline + | DsState::Stopping(ClientStopReason::Fault(..)) => {} // Okay _ => { panic_invalid(); } @@ -1102,29 +1090,6 @@ impl DownstairsClient { assert!(!matches!(up_state, UpstairsState::Active)); } } - DsState::Deactivated => { - // We only go deactivated if we were actually active, or - // somewhere past active. - // if deactivate is requested before active, the downstairs - // state should just go back to NEW and re-require an - // activation. - match old_state { - DsState::Active - | DsState::LiveRepair - | DsState::LiveRepairReady - | DsState::Offline - | DsState::Reconcile => {} // Okay - DsState::Faulted => { - if matches!(up_state, UpstairsState::Active) { - // Can't transition like this when active - panic_invalid(); - } - } - _ => { - panic_invalid(); - } - } - } DsState::LiveRepair => { assert_eq!(old_state, DsState::LiveRepairReady); } @@ -1141,10 +1106,14 @@ impl DownstairsClient { // on of these states. match old_state { DsState::Active - | DsState::Deactivated | DsState::Faulted | DsState::Reconcile - | DsState::Disabled => {} // Okay + | DsState::Stopping( + ClientStopReason::Deactivated + | ClientStopReason::Disabled + | ClientStopReason::Replacing + | ClientStopReason::NegotiationFailed(..), + ) => {} // Okay _ => { panic_invalid(); } @@ -1158,9 +1127,50 @@ impl DownstairsClient { } } } - DsState::Disabled => { - // A move to Disabled can happen at any time we are talking - // to a downstairs. + + // We only go deactivated if we were actually active, or + // somewhere past active. + // if deactivate is requested before active, the downstairs + // state should just go back to NEW and re-require an + // activation. + DsState::Stopping(ClientStopReason::Deactivated) => { + match old_state { + DsState::Active + | DsState::LiveRepair + | DsState::LiveRepairReady + | DsState::Offline + | DsState::Reconcile => {} // Okay + DsState::Faulted => { + if matches!(up_state, UpstairsState::Active) { + // Can't transition like this when active + panic_invalid(); + } + } + _ => { + panic_invalid(); + } + } + } + + // Some stop reasons may occur at any time + DsState::Stopping( + ClientStopReason::Fault(..) + | ClientStopReason::Replacing + | ClientStopReason::Disabled, + ) => {} + + // The client may undergo negotiation for many reasons + DsState::Stopping(ClientStopReason::NegotiationFailed(..)) => { + match old_state { + DsState::New + | DsState::WaitActive + | DsState::WaitQuorum + | DsState::Reconcile + | DsState::Offline + | DsState::Faulted + | DsState::LiveRepairReady => {} + _ => panic_invalid(), + } } } @@ -1193,8 +1203,7 @@ impl DownstairsClient { up_state: &UpstairsState, reason: ClientFaultReason, ) { - self.checked_state_transition(up_state, DsState::Faulted); - self.halt_io_task(reason.into()); + self.halt_io_task(up_state, reason.into()); } /// Finishes an in-progress live repair, setting our state to `Active` @@ -1365,8 +1374,7 @@ impl DownstairsClient { /// /// The IO task will automatically restart in the main event handler pub(crate) fn disable(&mut self, up_state: &UpstairsState) { - self.checked_state_transition(up_state, DsState::Disabled); - self.halt_io_task(ClientStopReason::Disabled); + self.halt_io_task(up_state, ClientStopReason::Disabled); } /// Skips from `LiveRepairReady` to `Active`; a no-op otherwise @@ -1834,16 +1842,6 @@ impl DownstairsClient { NegotiationState::GetExtentVersions; self.send(Message::ExtentVersionsPlease); } - DsState::Replacing => { - warn!( - self.log, - "exiting negotiation because we're replacing" - ); - self.abort_negotiation( - up_state, - ClientNegotiationFailed::Replacing, - ); - } bad_state => { panic!( "[{}] join from invalid state {} {} {:?}", @@ -1865,18 +1863,6 @@ impl DownstairsClient { return Ok(false); // TODO should we trigger set_inactive? } match self.state { - DsState::Replacing => { - error!( - self.log, - "exiting negotiation due to LastFlushAck \ - while replacing" - ); - self.abort_negotiation( - up_state, - ClientNegotiationFailed::Replacing, - ); - return Ok(false); // TODO should we trigger set_inactive? - } DsState::Offline => (), s => panic!("got LastFlushAck in bad state {s:?}"), } @@ -1913,18 +1899,6 @@ impl DownstairsClient { DsState::WaitQuorum, ); } - DsState::Replacing => { - warn!( - self.log, - "exiting negotiation due to ExtentVersions while \ - replacing" - ); - self.abort_negotiation( - up_state, - ClientNegotiationFailed::Replacing, - ); - return Ok(false); // TODO should we trigger set_inactive? - } DsState::Faulted | DsState::Replaced => { self.checked_state_transition( up_state, @@ -2177,8 +2151,11 @@ pub(crate) struct DownstairsStats { } /// When the upstairs halts the IO client task, it must provide a reason -#[derive(Debug)] -pub(crate) enum ClientStopReason { +#[derive( + Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, JsonSchema, +)] +#[serde(rename_all = "snake_case", tag = "type", content = "value")] +pub enum ClientStopReason { /// We are about to replace the client task Replacing, @@ -2200,8 +2177,11 @@ pub(crate) enum ClientStopReason { } /// Subset of [`ClientStopReason`] for faulting a client -#[derive(Debug)] -pub(crate) enum ClientNegotiationFailed { +#[derive( + Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, JsonSchema, +)] +#[serde(rename_all = "snake_case")] +pub enum ClientNegotiationFailed { /// Reconcile failed and we're restarting FailedReconcile, @@ -2210,9 +2190,6 @@ pub(crate) enum ClientNegotiationFailed { /// Negotiation says that we are incompatible Incompatible, - - /// We are trying to replace the client task - Replacing, } impl From for ClientStopReason { @@ -2222,8 +2199,11 @@ impl From for ClientStopReason { } /// Subset of [`ClientStopReason`] for faulting a client -#[derive(Debug)] -pub(crate) enum ClientFaultReason { +#[derive( + Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, JsonSchema, +)] +#[serde(rename_all = "snake_case")] +pub enum ClientFaultReason { /// Received an error from some non-recoverable IO (write or flush) IOError, @@ -2933,7 +2913,7 @@ mod test { ); client.checked_state_transition( &UpstairsState::Active, - DsState::Deactivated, + DsState::Stopping(ClientStopReason::Deactivated), ); client.checked_state_transition(&UpstairsState::Active, DsState::New); } @@ -2945,7 +2925,7 @@ mod test { let mut client = DownstairsClient::test_default(); client.checked_state_transition( &UpstairsState::Initializing, - DsState::Deactivated, + DsState::Stopping(ClientStopReason::Deactivated), ); } @@ -2960,7 +2940,7 @@ mod test { ); client.checked_state_transition( &UpstairsState::Initializing, - DsState::Deactivated, + DsState::Stopping(ClientStopReason::Deactivated), ); } @@ -2979,7 +2959,7 @@ mod test { ); client.checked_state_transition( &UpstairsState::Initializing, - DsState::Deactivated, + DsState::Stopping(ClientStopReason::Deactivated), ); } @@ -3020,7 +3000,7 @@ mod test { ); client.checked_state_transition( &UpstairsState::Initializing, - DsState::Deactivated, + DsState::Stopping(ClientStopReason::Deactivated), ); client.checked_state_transition( &UpstairsState::Initializing, diff --git a/upstairs/src/downstairs.rs b/upstairs/src/downstairs.rs index d16c9b637..21eaad871 100644 --- a/upstairs/src/downstairs.rs +++ b/upstairs/src/downstairs.rs @@ -10,7 +10,7 @@ use crate::{ cdt, client::{ ClientAction, ClientFaultReason, ClientNegotiationFailed, - DownstairsClient, EnqueueResult, + ClientStopReason, DownstairsClient, EnqueueResult, }, guest::GuestBlockRes, io_limits::{IOLimitGuard, IOLimits}, @@ -2556,7 +2556,7 @@ impl Downstairs { // We don't really know if the "old" matches what was old, // as that info is gone to us now, so assume it was true. match self.clients[new_client_id].state() { - DsState::Replacing + DsState::Stopping(ClientStopReason::Replacing) | DsState::Replaced | DsState::LiveRepairReady | DsState::LiveRepair => { @@ -2586,7 +2586,9 @@ impl Downstairs { continue; } match self.clients[client_id].state() { - DsState::Replacing + // XXX there are a bunch of states that aren't ready for IO but + // aren't listed here, e.g. all of the negotiation states. + DsState::Stopping(..) | DsState::Replaced | DsState::LiveRepairReady | DsState::LiveRepair => { @@ -2706,29 +2708,36 @@ impl Downstairs { /// The live-repair may continue after this point to clean up reserved jobs, /// to avoid blocking dependencies, but jobs are replaced with no-ops. fn abort_repair(&mut self, up_state: &UpstairsState) { - assert!(self.clients.iter().any(|c| { - c.state() == DsState::LiveRepair || - // If connection aborted, and restarted, then the re-negotiation - // could have won this race, and transitioned the reconnecting - // downstairs from LiveRepair to Faulted to LiveRepairReady. - c.state() == DsState::LiveRepairReady || - // If just a single IO reported failure, we will fault this - // downstairs and it won't yet have had a chance to move back - // around to LiveRepairReady yet. - c.state() == DsState::Faulted - })); + let mut found_valid_state = false; for i in ClientId::iter() { match self.clients[i].state() { DsState::LiveRepair => { + found_valid_state = true; self.fault_client( i, up_state, ClientFaultReason::FailedLiveRepair, ); } - DsState::LiveRepairReady => { - // TODO I don't think this is necessary - self.skip_all_jobs(i); + // If connection aborted, and restarted, then the re-negotiation + // could have won this race, and transitioned the reconnecting + // downstairs from LiveRepair to Faulted to LiveRepairReady. + DsState::LiveRepairReady => found_valid_state = true, + + // If just a single IO reported failure, we will fault this + // downstairs and it won't yet have had a chance to move back + // around to LiveRepairReady yet. + DsState::Faulted => found_valid_state = true, + + // It's also possible for a Downstairs to be in the process of + // stopping, due a fault or disconnection + DsState::Stopping( + ClientStopReason::Replacing + | ClientStopReason::Disabled + | ClientStopReason::Deactivated + | ClientStopReason::Fault(..), + ) => { + found_valid_state = true; } _ => {} } @@ -2738,6 +2747,11 @@ impl Downstairs { // always clear it. self.clients[i].clear_repair_state(); } + assert!( + found_valid_state, + "abort_repair called without a valid client state: {:?}", + self.clients.iter().map(|c| c.state()).collect::>(), + ); if let Some(repair) = &mut self.repair { repair.aborting_repair = true; @@ -3356,9 +3370,7 @@ impl Downstairs { "Saw CrucibleError::UpstairsInactive on client {}!", client_id ); - self.clients[client_id] - .checked_state_transition(up_state, DsState::Disabled); - // TODO should we also restart the IO task here? + self.clients[client_id].disable(up_state); } Some(CrucibleError::DecryptionError) => { // We should always be able to decrypt the data. If we @@ -4341,7 +4353,10 @@ struct DownstairsBackpressureConfig { #[cfg(test)] pub(crate) mod test { - use super::{ClientFaultReason, Downstairs, PendingJob}; + use super::{ + ClientFaultReason, ClientNegotiationFailed, ClientStopReason, + Downstairs, PendingJob, + }; use crate::{ downstairs::{LiveRepairData, LiveRepairState, ReconcileData}, live_repair::ExtentInfo, @@ -4364,6 +4379,17 @@ pub(crate) mod test { use uuid::Uuid; + // Helper constants for verbose stopping states + const STOP_FAULT_REQUESTED: DsState = DsState::Stopping( + ClientStopReason::Fault(ClientFaultReason::RequestedFault), + ); + const STOP_IO_ERROR: DsState = + DsState::Stopping(ClientStopReason::Fault(ClientFaultReason::IOError)); + const STOP_FAILED_RECONCILE: DsState = + DsState::Stopping(ClientStopReason::NegotiationFailed( + ClientNegotiationFailed::FailedReconcile, + )); + /// Builds a single-block reply from the given request and response data pub fn build_read_response(data: &[u8]) -> RawReadResponse { RawReadResponse { @@ -6069,9 +6095,10 @@ pub(crate) mod test { // Fault client 1, so that later event handling will kick us out of // repair - ds.clients[ClientId::new(1)].checked_state_transition( - &UpstairsState::Initializing, - DsState::Faulted, + ds.fault_client( + ClientId::new(1), + &UpstairsState::Active, + ClientFaultReason::RequestedFault, ); // Send an ack to trigger the reconciliation state check @@ -6083,9 +6110,9 @@ pub(crate) mod test { assert!(!nw); // The two troublesome tasks will end up in DsState::New. - assert_eq!(ds.clients[ClientId::new(0)].state(), DsState::New); - assert_eq!(ds.clients[ClientId::new(1)].state(), DsState::Faulted); - assert_eq!(ds.clients[ClientId::new(2)].state(), DsState::New); + assert_eq!(ds.clients[ClientId::new(0)].state(), STOP_FAILED_RECONCILE,); + assert_eq!(ds.clients[ClientId::new(1)].state(), STOP_FAULT_REQUESTED); + assert_eq!(ds.clients[ClientId::new(2)].state(), STOP_FAILED_RECONCILE); // Verify that reconciliation has been stopped assert!(ds.reconcile.is_none()); @@ -6126,9 +6153,9 @@ pub(crate) mod test { // Getting the next work to do should verify the previous is done, // and handle a state change for a downstairs. - assert_eq!(ds.clients[ClientId::new(0)].state(), DsState::New); - assert_eq!(ds.clients[ClientId::new(1)].state(), DsState::New); - assert_eq!(ds.clients[ClientId::new(2)].state(), DsState::New); + for c in ds.clients.iter() { + assert_eq!(c.state(), STOP_FAILED_RECONCILE); + } // Verify that reconciliation has stopped assert!(ds.reconcile.is_none()); @@ -6587,7 +6614,7 @@ pub(crate) mod test { None, )); // client 0 is failed, the others should be okay still - assert_eq!(ds.clients[ClientId::new(0)].state(), DsState::Faulted); + assert_eq!(ds.clients[ClientId::new(0)].state(), STOP_IO_ERROR,); assert_eq!(ds.clients[ClientId::new(1)].state(), DsState::Active); assert_eq!(ds.clients[ClientId::new(2)].state(), DsState::Active); @@ -6599,8 +6626,8 @@ pub(crate) mod test { &UpstairsState::Active, None, )); - assert_eq!(ds.clients[ClientId::new(0)].state(), DsState::Faulted); - assert_eq!(ds.clients[ClientId::new(1)].state(), DsState::Faulted); + assert_eq!(ds.clients[ClientId::new(0)].state(), STOP_IO_ERROR,); + assert_eq!(ds.clients[ClientId::new(1)].state(), STOP_IO_ERROR,); assert_eq!(ds.clients[ClientId::new(2)].state(), DsState::Active); // Three failures, But since this is a write we already have marked @@ -6613,9 +6640,9 @@ pub(crate) mod test { &UpstairsState::Active, None, )); - assert_eq!(ds.clients[ClientId::new(0)].state(), DsState::Faulted); - assert_eq!(ds.clients[ClientId::new(1)].state(), DsState::Faulted); - assert_eq!(ds.clients[ClientId::new(2)].state(), DsState::Faulted); + assert_eq!(ds.clients[ClientId::new(0)].state(), STOP_IO_ERROR,); + assert_eq!(ds.clients[ClientId::new(1)].state(), STOP_IO_ERROR,); + assert_eq!(ds.clients[ClientId::new(2)].state(), STOP_IO_ERROR,); // Verify that this work should have been fast-acked assert!(ds.ds_active.get(&next_id).unwrap().acked); @@ -6646,7 +6673,7 @@ pub(crate) mod test { None )); // client 0 should be marked failed. - assert_eq!(ds.clients[ClientId::new(0)].state(), DsState::Faulted); + assert_eq!(ds.clients[ClientId::new(0)].state(), STOP_IO_ERROR,); let ok_response = || Ok(Default::default()); // Process the good operation for client 1 @@ -6666,7 +6693,7 @@ pub(crate) mod test { &UpstairsState::Active, None )); - assert_eq!(ds.clients[ClientId::new(0)].state(), DsState::Faulted); + assert_eq!(ds.clients[ClientId::new(0)].state(), STOP_IO_ERROR,); assert_eq!(ds.clients[ClientId::new(1)].state(), DsState::Active); assert_eq!(ds.clients[ClientId::new(2)].state(), DsState::Active); @@ -6775,7 +6802,7 @@ pub(crate) mod test { None, )); // client 0 is failed, the others should be okay still - assert_eq!(ds.clients[ClientId::new(0)].state(), DsState::Faulted); + assert_eq!(ds.clients[ClientId::new(0)].state(), STOP_IO_ERROR,); assert_eq!(ds.clients[ClientId::new(1)].state(), DsState::Active); assert_eq!(ds.clients[ClientId::new(2)].state(), DsState::Active); @@ -6787,8 +6814,8 @@ pub(crate) mod test { &UpstairsState::Active, None )); - assert_eq!(ds.clients[ClientId::new(0)].state(), DsState::Faulted); - assert_eq!(ds.clients[ClientId::new(1)].state(), DsState::Faulted); + assert_eq!(ds.clients[ClientId::new(0)].state(), STOP_IO_ERROR,); + assert_eq!(ds.clients[ClientId::new(1)].state(), STOP_IO_ERROR,); assert_eq!(ds.clients[ClientId::new(2)].state(), DsState::Active); let ok_response = Ok(Default::default()); @@ -6800,8 +6827,8 @@ pub(crate) mod test { &UpstairsState::Active, None )); - assert_eq!(ds.clients[ClientId::new(0)].state(), DsState::Faulted); - assert_eq!(ds.clients[ClientId::new(1)].state(), DsState::Faulted); + assert_eq!(ds.clients[ClientId::new(0)].state(), STOP_IO_ERROR,); + assert_eq!(ds.clients[ClientId::new(1)].state(), STOP_IO_ERROR,); assert_eq!(ds.clients[ClientId::new(2)].state(), DsState::Active); // Verify we should have fast-ackd this work @@ -6880,7 +6907,7 @@ pub(crate) mod test { // Verify client states assert_eq!(ds.clients[ClientId::new(0)].state(), DsState::Active); - assert_eq!(ds.clients[ClientId::new(1)].state(), DsState::Faulted); + assert_eq!(ds.clients[ClientId::new(1)].state(), STOP_IO_ERROR,); assert_eq!(ds.clients[ClientId::new(2)].state(), DsState::Active); // A faulted write won't change skipped job count. @@ -7022,7 +7049,7 @@ pub(crate) mod test { // Verify client states assert_eq!(ds.clients[ClientId::new(0)].state(), DsState::Active); - assert_eq!(ds.clients[ClientId::new(1)].state(), DsState::Faulted); + assert_eq!(ds.clients[ClientId::new(1)].state(), STOP_IO_ERROR,); assert_eq!(ds.clients[ClientId::new(2)].state(), DsState::Active); // Verify the read switched from InProgress to Skipped @@ -7087,7 +7114,7 @@ pub(crate) mod test { // Verify client states assert_eq!(ds.clients[ClientId::new(0)].state(), DsState::Active); - assert_eq!(ds.clients[ClientId::new(1)].state(), DsState::Faulted); + assert_eq!(ds.clients[ClientId::new(1)].state(), STOP_IO_ERROR,); assert_eq!(ds.clients[ClientId::new(2)].state(), DsState::Active); // The write was fast-acked, and the read is still going @@ -7197,7 +7224,7 @@ pub(crate) mod test { // Verify client states assert_eq!(ds.clients[ClientId::new(0)].state(), DsState::Active); assert_eq!(ds.clients[ClientId::new(1)].state(), DsState::Active); - assert_eq!(ds.clients[ClientId::new(2)].state(), DsState::Faulted); + assert_eq!(ds.clients[ClientId::new(2)].state(), STOP_IO_ERROR,); // Verify all IOs are done for cid in ClientId::iter() { @@ -7306,7 +7333,7 @@ pub(crate) mod test { // Verify client states assert_eq!(ds.clients[ClientId::new(0)].state(), DsState::Active); assert_eq!(ds.clients[ClientId::new(1)].state(), DsState::Active); - assert_eq!(ds.clients[ClientId::new(2)].state(), DsState::Faulted); + assert_eq!(ds.clients[ClientId::new(2)].state(), STOP_IO_ERROR,); // Verify all IOs are done for cid in ClientId::iter() { @@ -9628,14 +9655,14 @@ pub(crate) mod test { &UpstairsState::Active, ClientFaultReason::RequestedFault, ); - ds.clients[to_repair].checked_state_transition( - &UpstairsState::Active, + for s in [ + DsState::Faulted, DsState::LiveRepairReady, - ); - ds.clients[to_repair].checked_state_transition( - &UpstairsState::Active, DsState::LiveRepair, - ); + ] { + ds.clients[to_repair] + .checked_state_transition(&UpstairsState::Active, s); + } let next_id = ds.peek_next_id().0; ds.repair = Some(LiveRepairData { @@ -9797,14 +9824,14 @@ pub(crate) mod test { &UpstairsState::Active, ClientFaultReason::RequestedFault, ); - ds.clients[to_repair].checked_state_transition( - &UpstairsState::Active, + for s in [ + DsState::Faulted, DsState::LiveRepairReady, - ); - ds.clients[to_repair].checked_state_transition( - &UpstairsState::Active, DsState::LiveRepair, - ); + ] { + ds.clients[to_repair] + .checked_state_transition(&UpstairsState::Active, s); + } let next_id = ds.peek_next_id().0; @@ -9953,10 +9980,10 @@ pub(crate) mod test { &UpstairsState::Active, ClientFaultReason::RequestedFault, ); - ds.clients[to_repair].checked_state_transition( - &UpstairsState::Active, - DsState::LiveRepairReady, - ); + for s in [DsState::Faulted, DsState::LiveRepairReady] { + ds.clients[to_repair] + .checked_state_transition(&UpstairsState::Active, s); + } // Start the repair normally. This enqueues the close & reopen jobs, and // reserves Job IDs for the repair/noop diff --git a/upstairs/src/lib.rs b/upstairs/src/lib.rs index 09508ef0f..e5a105e79 100644 --- a/upstairs/src/lib.rs +++ b/upstairs/src/lib.rs @@ -65,6 +65,9 @@ use guest::{GuestBlockRes, GuestIoHandle}; mod stats; +pub use client::{ + ClientFaultReason, ClientNegotiationFailed, ClientStopReason, +}; pub use crucible_common::impacted_blocks::*; mod deferred; @@ -785,6 +788,7 @@ pub(crate) struct RawReadResponse { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] #[serde(rename_all = "snake_case")] +#[serde(tag = "type", content = "value")] pub enum DsState { /* * New connection @@ -827,26 +831,14 @@ pub enum DsState { * if this downstairs reconnects in time. */ Offline, - /* - * A guest requested deactivation, this downstairs has completed all - * its outstanding work and is now waiting for the upstairs to - * transition back to initializing. - */ - Deactivated, - /* - * Another Upstairs has connected and is now active. - */ - Disabled, - /* - * This downstairs is being replaced, Any active task needs to clear - * any state and exit. - */ - Replacing, /* * The current downstairs tasks have ended and the replacement has * begun. */ Replaced, + + /// The IO task for the client is being stopped + Stopping(crate::client::ClientStopReason), } impl std::fmt::Display for DsState { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -878,18 +870,12 @@ impl std::fmt::Display for DsState { DsState::Offline => { write!(f, "Offline") } - DsState::Deactivated => { - write!(f, "Deactivated") - } - DsState::Disabled => { - write!(f, "Disabled") - } - DsState::Replacing => { - write!(f, "Replacing") - } DsState::Replaced => { write!(f, "Replaced") } + DsState::Stopping(..) => { + write!(f, "Stopping") + } } } } diff --git a/upstairs/src/live_repair.rs b/upstairs/src/live_repair.rs index 5f1d2b0af..543d8c3e9 100644 --- a/upstairs/src/live_repair.rs +++ b/upstairs/src/live_repair.rs @@ -94,6 +94,13 @@ pub mod repair_test { }, }; + const STOP_IO_ERROR: DsState = + DsState::Stopping(ClientStopReason::Fault(ClientFaultReason::IOError)); + + const STOP_FAILED_LR: DsState = DsState::Stopping(ClientStopReason::Fault( + ClientFaultReason::FailedLiveRepair, + )); + /// Test function to send fake replies for the given job, completing it /// /// Fake replies are applied through `Upstairs::apply`, so the reply may @@ -486,7 +493,7 @@ pub mod repair_test { } // process_ds_completion should force the downstairs to fail - assert_eq!(up.downstairs.clients[err_ds].state(), DsState::Faulted); + assert_eq!(up.downstairs.clients[err_ds].state(), STOP_IO_ERROR); info!(up.log, "repair job should have got here, move it forward"); // The repair (NoOp) job should have shown up. Move it forward. @@ -581,8 +588,10 @@ pub mod repair_test { // Because the repair has failed, the extent that was under // repair should also now be faulted. - assert_eq!(up.downstairs.clients[err_ds].state(), DsState::Faulted); - assert_eq!(up.downstairs.clients[or_ds].state(), DsState::Faulted); + assert_eq!(up.downstairs.clients[err_ds].state(), STOP_IO_ERROR); + if err_ds != or_ds { + assert_eq!(up.downstairs.clients[or_ds].state(), STOP_FAILED_LR); + } let job = up.downstairs.get_job(&ds_flush_id).unwrap(); match &job.work { @@ -658,8 +667,10 @@ pub mod repair_test { // process_ds_completion should force both the downstairs that // reported the error, and the downstairs that is under repair to // fail. - assert_eq!(up.downstairs.clients[err_ds].state(), DsState::Faulted); - assert_eq!(up.downstairs.clients[or_ds].state(), DsState::Faulted); + assert_eq!(up.downstairs.clients[err_ds].state(), STOP_IO_ERROR); + if err_ds != or_ds { + assert_eq!(up.downstairs.clients[or_ds].state(), STOP_FAILED_LR); + } // When we completed the repair jobs, the repair_extent should // have gone ahead and issued the NoOp that should be issued next. @@ -727,8 +738,10 @@ pub mod repair_test { assert_eq!(job.state_count().skipped, 1); } - assert_eq!(up.downstairs.clients[err_ds].state(), DsState::Faulted); - assert_eq!(up.downstairs.clients[or_ds].state(), DsState::Faulted); + assert_eq!(up.downstairs.clients[err_ds].state(), STOP_IO_ERROR); + if err_ds != or_ds { + assert_eq!(up.downstairs.clients[or_ds].state(), STOP_FAILED_LR); + } let job = up.downstairs.get_job(&ds_flush_id).unwrap(); match &job.work { @@ -840,8 +853,10 @@ pub mod repair_test { assert_eq!(job.state_count().skipped, 1); } - assert_eq!(up.downstairs.clients[err_ds].state(), DsState::Faulted); - assert_eq!(up.downstairs.clients[or_ds].state(), DsState::Faulted); + assert_eq!(up.downstairs.clients[err_ds].state(), STOP_IO_ERROR); + if err_ds != or_ds { + assert_eq!(up.downstairs.clients[or_ds].state(), STOP_FAILED_LR); + } let job = up.downstairs.get_job(&ds_flush_id).unwrap(); match &job.work { @@ -960,8 +975,10 @@ pub mod repair_test { assert_eq!(job.state_count().done, 2); assert_eq!(job.state_count().skipped, 1); - assert_eq!(up.downstairs.clients[err_ds].state(), DsState::Faulted); - assert_eq!(up.downstairs.clients[or_ds].state(), DsState::Faulted); + assert_eq!(up.downstairs.clients[err_ds].state(), STOP_IO_ERROR); + if err_ds != or_ds { + assert_eq!(up.downstairs.clients[or_ds].state(), STOP_FAILED_LR); + } let job = up.downstairs.get_job(&ds_flush_id).unwrap(); match &job.work { @@ -1060,8 +1077,10 @@ pub mod repair_test { assert_eq!(job.state_count().done, 2); assert_eq!(job.state_count().error, 1); - assert_eq!(up.downstairs.clients[err_ds].state(), DsState::Faulted); - assert_eq!(up.downstairs.clients[or_ds].state(), DsState::Faulted); + assert_eq!(up.downstairs.clients[err_ds].state(), STOP_IO_ERROR); + if err_ds != or_ds { + assert_eq!(up.downstairs.clients[or_ds].state(), STOP_FAILED_LR); + } let job = up.downstairs.get_job(&ds_flush_id).unwrap(); match &job.work { diff --git a/upstairs/src/upstairs.rs b/upstairs/src/upstairs.rs index 7d845bff1..8c7c71a20 100644 --- a/upstairs/src/upstairs.rs +++ b/upstairs/src/upstairs.rs @@ -2,7 +2,7 @@ //! Data structures specific to Crucible's `struct Upstairs` use crate::{ cdt, - client::{ClientAction, ClientRunResult}, + client::{ClientAction, ClientRunResult, ClientStopReason}, control::ControlRequest, deferred::{ DeferredBlockOp, DeferredMessage, DeferredQueue, DeferredRead, @@ -616,12 +616,13 @@ impl Upstairs { if matches!(&self.state, UpstairsState::Deactivating(..)) { info!(self.log, "checking for deactivation"); for i in ClientId::iter() { - // Clients become Deactivated, then New (when the IO task + // Clients become Stopping, then New (when the IO task // completes and the client is restarted). We don't try to // deactivate them _again_ in such cases. if matches!( self.downstairs.clients[i].state(), - DsState::Deactivated | DsState::New + DsState::Stopping(ClientStopReason::Deactivated) + | DsState::New ) { debug!(self.log, "already deactivated {i}"); } else if self.downstairs.try_deactivate(i, &self.state) { @@ -2226,7 +2227,10 @@ pub(crate) mod test { // Make sure the correct DS have changed state. for client_id in ClientId::iter() { // The downstairs is already deactivated - assert_eq!(up.ds_state(client_id), DsState::Deactivated); + assert_eq!( + up.ds_state(client_id), + DsState::Stopping(ClientStopReason::Deactivated) + ); // Push the event loop forward with the info that the IO task has // now stopped. @@ -3591,8 +3595,14 @@ pub(crate) mod test { } // These downstairs should now be deactivated now - assert_eq!(up.ds_state(ClientId::new(0)), DsState::Deactivated); - assert_eq!(up.ds_state(ClientId::new(2)), DsState::Deactivated); + assert_eq!( + up.ds_state(ClientId::new(0)), + DsState::Stopping(ClientStopReason::Deactivated) + ); + assert_eq!( + up.ds_state(ClientId::new(2)), + DsState::Stopping(ClientStopReason::Deactivated) + ); // Verify the remaining DS is still running assert_eq!(up.ds_state(ClientId::new(1)), DsState::Active); @@ -3612,7 +3622,10 @@ pub(crate) mod test { }), })); - assert_eq!(up.ds_state(ClientId::new(1)), DsState::Deactivated); + assert_eq!( + up.ds_state(ClientId::new(1)), + DsState::Stopping(ClientStopReason::Deactivated) + ); // Report all three DS as missing, which moves them to New and finishes // deactivation