Skip to content

Commit

Permalink
Change: RaftNetwork return RPCError instead of anyhow::Error
Browse files Browse the repository at this point in the history
- When a remote error encountered when replication, the replication will
  be stopped at once.

- Fix: #140
  • Loading branch information
drmingdrmer committed Jan 23, 2022
1 parent 01867e8 commit f08a3e6
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 39 deletions.
37 changes: 27 additions & 10 deletions openraft/src/core/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::sync::Arc;

use anyhow::anyhow;
use futures::future::TryFutureExt;
use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
Expand All @@ -15,6 +14,8 @@ use crate::core::State;
use crate::error::ClientReadError;
use crate::error::ClientWriteError;
use crate::error::QuorumNotEnough;
use crate::error::RPCError;
use crate::error::Timeout;
use crate::raft::AppendEntriesRequest;
use crate::raft::ClientWriteRequest;
use crate::raft::ClientWriteResponse;
Expand All @@ -25,6 +26,7 @@ use crate::replication::RaftEvent;
use crate::AppData;
use crate::AppDataResponse;
use crate::MessageSummary;
use crate::RPCTypes;
use crate::RaftNetwork;
use crate::RaftStorage;
use crate::StorageError;
Expand Down Expand Up @@ -93,8 +95,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
let mut pending = FuturesUnordered::new();
let membership = &self.core.effective_membership.membership;

