Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve: Make serde optional, a dirty work #243

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions example-raft-kv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
name = "example-raft-key-value"
version = "0.1.0"
edition = "2021"
authors = [
"drdr xp <[email protected]>",
authors = [
"drdr xp <[email protected]>",
"Pedro Paulo de Amorim <[email protected]>"
]
categories = ["algorithms", "asynchronous", "data-structures"]
Expand All @@ -23,7 +23,7 @@ actix-web = "4.0.0-rc.2"
async-trait = "0.1.36"
clap = { version = "3.0.13", features = ["derive", "env"] }
env_logger = "0.9.0"
openraft = { version="0.6", path= "../openraft" }
openraft = { version="0.6", path= "../openraft", features = ["serde_impl"] }
reqwest = { version = "0.11.9", features = ["json"] }
serde = { version="1.0.114", features=["derive"] }
serde_json = "1.0.57"
Expand Down
2 changes: 1 addition & 1 deletion memstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ repository = "https://github.com/datafuselabs/openraft"
readme = "README.md"

[dependencies]
openraft = { version="0.6", path= "../openraft" }
openraft = { version="0.6", path= "../openraft", features = ["serde_impl"] }
async-trait = "0.1.36"
serde = { version="1.0.114", features=["derive"] }
serde_json = "1.0.57"
Expand Down
4 changes: 3 additions & 1 deletion openraft/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ derive_more = { version="0.99.9" }
futures = "0.3"
maplit = "1.0.2"
rand = "0.8"
serde = { version="1", features=["derive", "rc"] }
serde = { version="1", features=["derive", "rc"], optional = true }
clap = { version = "3.0.7", features = ["derive", "env"] }
thiserror = "1.0.29"
tokio = { version="1.8", default-features=false, features=["fs", "io-util", "macros", "rt", "rt-multi-thread", "sync", "time"] }
Expand All @@ -45,6 +45,8 @@ tracing-subscriber = { version = "0.3.3", features=["env-filter"] }


[features]
serde_impl = ["serde"]

docinclude = [] # Used only for activating `doc(include="...")` on nightly.

# Enable backtrace when generating an error.
Expand Down
8 changes: 4 additions & 4 deletions openraft/src/config/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
use clap::Parser;
use rand::thread_rng;
use rand::Rng;
use serde::Deserialize;
use serde::Serialize;

use crate::config::error::ConfigError;

Expand All @@ -14,7 +12,8 @@ use crate::config::error::ConfigError;
/// would cause a leader to send an `InstallSnapshot` RPC to a follower based on replication lag.
///
/// Additional policies may become available in the future.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[derive(Clone, Debug, PartialEq)]
#[cfg_attr(feature = "serde_impl", derive(serde::Deserialize, serde::Serialize))]
pub enum SnapshotPolicy {
/// A snapshot will be generated once the log has grown the specified number of logs since
/// the last snapshot.
Expand Down Expand Up @@ -78,7 +77,8 @@ fn parse_snapshot_policy(src: &str) -> Result<SnapshotPolicy, ConfigError> {
/// What does all of this mean? Simply keep your election timeout settings high enough that the
/// performance of your network will not cause election timeouts, but don't keep it so high that
/// a real leader crash would cause prolonged downtime. See the Raft spec §5.6 for more details.
#[derive(Clone, Debug, Serialize, Deserialize, Parser)]
#[derive(Clone, Debug, Parser)]
#[cfg_attr(feature = "serde_impl", derive(serde::Deserialize, serde::Serialize))]
pub struct Config {
/// The application specific name of this Raft cluster
#[clap(long, env = "RAFT_CLUSTER_NAME", default_value = "foo")]
Expand Down
8 changes: 4 additions & 4 deletions openraft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ use futures::future::Abortable;
use maplit::btreeset;
use rand::thread_rng;
use rand::Rng;
use serde::Deserialize;
use serde::Serialize;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
Expand Down Expand Up @@ -74,7 +72,8 @@ use crate::Update;
/// - and the config.
///
/// An active config is just the last seen config in raft spec.
#[derive(Clone, Default, Eq, Serialize, Deserialize)]
#[derive(Clone, Default, Eq)]
#[cfg_attr(feature = "serde_impl", derive(serde::Deserialize, serde::Serialize))]
pub struct EffectiveMembership<C: RaftTypeConfig> {
/// The id of the log that applies this membership config
pub log_id: Option<LogId<C::NodeId>>,
Expand Down Expand Up @@ -738,7 +737,8 @@ pub(self) enum SnapshotUpdate<C: RaftTypeConfig> {
///////////////////////////////////////////////////////////////////////////////////////////////////

/// All possible states of a Raft node.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde_impl", derive(serde::Deserialize, serde::Serialize))]
pub enum State {
/// The node is completely passive; replicating entries, but neither voting nor timing out.
Learner,
Expand Down
98 changes: 61 additions & 37 deletions openraft/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ use std::fmt::Debug;
use std::time::Duration;

use anyerror::AnyError;
use serde::Deserialize;
use serde::Serialize;

use crate::raft_types::SnapshotSegmentId;
use crate::LogId;
Expand All @@ -19,8 +17,9 @@ use crate::StorageError;
use crate::Vote;

/// Fatal is unrecoverable and shuts down raft at once.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
#[serde(bound = "")]
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[cfg_attr(feature = "serde_impl", derive(serde::Deserialize, serde::Serialize))]
#[cfg_attr(feature = "serde_impl", serde(bound = ""))]
pub enum Fatal<NID: NodeId> {
#[error(transparent)]
StorageError(#[from] StorageError<NID>),
Expand Down Expand Up @@ -53,19 +52,22 @@ where E: TryInto<Fatal<NID>> + Clone
}
}

#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error, derive_more::TryInto)]
#[derive(Debug, Clone, thiserror::Error, derive_more::TryInto)]
#[cfg_attr(feature = "serde_impl", derive(serde::Deserialize, serde::Serialize))]
pub enum AppendEntriesError<C: RaftTypeConfig> {
#[error(transparent)]
Fatal(#[from] Fatal<C::NodeId>),
}

#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error, derive_more::TryInto)]
#[derive(Debug, Clone, thiserror::Error, derive_more::TryInto)]
#[cfg_attr(feature = "serde_impl", derive(serde::Deserialize, serde::Serialize))]
pub enum VoteError<C: RaftTypeConfig> {
#[error(transparent)]
Fatal(#[from] Fatal<C::NodeId>),
}

#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error, derive_more::TryInto)]
#[derive(Debug, Clone, thiserror::Error, derive_more::TryInto)]
#[cfg_attr(feature = "serde_impl", derive(serde::Deserialize, serde::Serialize))]
pub enum InstallSnapshotError<C: RaftTypeConfig> {
#[error(transparent)]
SnapshotMismatch(#[from] SnapshotMismatch),
Expand All @@ -75,7 +77,8 @@ pub enum InstallSnapshotError<C: RaftTypeConfig> {
}

/// An error related to a is_leader request.
#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error, derive_more::TryInto)]
#[derive(Debug, Clone, thiserror::Error, derive_more::TryInto)]
#[cfg_attr(feature = "serde_impl", derive(serde::Deserialize, serde::Serialize))]
pub enum CheckIsLeaderError<C: RaftTypeConfig> {
#[error(transparent)]
ForwardToLeader(#[from] ForwardToLeader<C::NodeId>),
Expand All @@ -88,7 +91,8 @@ pub enum CheckIsLeaderError<C: RaftTypeConfig> {
}

/// An error related to a client write request.
#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error, derive_more::TryInto)]
#[derive(Debug, Clone, thiserror::Error, derive_more::TryInto)]
#[cfg_attr(feature = "serde_impl", derive(serde::Deserialize, serde::Serialize))]
pub enum ClientWriteError<C: RaftTypeConfig> {
#[error(transparent)]
ForwardToLeader(#[from] ForwardToLeader<C::NodeId>),
Expand All @@ -102,8 +106,9 @@ pub enum ClientWriteError<C: RaftTypeConfig> {
}

/// The set of errors which may take place when requesting to propose a config change.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
#[serde(bound = "")]
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[cfg_attr(feature = "serde_impl", derive(serde::Deserialize, serde::Serialize))]
#[cfg_attr(feature = "serde_impl", serde(bound = ""))]
pub enum ChangeMembershipError<NID: NodeId> {
#[error(transparent)]
InProgress(#[from] InProgress<NID>),
Expand All @@ -122,8 +127,9 @@ pub enum ChangeMembershipError<NID: NodeId> {
MissingNodeInfo(#[from] MissingNodeInfo<NID>),
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
#[serde(bound = "")]
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[cfg_attr(feature = "serde_impl", derive(serde::Deserialize, serde::Serialize))]
#[cfg_attr(feature = "serde_impl", serde(bound = ""))]
pub enum AddLearnerError<NID: NodeId> {
#[error(transparent)]
ForwardToLeader(#[from] ForwardToLeader<NID>),
Expand All @@ -150,7 +156,8 @@ impl<NID: NodeId> TryFrom<AddLearnerError<NID>> for ForwardToLeader<NID> {
}

/// The set of errors which may take place when initializing a pristine Raft node.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[cfg_attr(feature = "serde_impl", derive(serde::Deserialize, serde::Serialize))]
pub enum InitializeError<C: RaftTypeConfig> {
/// The requested action is not allowed due to the Raft node's current state.
#[error("the requested action is not allowed due to the Raft node's current state")]
Expand Down Expand Up @@ -234,7 +241,8 @@ pub enum ReplicationError<C: RaftTypeConfig> {
RemoteError(#[from] RemoteError<C, AppendEntriesError<C>>),
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[cfg_attr(feature = "serde_impl", derive(serde::Deserialize, serde::Serialize))]
pub enum RPCError<C: RaftTypeConfig, T: Error> {
#[error(transparent)]
NodeNotFound(#[from] NodeNotFound<C::NodeId>),
Expand All @@ -249,7 +257,8 @@ pub enum RPCError<C: RaftTypeConfig, T: Error> {
RemoteError(#[from] RemoteError<C, T>),
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[cfg_attr(feature = "serde_impl", derive(serde::Deserialize, serde::Serialize))]
#[error("error occur on remote peer {target}: {source}")]
pub struct RemoteError<C: RaftTypeConfig, T: std::error::Error> {
pub target: C::NodeId,
Expand All @@ -274,21 +283,24 @@ impl<C: RaftTypeConfig, T: std::error::Error> RemoteError<C, T> {
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[cfg_attr(feature = "serde_impl", derive(serde::Deserialize, serde::Serialize))]
#[error("seen a higher vote: {higher} GT mine: {mine}")]
pub struct HigherVote<C: RaftTypeConfig> {
pub higher: Vote<C::NodeId>,
pub mine: Vote<C::NodeId>,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[cfg_attr(feature = "serde_impl", derive(serde::Deserialize, serde::Serialize))]
#[error("leader committed index {committed_index} advances target log index {target_index} too many")]
pub struct CommittedAdvanceTooMany {
pub committed_index: u64,
pub target_index: u64,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[cfg_attr(feature = "serde_impl", derive(serde::Deserialize, serde::Serialize))]
#[error(transparent)]
pub struct NetworkError {
#[from]
Expand All @@ -303,7 +315,8 @@ impl NetworkError {
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[cfg_attr(feature = "serde_impl", derive(serde::Deserialize, serde::Serialize))]
#[error("timeout after {timeout:?} when {action} {id}->{target}")]
pub struct Timeout<C: RaftTypeConfig> {
pub action: RPCTypes,
Expand All @@ -312,78 +325,89 @@ pub struct Timeout<C: RaftTypeConfig> {
pub timeout: Duration,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[cfg_attr(feature = "serde_impl", derive(serde::Deserialize, serde::Serialize))]
#[error("store has no log at: {index:?}, last purged: {last_purged_log_id:?}")]
pub struct LackEntry<C: RaftTypeConfig> {
pub index: Option<u64>,
pub last_purged_log_id: Option<LogId<C::NodeId>>,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
#[serde(bound = "")]
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[cfg_attr(feature = "serde_impl", derive(serde::Deserialize, serde::Serialize))]
#[cfg_attr(feature = "serde_impl", serde(bound = ""))]
#[error("has to forward request to: {leader_id:?}, {leader_node:?}")]
pub struct ForwardToLeader<NID: NodeId> {
pub leader_id: Option<NID>,
pub leader_node: Option<Node>,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[cfg_attr(feature = "serde_impl", derive(serde::Deserialize, serde::Serialize))]
#[error("snapshot segment id mismatch, expect: {expect}, got: {got}")]
pub struct SnapshotMismatch {
pub expect: SnapshotSegmentId,
pub got: SnapshotSegmentId,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[cfg_attr(feature = "serde_impl", derive(serde::Deserialize, serde::Serialize))]
#[error("not enough for a quorum, cluster: {cluster}, got: {got:?}")]
pub struct QuorumNotEnough<C: RaftTypeConfig> {
pub cluster: String,
pub got: BTreeSet<C::NodeId>,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
#[serde(bound = "")]
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[cfg_attr(feature = "serde_impl", derive(serde::Deserialize, serde::Serialize))]
#[cfg_attr(feature = "serde_impl", serde(bound = ""))]
#[error("the cluster is already undergoing a configuration change at log {membership_log_id}")]
pub struct InProgress<NID: NodeId> {
pub membership_log_id: LogId<NID>,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
#[serde(bound = "")]
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[cfg_attr(feature = "serde_impl", derive(serde::Deserialize, serde::Serialize))]
#[cfg_attr(feature = "serde_impl", serde(bound = ""))]
#[error("to add a member {node_id} first need to add it as learner")]
pub struct LearnerNotFound<NID: NodeId> {
pub node_id: NID,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
#[serde(bound = "")]
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[cfg_attr(feature = "serde_impl", derive(serde::Deserialize, serde::Serialize))]
#[cfg_attr(feature = "serde_impl", serde(bound = ""))]
#[error("replication to learner {node_id} is lagging {distance}, matched: {matched:?}, can not add as member")]
pub struct LearnerIsLagging<NID: NodeId> {
pub node_id: NID,
pub matched: Option<LogId<NID>>,
pub distance: u64,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
#[serde(bound = "")]
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[cfg_attr(feature = "serde_impl", derive(serde::Deserialize, serde::Serialize))]
#[cfg_attr(feature = "serde_impl", serde(bound = ""))]
#[error("node {node_id} {reason}")]
pub struct MissingNodeInfo<NID: NodeId> {
pub node_id: NID,
pub reason: String,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[cfg_attr(feature = "serde_impl", derive(serde::Deserialize, serde::Serialize))]
#[error("new membership can not be empty")]
pub struct EmptyMembership {}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
#[serde(bound = "")]
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[cfg_attr(feature = "serde_impl", derive(serde::Deserialize, serde::Serialize))]
#[cfg_attr(feature = "serde_impl", serde(bound = ""))]
#[error("node not found: {node_id}, source: {source}")]
pub struct NodeNotFound<NID: NodeId> {
pub node_id: NID,
pub source: AnyError,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[cfg_attr(feature = "serde_impl", derive(serde::Deserialize, serde::Serialize))]
#[error("infallible")]
pub enum Infallible {}
Loading