Skip to content

Commit

Permalink
Separate ClientNegotiationFailure
Browse files Browse the repository at this point in the history
  • Loading branch information
mkeeter committed Dec 2, 2024
1 parent 2fa8e3a commit de39d6e
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 53 deletions.
118 changes: 69 additions & 49 deletions upstairs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ impl DownstairsClient {
}

fn halt_io_task(&mut self, r: ClientStopReason) {
info!(self.log, "halting IO task due to {r:?}");
if let Some(t) = self.client_task.client_stop_tx.take() {
if let Err(_e) = t.send(r) {
warn!(self.log, "failed to send stop request")
Expand Down Expand Up @@ -847,22 +848,12 @@ impl DownstairsClient {
self.state
}

pub(crate) fn restart_connection(
pub(crate) fn abort_negotiation(
&mut self,
up_state: &UpstairsState,
reason: ClientStopReason,
reason: ClientNegotiationFailed,
) {
let new_state = match self.state {
DsState::Active | DsState::Offline
if matches!(
reason,
ClientStopReason::Fault(
ClientFaultReason::IneligibleForReplay
)
) =>
{
DsState::Faulted
}
DsState::Active | DsState::Offline => DsState::Offline,
DsState::Faulted => DsState::Faulted,
DsState::Deactivated => DsState::New,
Expand All @@ -888,7 +879,7 @@ impl DownstairsClient {
);

self.checked_state_transition(up_state, new_state);
self.halt_io_task(reason);
self.halt_io_task(reason.into());
}

/// Sets the current state to `DsState::Active`
Expand Down Expand Up @@ -998,7 +989,7 @@ impl DownstairsClient {
self.checked_state_transition(up_state, DsState::Replacing);
self.stats.replaced += 1;

self.halt_io_task(ClientStopReason::Replacing);
self.halt_io_task(ClientStopReason::Replaced);
}

/// Sets `self.state` to `new_state`, with logging and validity checking
Expand Down Expand Up @@ -1509,9 +1500,9 @@ impl DownstairsClient {
} => {
if self.negotiation_state != NegotiationState::Start {
error!(self.log, "got version already");
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::BadNegotiationOrder,
ClientNegotiationFailed::BadNegotiationOrder,
);
return Ok(false);
}
Expand All @@ -1522,9 +1513,9 @@ impl DownstairsClient {
CRUCIBLE_MESSAGE_VERSION,
version
);
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::Incompatible,
ClientNegotiationFailed::Incompatible,
);
return Ok(false);
}
Expand Down Expand Up @@ -1580,9 +1571,9 @@ impl DownstairsClient {
"downstairs version is {version}, \
ours is {CRUCIBLE_MESSAGE_VERSION}"
);
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::Incompatible,
ClientNegotiationFailed::Incompatible,
);
}
Message::EncryptedMismatch { expected } => {
Expand All @@ -1591,9 +1582,9 @@ impl DownstairsClient {
"downstairs encrypted is {expected}, ours is {}",
self.cfg.encrypted()
);
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::Incompatible,
ClientNegotiationFailed::Incompatible,
);
}
Message::ReadOnlyMismatch { expected } => {
Expand All @@ -1602,9 +1593,9 @@ impl DownstairsClient {
"downstairs read_only is {expected}, ours is {}",
self.cfg.read_only,
);
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::Incompatible,
ClientNegotiationFailed::Incompatible,
);
}
Message::YouAreNowActive {
Expand All @@ -1618,9 +1609,9 @@ impl DownstairsClient {
"Received YouAreNowActive out of order! {:?}",
self.negotiation_state
);
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::BadNegotiationOrder,
ClientNegotiationFailed::BadNegotiationOrder,
);
return Ok(false);
}
Expand Down Expand Up @@ -1662,17 +1653,17 @@ impl DownstairsClient {
"Generation requested:{} found:{}",
gen, upstairs_gen,
);
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::Incompatible,
ClientNegotiationFailed::Incompatible,
);
return Err(CrucibleError::GenerationNumberTooLow(
gen_error,
));
} else {
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::Incompatible,
ClientNegotiationFailed::Incompatible,
);
return Err(CrucibleError::UuidMismatch);
}
Expand All @@ -1685,9 +1676,9 @@ impl DownstairsClient {
if self.negotiation_state != NegotiationState::WaitForRegionInfo
{
error!(self.log, "Received RegionInfo out of order!");
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::BadNegotiationOrder,
ClientNegotiationFailed::BadNegotiationOrder,
);
return Ok(false);
}
Expand All @@ -1702,9 +1693,9 @@ impl DownstairsClient {
// collection for each downstairs.
if region_def.get_encrypted() != self.cfg.encrypted() {
error!(self.log, "encryption expectation mismatch!");
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::Incompatible,
ClientNegotiationFailed::Incompatible,
);
return Ok(false);
}
Expand Down Expand Up @@ -1848,9 +1839,9 @@ impl DownstairsClient {
self.log,
"exiting negotiation because we're replacing"
);
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::Replacing,
ClientNegotiationFailed::Replacing,
);
}
bad_state => {
Expand All @@ -1867,9 +1858,9 @@ impl DownstairsClient {
Message::LastFlushAck { last_flush_number } => {
if self.negotiation_state != NegotiationState::GetLastFlush {
error!(self.log, "Received LastFlushAck out of order!");
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::BadNegotiationOrder,
ClientNegotiationFailed::BadNegotiationOrder,
);
return Ok(false); // TODO should we trigger set_inactive?
}
Expand All @@ -1880,9 +1871,9 @@ impl DownstairsClient {
"exiting negotiation due to LastFlushAck \
while replacing"
);
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::Replacing,
ClientNegotiationFailed::Replacing,
);
return Ok(false); // TODO should we trigger set_inactive?
}
Expand All @@ -1909,9 +1900,9 @@ impl DownstairsClient {
if self.negotiation_state != NegotiationState::GetExtentVersions
{
error!(self.log, "Received ExtentVersions out of order!");
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::BadNegotiationOrder,
ClientNegotiationFailed::BadNegotiationOrder,
);
return Ok(false); // TODO should we trigger set_inactive?
}
Expand All @@ -1928,9 +1919,9 @@ impl DownstairsClient {
"exiting negotiation due to ExtentVersions while \
replacing"
);
self.restart_connection(
self.abort_negotiation(
up_state,
ClientStopReason::Replacing,
ClientNegotiationFailed::Replacing,
);
return Ok(false); // TODO should we trigger set_inactive?
}
Expand Down Expand Up @@ -2189,13 +2180,28 @@ pub(crate) struct DownstairsStats {
#[derive(Debug)]
pub(crate) enum ClientStopReason {
/// We are about to replace the client task
Replacing,
Replaced,

/// We have disabled the downstairs client for some reason
///
/// (for example, we have received `Message::YouAreNoLongerActive`)
Disabled,

/// The upstairs has requested that we deactivate
Deactivated,

/// Something went wrong during negotiation
#[allow(unused)] // logged in debug messages
NegotiationFailed(ClientNegotiationFailed),

/// We have explicitly faulted the client
#[allow(unused)] // logged in debug messages
Fault(ClientFaultReason),
}

/// Subset of [`ClientStopReason`] for faulting a client
#[derive(Debug)]
pub(crate) enum ClientNegotiationFailed {
/// Reconcile failed and we're restarting
FailedReconcile,

Expand All @@ -2205,21 +2211,35 @@ pub(crate) enum ClientStopReason {
/// Negotiation says that we are incompatible
Incompatible,

/// The upstairs has requested that we deactivate
Deactivated,
/// We are trying to replace the client task
Replacing,
}

/// We have explicitly faulted the client
Fault(ClientFaultReason),
impl From<ClientNegotiationFailed> for ClientStopReason {
fn from(f: ClientNegotiationFailed) -> ClientStopReason {
ClientStopReason::NegotiationFailed(f)
}
}

/// Subset of [`ClientStopReason`] for faulting a client
#[derive(Debug)]
pub(crate) enum ClientFaultReason {
/// Received an error from some non-recoverable IO (write or flush)
IOError,

/// Live-repair failed
FailedLiveRepair,

/// Too many jobs in the queue
TooManyOutstandingJobs,

/// Too many bytes in the queue
TooManyOutstandingBytes,

/// The upstairs has requested that we deactivate when we were offline
OfflineDeactivated,

/// The Upstairs has dropped jobs that would be needed for replay
IneligibleForReplay,

#[cfg(test)]
Expand Down
8 changes: 4 additions & 4 deletions upstairs/src/downstairs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use std::{
use crate::{
cdt,
client::{
ClientAction, ClientFaultReason, ClientStopReason, DownstairsClient,
EnqueueResult,
ClientAction, ClientFaultReason, ClientNegotiationFailed,
DownstairsClient, EnqueueResult,
},
guest::GuestBlockRes,
io_limits::{IOLimitGuard, IOLimits},
Expand Down Expand Up @@ -1909,9 +1909,9 @@ impl Downstairs {
if c.state() == DsState::Reconcile {
// Restart the IO task. This will cause the Upstairs to
// deactivate through a ClientAction::TaskStopped.
c.restart_connection(
c.abort_negotiation(
up_state,
ClientStopReason::FailedReconcile,
ClientNegotiationFailed::FailedReconcile,
);
error!(self.log, "Mark {} as FAILED REPAIR", i);
}
Expand Down

0 comments on commit de39d6e

Please sign in to comment.