Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify fault and restart code (1/3) #1568

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 78 additions & 70 deletions upstairs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ impl DownstairsClient {
}

fn halt_io_task(&mut self, r: ClientStopReason) {
info!(self.log, "halting IO task due to {r:?}");
if let Some(t) = self.client_task.client_stop_tx.take() {
if let Err(_e) = t.send(r) {
warn!(self.log, "failed to send stop request")
Expand Down Expand Up @@ -847,17 +848,12 @@ impl DownstairsClient {
self.state
}

pub(crate) fn restart_connection(
pub(crate) fn abort_negotiation(
&mut self,
up_state: &UpstairsState,
reason: ClientStopReason,
reason: ClientNegotiationFailed,
) {
let new_state = match self.state {
DsState::Active | DsState::Offline
if matches!(reason, ClientStopReason::IneligibleForReplay) =>
{
DsState::Faulted
}
DsState::Active | DsState::Offline => DsState::Offline,
DsState::Faulted => DsState::Faulted,
DsState::Deactivated => DsState::New,
Expand All @@ -883,7 +879,7 @@ impl DownstairsClient {
);

self.checked_state_transition(up_state, new_state);
self.halt_io_task(reason);
self.halt_io_task(reason.into());
}

/// Sets the current state to `DsState::Active`
Expand Down Expand Up @@ -1185,26 +1181,8 @@ impl DownstairsClient {
}
}

/// Aborts an in-progress live repair, conditionally restarting the task
///
/// # Panics
/// If this client is not in `DsState::LiveRepair` and `restart_task` is
/// `true`, or vice versa.
pub(crate) fn abort_repair(
&mut self,
up_state: &UpstairsState,
restart_task: bool,
) {
if restart_task {
assert_eq!(self.state, DsState::LiveRepair);
self.checked_state_transition(up_state, DsState::Faulted);
self.halt_io_task(ClientStopReason::FailedLiveRepair);
} else {
// Someone else (i.e. receiving an error upon IO completion) already
// restarted the IO task and kicked us out of the live-repair state,
// but we'll do further cleanup here.
assert_ne!(self.state, DsState::LiveRepair);
}
/// Sets `repair_info` to `None` and increments `live_repair_aborted`
pub(crate) fn clear_repair_state(&mut self) {
self.repair_info = None;
self.stats.live_repair_aborted += 1;
}
Expand All @@ -1213,10 +1191,10 @@ impl DownstairsClient {
pub(crate) fn fault(
&mut self,
up_state: &UpstairsState,
reason: ClientStopReason,
reason: ClientFaultReason,
) {
self.checked_state_transition(up_state, DsState::Faulted);
self.halt_io_task(reason);
self.halt_io_task(reason.into());
}

/// Finishes an in-progress live repair, setting our state to `Active`
Expand Down Expand Up @@ -1522,9 +1500,9 @@ impl DownstairsClient {
} => {
if self.negotiation_state != NegotiationState::Start {
error!(self.log, "got version already");
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::BadNegotiationOrder,
ClientNegotiationFailed::BadNegotiationOrder,
);
return Ok(false);
}
Expand All @@ -1535,9 +1513,9 @@ impl DownstairsClient {
CRUCIBLE_MESSAGE_VERSION,
version
);
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::Incompatible,
ClientNegotiationFailed::Incompatible,
);
return Ok(false);
}
Expand Down Expand Up @@ -1593,9 +1571,9 @@ impl DownstairsClient {
"downstairs version is {version}, \
ours is {CRUCIBLE_MESSAGE_VERSION}"
);
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::Incompatible,
ClientNegotiationFailed::Incompatible,
);
}
Message::EncryptedMismatch { expected } => {
Expand All @@ -1604,9 +1582,9 @@ impl DownstairsClient {
"downstairs encrypted is {expected}, ours is {}",
self.cfg.encrypted()
);
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::Incompatible,
ClientNegotiationFailed::Incompatible,
);
}
Message::ReadOnlyMismatch { expected } => {
Expand All @@ -1615,9 +1593,9 @@ impl DownstairsClient {
"downstairs read_only is {expected}, ours is {}",
self.cfg.read_only,
);
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::Incompatible,
ClientNegotiationFailed::Incompatible,
);
}
Message::YouAreNowActive {
Expand All @@ -1631,9 +1609,9 @@ impl DownstairsClient {
"Received YouAreNowActive out of order! {:?}",
self.negotiation_state
);
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::BadNegotiationOrder,
ClientNegotiationFailed::BadNegotiationOrder,
);
return Ok(false);
}
Expand Down Expand Up @@ -1675,17 +1653,17 @@ impl DownstairsClient {
"Generation requested:{} found:{}",
gen, upstairs_gen,
);
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::Incompatible,
ClientNegotiationFailed::Incompatible,
);
return Err(CrucibleError::GenerationNumberTooLow(
gen_error,
));
} else {
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::Incompatible,
ClientNegotiationFailed::Incompatible,
);
return Err(CrucibleError::UuidMismatch);
}
Expand All @@ -1698,9 +1676,9 @@ impl DownstairsClient {
if self.negotiation_state != NegotiationState::WaitForRegionInfo
{
error!(self.log, "Received RegionInfo out of order!");
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::BadNegotiationOrder,
ClientNegotiationFailed::BadNegotiationOrder,
);
return Ok(false);
}
Expand All @@ -1715,9 +1693,9 @@ impl DownstairsClient {
// collection for each downstairs.
if region_def.get_encrypted() != self.cfg.encrypted() {
error!(self.log, "encryption expectation mismatch!");
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::Incompatible,
ClientNegotiationFailed::Incompatible,
);
return Ok(false);
}
Expand Down Expand Up @@ -1861,9 +1839,9 @@ impl DownstairsClient {
self.log,
"exiting negotiation because we're replacing"
);
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::Replacing,
ClientNegotiationFailed::Replacing,
);
}
bad_state => {
Expand All @@ -1880,9 +1858,9 @@ impl DownstairsClient {
Message::LastFlushAck { last_flush_number } => {
if self.negotiation_state != NegotiationState::GetLastFlush {
error!(self.log, "Received LastFlushAck out of order!");
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::BadNegotiationOrder,
ClientNegotiationFailed::BadNegotiationOrder,
);
return Ok(false); // TODO should we trigger set_inactive?
}
Expand All @@ -1893,9 +1871,9 @@ impl DownstairsClient {
"exiting negotiation due to LastFlushAck \
while replacing"
);
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::Replacing,
ClientNegotiationFailed::Replacing,
);
return Ok(false); // TODO should we trigger set_inactive?
}
Expand All @@ -1922,9 +1900,9 @@ impl DownstairsClient {
if self.negotiation_state != NegotiationState::GetExtentVersions
{
error!(self.log, "Received ExtentVersions out of order!");
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::BadNegotiationOrder,
ClientNegotiationFailed::BadNegotiationOrder,
);
return Ok(false); // TODO should we trigger set_inactive?
}
Expand All @@ -1941,9 +1919,9 @@ impl DownstairsClient {
"exiting negotiation due to ExtentVersions while \
replacing"
);
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::Replacing,
ClientNegotiationFailed::Replacing,
);
return Ok(false); // TODO should we trigger set_inactive?
}
Expand Down Expand Up @@ -2209,18 +2187,46 @@ pub(crate) enum ClientStopReason {
/// (for example, we have received `Message::YouAreNoLongerActive`)
Disabled,

/// The upstairs has requested that we deactivate
Deactivated,

/// Something went wrong during negotiation
#[allow(unused)] // logged in debug messages
NegotiationFailed(ClientNegotiationFailed),

/// We have explicitly faulted the client
#[allow(unused)] // logged in debug messages
Fault(ClientFaultReason),
}

