diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index ca4ff705d..e4e3c8a93 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -405,6 +405,7 @@ jobs: - "nightly" example: - "raft-kv-memstore" + - "raft-kv-memstore-generic-snapshot-data" - "raft-kv-memstore-singlethreaded" - "raft-kv-rocksdb" diff --git a/Cargo.toml b/Cargo.toml index 7ea4db3db..17dd84df9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,5 +58,6 @@ exclude = [ "stores/rocksstore-v2", "examples/raft-kv-memstore", "examples/raft-kv-memstore-singlethreaded", + "examples/raft-kv-memstore-generic-snapshot-data", "examples/raft-kv-rocksdb", ] diff --git a/examples/raft-kv-memstore-generic-snapshot-data/.gitignore b/examples/raft-kv-memstore-generic-snapshot-data/.gitignore new file mode 100644 index 000000000..cb4025390 --- /dev/null +++ b/examples/raft-kv-memstore-generic-snapshot-data/.gitignore @@ -0,0 +1,5 @@ +target +vendor +.idea + +/*.log diff --git a/examples/raft-kv-memstore-generic-snapshot-data/Cargo.toml b/examples/raft-kv-memstore-generic-snapshot-data/Cargo.toml new file mode 100644 index 000000000..fb2a1814f --- /dev/null +++ b/examples/raft-kv-memstore-generic-snapshot-data/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "raft-kv-memstore-generic-snapshot-data" +version = "0.1.0" +readme = "README.md" + +edition = "2021" +authors = [ + "drdr xp ", + "Pedro Paulo de Amorim ", +] +categories = ["algorithms", "asynchronous", "data-structures"] +description = "An example distributed key-value store built upon `openraft`." +homepage = "https://github.com/datafuselabs/openraft" +keywords = ["raft", "consensus"] +license = "MIT OR Apache-2.0" +repository = "https://github.com/datafuselabs/openraft" + +[dependencies] +openraft = { path = "../../openraft", features = ["serde", "storage-v2", "generic-snapshot-data"] } + +clap = { version = "4.1.11", features = ["derive", "env"] } +reqwest = { version = "0.11.9", features = ["json"] } +serde = { version = "1.0.114", features = ["derive"] } +serde_json = "1.0.57" +tokio = { version = "1.0", default-features = false, features = ["sync"] } +tracing = "0.1.29" +tracing-subscriber = { version = "0.3.0", features = ["env-filter"] } + +[dev-dependencies] +maplit = "1.0.2" + +[features] + +[package.metadata.docs.rs] +all-features = true diff --git a/examples/raft-kv-memstore-generic-snapshot-data/README.md b/examples/raft-kv-memstore-generic-snapshot-data/README.md new file mode 100644 index 000000000..da9d63e07 --- /dev/null +++ b/examples/raft-kv-memstore-generic-snapshot-data/README.md @@ -0,0 +1,17 @@ +# Example Openraft kv-store with `generic-snapshot-data` enabled + +With `generic-snapshot-data` feature flag enabled, Openraft allows application to use any data type for snapshot data, +instead of a single-file like data format with `AsyncSeek + AsyncRead + AsyncWrite + Unpin` bounds. + +This example is similar to the basic raft-kv-memstore example +but focuses on how to handle snapshot with `generic-snapshot-data` enabled. +Other aspects are minimized. + +To send a complete snapshot, Refer to implementation of `RaftNetwork::snapshot()` in this example. + +To receive a complete snapshot, Refer to implementation of `api::snapshot()` in this example. + + +## Run it + +Run it with `cargo test -- --nocaputre`. \ No newline at end of file diff --git a/examples/raft-kv-memstore-generic-snapshot-data/src/api.rs b/examples/raft-kv-memstore-generic-snapshot-data/src/api.rs new file mode 100644 index 000000000..f3e610f6a --- /dev/null +++ b/examples/raft-kv-memstore-generic-snapshot-data/src/api.rs @@ -0,0 +1,104 @@ +//! This mod implements a network API for raft node. + +use std::collections::BTreeMap; +use std::collections::BTreeSet; + +use openraft::error::CheckIsLeaderError; +use openraft::error::Infallible; +use openraft::error::RaftError; +use openraft::BasicNode; +use openraft::RaftMetrics; + +use crate::app::App; +use crate::decode; +use crate::encode; +use crate::typ; +use crate::NodeId; + +pub async fn write(app: &mut App, req: String) -> String { + let res = app.raft.client_write(decode(&req)).await; + encode(res) +} + +pub async fn read(app: &mut App, req: String) -> String { + let key: String = decode(&req); + + let ret = app.raft.ensure_linearizable().await; + + let res = match ret { + Ok(_) => { + let state_machine = app.state_machine.state_machine.lock().unwrap(); + let value = state_machine.data.get(&key).cloned(); + + let res: Result>> = + Ok(value.unwrap_or_default()); + res + } + Err(e) => Err(e), + }; + encode(res) +} + +// Raft API + +pub async fn vote(app: &mut App, req: String) -> String { + let res = app.raft.vote(decode(&req)).await; + encode(res) +} + +pub async fn append(app: &mut App, req: String) -> String { + let res = app.raft.append_entries(decode(&req)).await; + encode(res) +} + +/// Receive a snapshot and install it. +pub async fn snapshot(app: &mut App, req: String) -> String { + let (vote, snapshot_meta, snapshot_data): (typ::Vote, typ::SnapshotMeta, typ::SnapshotData) = decode(&req); + let snapshot = typ::Snapshot { + meta: snapshot_meta, + snapshot: Box::new(snapshot_data), + }; + let res = app + .raft + .install_complete_snapshot(vote, snapshot) + .await + .map_err(typ::RaftError::::Fatal); + encode(res) +} + +// Management API + +/// Add a node as **Learner**. +/// +/// A Learner receives log replication from the leader but does not vote. +/// This should be done before adding a node as a member into the cluster +/// (by calling `change-membership`) +pub async fn add_learner(app: &mut App, req: String) -> String { + let node_id: NodeId = decode(&req); + let node = BasicNode { addr: "".to_string() }; + let res = app.raft.add_learner(node_id, node, true).await; + encode(res) +} + +/// Changes specified learners to members, or remove members. +pub async fn change_membership(app: &mut App, req: String) -> String { + let node_ids: BTreeSet = decode(&req); + let res = app.raft.change_membership(node_ids, false).await; + encode(res) +} + +/// Initialize a single-node cluster. +pub async fn init(app: &mut App) -> String { + let mut nodes = BTreeMap::new(); + nodes.insert(app.id, BasicNode { addr: "".to_string() }); + let res = app.raft.initialize(nodes).await; + encode(res) +} + +/// Get the latest metrics of the cluster +pub async fn metrics(app: &mut App) -> String { + let metrics = app.raft.metrics().borrow().clone(); + + let res: Result, Infallible> = Ok(metrics); + encode(res) +} diff --git a/examples/raft-kv-memstore-generic-snapshot-data/src/app.rs b/examples/raft-kv-memstore-generic-snapshot-data/src/app.rs new file mode 100644 index 000000000..5d5a05fe6 --- /dev/null +++ b/examples/raft-kv-memstore-generic-snapshot-data/src/app.rs @@ -0,0 +1,73 @@ +use std::sync::Arc; + +use tokio::sync::mpsc; +use tokio::sync::oneshot; + +use crate::api; +use crate::router::Router; +use crate::typ; +use crate::NodeId; +use crate::StateMachineStore; + +pub type Path = String; +pub type Payload = String; +pub type ResponseTx = oneshot::Sender; +pub type RequestTx = mpsc::UnboundedSender<(Path, Payload, ResponseTx)>; + +/// Representation of an application state. +pub struct App { + pub id: NodeId, + pub raft: typ::Raft, + + /// Receive application requests, Raft protocol request or management requests. + pub rx: mpsc::UnboundedReceiver<(Path, Payload, ResponseTx)>, + pub router: Router, + + pub state_machine: Arc, +} + +impl App { + pub fn new(id: NodeId, raft: typ::Raft, router: Router, state_machine: Arc) -> Self { + let (tx, rx) = mpsc::unbounded_channel(); + + { + let mut targets = router.targets.lock().unwrap(); + targets.insert(id, tx); + } + + Self { + id, + raft, + rx, + router, + state_machine, + } + } + + pub async fn run(mut self) -> Option<()> { + loop { + let (path, payload, response_tx) = self.rx.recv().await?; + + let res = match path.as_str() { + // Application API + "/app/write" => api::write(&mut self, payload).await, + "/app/read" => api::read(&mut self, payload).await, + + // Raft API + "/raft/append" => api::append(&mut self, payload).await, + "/raft/snapshot" => api::snapshot(&mut self, payload).await, + "/raft/vote" => api::vote(&mut self, payload).await, + + // Management API + "/mng/add-learner" => api::add_learner(&mut self, payload).await, + "/mng/change-membership" => api::change_membership(&mut self, payload).await, + "/mng/init" => api::init(&mut self).await, + "/mng/metrics" => api::metrics(&mut self).await, + + _ => panic!("unknown path: {}", path), + }; + + response_tx.send(res).unwrap(); + } + } +} diff --git a/examples/raft-kv-memstore-generic-snapshot-data/src/lib.rs b/examples/raft-kv-memstore-generic-snapshot-data/src/lib.rs new file mode 100644 index 000000000..4d4c9d7ec --- /dev/null +++ b/examples/raft-kv-memstore-generic-snapshot-data/src/lib.rs @@ -0,0 +1,107 @@ +#![allow(clippy::uninlined_format_args)] +#![deny(unused_qualifications)] + +use std::sync::Arc; + +use openraft::BasicNode; +use openraft::Config; +use openraft::TokioRuntime; + +use crate::app::App; +use crate::router::Router; +use crate::store::Request; +use crate::store::Response; +use crate::store::StateMachineData; + +pub mod router; + +pub mod api; +pub mod app; +pub mod network; +pub mod store; + +pub type NodeId = u64; + +openraft::declare_raft_types!( + /// Declare the type configuration for example K/V store. + pub TypeConfig: + D = Request, + R = Response, + NodeId = NodeId, + Node = BasicNode, + Entry = openraft::Entry, + // In this example, snapshot is just a copy of the state machine. + // And it can be any type. + SnapshotData = StateMachineData, + AsyncRuntime = TokioRuntime +); + +pub type LogStore = crate::store::LogStore; +pub type StateMachineStore = crate::store::StateMachineStore; + +pub mod typ { + use openraft::BasicNode; + + use crate::NodeId; + use crate::TypeConfig; + + pub type Raft = openraft::Raft; + + pub type Vote = openraft::Vote; + pub type SnapshotMeta = openraft::SnapshotMeta; + pub type SnapshotData = ::SnapshotData; + pub type Snapshot = openraft::Snapshot; + + pub type Infallible = openraft::error::Infallible; + pub type Fatal = openraft::error::Fatal; + pub type RaftError = openraft::error::RaftError; + pub type RPCError = openraft::error::RPCError>; + pub type StreamingError = openraft::error::StreamingError; + + pub type RaftMetrics = openraft::RaftMetrics; + + pub type ClientWriteError = openraft::error::ClientWriteError; + pub type CheckIsLeaderError = openraft::error::CheckIsLeaderError; + pub type ForwardToLeader = openraft::error::ForwardToLeader; + pub type InitializeError = openraft::error::InitializeError; + + pub type ClientWriteResponse = openraft::raft::ClientWriteResponse; +} + +pub fn encode(t: T) -> String { + serde_json::to_string(&t).unwrap() +} + +pub fn decode(s: &str) -> T { + serde_json::from_str(s).unwrap() +} + +pub async fn new_raft(node_id: NodeId, router: Router) -> (typ::Raft, App) { + // Create a configuration for the raft instance. + let config = Config { + heartbeat_interval: 500, + election_timeout_min: 1500, + election_timeout_max: 3000, + // Once snapshot is built, delete the logs at once. + // So that all further replication will be based on the snapshot. + max_in_snapshot_log_to_keep: 0, + ..Default::default() + }; + + let config = Arc::new(config.validate().unwrap()); + + // Create a instance of where the Raft logs will be stored. + let log_store = Arc::new(LogStore::default()); + + // Create a instance of where the state machine data will be stored. + let state_machine_store = Arc::new(StateMachineStore::default()); + + // Create a local raft instance. + let raft = openraft::Raft::new(node_id, config, router.clone(), log_store, state_machine_store.clone()) + .await + .unwrap(); + + let app = App::new(node_id, raft.clone(), router, state_machine_store); + + (raft, app) +} diff --git a/examples/raft-kv-memstore-generic-snapshot-data/src/network.rs b/examples/raft-kv-memstore-generic-snapshot-data/src/network.rs new file mode 100644 index 000000000..7d1d59a02 --- /dev/null +++ b/examples/raft-kv-memstore-generic-snapshot-data/src/network.rs @@ -0,0 +1,76 @@ +use std::future::Future; + +use openraft::error::RemoteError; +use openraft::error::ReplicationClosed; +use openraft::network::RPCOption; +use openraft::raft::AppendEntriesRequest; +use openraft::raft::AppendEntriesResponse; +use openraft::raft::SnapshotResponse; +use openraft::raft::VoteRequest; +use openraft::raft::VoteResponse; +use openraft::OptionalSend; +use openraft::RaftNetwork; +use openraft::RaftNetworkFactory; +use openraft::Snapshot; +use openraft::Vote; + +use crate::router::Router; +use crate::typ; +use crate::BasicNode; +use crate::NodeId; +use crate::TypeConfig; + +pub struct Connection { + router: Router, + target: NodeId, +} + +impl RaftNetworkFactory for Router { + type Network = Connection; + + async fn new_client(&mut self, target: NodeId, _node: &BasicNode) -> Self::Network { + Connection { + router: self.clone(), + target, + } + } +} + +impl RaftNetwork for Connection { + async fn send_append_entries( + &mut self, + req: AppendEntriesRequest, + ) -> Result, typ::RPCError> { + let resp = self + .router + .send(self.target, "/raft/append", req) + .await + .map_err(|e| RemoteError::new(self.target, e))?; + Ok(resp) + } + + /// A real application should replace this method with customized implementation. + async fn snapshot( + &mut self, + vote: Vote, + snapshot: Snapshot, + _cancel: impl Future + OptionalSend, + _option: RPCOption, + ) -> Result, typ::StreamingError> { + let resp = self + .router + .send::<_, _, typ::Infallible>(self.target, "/raft/snapshot", (vote, snapshot.meta, snapshot.snapshot)) + .await + .map_err(|e| RemoteError::new(self.target, e.into_fatal().unwrap()))?; + Ok(resp) + } + + async fn send_vote(&mut self, req: VoteRequest) -> Result, typ::RPCError> { + let resp = self + .router + .send(self.target, "/raft/vote", req) + .await + .map_err(|e| RemoteError::new(self.target, e))?; + Ok(resp) + } +} diff --git a/examples/raft-kv-memstore-generic-snapshot-data/src/router.rs b/examples/raft-kv-memstore-generic-snapshot-data/src/router.rs new file mode 100644 index 000000000..c9bf0c54a --- /dev/null +++ b/examples/raft-kv-memstore-generic-snapshot-data/src/router.rs @@ -0,0 +1,44 @@ +use std::collections::BTreeMap; +use std::sync::Arc; +use std::sync::Mutex; + +use tokio::sync::oneshot; + +use crate::app::RequestTx; +use crate::decode; +use crate::encode; +use crate::typ::RaftError; +use crate::NodeId; + +/// Simulate a network router. +#[derive(Debug, Clone)] +#[derive(Default)] +pub struct Router { + pub targets: Arc>>, +} + +impl Router { + /// Send request `Req` to target node `to`, and wait for response `Result>`. + pub async fn send(&self, to: NodeId, path: &str, req: Req) -> Result> + where + Req: serde::Serialize, + Result>: serde::de::DeserializeOwned, + { + let (resp_tx, resp_rx) = oneshot::channel(); + + let encoded_req = encode(req); + tracing::debug!("send to: {}, {}, {}", to, path, encoded_req); + + { + let mut targets = self.targets.lock().unwrap(); + let tx = targets.get_mut(&to).unwrap(); + + tx.send((path.to_string(), encoded_req, resp_tx)).unwrap(); + } + + let resp_str = resp_rx.await.unwrap(); + tracing::debug!("resp from: {}, {}, {}", to, path, resp_str); + + decode::>>(&resp_str) + } +} diff --git a/examples/raft-kv-memstore-generic-snapshot-data/src/store.rs b/examples/raft-kv-memstore-generic-snapshot-data/src/store.rs new file mode 100644 index 000000000..98d36aa8b --- /dev/null +++ b/examples/raft-kv-memstore-generic-snapshot-data/src/store.rs @@ -0,0 +1,344 @@ +use std::collections::BTreeMap; +use std::fmt::Debug; +use std::ops::RangeBounds; +use std::sync::Arc; +use std::sync::Mutex; + +use openraft::storage::LogFlushed; +use openraft::storage::LogState; +use openraft::storage::RaftLogStorage; +use openraft::storage::RaftStateMachine; +use openraft::storage::Snapshot; +use openraft::BasicNode; +use openraft::Entry; +use openraft::EntryPayload; +use openraft::LogId; +use openraft::RaftLogReader; +use openraft::RaftSnapshotBuilder; +use openraft::RaftTypeConfig; +use openraft::SnapshotMeta; +use openraft::StorageError; +use openraft::StoredMembership; +use openraft::Vote; +use serde::Deserialize; +use serde::Serialize; + +use crate::typ; +use crate::NodeId; +use crate::TypeConfig; + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum Request { + Set { key: String, value: String }, +} + +impl Request { + pub fn set(key: impl ToString, value: impl ToString) -> Self { + Self::Set { + key: key.to_string(), + value: value.to_string(), + } + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct Response { + pub value: Option, +} + +#[derive(Debug)] +pub struct StoredSnapshot { + pub meta: SnapshotMeta, + + /// The data of the state machine at the time of this snapshot. + pub data: Box, +} + +/// Data contained in the Raft state machine. Note that we are using `serde` to serialize the +/// `data`, which has a implementation to be serialized. Note that for this test we set both the key +/// and value as String, but you could set any type of value that has the serialization impl. +#[derive(Serialize, Deserialize, Debug, Default, Clone)] +pub struct StateMachineData { + pub last_applied: Option>, + + pub last_membership: StoredMembership, + + /// Application data. + pub data: BTreeMap, +} + +/// Defines a state machine for the Raft cluster. This state machine represents a copy of the +/// data for this node. Additionally, it is responsible for storing the last snapshot of the data. +#[derive(Debug, Default)] +pub struct StateMachineStore { + /// The Raft state machine. + pub state_machine: Mutex, + + snapshot_idx: Mutex, + + /// The last received snapshot. + current_snapshot: Mutex>, +} + +#[derive(Debug, Default)] +pub struct LogStore { + last_purged_log_id: Mutex>>, + + /// The Raft log. + log: Mutex>>, + + committed: Mutex>>, + + /// The current granted vote. + vote: Mutex>>, +} + +impl RaftLogReader for Arc { + async fn try_get_log_entries + Clone + Debug>( + &mut self, + range: RB, + ) -> Result>, StorageError> { + let log = self.log.lock().unwrap(); + let response = log.range(range.clone()).map(|(_, val)| val.clone()).collect::>(); + Ok(response) + } +} + +impl RaftSnapshotBuilder for Arc { + #[tracing::instrument(level = "trace", skip(self))] + async fn build_snapshot(&mut self) -> Result, StorageError> { + let data; + let last_applied_log; + let last_membership; + + { + // Serialize the data of the state machine. + let state_machine = self.state_machine.lock().unwrap().clone(); + + last_applied_log = state_machine.last_applied; + last_membership = state_machine.last_membership.clone(); + data = state_machine; + } + + let snapshot_idx = { + let mut l = self.snapshot_idx.lock().unwrap(); + *l += 1; + *l + }; + + let snapshot_id = if let Some(last) = last_applied_log { + format!("{}-{}-{}", last.leader_id, last.index, snapshot_idx) + } else { + format!("--{}", snapshot_idx) + }; + + let meta = SnapshotMeta { + last_log_id: last_applied_log, + last_membership, + snapshot_id, + }; + + let snapshot = StoredSnapshot { + meta: meta.clone(), + data: Box::new(data.clone()), + }; + + { + let mut current_snapshot = self.current_snapshot.lock().unwrap(); + *current_snapshot = Some(snapshot); + } + + Ok(Snapshot { + meta, + snapshot: Box::new(data), + }) + } +} + +impl RaftStateMachine for Arc { + type SnapshotBuilder = Self; + + async fn applied_state( + &mut self, + ) -> Result<(Option>, StoredMembership), StorageError> { + let state_machine = self.state_machine.lock().unwrap(); + Ok((state_machine.last_applied, state_machine.last_membership.clone())) + } + + #[tracing::instrument(level = "trace", skip(self, entries))] + async fn apply(&mut self, entries: I) -> Result, StorageError> + where I: IntoIterator> { + let mut res = Vec::new(); //No `with_capacity`; do not know `len` of iterator + + let mut sm = self.state_machine.lock().unwrap(); + + for entry in entries { + tracing::debug!(%entry.log_id, "replicate to sm"); + + sm.last_applied = Some(entry.log_id); + + match entry.payload { + EntryPayload::Blank => res.push(Response { value: None }), + EntryPayload::Normal(ref req) => match req { + Request::Set { key, value, .. } => { + sm.data.insert(key.clone(), value.clone()); + res.push(Response { + value: Some(value.clone()), + }) + } + }, + EntryPayload::Membership(ref mem) => { + sm.last_membership = StoredMembership::new(Some(entry.log_id), mem.clone()); + res.push(Response { value: None }) + } + }; + } + Ok(res) + } + + #[tracing::instrument(level = "trace", skip(self))] + async fn begin_receiving_snapshot( + &mut self, + ) -> Result::SnapshotData>, StorageError> { + Ok(Box::default()) + } + + #[tracing::instrument(level = "trace", skip(self, snapshot))] + async fn install_snapshot( + &mut self, + meta: &SnapshotMeta, + snapshot: Box<::SnapshotData>, + ) -> Result<(), StorageError> { + tracing::info!("install snapshot"); + + let new_snapshot = StoredSnapshot { + meta: meta.clone(), + data: snapshot, + }; + + // Update the state machine. + { + let updated_state_machine: StateMachineData = *new_snapshot.data.clone(); + let mut state_machine = self.state_machine.lock().unwrap(); + *state_machine = updated_state_machine; + } + + // Update current snapshot. + let mut current_snapshot = self.current_snapshot.lock().unwrap(); + *current_snapshot = Some(new_snapshot); + Ok(()) + } + + #[tracing::instrument(level = "trace", skip(self))] + async fn get_current_snapshot(&mut self) -> Result>, StorageError> { + match &*self.current_snapshot.lock().unwrap() { + Some(snapshot) => { + let data = snapshot.data.clone(); + Ok(Some(Snapshot { + meta: snapshot.meta.clone(), + snapshot: data, + })) + } + None => Ok(None), + } + } + + async fn get_snapshot_builder(&mut self) -> Self::SnapshotBuilder { + self.clone() + } +} + +impl RaftLogStorage for Arc { + type LogReader = Self; + + async fn get_log_state(&mut self) -> Result, StorageError> { + let log = self.log.lock().unwrap(); + let last = log.iter().next_back().map(|(_, ent)| ent.log_id); + + let last_purged = *self.last_purged_log_id.lock().unwrap(); + + let last = match last { + None => last_purged, + Some(x) => Some(x), + }; + + Ok(LogState { + last_purged_log_id: last_purged, + last_log_id: last, + }) + } + + async fn save_committed(&mut self, committed: Option>) -> Result<(), StorageError> { + let mut c = self.committed.lock().unwrap(); + *c = committed; + Ok(()) + } + + async fn read_committed(&mut self) -> Result>, StorageError> { + let committed = self.committed.lock().unwrap(); + Ok(*committed) + } + + #[tracing::instrument(level = "trace", skip(self))] + async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { + let mut v = self.vote.lock().unwrap(); + *v = Some(*vote); + Ok(()) + } + + async fn read_vote(&mut self) -> Result>, StorageError> { + Ok(*self.vote.lock().unwrap()) + } + + #[tracing::instrument(level = "trace", skip(self, entries, callback))] + async fn append(&mut self, entries: I, callback: LogFlushed) -> Result<(), StorageError> + where I: IntoIterator> { + // Simple implementation that calls the flush-before-return `append_to_log`. + let mut log = self.log.lock().unwrap(); + for entry in entries { + log.insert(entry.log_id.index, entry); + } + callback.log_io_completed(Ok(())); + + Ok(()) + } + + #[tracing::instrument(level = "debug", skip(self))] + async fn truncate(&mut self, log_id: LogId) -> Result<(), StorageError> { + tracing::debug!("delete_log: [{:?}, +oo)", log_id); + + let mut log = self.log.lock().unwrap(); + let keys = log.range(log_id.index..).map(|(k, _v)| *k).collect::>(); + for key in keys { + log.remove(&key); + } + + Ok(()) + } + + #[tracing::instrument(level = "debug", skip(self))] + async fn purge(&mut self, log_id: LogId) -> Result<(), StorageError> { + tracing::debug!("delete_log: (-oo, {:?}]", log_id); + + { + let mut ld = self.last_purged_log_id.lock().unwrap(); + assert!(*ld <= Some(log_id)); + *ld = Some(log_id); + } + + { + let mut log = self.log.lock().unwrap(); + + let keys = log.range(..=log_id.index).map(|(k, _v)| *k).collect::>(); + for key in keys { + log.remove(&key); + } + } + + Ok(()) + } + + async fn get_log_reader(&mut self) -> Self::LogReader { + self.clone() + } +} diff --git a/examples/raft-kv-memstore-generic-snapshot-data/test-cluster.sh b/examples/raft-kv-memstore-generic-snapshot-data/test-cluster.sh new file mode 100755 index 000000000..9b582da4a --- /dev/null +++ b/examples/raft-kv-memstore-generic-snapshot-data/test-cluster.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +echo "No shell test script for this example" \ No newline at end of file diff --git a/examples/raft-kv-memstore-generic-snapshot-data/tests/cluster/main.rs b/examples/raft-kv-memstore-generic-snapshot-data/tests/cluster/main.rs new file mode 100644 index 000000000..5148911f9 --- /dev/null +++ b/examples/raft-kv-memstore-generic-snapshot-data/tests/cluster/main.rs @@ -0,0 +1,3 @@ +#![allow(clippy::uninlined_format_args)] + +mod test_cluster; diff --git a/examples/raft-kv-memstore-generic-snapshot-data/tests/cluster/test_cluster.rs b/examples/raft-kv-memstore-generic-snapshot-data/tests/cluster/test_cluster.rs new file mode 100644 index 000000000..9f14ee922 --- /dev/null +++ b/examples/raft-kv-memstore-generic-snapshot-data/tests/cluster/test_cluster.rs @@ -0,0 +1,137 @@ +use std::backtrace::Backtrace; +use std::collections::BTreeMap; +use std::panic::PanicInfo; +use std::time::Duration; + +use openraft::BasicNode; +use raft_kv_memstore_generic_snapshot_data::new_raft; +use raft_kv_memstore_generic_snapshot_data::router::Router; +use raft_kv_memstore_generic_snapshot_data::store::Request; +use raft_kv_memstore_generic_snapshot_data::typ; +use tokio::task; +use tokio::task::LocalSet; +use tracing_subscriber::EnvFilter; + +pub fn log_panic(panic: &PanicInfo) { + let backtrace = format!("{:?}", Backtrace::force_capture()); + + eprintln!("{}", panic); + + if let Some(location) = panic.location() { + tracing::error!( + message = %panic, + backtrace = %backtrace, + panic.file = location.file(), + panic.line = location.line(), + panic.column = location.column(), + ); + eprintln!("{}:{}:{}", location.file(), location.line(), location.column()); + } else { + tracing::error!(message = %panic, backtrace = %backtrace); + } + + eprintln!("{}", backtrace); +} + +/// This test shows how to transfer a snapshot from one node to another: +/// +/// - Setup a single node cluster, write some logs, take a snapshot; +/// - Add a learner node-2 to receive snapshot replication, via the complete-snapshot API: +/// - The sending end sends snapshot with `RaftNetwork::snapshot()`; +/// - The receiving end deliver the received snapshot to `Raft` with +/// `Raft::install_complete_snapshot()`. +#[tokio::test] +async fn test_cluster() { + std::panic::set_hook(Box::new(|panic| { + log_panic(panic); + })); + + tracing_subscriber::fmt() + .with_target(true) + .with_thread_ids(true) + .with_level(true) + .with_ansi(false) + .with_env_filter(EnvFilter::from_default_env()) + .init(); + + let router = Router::default(); + + let local = LocalSet::new(); + + let (raft1, app1) = new_raft(1, router.clone()).await; + let (raft2, app2) = new_raft(2, router.clone()).await; + + let rafts = [raft1, raft2]; + + local + .run_until(async move { + task::spawn_local(app1.run()); + task::spawn_local(app2.run()); + + run_test(&rafts, router).await; + }) + .await; +} + +async fn run_test(rafts: &[typ::Raft], router: Router) { + let _ = router; + + // Wait for server to start up. + tokio::time::sleep(Duration::from_millis(200)).await; + + let raft1 = &rafts[0]; + let raft2 = &rafts[1]; + + println!("=== init single node cluster"); + { + let mut nodes = BTreeMap::new(); + nodes.insert(1, BasicNode { addr: "".to_string() }); + raft1.initialize(nodes).await.unwrap(); + } + + println!("=== write 2 logs"); + { + let resp = raft1.client_write(Request::set("foo1", "bar1")).await.unwrap(); + println!("write resp: {:#?}", resp); + let resp = raft1.client_write(Request::set("foo2", "bar2")).await.unwrap(); + println!("write resp: {:#?}", resp); + } + + println!("=== let node-1 take a snapshot"); + { + raft1.trigger().snapshot().await.unwrap(); + + // Wait for a while to let the snapshot get done. + tokio::time::sleep(Duration::from_millis(500)).await; + } + + println!("=== metrics after building snapshot"); + { + let metrics = raft1.metrics().borrow().clone(); + println!("node 1 metrics: {:#?}", metrics); + assert_eq!(Some(3), metrics.snapshot.map(|x| x.index)); + assert_eq!(Some(3), metrics.purged.map(|x| x.index)); + } + + println!("=== add-learner node-2"); + { + let node = BasicNode { addr: "".to_string() }; + let resp = raft1.add_learner(2, node, true).await.unwrap(); + println!("add-learner node-2 resp: {:#?}", resp); + } + + // Wait for a while to let the node 2 to receive snapshot replication. + tokio::time::sleep(Duration::from_millis(500)).await; + + println!("=== metrics of node 2 that received snapshot"); + { + let metrics = raft2.metrics().borrow().clone(); + println!("node 2 metrics: {:#?}", metrics); + assert_eq!(Some(3), metrics.snapshot.map(|x| x.index)); + assert_eq!(Some(3), metrics.purged.map(|x| x.index)); + } + + // In this example, the snapshot is just a copy of the state machine. + let snapshot = raft2.get_snapshot().await.unwrap(); + println!("node 2 received snapshot: {:#?}", snapshot); +} diff --git a/openraft/Cargo.toml b/openraft/Cargo.toml index c96a0b4cc..4b3ad2d4b 100644 --- a/openraft/Cargo.toml +++ b/openraft/Cargo.toml @@ -87,11 +87,28 @@ storage-v2 = [] singlethreaded = ["macros/singlethreaded"] -# Permit the follower's log to roll back to an earlier state without causing the leader to panic. -# Although log state reversion is typically seen as a bug, enabling it can be useful for testing or other special scenarios. -# For instance, in an even number nodes cluster, erasing a node's data and then rebooting it(log reverts to empty) will not result in data loss. +# Permit the follower's log to roll back to an earlier state without causing the +# leader to panic. +# +# Although log state reversion is typically seen as a bug, enabling it can be +# useful for testing or other special scenarios. +# For instance, in an even number nodes cluster, erasing a node's data and then +# rebooting it(log reverts to empty) will not result in data loss. loosen-follower-log-revert = [] + +# Enable this feature flag to eliminate the `AsyncRead + AsyncWrite + AsyncSeek +# + Unpin` bound from `RaftTypeConfig::SnapshotData`. +# +# Enabling this feature allows applications to use a custom snapshot data format +# and transport fragmentation, diverging from the default implementation which +# typically relies on a single-file structure . +# +# By default it is off. +# This feature is introduced in 0.9.0 +generic-snapshot-data = [] + + # Enables "log" feature in `tracing` crate, to let tracing events emit log # record. # See: https://docs.rs/tracing/latest/tracing/#emitting-log-records diff --git a/openraft/src/core/mod.rs b/openraft/src/core/mod.rs index c10d9ddc3..d401e4707 100644 --- a/openraft/src/core/mod.rs +++ b/openraft/src/core/mod.rs @@ -12,11 +12,8 @@ pub(crate) mod raft_msg; mod replication_state; mod server_state; pub(crate) mod sm; -pub(crate) mod streaming_state; mod tick; -pub(crate) mod snapshot_state; - pub(crate) use raft_core::ApplyResult; pub(crate) use raft_core::ApplyingEntry; pub use raft_core::RaftCore; diff --git a/openraft/src/core/snapshot_state.rs b/openraft/src/core/snapshot_state.rs deleted file mode 100644 index 10c437ca5..000000000 --- a/openraft/src/core/snapshot_state.rs +++ /dev/null @@ -1,22 +0,0 @@ -use crate::LeaderId; -use crate::NodeId; -use crate::SnapshotId; - -/// A global unique id of install-snapshot request. -#[derive(Debug, Clone)] -#[derive(PartialEq, Eq)] -pub(crate) struct SnapshotRequestId { - pub(crate) leader_id: LeaderId, - pub(crate) snapshot_id: SnapshotId, - pub(crate) offset: u64, -} - -impl SnapshotRequestId { - pub(crate) fn new(leader_id: LeaderId, snapshot_id: SnapshotId, offset: u64) -> Self { - Self { - leader_id, - snapshot_id, - offset, - } - } -} diff --git a/openraft/src/core/streaming_state.rs b/openraft/src/core/streaming_state.rs deleted file mode 100644 index 53528e086..000000000 --- a/openraft/src/core/streaming_state.rs +++ /dev/null @@ -1,66 +0,0 @@ -use std::io::SeekFrom; - -use tokio::io::AsyncSeekExt; -use tokio::io::AsyncWriteExt; - -use crate::raft::InstallSnapshotRequest; -use crate::ErrorSubject; -use crate::ErrorVerb; -use crate::RaftTypeConfig; -use crate::SnapshotId; -use crate::StorageError; - -/// The Raft node is streaming in a snapshot from the leader. -pub(crate) struct Streaming -where C: RaftTypeConfig -{ - /// The offset of the last byte written to the snapshot. - pub(crate) offset: u64, - - /// The ID of the snapshot being written. - pub(crate) snapshot_id: SnapshotId, - - /// A handle to the snapshot writer. - pub(crate) snapshot_data: Box, -} - -impl Streaming -where C: RaftTypeConfig -{ - pub(crate) fn new(snapshot_id: SnapshotId, snapshot_data: Box) -> Self { - Self { - offset: 0, - snapshot_id, - snapshot_data, - } - } - - /// Receive a chunk of snapshot data. - pub(crate) async fn receive(&mut self, req: InstallSnapshotRequest) -> Result> { - // TODO: check id? - - // Always seek to the target offset if not an exact match. - if req.offset != self.offset { - if let Err(err) = self.snapshot_data.as_mut().seek(SeekFrom::Start(req.offset)).await { - return Err(StorageError::from_io_error( - ErrorSubject::Snapshot(Some(req.meta.signature())), - ErrorVerb::Seek, - err, - )); - } - self.offset = req.offset; - } - - // Write the next segment & update offset. - let res = self.snapshot_data.as_mut().write_all(&req.data).await; - if let Err(err) = res { - return Err(StorageError::from_io_error( - ErrorSubject::Snapshot(Some(req.meta.signature())), - ErrorVerb::Write, - err, - )); - } - self.offset += req.data.len() as u64; - Ok(req.done) - } -} diff --git a/openraft/src/docs/feature_flags/feature-flags.md b/openraft/src/docs/feature_flags/feature-flags.md index 79ed37a9b..a4eca5358 100644 --- a/openraft/src/docs/feature_flags/feature-flags.md +++ b/openraft/src/docs/feature_flags/feature-flags.md @@ -55,6 +55,35 @@ By default openraft enables no features. In order to use the feature, `AsyncRuntime::spawn` should invoke `tokio::task::spawn_local` or equivalents.

+- `generic-snapshot-data`: Enable this feature flag + to eliminate the `AsyncRead + AsyncWrite + AsyncSeek + Unpin` bound + from [`RaftTypeConfig::SnapshotData`](crate::RaftTypeConfig::SnapshotData) + Enabling this feature allows applications to use a custom snapshot data format and transport fragmentation, + diverging from the default implementation which typically relies on a single-file structure. + + By default, it is off. + This feature is introduced in 0.9.0 + + On the sending end (leader that sends snapshot to follower): + + - Without `generic-snapshot-data`: [`RaftNetwork::snapshot()`] + provides a default implementation that invokes the chunk-based API + [`RaftNetwork::install_snapshot()`] for transmit. + + - With `generic-snapshot-data` enabled: [`RaftNetwork::snapshot()`] + must be implemented to provide application customized snapshot transmission. + Application does not need to implement [`RaftNetwork::install_snapshot()`]. + + On the receiving end(follower): + + - `Raft::install_snapshot()` is available only when `generic-snapshot-data` is disabled. + + Refer to example `examples/raft-kv-memstore-generic-snapshot-data` with `generic-snapshot-data` enabled. +

+ - `tracing-log`: enables "log" feature in `tracing` crate, to let tracing events emit log record. See: [tracing doc: emitting-log-records](https://docs.rs/tracing/latest/tracing/#emitting-log-records) + +[`RaftNetwork::snapshot()`]: crate::network::RaftNetwork::snapshot +[`RaftNetwork::install_snapshot()`]: crate::network::RaftNetwork::install_snapshot \ No newline at end of file diff --git a/openraft/src/network/mod.rs b/openraft/src/network/mod.rs index 1552c273d..c2c7a2ec1 100644 --- a/openraft/src/network/mod.rs +++ b/openraft/src/network/mod.rs @@ -5,7 +5,7 @@ mod factory; #[allow(clippy::module_inception)] mod network; mod rpc_option; mod rpc_type; -pub(crate) mod stream_snapshot; +#[cfg(not(feature = "generic-snapshot-data"))] pub(crate) mod stream_snapshot; pub use backoff::Backoff; pub use factory::RaftNetworkFactory; diff --git a/openraft/src/network/network.rs b/openraft/src/network/network.rs index 8ca51cfb7..de3dd5f3a 100644 --- a/openraft/src/network/network.rs +++ b/openraft/src/network/network.rs @@ -10,8 +10,6 @@ use crate::error::RaftError; use crate::error::ReplicationClosed; use crate::error::StreamingError; use crate::network::rpc_option::RPCOption; -use crate::network::stream_snapshot; -use crate::network::stream_snapshot::SnapshotTransport; use crate::network::Backoff; use crate::raft::AppendEntriesRequest; use crate::raft::AppendEntriesResponse; @@ -108,8 +106,21 @@ where C: RaftTypeConfig cancel: impl Future + OptionalSend, option: RPCOption, ) -> Result, StreamingError>> { - let resp = stream_snapshot::Chunked::send_snapshot(self, vote, snapshot, cancel, option).await?; - Ok(resp) + #[cfg(not(feature = "generic-snapshot-data"))] + { + use crate::network::stream_snapshot; + use crate::network::stream_snapshot::SnapshotTransport; + + let resp = stream_snapshot::Chunked::send_snapshot(self, vote, snapshot, cancel, option).await?; + Ok(resp) + } + #[cfg(feature = "generic-snapshot-data")] + { + let _ = (vote, snapshot, cancel, option); + unimplemented!( + "no default implementation for RaftNetwork::snapshot() if `generic-snapshot-data` feature is enabled" + ) + } } /// Send an AppendEntries RPC to the target Raft node (ยง5). diff --git a/openraft/src/network/stream_snapshot.rs b/openraft/src/network/stream_snapshot.rs index 4ecfdae34..c625ea52d 100644 --- a/openraft/src/network/stream_snapshot.rs +++ b/openraft/src/network/stream_snapshot.rs @@ -1,19 +1,13 @@ use std::future::Future; use std::io::SeekFrom; -use std::pin::Pin; use std::time::Duration; use futures::FutureExt; use macros::add_async_trait; -use tokio::io::AsyncRead; use tokio::io::AsyncReadExt; -use tokio::io::AsyncSeek; use tokio::io::AsyncSeekExt; -use tokio::io::AsyncWrite; use tokio::io::AsyncWriteExt; -use crate::core::snapshot_state::SnapshotRequestId; -use crate::core::streaming_state::Streaming; use crate::error::Fatal; use crate::error::ReplicationClosed; use crate::error::StreamingError; @@ -25,10 +19,10 @@ use crate::AsyncRuntime; use crate::ErrorSubject; use crate::ErrorVerb; use crate::OptionalSend; -use crate::OptionalSync; use crate::RaftNetwork; use crate::RaftTypeConfig; use crate::Snapshot; +use crate::SnapshotId; use crate::StorageError; use crate::StorageIOError; use crate::ToStorageResult; @@ -60,9 +54,7 @@ pub(crate) trait SnapshotTransport { /// Send and Receive snapshot by chunks. pub(crate) struct Chunked {} -impl SnapshotTransport for Chunked -where C::SnapshotData: AsyncRead + AsyncWrite + AsyncSeek + OptionalSend + OptionalSync + Unpin + 'static -{ +impl SnapshotTransport for Chunked { /// Stream snapshot by chunks. /// /// This function is for backward compatibility and provides a default implement for @@ -89,12 +81,10 @@ where C::SnapshotData: AsyncRead + AsyncWrite + AsyncSeek + OptionalSend + Optio let mut offset = 0; let end = snapshot.snapshot.seek(SeekFrom::End(0)).await.sto_res(subject_verb)?; + let mut c = std::pin::pin!(cancel); loop { - // Safety: `cancel` is a future that is polled only by this function. - let c = unsafe { Pin::new_unchecked(&mut cancel) }; - // If canceled, return at once - if let Some(err) = c.now_or_never() { + if let Some(err) = c.as_mut().now_or_never() { return Err(err.into()); } @@ -173,23 +163,15 @@ where C::SnapshotData: AsyncRead + AsyncWrite + AsyncSeek + OptionalSend + Optio ) -> Result>, StorageError> { let snapshot_meta = req.meta.clone(); let done = req.done; - let offset = req.offset; - - let req_id = SnapshotRequestId::new(*req.vote.leader_id(), snapshot_meta.snapshot_id.clone(), offset); - tracing::info!( - req = display(&req), - snapshot_req_id = debug(&req_id), - "{}", - func_name!() - ); + tracing::info!(req = display(&req), "{}", func_name!()); { let s = streaming.as_mut().unwrap(); s.receive(req).await?; } - tracing::info!(snapshot_req_id = debug(&req_id), "received snapshot chunk"); + tracing::info!("Done received snapshot chunk"); if done { let streaming = streaming.take().unwrap(); @@ -207,3 +189,58 @@ where C::SnapshotData: AsyncRead + AsyncWrite + AsyncSeek + OptionalSend + Optio Ok(None) } } + +/// The Raft node is streaming in a snapshot from the leader. +pub(crate) struct Streaming +where C: RaftTypeConfig +{ + /// The offset of the last byte written to the snapshot. + pub(crate) offset: u64, + + /// The ID of the snapshot being written. + pub(crate) snapshot_id: SnapshotId, + + /// A handle to the snapshot writer. + pub(crate) snapshot_data: Box, +} + +impl Streaming +where C: RaftTypeConfig +{ + pub(crate) fn new(snapshot_id: SnapshotId, snapshot_data: Box) -> Self { + Self { + offset: 0, + snapshot_id, + snapshot_data, + } + } + + /// Receive a chunk of snapshot data. + pub(crate) async fn receive(&mut self, req: InstallSnapshotRequest) -> Result> { + // TODO: check id? + + // Always seek to the target offset if not an exact match. + if req.offset != self.offset { + if let Err(err) = self.snapshot_data.as_mut().seek(SeekFrom::Start(req.offset)).await { + return Err(StorageError::from_io_error( + ErrorSubject::Snapshot(Some(req.meta.signature())), + ErrorVerb::Seek, + err, + )); + } + self.offset = req.offset; + } + + // Write the next segment & update offset. + let res = self.snapshot_data.as_mut().write_all(&req.data).await; + if let Err(err) = res { + return Err(StorageError::from_io_error( + ErrorSubject::Snapshot(Some(req.meta.signature())), + ErrorVerb::Write, + err, + )); + } + self.offset += req.data.len() as u64; + Ok(req.done) + } +} diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index aae4eea03..5972bdf55 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -42,7 +42,6 @@ use crate::core::raft_msg::external_command::ExternalCommand; use crate::core::raft_msg::RaftMsg; use crate::core::replication_lag; use crate::core::sm; -use crate::core::streaming_state::Streaming; use crate::core::RaftCore; use crate::core::Tick; use crate::engine::Engine; @@ -52,17 +51,13 @@ use crate::error::ClientWriteError; use crate::error::Fatal; use crate::error::HigherVote; use crate::error::InitializeError; -use crate::error::InstallSnapshotError; use crate::error::RaftError; -use crate::error::SnapshotMismatch; use crate::membership::IntoNodes; use crate::metrics::RaftDataMetrics; use crate::metrics::RaftMetrics; use crate::metrics::RaftServerMetrics; use crate::metrics::Wait; use crate::metrics::WaitError; -use crate::network::stream_snapshot::Chunked; -use crate::network::stream_snapshot::SnapshotTransport; use crate::network::RaftNetworkFactory; use crate::raft::raft_inner::RaftInner; use crate::raft::runtime_config_handle::RuntimeConfigHandle; @@ -78,7 +73,6 @@ use crate::MessageSummary; use crate::RaftState; pub use crate::RaftTypeConfig; use crate::Snapshot; -use crate::SnapshotSegmentId; use crate::StorageHelper; use crate::Vote; @@ -261,6 +255,8 @@ where C: RaftTypeConfig rx_server_metrics, tx_shutdown: Mutex::new(Some(tx_shutdown)), core_state: Mutex::new(CoreState::Running(core_handle)), + + #[cfg(not(feature = "generic-snapshot-data"))] snapshot: Mutex::new(None), }; @@ -415,11 +411,18 @@ where C: RaftTypeConfig /// /// If receiving is finished `done == true`, it installs the snapshot to the state machine. /// Nothing will be done if the input snapshot is older than the state machine. + #[cfg(not(feature = "generic-snapshot-data"))] #[tracing::instrument(level = "debug", skip_all)] pub async fn install_snapshot( &self, req: InstallSnapshotRequest, - ) -> Result, RaftError> { + ) -> Result, RaftError> + where + C::SnapshotData: tokio::io::AsyncRead + tokio::io::AsyncWrite + tokio::io::AsyncSeek + Unpin, + { + use crate::network::stream_snapshot::Chunked; + use crate::network::stream_snapshot::SnapshotTransport; + tracing::debug!(req = display(&req), "Raft::install_snapshot()"); let req_vote = req.vote; @@ -431,12 +434,12 @@ where C: RaftTypeConfig if curr_id != Some(&req.meta.snapshot_id) { if req.offset != 0 { - let mismatch = InstallSnapshotError::SnapshotMismatch(SnapshotMismatch { - expect: SnapshotSegmentId { + let mismatch = crate::error::InstallSnapshotError::SnapshotMismatch(crate::error::SnapshotMismatch { + expect: crate::SnapshotSegmentId { id: snapshot_id.clone(), offset: 0, }, - got: SnapshotSegmentId { + got: crate::SnapshotSegmentId { id: snapshot_id.clone(), offset: req.offset, }, @@ -456,7 +459,10 @@ where C: RaftTypeConfig } } }; - *streaming = Some(Streaming::new(req.meta.snapshot_id.clone(), snapshot_data)); + *streaming = Some(crate::network::stream_snapshot::Streaming::new( + req.meta.snapshot_id.clone(), + snapshot_data, + )); } let snapshot = Chunked::receive_snapshot(&mut *streaming, req).await?; diff --git a/openraft/src/raft/raft_inner.rs b/openraft/src/raft/raft_inner.rs index 2141fa5a8..ceecf7f4c 100644 --- a/openraft/src/raft/raft_inner.rs +++ b/openraft/src/raft/raft_inner.rs @@ -11,7 +11,6 @@ use tracing::Level; use crate::config::RuntimeConfig; use crate::core::raft_msg::external_command::ExternalCommand; use crate::core::raft_msg::RaftMsg; -use crate::core::streaming_state::Streaming; use crate::core::TickHandle; use crate::error::Fatal; use crate::error::RaftError; @@ -44,7 +43,8 @@ where C: RaftTypeConfig pub(in crate::raft) core_state: Mutex>, /// The ongoing snapshot transmission. - pub(in crate::raft) snapshot: Mutex>>, + #[cfg(not(feature = "generic-snapshot-data"))] + pub(in crate::raft) snapshot: Mutex>>, } impl RaftInner diff --git a/openraft/src/storage/mod.rs b/openraft/src/storage/mod.rs index f17fcb1ef..742370157 100644 --- a/openraft/src/storage/mod.rs +++ b/openraft/src/storage/mod.rs @@ -113,6 +113,7 @@ where C: RaftTypeConfig impl Snapshot where C: RaftTypeConfig { + #[allow(dead_code)] pub(crate) fn new(meta: SnapshotMeta, snapshot: Box) -> Self { Self { meta, snapshot } } diff --git a/openraft/src/type_config.rs b/openraft/src/type_config.rs index 421f87539..7271d4662 100644 --- a/openraft/src/type_config.rs +++ b/openraft/src/type_config.rs @@ -1,9 +1,5 @@ use std::fmt::Debug; -use tokio::io::AsyncRead; -use tokio::io::AsyncSeek; -use tokio::io::AsyncWrite; - use crate::entry::FromAppData; use crate::entry::RaftEntry; use crate::AppData; @@ -58,11 +54,20 @@ pub trait RaftTypeConfig: /// Raft log entry, which can be built from an AppData. type Entry: RaftEntry + FromAppData; + // TODO: fix the doc address /// Snapshot data for exposing a snapshot for reading & writing. /// /// See the [storage chapter of the guide](https://datafuselabs.github.io/openraft/getting-started.html#implement-raftstorage) /// for details on where and how this is used. - type SnapshotData: AsyncRead + AsyncWrite + AsyncSeek + OptionalSend + OptionalSync + Unpin + 'static; + #[cfg(not(feature = "generic-snapshot-data"))] + type SnapshotData: tokio::io::AsyncRead + + tokio::io::AsyncWrite + + tokio::io::AsyncSeek + + OptionalSend + + Unpin + + 'static; + #[cfg(feature = "generic-snapshot-data")] + type SnapshotData: OptionalSend + 'static; /// Asynchronous runtime type. type AsyncRuntime: AsyncRuntime;