diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index e09bdda42..c3be6772d 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -334,7 +334,7 @@ where C: RaftTypeConfig tracing::debug!(rpc = display(rpc.summary()), "Raft::append_entries"); let (tx, rx) = oneshot::channel(); - self.call_core(RaftMsg::AppendEntries { rpc, tx }, rx).await + self.inner.call_core(RaftMsg::AppendEntries { rpc, tx }, rx).await } /// Submit a VoteRequest (RequestVote in the spec) RPC to this Raft node. @@ -346,7 +346,7 @@ where C: RaftTypeConfig tracing::info!(rpc = display(rpc.summary()), "Raft::vote()"); let (tx, rx) = oneshot::channel(); - self.call_core(RaftMsg::RequestVote { rpc, tx }, rx).await + self.inner.call_core(RaftMsg::RequestVote { rpc, tx }, rx).await } /// Get the latest snapshot from the state machine. @@ -359,7 +359,7 @@ where C: RaftTypeConfig let (tx, rx) = oneshot::channel(); let cmd = ExternalCommand::GetSnapshot { tx }; - self.call_core(RaftMsg::ExternalCommand { cmd }, rx).await + self.inner.call_core(RaftMsg::ExternalCommand { cmd }, rx).await } /// Install a completely received snapshot to the state machine. @@ -376,7 +376,7 @@ where C: RaftTypeConfig tracing::debug!("Raft::install_complete_snapshot()"); let (tx, rx) = oneshot::channel(); - self.call_core(RaftMsg::InstallCompleteSnapshot { vote, snapshot, tx }, rx).await + self.inner.call_core(RaftMsg::InstallCompleteSnapshot { vote, snapshot, tx }, rx).await } /// Submit an InstallSnapshot RPC to this Raft node. @@ -391,7 +391,7 @@ where C: RaftTypeConfig tracing::debug!(rpc = display(rpc.summary()), "Raft::install_snapshot()"); let (tx, rx) = oneshot::channel(); - self.call_core(RaftMsg::InstallSnapshot { rpc, tx }, rx).await + self.inner.call_core(RaftMsg::InstallSnapshot { rpc, tx }, rx).await } /// Get the ID of the current leader from this Raft node. @@ -413,7 +413,7 @@ where C: RaftTypeConfig #[tracing::instrument(level = "debug", skip(self))] pub async fn is_leader(&self) -> Result<(), RaftError>> { let (tx, rx) = oneshot::channel(); - let _ = self.call_core(RaftMsg::CheckIsLeaderRequest { tx }, rx).await?; + let _ = self.inner.call_core(RaftMsg::CheckIsLeaderRequest { tx }, rx).await?; Ok(()) } @@ -497,7 +497,7 @@ where C: RaftTypeConfig RaftError>, > { let (tx, rx) = oneshot::channel(); - let (read_log_id, applied) = self.call_core(RaftMsg::CheckIsLeaderRequest { tx }, rx).await?; + let (read_log_id, applied) = self.inner.call_core(RaftMsg::CheckIsLeaderRequest { tx }, rx).await?; Ok((read_log_id, applied)) } @@ -525,7 +525,7 @@ where C: RaftTypeConfig app_data: C::D, ) -> Result, RaftError>> { let (tx, rx) = oneshot::channel(); - self.call_core(RaftMsg::ClientWriteRequest { app_data, tx }, rx).await + self.inner.call_core(RaftMsg::ClientWriteRequest { app_data, tx }, rx).await } /// Initialize a pristine Raft node with the given config. @@ -558,14 +558,15 @@ where C: RaftTypeConfig T: IntoNodes + Debug, { let (tx, rx) = oneshot::channel(); - self.call_core( - RaftMsg::Initialize { - members: members.into_nodes(), - tx, - }, - rx, - ) - .await + self.inner + .call_core( + RaftMsg::Initialize { + members: members.into_nodes(), + tx, + }, + rx, + ) + .await } /// Add a new learner raft node, optionally, blocking until up-to-speed. @@ -593,6 +594,7 @@ where C: RaftTypeConfig ) -> Result, RaftError>> { let (tx, rx) = oneshot::channel(); let resp = self + .inner .call_core( RaftMsg::ChangeMembership { changes: ChangeMembers::AddNodes(btreemap! {id=>node}), @@ -724,6 +726,7 @@ where C: RaftTypeConfig // res is error if membership can not be changed. // If no error, it will enter a joint state let res = self + .inner .call_core( RaftMsg::ChangeMembership { changes: changes.clone(), @@ -751,7 +754,7 @@ where C: RaftTypeConfig tracing::debug!("the second step is to change to uniform config: {:?}", changes); let (tx, rx) = oneshot::channel(); - let res = self.call_core(RaftMsg::ChangeMembership { changes, retain, tx }, rx).await; + let res = self.inner.call_core(RaftMsg::ChangeMembership { changes, retain, tx }, rx).await; if let Err(e) = &res { tracing::error!("the second step error: {}", e); @@ -763,42 +766,6 @@ where C: RaftTypeConfig Ok(res) } - /// Invoke RaftCore by sending a RaftMsg and blocks waiting for response. - #[tracing::instrument(level = "debug", skip(self, mes, rx))] - pub(crate) async fn call_core( - &self, - mes: RaftMsg, - rx: oneshot::Receiver>, - ) -> Result> - where - E: Debug, - { - let sum = if tracing::enabled!(Level::DEBUG) { - Some(mes.summary()) - } else { - None - }; - - let send_res = self.inner.tx_api.send(mes); - - if send_res.is_err() { - let fatal = self.inner.get_core_stopped_error("sending tx to RaftCore", sum).await; - return Err(RaftError::Fatal(fatal)); - } - - let recv_res = rx.await; - tracing::debug!("call_core receives result is error: {:?}", recv_res.is_err()); - - match recv_res { - Ok(x) => x.map_err(|e| RaftError::APIError(e)), - Err(_) => { - let fatal = self.inner.get_core_stopped_error("receiving rx from RaftCore", sum).await; - tracing::error!(error = debug(&fatal), "core_call fatal error"); - Err(RaftError::Fatal(fatal)) - } - } - } - /// Provides read-only access to [`RaftState`] through a user-provided function. /// /// The function `func` is applied to the current [`RaftState`]. The result of this function, diff --git a/openraft/src/raft/raft_inner.rs b/openraft/src/raft/raft_inner.rs index 4c958d96a..c5b6e7aba 100644 --- a/openraft/src/raft/raft_inner.rs +++ b/openraft/src/raft/raft_inner.rs @@ -1,21 +1,25 @@ use std::fmt; +use std::fmt::Debug; use std::sync::Arc; use tokio::sync::mpsc; use tokio::sync::oneshot; use tokio::sync::watch; use tokio::sync::Mutex; +use tracing::Level; use crate::config::RuntimeConfig; use crate::core::raft_msg::external_command::ExternalCommand; use crate::core::raft_msg::RaftMsg; use crate::core::TickHandle; use crate::error::Fatal; +use crate::error::RaftError; use crate::metrics::RaftDataMetrics; use crate::metrics::RaftServerMetrics; use crate::raft::core_state::CoreState; use crate::AsyncRuntime; use crate::Config; +use crate::MessageSummary; use crate::RaftMetrics; use crate::RaftTypeConfig; @@ -42,6 +46,42 @@ where C: RaftTypeConfig impl RaftInner where C: RaftTypeConfig { + /// Invoke RaftCore by sending a RaftMsg and blocks waiting for response. + #[tracing::instrument(level = "debug", skip_all)] + pub(crate) async fn call_core( + &self, + mes: RaftMsg, + rx: oneshot::Receiver>, + ) -> Result> + where + E: Debug, + { + let sum = if tracing::enabled!(Level::DEBUG) { + Some(mes.summary()) + } else { + None + }; + + let send_res = self.tx_api.send(mes); + + if send_res.is_err() { + let fatal = self.get_core_stopped_error("sending tx to RaftCore", sum).await; + return Err(RaftError::Fatal(fatal)); + } + + let recv_res = rx.await; + tracing::debug!("call_core receives result is error: {:?}", recv_res.is_err()); + + match recv_res { + Ok(x) => x.map_err(|e| RaftError::APIError(e)), + Err(_) => { + let fatal = self.get_core_stopped_error("receiving rx from RaftCore", sum).await; + tracing::error!(error = debug(&fatal), "core_call fatal error"); + Err(RaftError::Fatal(fatal)) + } + } + } + /// Send an [`ExternalCommand`] to RaftCore to execute in the `RaftCore` thread. /// /// It returns at once.