/// Subset of [`ClientStopReason`] for faulting a client
#[derive(Debug)]
pub(crate) enum ClientNegotiationFailed {
/// Reconcile failed and we're restarting
FailedReconcile,

/// Received an error from some IO
IOError,

/// Negotiation message received out of order
BadNegotiationOrder,

/// Negotiation says that we are incompatible
Incompatible,

/// We are trying to replace the client task
Replacing,
}

impl From<ClientNegotiationFailed> for ClientStopReason {
fn from(f: ClientNegotiationFailed) -> ClientStopReason {
ClientStopReason::NegotiationFailed(f)
}
}

/// Subset of [`ClientStopReason`] for faulting a client
#[derive(Debug)]
pub(crate) enum ClientFaultReason {
/// Received an error from some non-recoverable IO (write or flush)
IOError,

/// Live-repair failed
FailedLiveRepair,

Expand All @@ -2230,18 +2236,20 @@ pub(crate) enum ClientStopReason {
/// Too many bytes in the queue
TooManyOutstandingBytes,

/// The upstairs has requested that we deactivate
Deactivated,

/// The test suite has requested a fault
#[cfg(test)]
RequestedFault,

/// The upstairs has requested that we deactivate when we were offline
OfflineDeactivated,

/// The Upstairs has dropped jobs that would be needed for replay
IneligibleForReplay,

#[cfg(test)]
RequestedFault,
}

impl From<ClientFaultReason> for ClientStopReason {
fn from(f: ClientFaultReason) -> ClientStopReason {
ClientStopReason::Fault(f)
}
}

/// Response received from the I/O task
Expand Down
Loading
Loading