Skip to content

Commit

Permalink
Change: add Fatal::Panicked; record RaftCore panic as an error
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed May 9, 2022
1 parent 0867488 commit 8496a48
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 25 deletions.
4 changes: 1 addition & 3 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,7 @@ impl<NID: NodeId> Engine<NID> {
// // --- app API ---
//
// /// Write a new log entry.
// pub(crate) fn write(&mut self) -> Result<Vec<AlgoCmd<NID>>, ForwardToLeader<NID>> {
// todo!()
// }
// pub(crate) fn write(&mut self) -> Result<Vec<AlgoCmd<NID>>, ForwardToLeader<NID>> {}
//
// // --- raft protocol API ---
//
Expand Down
3 changes: 3 additions & 0 deletions openraft/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ pub enum Fatal<NID: NodeId> {
#[error(transparent)]
StorageError(#[from] StorageError<NID>),

#[error("panicked")]
Panicked,

#[error("raft stopped")]
Stopped,
}
Expand Down
69 changes: 47 additions & 22 deletions openraft/src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ struct RaftInner<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>>
tx_shutdown: Mutex<Option<oneshot::Sender<()>>>,
marker_n: std::marker::PhantomData<N>,
marker_s: std::marker::PhantomData<S>,

/// The error that cause RaftCore to quit.
core_error: std::sync::Mutex<Option<Fatal<C::NodeId>>>,
}

/// The Raft API.
Expand Down Expand Up @@ -176,6 +179,8 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Raft<C, N,
tx_shutdown: Mutex::new(Some(tx_shutdown)),
marker_n: std::marker::PhantomData,
marker_s: std::marker::PhantomData,

core_error: std::sync::Mutex::new(None),
};
Self { inner: Arc::new(inner) }
}
Expand Down Expand Up @@ -441,42 +446,62 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Raft<C, N,

let send_res = self.inner.tx_api.send((mes, span));
if let Err(send_err) = send_res {
let last_err = self.inner.rx_metrics.borrow().running_state.clone();
let last_err = self.get_core_error().await;
tracing::error!(%send_err, mes=%sum.unwrap_or_default(), last_error=?last_err, "error send tx to RaftCore");

let err = match last_err {
Ok(_) => {
// normal shutdown, not caused by any error.
Fatal::Stopped
}
Err(e) => e,
};

return Err(err.into());
return Err(last_err.into());
}

let recv_res = rx.await;
let res = match recv_res {
Ok(x) => x,
Err(e) => {
let last_err = self.inner.rx_metrics.borrow().running_state.clone();
let last_err = self.get_core_error().await;
tracing::error!(%e, mes=%sum.unwrap_or_default(), last_error=?last_err, "error recv rx from RaftCore");

let err = match last_err {
Ok(_) => {
// normal shutdown, not caused by any error.
Fatal::Stopped
}
Err(e) => e,
};

Err(err.into())
Err(last_err.into())
}
};

res
}

async fn get_core_error(&self) -> Fatal<C::NodeId> {
// If there is an error recorded, return it.
{
let guard = self.inner.core_error.lock().unwrap();
if let Some(x) = &*guard {
return x.clone();
}
}

if let Some(h) = self.inner.raft_handle.lock().await.take() {
let res = h.await;
tracing::error!(res=?res, "RaftCore exited");

if let Err(err) = res {
let mut guard = self.inner.core_error.lock().unwrap();

if err.is_panic() {
*guard = Some(Fatal::Panicked);
return Fatal::Panicked;
} else if err.is_cancelled() {
*guard = Some(Fatal::Stopped);
return Fatal::Stopped;
}
}
}

// RaftCore encountered an un-handleable error
let last_err = self.inner.rx_metrics.borrow().running_state.clone();
if let Err(err) = last_err {
let mut guard = self.inner.core_error.lock().unwrap();
*guard = Some(err.clone());

return err;
}

unreachable!("no RaftCore error found")
}

/// Send a request to the Raft core loop in a fire-and-forget manner.
///
/// The request functor will be called with a mutable reference to both the state machine
Expand Down

0 comments on commit 8496a48

Please sign in to comment.