for (id, node) in self.nodes.iter() {
if !membership.is_member(id) {
for (target, node) in self.nodes.iter() {
if !membership.is_member(target) {
continue;
}

Expand All @@ -106,22 +108,37 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
leader_commit: self.core.committed,
};

let target = *id;
let my_id = self.core.id;
let target = *target;
let network = self.core.network.clone();

let ttl = Duration::from_millis(self.core.config.heartbeat_interval);

let task = tokio::spawn(
async move {
match timeout(ttl, network.send_append_entries(target, rpc)).await {
Ok(Ok(data)) => Ok((target, data)),
Ok(Err(err)) => Err((target, err)),
Err(_timeout) => Err((target, anyhow!("timeout waiting for leadership confirmation"))),
let outer_res = timeout(ttl, network.send_append_entries(target, rpc)).await;
match outer_res {
Ok(append_res) => match append_res {
Ok(x) => Ok((target, x)),
Err(err) => Err((target, err)),
},
Err(_timeout) => {
let timeout_err = Timeout {
action: RPCTypes::AppendEntries,
id: my_id,
target,
timeout: ttl,
};

Err((target, RPCError::Timeout(timeout_err)))
}
}
}
.instrument(tracing::debug_span!("spawn")),
// TODO(xp): add target to span
.instrument(tracing::debug_span!("SPAWN_append_entries")),
)
.map_err(move |err| (*id, err));
.map_err(move |err| (target, err));

pending.push(task);
}

Expand Down
34 changes: 30 additions & 4 deletions openraft/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use serde::Serialize;
use crate::raft_types::SnapshotSegmentId;
use crate::LogId;
use crate::NodeId;
use crate::RPCTypes;
use crate::StorageError;

/// Fatal is unrecoverable and shuts down raft at once.
Expand Down Expand Up @@ -85,8 +86,6 @@ pub enum ClientReadError {
/// An error related to a client write request.
#[derive(Debug, Clone, thiserror::Error, derive_more::TryInto)]
pub enum ClientWriteError {
// #[error("{0}")]
// RaftError(#[from] RaftError),
#[error(transparent)]
ForwardToLeader(#[from] ForwardToLeader),

Expand Down Expand Up @@ -177,7 +176,6 @@ impl From<StorageError> for AddLearnerError {

/// Error variants related to the Replication.
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
#[allow(clippy::large_enum_variant)]
pub enum ReplicationError {
#[error(transparent)]
Expand All @@ -202,6 +200,34 @@ pub enum ReplicationError {

#[error(transparent)]
Network(#[from] NetworkError),

#[error(transparent)]
RemoteError(#[from] RemoteError<AppendEntriesError>),
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
pub enum RPCError<T: Error> {
#[error(transparent)]
Timeout(#[from] Timeout),

#[error(transparent)]
Network(#[from] NetworkError),

#[error(transparent)]
RemoteError(#[from] RemoteError<T>),
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
#[error("error occur on remote peer {target}: {source}")]
pub struct RemoteError<T: std::error::Error> {
pub target: NodeId,
pub source: T,
}

impl<T: std::error::Error> RemoteError<T> {
pub fn new(target: NodeId, e: T) -> Self {
Self { target, source: e }
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
Expand Down Expand Up @@ -244,7 +270,7 @@ impl From<anyhow::Error> for NetworkError {
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
#[error("timeout after {timeout:?} when {action} {id}->{target}")]
pub struct Timeout {
pub action: String,
pub action: RPCTypes,
pub id: NodeId,
pub target: NodeId,
pub timeout: Duration,
Expand Down
1 change: 1 addition & 0 deletions openraft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub use crate::core::State;
pub use crate::defensive::DefensiveCheck;
pub use crate::membership::Membership;
pub use crate::metrics::RaftMetrics;
pub use crate::network::RPCTypes;
pub use crate::network::RaftNetwork;
pub use crate::raft::Raft;
pub use crate::raft_types::LogId;
Expand Down
32 changes: 28 additions & 4 deletions openraft/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
//! The Raft network interface.

use anyhow::Result;
use std::fmt::Formatter;

use async_trait::async_trait;
use serde::Deserialize;
use serde::Serialize;

use crate::error::AppendEntriesError;
use crate::error::InstallSnapshotError;
use crate::error::RPCError;
use crate::error::VoteError;
use crate::raft::AppendEntriesRequest;
use crate::raft::AppendEntriesResponse;
use crate::raft::InstallSnapshotRequest;
Expand All @@ -12,6 +19,19 @@ use crate::raft::VoteResponse;
use crate::AppData;
use crate::NodeId;

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum RPCTypes {
Vote,
AppendEntries,
InstallSnapshot,
}

impl std::fmt::Display for RPCTypes {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}

/// A trait defining the interface for a Raft network between cluster members.
///
/// See the [network chapter of the guide](https://datafuselabs.github.io/openraft/getting-started.html#3-impl-raftnetwork)
Expand All @@ -21,15 +41,19 @@ pub trait RaftNetwork<D>: Send + Sync + 'static
where D: AppData
{
/// Send an AppendEntries RPC to the target Raft node (§5).
async fn send_append_entries(&self, target: NodeId, rpc: AppendEntriesRequest<D>) -> Result<AppendEntriesResponse>;
async fn send_append_entries(
&self,
target: NodeId,
rpc: AppendEntriesRequest<D>,
) -> Result<AppendEntriesResponse, RPCError<AppendEntriesError>>;

/// Send an InstallSnapshot RPC to the target Raft node (§7).
async fn send_install_snapshot(
&self,
target: NodeId,
rpc: InstallSnapshotRequest,
) -> Result<InstallSnapshotResponse>;
) -> Result<InstallSnapshotResponse, RPCError<InstallSnapshotError>>;

/// Send a RequestVote RPC to the target Raft node (§5).
async fn send_vote(&self, target: NodeId, rpc: VoteRequest) -> Result<VoteResponse>;
async fn send_vote(&self, target: NodeId, rpc: VoteRequest) -> Result<VoteResponse, RPCError<VoteError>>;
}
22 changes: 19 additions & 3 deletions openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ use tracing::Span;

use crate::config::Config;
use crate::config::SnapshotPolicy;
use crate::error::AppendEntriesError;
use crate::error::CommittedAdvanceTooMany;
use crate::error::HigherTerm;
use crate::error::LackEntry;
use crate::error::NetworkError;
use crate::error::RPCError;
use crate::error::ReplicationError;
use crate::error::Timeout;
use crate::raft::AppendEntriesRequest;
Expand All @@ -39,6 +40,7 @@ use crate::ErrorVerb;
use crate::LogId;
use crate::MessageSummary;
use crate::NodeId;
use crate::RPCTypes;
use crate::RaftNetwork;
use crate::RaftStorage;
use crate::ToStorageResult;
Expand Down Expand Up @@ -253,6 +255,15 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
ReplicationError::Network { .. } => {
// nothing to do
}
ReplicationError::RemoteError(remote_err) => {
tracing::error!(%remote_err, "remote peer error");
match remote_err.source {
AppendEntriesError::Fatal(fatal) => {
tracing::error!(%fatal, target=%remote_err.target, "remote fatal error, close replication");
return;
}
}
}
};
}
}
Expand Down Expand Up @@ -359,13 +370,18 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
Ok(res) => res,
Err(err) => {
tracing::warn!(error=%err, "error sending AppendEntries RPC to target");
return Err(ReplicationError::Network(NetworkError::from(err)));
let repl_err = match err {
RPCError::Timeout(e) => ReplicationError::Timeout(e),
RPCError::Network(e) => ReplicationError::Network(e),
RPCError::RemoteError(e) => ReplicationError::RemoteError(e),
};
return Err(repl_err);
}
},
Err(timeout_err) => {
tracing::warn!(error=%timeout_err, "timeout while sending AppendEntries RPC to target");
return Err(ReplicationError::Timeout(Timeout {
action: "send_append_entries".to_string(),
action: RPCTypes::AppendEntries,
id: self.id,
target: self.target,
timeout: the_timeout,
Expand Down
60 changes: 42 additions & 18 deletions openraft/tests/fixtures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,14 @@ use memstore::ClientResponse as MemClientResponse;
use memstore::MemStore;
use openraft::async_trait::async_trait;
use openraft::error::AddLearnerError;
use openraft::error::AppendEntriesError;
use openraft::error::ClientReadError;
use openraft::error::ClientWriteError;
use openraft::error::InstallSnapshotError;
use openraft::error::NetworkError;
use openraft::error::RPCError;
use openraft::error::RemoteError;
use openraft::error::VoteError;
use openraft::metrics::Wait;
use openraft::raft::AddLearnerResponse;
use openraft::raft::AppendEntriesRequest;
Expand Down Expand Up @@ -805,6 +811,18 @@ impl RaftRouter {

Ok(())
}

pub async fn check_reachable(&self, id: NodeId, target: NodeId) -> std::result::Result<(), NetworkError> {
let isolated = self.isolated_nodes.read().await;

if isolated.contains(&target) || isolated.contains(&id) {
let err = anyhow!("target node is isolated");
let network_err = NetworkError::from(err);
return Err(network_err);
}

Ok(())
}
}

#[async_trait]
Expand All @@ -814,46 +832,52 @@ impl RaftNetwork<MemClientRequest> for RaftRouter {
&self,
target: u64,
rpc: AppendEntriesRequest<MemClientRequest>,
) -> Result<AppendEntriesResponse> {
) -> std::result::Result<AppendEntriesResponse, RPCError<AppendEntriesError>> {
tracing::debug!("append_entries to id={} {:?}", target, rpc);
self.rand_send_delay().await;

self.check_reachable(rpc.leader_id, target).await?;

let rt = self.routing_table.read().await;
let isolated = self.isolated_nodes.read().await;
let addr = rt.get(&target).expect("target node not found in routing table");
if isolated.contains(&target) || isolated.contains(&rpc.leader_id) {
return Err(anyhow!("target node is isolated"));
}

let resp = addr.0.append_entries(rpc).await;

tracing::debug!("append_entries: recv resp from id={} {:?}", target, resp);
Ok(resp?)
let resp = resp.map_err(|e| RemoteError::new(target, e))?;
Ok(resp)
}

/// Send an InstallSnapshot RPC to the target Raft node (§7).
async fn send_install_snapshot(&self, target: u64, rpc: InstallSnapshotRequest) -> Result<InstallSnapshotResponse> {
async fn send_install_snapshot(
&self,
target: u64,
rpc: InstallSnapshotRequest,
) -> std::result::Result<InstallSnapshotResponse, RPCError<InstallSnapshotError>> {
self.rand_send_delay().await;

self.check_reachable(rpc.leader_id, target).await?;

let rt = self.routing_table.read().await;
let isolated = self.isolated_nodes.read().await;
let addr = rt.get(&target).expect("target node not found in routing table");
if isolated.contains(&target) || isolated.contains(&rpc.leader_id) {
return Err(anyhow!("target node is isolated"));
}
Ok(addr.0.install_snapshot(rpc).await?)

let resp = addr.0.install_snapshot(rpc).await;
let resp = resp.map_err(|e| RemoteError::new(target, e))?;
Ok(resp)
}

/// Send a RequestVote RPC to the target Raft node (§5).
async fn send_vote(&self, target: u64, rpc: VoteRequest) -> Result<VoteResponse> {
async fn send_vote(&self, target: u64, rpc: VoteRequest) -> std::result::Result<VoteResponse, RPCError<VoteError>> {
self.rand_send_delay().await;

self.check_reachable(rpc.candidate_id, target).await?;

let rt = self.routing_table.read().await;
let isolated = self.isolated_nodes.read().await;
let addr = rt.get(&target).expect("target node not found in routing table");
if isolated.contains(&target) || isolated.contains(&rpc.candidate_id) {
return Err(anyhow!("target node is isolated"));
}
Ok(addr.0.vote(rpc).await?)

let resp = addr.0.vote(rpc).await;
let resp = resp.map_err(|e| RemoteError::new(target, e))?;
Ok(resp)
}
}

Expand Down

0 comments on commit f08a3e6

Please sign in to comment.