Skip to content

Commit

Permalink
Change: after shutdown(), it should return an error when accessing Ra…
Browse files Browse the repository at this point in the history
…ft, instead of panicking.

- Fix: #373
  • Loading branch information
devillove084 authored and drmingdrmer committed Jun 21, 2022
1 parent d69a20b commit d81c727
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 53 deletions.
1 change: 1 addition & 0 deletions openraft/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ pub enum CheckIsLeaderError<NID: NodeId> {

/// An error related to a client write request.
#[derive(Debug, Clone, thiserror::Error, derive_more::TryInto)]
#[derive(PartialEq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub enum ClientWriteError<NID: NodeId> {
#[error(transparent)]
Expand Down
131 changes: 78 additions & 53 deletions openraft/src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::fmt::Debug;
use std::fmt::Display;
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -106,17 +107,24 @@ macro_rules! declare_raft_types {
};
}

/// The running state of RaftCore
enum CoreState<NID: NodeId> {
/// The RaftCore task is still running.
Running(JoinHandle<Result<(), Fatal<NID>>>),

/// The RaftCore task has finished. The return value of the task is stored.
Done(Result<(), Fatal<NID>>),
}

struct RaftInner<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> {
tx_api: mpsc::UnboundedSender<(RaftMsg<C, N, S>, Span)>,
rx_metrics: watch::Receiver<RaftMetrics<C>>,
// TODO(xp): it does not need to be a async mutex.
#[allow(clippy::type_complexity)]
raft_handle: Mutex<Option<JoinHandle<Result<(), Fatal<C::NodeId>>>>>,
tx_shutdown: Mutex<Option<oneshot::Sender<()>>>,
marker_n: std::marker::PhantomData<N>,
marker_s: std::marker::PhantomData<S>,

/// The error that cause RaftCore to quit.
core_error: std::sync::Mutex<Option<Fatal<C::NodeId>>>,
core_state: Mutex<CoreState<C::NodeId>>,
}

/// The Raft API.
Expand Down Expand Up @@ -183,12 +191,10 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Raft<C, N,
let inner = RaftInner {
tx_api,
rx_metrics,
raft_handle: Mutex::new(Some(raft_handle)),
tx_shutdown: Mutex::new(Some(tx_shutdown)),
marker_n: std::marker::PhantomData,
marker_s: std::marker::PhantomData,

core_error: std::sync::Mutex::new(None),
core_state: Mutex::new(CoreState::Running(raft_handle)),
};
Self { inner: Arc::new(inner) }
}
Expand Down Expand Up @@ -455,61 +461,76 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Raft<C, N,
let sum = if span.is_disabled() { None } else { Some(mes.summary()) };

let send_res = self.inner.tx_api.send((mes, span));
if let Err(send_err) = send_res {
let last_err = self.get_core_error().await;
tracing::error!(%send_err, mes=%sum.unwrap_or_default(), last_error=?last_err, "error send tx to RaftCore");
return Err(last_err.into());

if send_res.is_err() {
let fatal = self.get_core_stopped_error("sending tx to RaftCore", sum).await;
return Err(fatal.into());
}

let recv_res = rx.await;
let res = match recv_res {
Ok(x) => x,
Err(e) => {
let last_err = self.get_core_error().await;
tracing::error!(%e, mes=%sum.unwrap_or_default(), last_error=?last_err, "error recv rx from RaftCore");
Err(last_err.into())
}
};

res
}

async fn get_core_error(&self) -> Fatal<C::NodeId> {
// If there is an error recorded, return it.
{
let guard = self.inner.core_error.lock().unwrap();
if let Some(x) = &*guard {
return x.clone();
match recv_res {
Ok(x) => x,
Err(_) => {
let fatal = self.get_core_stopped_error("receiving rx from RaftCore", sum).await;
Err(fatal.into())
}
}
}

if let Some(h) = self.inner.raft_handle.lock().await.take() {
let res = h.await;
tracing::error!(res=?res, "RaftCore exited");

if let Err(err) = res {
let mut guard = self.inner.core_error.lock().unwrap();
async fn get_core_stopped_error(&self, when: impl Display, message_summary: Option<String>) -> Fatal<C::NodeId> {
// Wait for the core task to finish.
self.join_core_task().await;

if err.is_panic() {
*guard = Some(Fatal::Panicked);
return Fatal::Panicked;
} else if err.is_cancelled() {
*guard = Some(Fatal::Stopped);
return Fatal::Stopped;
}
// Retrieve the result.
let core_res = {
let state = self.inner.core_state.lock().await;
if let CoreState::Done(core_task_res) = &*state {
core_task_res.clone()
} else {
unreachable!("RaftCore should have already quit")
}
}
};

// RaftCore encountered an un-handleable error
let last_err = self.inner.rx_metrics.borrow().running_state.clone();
if let Err(err) = last_err {
let mut guard = self.inner.core_error.lock().unwrap();
*guard = Some(err.clone());
tracing::error!(
core_result = debug(&core_res),
"failure {}; message: {:?}",
when,
message_summary
);

return err;
match core_res {
// A normal quit is still an unexpected "stop" to the caller.
Ok(_) => Fatal::Stopped,
Err(e) => e,
}
}

unreachable!("no RaftCore error found")
/// Wait for RaftCore task to finish and record the returned value from the task.
async fn join_core_task(&self) {
let mut state = self.inner.core_state.lock().await;
match &mut *state {
CoreState::Running(handle) => {
let res = handle.await;
tracing::info!(res = debug(&res), "RaftCore exited");

let core_task_res = match res {
Err(err) => {
if err.is_panic() {
Err(Fatal::Panicked)
} else {
Err(Fatal::Stopped)
}
}
Ok(returned_res) => returned_res,
};

*state = CoreState::Done(core_task_res);
}
CoreState::Done(_) => {
// RaftCore has already quit, nothing to do
}
}
}

/// Send a request to the Raft core loop in a fire-and-forget manner.
Expand Down Expand Up @@ -567,11 +588,15 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Raft<C, N,
/// Shutdown this Raft node.
pub async fn shutdown(&self) -> Result<(), JoinError> {
if let Some(tx) = self.inner.tx_shutdown.lock().await.take() {
let _ = tx.send(());
}
if let Some(handle) = self.inner.raft_handle.lock().await.take() {
let _ = handle.await?;
// A failure to send means the RaftCore is already shutdown. Continue to check the task return value.
let send_res = tx.send(());
tracing::info!("sending shutdown signal to RaftCore, sending res: {:?}", send_res);
}

self.join_core_task().await;

// TODO(xp): API change: replace `JoinError` with `Fatal`,
// to let the caller know the return value of RaftCore task.
Ok(())
}
}
Expand Down
55 changes: 55 additions & 0 deletions openraft/tests/shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use anyhow::anyhow;
use anyhow::Result;
use fixtures::RaftRouter;
use maplit::btreeset;
use openraft::error::ClientWriteError;
use openraft::error::Fatal;
use openraft::Config;
use openraft::ServerState;

Expand Down Expand Up @@ -62,3 +64,56 @@ async fn initialization() -> Result<()> {
fn timeout() -> Option<Duration> {
Some(Duration::from_millis(1000))
}

/// A panicked RaftCore should also return a proper error the next time accessing the `Raft`.
#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn return_error_after_panic() -> Result<()> {
let config = Arc::new(Config::default().validate()?);
let mut router = RaftRouter::new(config.clone());

tracing::info!("--- initializing cluster");
let log_index = router.new_nodes_from_single(btreeset! {0}, btreeset! {}).await?;
let _ = log_index; // unused;

tracing::info!("--- panic the RaftCore");
{
router.external_request(0, |_s, _sto, _net| {
panic!("foo");
});
}

tracing::info!("--- calls the panicked raft should get a Fatal::Panicked error");
{
let res = router.client_request(0, "foo", 2).await;
let err = res.unwrap_err();
assert_eq!(ClientWriteError::<u64>::Fatal(Fatal::Panicked), err);
}

Ok(())
}

/// After shutdown(), access to Raft should return a Fatal::Stopped error.
#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn return_error_after_shutdown() -> Result<()> {
let config = Arc::new(Config::default().validate()?);
let mut router = RaftRouter::new(config.clone());

tracing::info!("--- initializing cluster");
let log_index = router.new_nodes_from_single(btreeset! {0}, btreeset! {}).await?;
let _ = log_index; // unused;

tracing::info!("--- shutdown the raft");
{
let n = router.get_raft_handle(&0)?;
n.shutdown().await?;
}

tracing::info!("--- calls the panicked raft should get a Fatal::Panicked error");
{
let res = router.client_request(0, "foo", 2).await;
let err = res.unwrap_err();
assert_eq!(ClientWriteError::<u64>::Fatal(Fatal::Stopped), err);
}

Ok(())
}

0 comments on commit d81c727

Please sign in to comment.