From 907f5f7494f073a6d1a844cf4ef05d314d76d7a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Fri, 16 Feb 2024 22:42:04 +0800 Subject: [PATCH] Feature: feature flag `generic-snapshot-data` Add feature flag `generic-snapshot-data`: when enabled, `SnapshotData` does not have `AsyncSeek + AsyncRead + AsyncWrite` bound. This enables application to define their own snapshot format and transmission protocol. If this feature flag is not eabled, no changes are required for application to upgrade Openraft. On the sending end(leader that sends snapshot to follower): - Without `generic-snapshot-data`: `RaftNetwork::snapshot()` provides a default implementation that invokes the chunk based API `RaftNetwork::install_snapshot()` for transmit. - With `generic-snapshot-data` enabled: `RaftNetwork::snapshot()` must be implemented to provide application customized snapshot transmission. Application does not need to implement `RaftNetwork::install_snapshot()`. On the receiving end(follower): - `Raft::install_snapshot()` is available only when `generic-snapshot-data` is disabled. Add an example `examples/raft-kv-memstore-generic-snapshot-data` with `generic-snapshot-data` enabled. In this example snapshot is transmitted without fragmentation, i.e., via `RaftNetwork::snapshot()`. The chunk based API `RaftNetwork::install_snapshot()` is not used. In a production scenario, a snapshot can be transmitted in arbitrary manner. - Fix: #606 - Fix: #209 --- .github/workflows/ci.yaml | 1 + Cargo.toml | 1 + .../.gitignore | 5 + .../Cargo.toml | 35 ++ .../README.md | 17 + .../src/api.rs | 104 ++++++ .../src/app.rs | 73 ++++ .../src/lib.rs | 107 ++++++ .../src/network.rs | 76 ++++ .../src/router.rs | 44 +++ .../src/store.rs | 344 ++++++++++++++++++ .../test-cluster.sh | 3 + .../tests/cluster/main.rs | 3 + .../tests/cluster/test_cluster.rs | 137 +++++++ openraft/Cargo.toml | 23 +- openraft/src/core/mod.rs | 3 - openraft/src/core/snapshot_state.rs | 22 -- openraft/src/core/streaming_state.rs | 66 ---- .../src/docs/feature_flags/feature-flags.md | 29 ++ openraft/src/network/mod.rs | 2 +- openraft/src/network/network.rs | 19 +- openraft/src/network/stream_snapshot.rs | 85 +++-- openraft/src/raft/mod.rs | 28 +- openraft/src/raft/raft_inner.rs | 4 +- openraft/src/storage/mod.rs | 1 + openraft/src/type_config.rs | 15 +- 26 files changed, 1106 insertions(+), 141 deletions(-) create mode 100644 examples/raft-kv-memstore-generic-snapshot-data/.gitignore create mode 100644 examples/raft-kv-memstore-generic-snapshot-data/Cargo.toml create mode 100644 examples/raft-kv-memstore-generic-snapshot-data/README.md create mode 100644 examples/raft-kv-memstore-generic-snapshot-data/src/api.rs create mode 100644 examples/raft-kv-memstore-generic-snapshot-data/src/app.rs create mode 100644 examples/raft-kv-memstore-generic-snapshot-data/src/lib.rs create mode 100644 examples/raft-kv-memstore-generic-snapshot-data/src/network.rs create mode 100644 examples/raft-kv-memstore-generic-snapshot-data/src/router.rs create mode 100644 examples/raft-kv-memstore-generic-snapshot-data/src/store.rs create mode 100755 examples/raft-kv-memstore-generic-snapshot-data/test-cluster.sh create mode 100644 examples/raft-kv-memstore-generic-snapshot-data/tests/cluster/main.rs create mode 100644 examples/raft-kv-memstore-generic-snapshot-data/tests/cluster/test_cluster.rs delete mode 100644 openraft/src/core/snapshot_state.rs delete mode 100644 openraft/src/core/streaming_state.rs diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index ca4ff705d..e4e3c8a93 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -405,6 +405,7 @@ jobs: - "nightly" example: - "raft-kv-memstore" + - "raft-kv-memstore-generic-snapshot-data" - "raft-kv-memstore-singlethreaded" - "raft-kv-rocksdb" diff --git a/Cargo.toml b/Cargo.toml index 7ea4db3db..17dd84df9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,5 +58,6 @@ exclude = [ "stores/rocksstore-v2", "examples/raft-kv-memstore", "examples/raft-kv-memstore-singlethreaded", + "examples/raft-kv-memstore-generic-snapshot-data", "examples/raft-kv-rocksdb", ] diff --git a/examples/raft-kv-memstore-generic-snapshot-data/.gitignore b/examples/raft-kv-memstore-generic-snapshot-data/.gitignore new file mode 100644 index 000000000..cb4025390 --- /dev/null +++ b/examples/raft-kv-memstore-generic-snapshot-data/.gitignore @@ -0,0 +1,5 @@ +target +vendor +.idea + +/*.log diff --git a/examples/raft-kv-memstore-generic-snapshot-data/Cargo.toml b/examples/raft-kv-memstore-generic-snapshot-data/Cargo.toml new file mode 100644 index 000000000..fb2a1814f --- /dev/null +++ b/examples/raft-kv-memstore-generic-snapshot-data/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "raft-kv-memstore-generic-snapshot-data" +version = "0.1.0" +readme = "README.md" + +edition = "2021" +authors = [ + "drdr xp ", + "Pedro Paulo de Amorim ", +] +categories = ["algorithms", "asynchronous", "data-structures"] +description = "An example distributed key-value store built upon `openraft`." +homepage = "https://github.com/datafuselabs/openraft" +keywords = ["raft", "consensus"] +license = "MIT OR Apache-2.0" +repository = "https://github.com/datafuselabs/openraft" + +[dependencies] +openraft = { path = "../../openraft", features = ["serde", "storage-v2", "generic-snapshot-data"] } + +clap = { version = "4.1.11", features = ["derive", "env"] } +reqwest = { version = "0.11.9", features = ["json"] } +serde = { version = "1.0.114", features = ["derive"] } +serde_json = "1.0.57" +tokio = { version = "1.0", default-features = false, features = ["sync"] } +tracing = "0.1.29" +tracing-subscriber = { version = "0.3.0", features = ["env-filter"] } + +[dev-dependencies] +maplit = "1.0.2" + +[features] + +[package.metadata.docs.rs] +all-features = true diff --git a/examples/raft-kv-memstore-generic-snapshot-data/README.md b/examples/raft-kv-memstore-generic-snapshot-data/README.md new file mode 100644 index 000000000..da9d63e07 --- /dev/null +++ b/examples/raft-kv-memstore-generic-snapshot-data/README.md @@ -0,0 +1,17 @@ +# Example Openraft kv-store with `generic-snapshot-data` enabled + +With `generic-snapshot-data` feature flag enabled, Openraft allows application to use any data type for snapshot data, +instead of a single-file like data format with `AsyncSeek + AsyncRead + AsyncWrite + Unpin` bounds. + +This example is similar to the basic raft-kv-memstore example +but focuses on how to handle snapshot with `generic-snapshot-data` enabled. +Other aspects are minimized. + +To send a complete snapshot, Refer to implementation of `RaftNetwork::snapshot()` in this example. + +To receive a complete snapshot, Refer to implementation of `api::snapshot()` in this example. + + +## Run it + +Run it with `cargo test -- --nocaputre`. \ No newline at end of file diff --git a/examples/raft-kv-memstore-generic-snapshot-data/src/api.rs b/examples/raft-kv-memstore-generic-snapshot-data/src/api.rs new file mode 100644 index 000000000..f3e610f6a --- /dev/null +++ b/examples/raft-kv-memstore-generic-snapshot-data/src/api.rs @@ -0,0 +1,104 @@ +//! This mod implements a network API for raft node. + +use std::collections::BTreeMap; +use std::collections::BTreeSet; + +use openraft::error::CheckIsLeaderError; +use openraft::error::Infallible; +use openraft::error::RaftError; +use openraft::BasicNode; +use openraft::RaftMetrics; + +use crate::app::App; +use crate::decode; +use crate::encode; +use crate::typ; +use crate::NodeId; + +pub async fn write(app: &mut App, req: String) -> String { + let res = app.raft.client_write(decode(&req)).await; + encode(res) +} + +pub async fn read(app: &mut App, req: String) -> String { + let key: String = decode(&req); + + let ret = app.raft.ensure_linearizable().await; + + let res = match ret { + Ok(_) => { + let state_machine = app.state_machine.state_machine.lock().unwrap(); + let value = state_machine.data.get(&key).cloned(); + + let res: Result>> = + Ok(value.unwrap_or_default()); + res + } + Err(e) => Err(e), + }; + encode(res) +} + +// Raft API + +pub async fn vote(app: &mut App, req: String) -> String { + let res = app.raft.vote(decode(&req)).await; + encode(res) +} + +pub async fn append(app: &mut App, req: String) -> String { + let res = app.raft.append_entries(decode(&req)).await; + encode(res) +} + +/// Receive a snapshot and install it. +pub async fn snapshot(app: &mut App, req: String) -> String { + let (vote, snapshot_meta, snapshot_data): (typ::Vote, typ::SnapshotMeta, typ::SnapshotData) = decode(&req); + let snapshot = typ::Snapshot { + meta: snapshot_meta, + snapshot: Box::new(snapshot_data), + }; + let res = app + .raft + .install_complete_snapshot(vote, snapshot) + .await + .map_err(typ::RaftError::::Fatal); + encode(res) +} + +// Management API + +/// Add a node as **Learner**. +/// +/// A Learner receives log replication from the leader but does not vote. +/// This should be done before adding a node as a member into the cluster +/// (by calling `change-membership`) +pub async fn add_learner(app: &mut App, req: String) -> String { + let node_id: NodeId = decode(&req); + let node = BasicNode { addr: "".to_string() }; + let res = app.raft.add_learner(node_id, node, true).await; + encode(res) +} + +/// Changes specified learners to members, or remove members. +pub async fn change_membership(app: &mut App, req: String) -> String { + let node_ids: BTreeSet = decode(&req); + let res = app.raft.change_membership(node_ids, false).await; + encode(res) +} + +/// Initialize a single-node cluster. +pub async fn init(app: &mut App) -> String { + let mut nodes = BTreeMap::new(); + nodes.insert(app.id, BasicNode { addr: "".to_string() }); + let res = app.raft.initialize(nodes).await; + encode(res) +} + +/// Get the latest metrics of the cluster +pub async fn metrics(app: &mut App) -> String { + let metrics = app.raft.metrics().borrow().clone(); + + let res: Result, Infallible> = Ok(metrics); + encode(res) +} diff --git a/examples/raft-kv-memstore-generic-snapshot-data/src/app.rs b/examples/raft-kv-memstore-generic-snapshot-data/src/app.rs new file mode 100644 index 000000000..5d5a05fe6 --- /dev/null +++ b/examples/raft-kv-memstore-generic-snapshot-data/src/app.rs @@ -0,0 +1,73 @@ +use std::sync::Arc; + +use tokio::sync::mpsc; +use tokio::sync::oneshot; + +use crate::api; +use crate::router::Router; +use crate::typ; +use crate::NodeId; +use crate::StateMachineStore; + +pub type Path = String; +pub type Payload = String; +pub type ResponseTx = oneshot::Sender; +pub type RequestTx = mpsc::UnboundedSender<(Path, Payload, ResponseTx)>; + +/// Representation of an application state. +pub struct App { + pub id: NodeId, + pub raft: typ::Raft, + + /// Receive application requests, Raft protocol request or management requests. + pub rx: mpsc::UnboundedReceiver<(Path, Payload, ResponseTx)>, + pub router: Router, + + pub state_machine: Arc, +} + +impl App { + pub fn new(id: NodeId, raft: typ::Raft, router: Router, state_machine: Arc) -> Self { + let (tx, rx) = mpsc::unbounded_channel(); + + { + let mut targets = router.targets.lock().unwrap(); + targets.insert(id, tx); + } + + Self { + id, + raft, + rx, + router, + state_machine, + } + } + + pub async fn run(mut self) -> Option<()> { + loop { + let (path, payload, response_tx) = self.rx.recv().await?; + + let res = match path.as_str() { + // Application API + "/app/write" => api::write(&mut self, payload).await, + "/app/read" => api::read(&mut self, payload).await, + + // Raft API + "/raft/append" => api::append(&mut self, payload).await, + "/raft/snapshot" => api::snapshot(&mut self, payload).await, + "/raft/vote" => api::vote(&mut self, payload).await, + + // Management API + "/mng/add-learner" => api::add_learner(&mut self, payload).await, + "/mng/change-membership" => api::change_membership(&mut self, payload).await, + "/mng/init" => api::init(&mut self).await, + "/mng/metrics" => api::metrics(&mut self).await, + + _ => panic!("unknown path: {}", path), + }; + + response_tx.send(res).unwrap(); + } + } +} diff --git a/examples/raft-kv-memstore-generic-snapshot-data/src/lib.rs b/examples/raft-kv-memstore-generic-snapshot-data/src/lib.rs new file mode 100644 index 000000000..4d4c9d7ec --- /dev/null +++ b/examples/raft-kv-memstore-generic-snapshot-data/src/lib.rs @@ -0,0 +1,107 @@ +#![allow(clippy::uninlined_format_args)] +#![deny(unused_qualifications)] + +use std::sync::Arc; + +use openraft::BasicNode; +use openraft::Config; +use openraft::TokioRuntime; + +use crate::app::App; +use crate::router::Router; +use crate::store::Request; +use crate::store::Response; +use crate::store::StateMachineData; + +pub mod router; + +pub mod api; +pub mod app; +pub mod network; +pub mod store; + +pub type NodeId = u64; + +openraft::declare_raft_types!( + /// Declare the type configuration for example K/V store. + pub TypeConfig: + D = Request, + R = Response, + NodeId = NodeId, + Node = BasicNode, + Entry = openraft::Entry, + // In this example, snapshot is just a copy of the state machine. + // And it can be any type. + SnapshotData = StateMachineData, + AsyncRuntime = TokioRuntime +); + +pub type LogStore = crate::store::LogStore; +pub type StateMachineStore = crate::store::StateMachineStore; + +pub mod typ { + use openraft::BasicNode; + + use crate::NodeId; + use crate::TypeConfig; + + pub type Raft = openraft::Raft; + + pub type Vote = openraft::Vote; + pub type SnapshotMeta = openraft::SnapshotMeta; + pub type SnapshotData = ::SnapshotData; + pub type Snapshot = openraft::Snapshot; + + pub type Infallible = openraft::error::Infallible; + pub type Fatal = openraft::error::Fatal; + pub type RaftError = openraft::error::RaftError; + pub type RPCError = openraft::error::RPCError>; + pub type StreamingError = openraft::error::StreamingError; + + pub type RaftMetrics = openraft::RaftMetrics; + + pub type ClientWriteError = openraft::error::ClientWriteError; + pub type CheckIsLeaderError = openraft::error::CheckIsLeaderError; + pub type ForwardToLeader = openraft::error::ForwardToLeader; + pub type InitializeError = openraft::error::InitializeError; + + pub type ClientWriteResponse = openraft::raft::ClientWriteResponse; +} + +pub fn encode(t: T) -> String { + serde_json::to_string(&t).unwrap() +} + +pub fn decode(s: &str) -> T { + serde_json::from_str(s).unwrap() +} + +pub async fn new_raft(node_id: NodeId, router: Router) -> (typ::Raft, App) { + // Create a configuration for the raft instance. + let config = Config { + heartbeat_interval: 500, + election_timeout_min: 1500, + election_timeout_max: 3000, + // Once snapshot is built, delete the logs at once. + // So that all further replication will be based on the snapshot. + max_in_snapshot_log_to_keep: 0, + ..Default::default() + }; + + let config = Arc::new(config.validate().unwrap()); + + // Create a instance of where the Raft logs will be stored. + let log_store = Arc::new(LogStore::default()); + + // Create a instance of where the state machine data will be stored. + let state_machine_store = Arc::new(StateMachineStore::default()); + + // Create a local raft instance. + let raft = openraft::Raft::new(node_id, config, router.clone(), log_store, state_machine_store.clone()) + .await + .unwrap(); + + let app = App::new(node_id, raft.clone(), router, state_machine_store); + + (raft, app) +} diff --git a/examples/raft-kv-memstore-generic-snapshot-data/src/network.rs b/examples/raft-kv-memstore-generic-snapshot-data/src/network.rs new file mode 100644 index 000000000..7d1d59a02 --- /dev/null +++ b/examples/raft-kv-memstore-generic-snapshot-data/src/network.rs @@ -0,0 +1,76 @@ +use std::future::Future; + +use openraft::error::RemoteError; +use openraft::error::ReplicationClosed; +use openraft::network::RPCOption; +use openraft::raft::AppendEntriesRequest; +use openraft::raft::AppendEntriesResponse; +use openraft::raft::SnapshotResponse; +use openraft::raft::VoteRequest; +use openraft::raft::VoteResponse; +use openraft::OptionalSend; +use openraft::RaftNetwork; +use openraft::RaftNetworkFactory; +use openraft::Snapshot; +use openraft::Vote; + +use crate::router::Router; +use crate::typ; +use crate::BasicNode; +use crate::NodeId; +use crate::TypeConfig; + +pub struct Connection { + router: Router, + target: NodeId, +} + +impl RaftNetworkFactory for Router { + type Network = Connection; + + async fn new_client(&mut self, target: NodeId, _node: &BasicNode) -> Self::Network { + Connection { + router: self.clone(), + target, + } + } +} + +impl RaftNetwork for Connection { + async fn send_append_entries( + &mut self, + req: AppendEntriesRequest, + ) -> Result, typ::RPCError> { + let resp = self + .router + .send(self.target, "/raft/append", req) + .await + .map_err(|e| RemoteError::new(self.target, e))?; + Ok(resp) + } + + /// A real application should replace this method with customized implementation. + async fn snapshot( + &mut self, + vote: Vote, + snapshot: Snapshot, + _cancel: impl Future + OptionalSend, + _option: RPCOption, + ) -> Result, typ::StreamingError> { + let resp = self + .router + .send::<_, _, typ::Infallible>(self.target, "/raft/snapshot", (vote, snapshot.meta, snapshot.snapshot)) + .await + .map_err(|e| RemoteError::new(self.target, e.into_fatal().unwrap()))?; + Ok(resp) + } + + async fn send_vote(&mut self, req: VoteRequest) -> Result, typ::RPCError> { + let resp = self + .router + .send(self.target, "/raft/vote", req) + .await + .map_err(|e| RemoteError::new(self.target, e))?; + Ok(resp) + } +} diff --git a/examples/raft-kv-memstore-generic-snapshot-data/src/router.rs b/examples/raft-kv-memstore-generic-snapshot-data/src/router.rs new file mode 100644 index 000000000..c9bf0c54a --- /dev/null +++ b/examples/raft-kv-memstore-generic-snapshot-data/src/router.rs @@ -0,0 +1,44 @@ +use std::collections::BTreeMap; +use std::sync::Arc; +use std::sync::Mutex; + +use tokio::sync::oneshot; + +use crate::app::RequestTx; +use crate::decode; +use crate::encode; +use crate::typ::RaftError; +use crate::NodeId; + +/// Simulate a network router. +#[derive(Debug, Clone)] +#[derive(Default)] +pub struct Router { + pub targets: Arc>>, +} + +impl Router { + /// Send request `Req` to target node `to`, and wait for response `Result>`. + pub async fn send(&self, to: NodeId, path: &str, req: Req) -> Result> + where + Req: serde::Serialize, + Result>: serde::de::DeserializeOwned, + { + let (resp_tx, resp_rx) = oneshot::channel(); + + let encoded_req = encode(req); + tracing::debug!("send to: {}, {}, {}", to, path, encoded_req); + + { + let mut targets = self.targets.lock().unwrap(); + let tx = targets.get_mut(&to).unwrap(); + + tx.send((path.to_string(), encoded_req, resp_tx)).unwrap(); + } + + let resp_str = resp_rx.await.unwrap(); + tracing::debug!("resp from: {}, {}, {}", to, path, resp_str); + + decode::>>(&resp_str) + } +} diff --git a/examples/raft-kv-memstore-generic-snapshot-data/src/store.rs b/examples/raft-kv-memstore-generic-snapshot-data/src/store.rs new file mode 100644 index 000000000..98d36aa8b --- /dev/null +++ b/examples/raft-kv-memstore-generic-snapshot-data/src/store.rs @@ -0,0 +1,344 @@ +use std::collections::BTreeMap; +use std::fmt::Debug; +use std::ops::RangeBounds; +use std::sync::Arc; +use std::sync::Mutex; + +use openraft::storage::LogFlushed; +use openraft::storage::LogState; +use openraft::storage::RaftLogStorage; +use openraft::storage::RaftStateMachine; +use openraft::storage::Snapshot; +use openraft::BasicNode; +use openraft::Entry; +use openraft::EntryPayload; +use openraft::LogId; +use openraft::RaftLogReader; +use openraft::RaftSnapshotBuilder; +use openraft::RaftTypeConfig; +use openraft::SnapshotMeta; +use openraft::StorageError; +use openraft::StoredMembership; +use openraft::Vote; +use serde::Deserialize; +use serde::Serialize; + +use crate::typ; +use crate::NodeId; +use crate::TypeConfig; + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum Request { + Set { key: String, value: String }, +} + +impl Request { + pub fn set(key: impl ToString, value: impl ToString) -> Self { + Self::Set { + key: key.to_string(), + value: value.to_string(), + } + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct Response { + pub value: Option, +} + +#[derive(Debug)] +pub struct StoredSnapshot { + pub meta: SnapshotMeta, + + /// The data of the state machine at the time of this snapshot. + pub data: Box, +} + +/// Data contained in the Raft state machine. Note that we are using `serde` to serialize the +/// `data`, which has a implementation to be serialized. Note that for this test we set both the key +/// and value as String, but you could set any type of value that has the serialization impl. +#[derive(Serialize, Deserialize, Debug, Default, Clone)] +pub struct StateMachineData { + pub last_applied: Option>, + + pub last_membership: StoredMembership, + + /// Application data. + pub data: BTreeMap, +} + +/// Defines a state machine for the Raft cluster. This state machine represents a copy of the +/// data for this node. Additionally, it is responsible for storing the last snapshot of the data. +#[derive(Debug, Default)] +pub struct StateMachineStore { + /// The Raft state machine. + pub state_machine: Mutex, + + snapshot_idx: Mutex, + + /// The last received snapshot. + current_snapshot: Mutex>, +} + +#[derive(Debug, Default)] +pub struct LogStore { + last_purged_log_id: Mutex>>, + + /// The Raft log. + log: Mutex>>, + + committed: Mutex>>, + + /// The current granted vote. + vote: Mutex>>, +} + +impl RaftLogReader for Arc { + async fn try_get_log_entries + Clone + Debug>( + &mut self, + range: RB, + ) -> Result>, StorageError> { + let log = self.log.lock().unwrap(); + let response = log.range(range.clone()).map(|(_, val)| val.clone()).collect::>(); + Ok(response) + } +} + +impl RaftSnapshotBuilder for Arc { + #[tracing::instrument(level = "trace", skip(self))] + async fn build_snapshot(&mut self) -> Result, StorageError> { + let data; + let last_applied_log; + let last_membership; + + { + // Serialize the data of the state machine. + let state_machine = self.state_machine.lock().unwrap().clone(); + + last_applied_log = state_machine.last_applied; + last_membership = state_machine.last_membership.clone(); + data = state_machine; + } + + let snapshot_idx = { + let mut l = self.snapshot_idx.lock().unwrap(); + *l += 1; + *l + }; + + let snapshot_id = if let Some(last) = last_applied_log { + format!("{}-{}-{}", last.leader_id, last.index, snapshot_idx) + } else { + format!("--{}", snapshot_idx) + }; + + let meta = SnapshotMeta { + last_log_id: last_applied_log, + last_membership, + snapshot_id, + }; + + let snapshot = StoredSnapshot { + meta: meta.clone(), + data: Box::new(data.clone()), + }; + + { + let mut current_snapshot = self.current_snapshot.lock().unwrap(); + *current_snapshot = Some(snapshot); + } + + Ok(Snapshot { + meta, + snapshot: Box::new(data), + }) + } +} + +impl RaftStateMachine for Arc { + type SnapshotBuilder = Self; + + async fn applied_state( + &mut self, + ) -> Result<(Option>, StoredMembership), StorageError> { + let state_machine = self.state_machine.lock().unwrap(); + Ok((state_machine.last_applied, state_machine.last_membership.clone())) + } + + #[tracing::instrument(level = "trace", skip(self, entries))] + async fn apply(&mut self, entries: I) -> Result, StorageError> + where I: IntoIterator> { + let mut res = Vec::new(); //No `with_capacity`; do not know `len` of iterator + + let mut sm = self.state_machine.lock().unwrap(); + + for entry in entries { + tracing::debug!(%entry.log_id, "replicate to sm"); + + sm.last_applied = Some(entry.log_id); + + match entry.payload { + EntryPayload::Blank => res.push(Response { value: None }), + EntryPayload::Normal(ref req) => match req { + Request::Set { key, value, .. } => { + sm.data.insert(key.clone(), value.clone()); + res.push(Response { + value: Some(value.clone()), + }) + } + }, + EntryPayload::Membership(ref mem) => { + sm.last_membership = StoredMembership::new(Some(entry.log_id), mem.clone()); + res.push(Response { value: None }) + } + }; + } + Ok(res) + } + + #[tracing::instrument(level = "trace", skip(self))] + async fn begin_receiving_snapshot( + &mut self, + ) -> Result::SnapshotData>, StorageError> { + Ok(Box::default()) + } + + #[tracing::instrument(level = "trace", skip(self, snapshot))] + async fn install_snapshot( + &mut self, + meta: &SnapshotMeta, + snapshot: Box<::SnapshotData>, + ) -> Result<(), StorageError> { + tracing::info!("install snapshot"); + + let new_snapshot = StoredSnapshot { + meta: meta.clone(), + data: snapshot, + }; + + // Update the state machine. + { + let updated_state_machine: StateMachineData = *new_snapshot.data.clone(); + let mut state_machine = self.state_machine.lock().unwrap(); + *state_machine = updated_state_machine; + } + + // Update current snapshot. + let mut current_snapshot = self.current_snapshot.lock().unwrap(); + *current_snapshot = Some(new_snapshot); + Ok(()) + } + + #[tracing::instrument(level = "trace", skip(self))] + async fn get_current_snapshot(&mut self) -> Result>, StorageError> { + match &*self.current_snapshot.lock().unwrap() { + Some(snapshot) => { + let data = snapshot.data.clone(); + Ok(Some(Snapshot { + meta: snapshot.meta.clone(), + snapshot: data, + })) + } + None => Ok(None), + } + } + + async fn get_snapshot_builder(&mut self) -> Self::SnapshotBuilder { + self.clone() + } +} + +impl RaftLogStorage for Arc { + type LogReader = Self; + + async fn get_log_state(&mut self) -> Result, StorageError> { + let log = self.log.lock().unwrap(); + let last = log.iter().next_back().map(|(_, ent)| ent.log_id); + + let last_purged = *self.last_purged_log_id.lock().unwrap(); + + let last = match last { + None => last_purged, + Some(x) => Some(x), + }; + + Ok(LogState { + last_purged_log_id: last_purged, + last_log_id: last, + }) + } + + async fn save_committed(&mut self, committed: Option>) -> Result<(), StorageError> { + let mut c = self.committed.lock().unwrap(); + *c = committed; + Ok(()) + } + + async fn read_committed(&mut self) -> Result>, StorageError> { + let committed = self.committed.lock().unwrap(); + Ok(*committed) + } + + #[tracing::instrument(level = "trace", skip(self))] + async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { + let mut v = self.vote.lock().unwrap(); + *v = Some(*vote); + Ok(()) + } + + async fn read_vote(&mut self) -> Result>, StorageError> { + Ok(*self.vote.lock().unwrap()) + } + + #[tracing::instrument(level = "trace", skip(self, entries, callback))] + async fn append(&mut self, entries: I, callback: LogFlushed) -> Result<(), StorageError> + where I: IntoIterator> { + // Simple implementation that calls the flush-before-return `append_to_log`. + let mut log = self.log.lock().unwrap(); + for entry in entries { + log.insert(entry.log_id.index, entry); + } + callback.log_io_completed(Ok(())); + + Ok(()) + } + + #[tracing::instrument(level = "debug", skip(self))] + async fn truncate(&mut self, log_id: LogId) -> Result<(), StorageError> { + tracing::debug!("delete_log: [{:?}, +oo)", log_id); + + let mut log = self.log.lock().unwrap(); + let keys = log.range(log_id.index..).map(|(k, _v)| *k).collect::>(); + for key in keys { + log.remove(&key); + } + + Ok(()) + } + + #[tracing::instrument(level = "debug", skip(self))] + async fn purge(&mut self, log_id: LogId) -> Result<(), StorageError> { + tracing::debug!("delete_log: (-oo, {:?}]", log_id); + + { + let mut ld = self.last_purged_log_id.lock().unwrap(); + assert!(*ld <= Some(log_id)); + *ld = Some(log_id); + } + + { + let mut log = self.log.lock().unwrap(); + + let keys = log.range(..=log_id.index).map(|(k, _v)| *k).collect::>(); + for key in keys { + log.remove(&key); + } + } + + Ok(()) + } + + async fn get_log_reader(&mut self) -> Self::LogReader { + self.clone() + } +} diff --git a/examples/raft-kv-memstore-generic-snapshot-data/test-cluster.sh b/examples/raft-kv-memstore-generic-snapshot-data/test-cluster.sh new file mode 100755 index 000000000..9b582da4a --- /dev/null +++ b/examples/raft-kv-memstore-generic-snapshot-data/test-cluster.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +echo "No shell test script for this example" \ No newline at end of file diff --git a/examples/raft-kv-memstore-generic-snapshot-data/tests/cluster/main.rs b/examples/raft-kv-memstore-generic-snapshot-data/tests/cluster/main.rs new file mode 100644 index 000000000..5148911f9 --- /dev/null +++ b/examples/raft-kv-memstore-generic-snapshot-data/tests/cluster/main.rs @@ -0,0 +1,3 @@ +#![allow(clippy::uninlined_format_args)] + +mod test_cluster; diff --git a/examples/raft-kv-memstore-generic-snapshot-data/tests/cluster/test_cluster.rs b/examples/raft-kv-memstore-generic-snapshot-data/tests/cluster/test_cluster.rs new file mode 100644 index 000000000..9f14ee922 --- /dev/null +++ b/examples/raft-kv-memstore-generic-snapshot-data/tests/cluster/test_cluster.rs @@ -0,0 +1,137 @@ +use std::backtrace::Backtrace; +use std::collections::BTreeMap; +use std::panic::PanicInfo; +use std::time::Duration; + +use openraft::BasicNode; +use raft_kv_memstore_generic_snapshot_data::new_raft; +use raft_kv_memstore_generic_snapshot_data::router::Router; +use raft_kv_memstore_generic_snapshot_data::store::Request; +use raft_kv_memstore_generic_snapshot_data::typ; +use tokio::task; +use tokio::task::LocalSet; +use tracing_subscriber::EnvFilter; + +pub fn log_panic(panic: &PanicInfo) { + let backtrace = format!("{:?}", Backtrace::force_capture()); + + eprintln!("{}", panic); + + if let Some(location) = panic.location() { + tracing::error!( + message = %panic, + backtrace = %backtrace, + panic.file = location.file(), + panic.line = location.line(), + panic.column = location.column(), + ); + eprintln!("{}:{}:{}", location.file(), location.line(), location.column()); + } else { + tracing::error!(message = %panic, backtrace = %backtrace); + } + + eprintln!("{}", backtrace); +} + +/// This test shows how to transfer a snapshot from one node to another: +/// +/// - Setup a single node cluster, write some logs, take a snapshot; +/// - Add a learner node-2 to receive snapshot replication, via the complete-snapshot API: +/// - The sending end sends snapshot with `RaftNetwork::snapshot()`; +/// - The receiving end deliver the received snapshot to `Raft` with +/// `Raft::install_complete_snapshot()`. +#[tokio::test] +async fn test_cluster() { + std::panic::set_hook(Box::new(|panic| { + log_panic(panic); + })); + + tracing_subscriber::fmt() + .with_target(true) + .with_thread_ids(true) + .with_level(true) + .with_ansi(false) + .with_env_filter(EnvFilter::from_default_env()) + .init(); + + let router = Router::default(); + + let local = LocalSet::new(); + + let (raft1, app1) = new_raft(1, router.clone()).await; + let (raft2, app2) = new_raft(2, router.clone()).await; + + let rafts = [raft1, raft2]; + + local + .run_until(async move { + task::spawn_local(app1.run()); + task::spawn_local(app2.run()); + + run_test(&rafts, router).await; + }) + .await; +} + +async fn run_test(rafts: &[typ::Raft], router: Router) { + let _ = router; + + // Wait for server to start up. + tokio::time::sleep(Duration::from_millis(200)).await; + + let raft1 = &rafts[0]; + let raft2 = &rafts[1]; + + println!("=== init single node cluster"); + { + let mut nodes = BTreeMap::new(); + nodes.insert(1, BasicNode { addr: "".to_string() }); + raft1.initialize(nodes).await.unwrap(); + } + + println!("=== write 2 logs"); + { + let resp = raft1.client_write(Request::set("foo1", "bar1")).await.unwrap(); + println!("write resp: {:#?}", resp); + let resp = raft1.client_write(Request::set("foo2", "bar2")).await.unwrap(); + println!("write resp: {:#?}", resp); + } + + println!("=== let node-1 take a snapshot"); + { + raft1.trigger().snapshot().await.unwrap(); + + // Wait for a while to let the snapshot get done. + tokio::time::sleep(Duration::from_millis(500)).await; + } + + println!("=== metrics after building snapshot"); + { + let metrics = raft1.metrics().borrow().clone(); + println!("node 1 metrics: {:#?}", metrics); + assert_eq!(Some(3), metrics.snapshot.map(|x| x.index)); + assert_eq!(Some(3), metrics.purged.map(|x| x.index)); + } + + println!("=== add-learner node-2"); + { + let node = BasicNode { addr: "".to_string() }; + let resp = raft1.add_learner(2, node, true).await.unwrap(); + println!("add-learner node-2 resp: {:#?}", resp); + } + + // Wait for a while to let the node 2 to receive snapshot replication. + tokio::time::sleep(Duration::from_millis(500)).await; + + println!("=== metrics of node 2 that received snapshot"); + { + let metrics = raft2.metrics().borrow().clone(); + println!("node 2 metrics: {:#?}", metrics); + assert_eq!(Some(3), metrics.snapshot.map(|x| x.index)); + assert_eq!(Some(3), metrics.purged.map(|x| x.index)); + } + + // In this example, the snapshot is just a copy of the state machine. + let snapshot = raft2.get_snapshot().await.unwrap(); + println!("node 2 received snapshot: {:#?}", snapshot); +} diff --git a/openraft/Cargo.toml b/openraft/Cargo.toml index c96a0b4cc..4b3ad2d4b 100644 --- a/openraft/Cargo.toml +++ b/openraft/Cargo.toml @@ -87,11 +87,28 @@ storage-v2 = [] singlethreaded = ["macros/singlethreaded"] -# Permit the follower's log to roll back to an earlier state without causing the leader to panic. -# Although log state reversion is typically seen as a bug, enabling it can be useful for testing or other special scenarios. -# For instance, in an even number nodes cluster, erasing a node's data and then rebooting it(log reverts to empty) will not result in data loss. +# Permit the follower's log to roll back to an earlier state without causing the +# leader to panic. +# +# Although log state reversion is typically seen as a bug, enabling it can be +# useful for testing or other special scenarios. +# For instance, in an even number nodes cluster, erasing a node's data and then +# rebooting it(log reverts to empty) will not result in data loss. loosen-follower-log-revert = [] + +# Enable this feature flag to eliminate the `AsyncRead + AsyncWrite + AsyncSeek +# + Unpin` bound from `RaftTypeConfig::SnapshotData`. +# +# Enabling this feature allows applications to use a custom snapshot data format +# and transport fragmentation, diverging from the default implementation which +# typically relies on a single-file structure . +# +# By default it is off. +# This feature is introduced in 0.9.0 +generic-snapshot-data = [] + + # Enables "log" feature in `tracing` crate, to let tracing events emit log # record. # See: https://docs.rs/tracing/latest/tracing/#emitting-log-records diff --git a/openraft/src/core/mod.rs b/openraft/src/core/mod.rs index c10d9ddc3..d401e4707 100644 --- a/openraft/src/core/mod.rs +++ b/openraft/src/core/mod.rs @@ -12,11 +12,8 @@ pub(crate) mod raft_msg; mod replication_state; mod server_state; pub(crate) mod sm; -pub(crate) mod streaming_state; mod tick; -pub(crate) mod snapshot_state; - pub(crate) use raft_core::ApplyResult; pub(crate) use raft_core::ApplyingEntry; pub use raft_core::RaftCore; diff --git a/openraft/src/core/snapshot_state.rs b/openraft/src/core/snapshot_state.rs deleted file mode 100644 index 10c437ca5..000000000 --- a/openraft/src/core/snapshot_state.rs +++ /dev/null @@ -1,22 +0,0 @@ -use crate::LeaderId; -use crate::NodeId; -use crate::SnapshotId; - -/// A global unique id of install-snapshot request. -#[derive(Debug, Clone)] -#[derive(PartialEq, Eq)] -pub(crate) struct SnapshotRequestId { - pub(crate) leader_id: LeaderId, - pub(crate) snapshot_id: SnapshotId, - pub(crate) offset: u64, -} - -impl SnapshotRequestId { - pub(crate) fn new(leader_id: LeaderId, snapshot_id: SnapshotId, offset: u64) -> Self { - Self { - leader_id, - snapshot_id, - offset, - } - } -} diff --git a/openraft/src/core/streaming_state.rs b/openraft/src/core/streaming_state.rs deleted file mode 100644 index 53528e086..000000000 --- a/openraft/src/core/streaming_state.rs +++ /dev/null @@ -1,66 +0,0 @@ -use std::io::SeekFrom; - -use tokio::io::AsyncSeekExt; -use tokio::io::AsyncWriteExt; - -use crate::raft::InstallSnapshotRequest; -use crate::ErrorSubject; -use crate::ErrorVerb; -use crate::RaftTypeConfig; -use crate::SnapshotId; -use crate::StorageError; - -/// The Raft node is streaming in a snapshot from the leader. -pub(crate) struct Streaming -where C: RaftTypeConfig -{ - /// The offset of the last byte written to the snapshot. - pub(crate) offset: u64, - - /// The ID of the snapshot being written. - pub(crate) snapshot_id: SnapshotId, - - /// A handle to the snapshot writer. - pub(crate) snapshot_data: Box, -} - -impl Streaming -where C: RaftTypeConfig -{ - pub(crate) fn new(snapshot_id: SnapshotId, snapshot_data: Box) -> Self { - Self { - offset: 0, - snapshot_id, - snapshot_data, - } - } - - /// Receive a chunk of snapshot data. - pub(crate) async fn receive(&mut self, req: InstallSnapshotRequest) -> Result> { - // TODO: check id? - - // Always seek to the target offset if not an exact match. - if req.offset != self.offset { - if let Err(err) = self.snapshot_data.as_mut().seek(SeekFrom::Start(req.offset)).await { - return Err(StorageError::from_io_error( - ErrorSubject::Snapshot(Some(req.meta.signature())), - ErrorVerb::Seek, - err, - )); - } - self.offset = req.offset; - } - - // Write the next segment & update offset. - let res = self.snapshot_data.as_mut().write_all(&req.data).await; - if let Err(err) = res { - return Err(StorageError::from_io_error( - ErrorSubject::Snapshot(Some(req.meta.signature())), - ErrorVerb::Write, - err, - )); - } - self.offset += req.data.len() as u64; - Ok(req.done) - } -} diff --git a/openraft/src/docs/feature_flags/feature-flags.md b/openraft/src/docs/feature_flags/feature-flags.md index 79ed37a9b..a4eca5358 100644 --- a/openraft/src/docs/feature_flags/feature-flags.md +++ b/openraft/src/docs/feature_flags/feature-flags.md @@ -55,6 +55,35 @@ By default openraft enables no features. In order to use the feature, `AsyncRuntime::spawn` should invoke `tokio::task::spawn_local` or equivalents.

+- `generic-snapshot-data`: Enable this feature flag + to eliminate the `AsyncRead + AsyncWrite + AsyncSeek + Unpin` bound + from [`RaftTypeConfig::SnapshotData`](crate::RaftTypeConfig::SnapshotData) + Enabling this feature allows applications to use a custom snapshot data format and transport fragmentation, + diverging from the default implementation which typically relies on a single-file structure. + + By default, it is off. + This feature is introduced in 0.9.0 + + On the sending end (leader that sends snapshot to follower): + + - Without `generic-snapshot-data`: [`RaftNetwork::snapshot()`] + provides a default implementation that invokes the chunk-based API + [`RaftNetwork::install_snapshot()`] for transmit. + + - With `generic-snapshot-data` enabled: [`RaftNetwork::snapshot()`] + must be implemented to provide application customized snapshot transmission. + Application does not need to implement [`RaftNetwork::install_snapshot()`]. + + On the receiving end(follower): + + - `Raft::install_snapshot()` is available only when `generic-snapshot-data` is disabled. + + Refer to example `examples/raft-kv-memstore-generic-snapshot-data` with `generic-snapshot-data` enabled. +

+ - `tracing-log`: enables "log" feature in `tracing` crate, to let tracing events emit log record. See: [tracing doc: emitting-log-records](https://docs.rs/tracing/latest/tracing/#emitting-log-records) + +[`RaftNetwork::snapshot()`]: crate::network::RaftNetwork::snapshot +[`RaftNetwork::install_snapshot()`]: crate::network::RaftNetwork::install_snapshot \ No newline at end of file diff --git a/openraft/src/network/mod.rs b/openraft/src/network/mod.rs index 1552c273d..c2c7a2ec1 100644 --- a/openraft/src/network/mod.rs +++ b/openraft/src/network/mod.rs @@ -5,7 +5,7 @@ mod factory; #[allow(clippy::module_inception)] mod network; mod rpc_option; mod rpc_type; -pub(crate) mod stream_snapshot; +#[cfg(not(feature = "generic-snapshot-data"))] pub(crate) mod stream_snapshot; pub use backoff::Backoff; pub use factory::RaftNetworkFactory; diff --git a/openraft/src/network/network.rs b/openraft/src/network/network.rs index 8ca51cfb7..de3dd5f3a 100644 --- a/openraft/src/network/network.rs +++ b/openraft/src/network/network.rs @@ -10,8 +10,6 @@ use crate::error::RaftError; use crate::error::ReplicationClosed; use crate::error::StreamingError; use crate::network::rpc_option::RPCOption; -use crate::network::stream_snapshot; -use crate::network::stream_snapshot::SnapshotTransport; use crate::network::Backoff; use crate::raft::AppendEntriesRequest; use crate::raft::AppendEntriesResponse; @@ -108,8 +106,21 @@ where C: RaftTypeConfig cancel: impl Future + OptionalSend, option: RPCOption, ) -> Result, StreamingError>> { - let resp = stream_snapshot::Chunked::send_snapshot(self, vote, snapshot, cancel, option).await?; - Ok(resp) + #[cfg(not(feature = "generic-snapshot-data"))] + { + use crate::network::stream_snapshot; + use crate::network::stream_snapshot::SnapshotTransport; + + let resp = stream_snapshot::Chunked::send_snapshot(self, vote, snapshot, cancel, option).await?; + Ok(resp) + } + #[cfg(feature = "generic-snapshot-data")] + { + let _ = (vote, snapshot, cancel, option); + unimplemented!( + "no default implementation for RaftNetwork::snapshot() if `generic-snapshot-data` feature is enabled" + ) + } } /// Send an AppendEntries RPC to the target Raft node (ยง5). diff --git a/openraft/src/network/stream_snapshot.rs b/openraft/src/network/stream_snapshot.rs index 4ecfdae34..c625ea52d 100644 --- a/openraft/src/network/stream_snapshot.rs +++ b/openraft/src/network/stream_snapshot.rs @@ -1,19 +1,13 @@ use std::future::Future; use std::io::SeekFrom; -use std::pin::Pin; use std::time::Duration; use futures::FutureExt; use macros::add_async_trait; -use tokio::io::AsyncRead; use tokio::io::AsyncReadExt; -use tokio::io::AsyncSeek; use tokio::io::AsyncSeekExt; -use tokio::io::AsyncWrite; use tokio::io::AsyncWriteExt; -use crate::core::snapshot_state::SnapshotRequestId; -use crate::core::streaming_state::Streaming; use crate::error::Fatal; use crate::error::ReplicationClosed; use crate::error::StreamingError; @@ -25,10 +19,10 @@ use crate::AsyncRuntime; use crate::ErrorSubject; use crate::ErrorVerb; use crate::OptionalSend; -use crate::OptionalSync; use crate::RaftNetwork; use crate::RaftTypeConfig; use crate::Snapshot; +use crate::SnapshotId; use crate::StorageError; use crate::StorageIOError; use crate::ToStorageResult; @@ -60,9 +54,7 @@ pub(crate) trait SnapshotTransport { /// Send and Receive snapshot by chunks. pub(crate) struct Chunked {} -impl SnapshotTransport for Chunked -where C::SnapshotData: AsyncRead + AsyncWrite + AsyncSeek + OptionalSend + OptionalSync + Unpin + 'static -{ +impl SnapshotTransport for Chunked { /// Stream snapshot by chunks. /// /// This function is for backward compatibility and provides a default implement for @@ -89,12 +81,10 @@ where C::SnapshotData: AsyncRead + AsyncWrite + AsyncSeek + OptionalSend + Optio let mut offset = 0; let end = snapshot.snapshot.seek(SeekFrom::End(0)).await.sto_res(subject_verb)?; + let mut c = std::pin::pin!(cancel); loop { - // Safety: `cancel` is a future that is polled only by this function. - let c = unsafe { Pin::new_unchecked(&mut cancel) }; - // If canceled, return at once - if let Some(err) = c.now_or_never() { + if let Some(err) = c.as_mut().now_or_never() { return Err(err.into()); } @@ -173,23 +163,15 @@ where C::SnapshotData: AsyncRead + AsyncWrite + AsyncSeek + OptionalSend + Optio ) -> Result>, StorageError> { let snapshot_meta = req.meta.clone(); let done = req.done; - let offset = req.offset; - - let req_id = SnapshotRequestId::new(*req.vote.leader_id(), snapshot_meta.snapshot_id.clone(), offset); - tracing::info!( - req = display(&req), - snapshot_req_id = debug(&req_id), - "{}", - func_name!() - ); + tracing::info!(req = display(&req), "{}", func_name!()); { let s = streaming.as_mut().unwrap(); s.receive(req).await?; } - tracing::info!(snapshot_req_id = debug(&req_id), "received snapshot chunk"); + tracing::info!("Done received snapshot chunk"); if done { let streaming = streaming.take().unwrap(); @@ -207,3 +189,58 @@ where C::SnapshotData: AsyncRead + AsyncWrite + AsyncSeek + OptionalSend + Optio Ok(None) } } + +/// The Raft node is streaming in a snapshot from the leader. +pub(crate) struct Streaming +where C: RaftTypeConfig +{ + /// The offset of the last byte written to the snapshot. + pub(crate) offset: u64, + + /// The ID of the snapshot being written. + pub(crate) snapshot_id: SnapshotId, + + /// A handle to the snapshot writer. + pub(crate) snapshot_data: Box, +} + +impl Streaming +where C: RaftTypeConfig +{ + pub(crate) fn new(snapshot_id: SnapshotId, snapshot_data: Box) -> Self { + Self { + offset: 0, + snapshot_id, + snapshot_data, + } + } + + /// Receive a chunk of snapshot data. + pub(crate) async fn receive(&mut self, req: InstallSnapshotRequest) -> Result> { + // TODO: check id? + + // Always seek to the target offset if not an exact match. + if req.offset != self.offset { + if let Err(err) = self.snapshot_data.as_mut().seek(SeekFrom::Start(req.offset)).await { + return Err(StorageError::from_io_error( + ErrorSubject::Snapshot(Some(req.meta.signature())), + ErrorVerb::Seek, + err, + )); + } + self.offset = req.offset; + } + + // Write the next segment & update offset. + let res = self.snapshot_data.as_mut().write_all(&req.data).await; + if let Err(err) = res { + return Err(StorageError::from_io_error( + ErrorSubject::Snapshot(Some(req.meta.signature())), + ErrorVerb::Write, + err, + )); + } + self.offset += req.data.len() as u64; + Ok(req.done) + } +} diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index aae4eea03..5972bdf55 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -42,7 +42,6 @@ use crate::core::raft_msg::external_command::ExternalCommand; use crate::core::raft_msg::RaftMsg; use crate::core::replication_lag; use crate::core::sm; -use crate::core::streaming_state::Streaming; use crate::core::RaftCore; use crate::core::Tick; use crate::engine::Engine; @@ -52,17 +51,13 @@ use crate::error::ClientWriteError; use crate::error::Fatal; use crate::error::HigherVote; use crate::error::InitializeError; -use crate::error::InstallSnapshotError; use crate::error::RaftError; -use crate::error::SnapshotMismatch; use crate::membership::IntoNodes; use crate::metrics::RaftDataMetrics; use crate::metrics::RaftMetrics; use crate::metrics::RaftServerMetrics; use crate::metrics::Wait; use crate::metrics::WaitError; -use crate::network::stream_snapshot::Chunked; -use crate::network::stream_snapshot::SnapshotTransport; use crate::network::RaftNetworkFactory; use crate::raft::raft_inner::RaftInner; use crate::raft::runtime_config_handle::RuntimeConfigHandle; @@ -78,7 +73,6 @@ use crate::MessageSummary; use crate::RaftState; pub use crate::RaftTypeConfig; use crate::Snapshot; -use crate::SnapshotSegmentId; use crate::StorageHelper; use crate::Vote; @@ -261,6 +255,8 @@ where C: RaftTypeConfig rx_server_metrics, tx_shutdown: Mutex::new(Some(tx_shutdown)), core_state: Mutex::new(CoreState::Running(core_handle)), + + #[cfg(not(feature = "generic-snapshot-data"))] snapshot: Mutex::new(None), }; @@ -415,11 +411,18 @@ where C: RaftTypeConfig /// /// If receiving is finished `done == true`, it installs the snapshot to the state machine. /// Nothing will be done if the input snapshot is older than the state machine. + #[cfg(not(feature = "generic-snapshot-data"))] #[tracing::instrument(level = "debug", skip_all)] pub async fn install_snapshot( &self, req: InstallSnapshotRequest, - ) -> Result, RaftError> { + ) -> Result, RaftError> + where + C::SnapshotData: tokio::io::AsyncRead + tokio::io::AsyncWrite + tokio::io::AsyncSeek + Unpin, + { + use crate::network::stream_snapshot::Chunked; + use crate::network::stream_snapshot::SnapshotTransport; + tracing::debug!(req = display(&req), "Raft::install_snapshot()"); let req_vote = req.vote; @@ -431,12 +434,12 @@ where C: RaftTypeConfig if curr_id != Some(&req.meta.snapshot_id) { if req.offset != 0 { - let mismatch = InstallSnapshotError::SnapshotMismatch(SnapshotMismatch { - expect: SnapshotSegmentId { + let mismatch = crate::error::InstallSnapshotError::SnapshotMismatch(crate::error::SnapshotMismatch { + expect: crate::SnapshotSegmentId { id: snapshot_id.clone(), offset: 0, }, - got: SnapshotSegmentId { + got: crate::SnapshotSegmentId { id: snapshot_id.clone(), offset: req.offset, }, @@ -456,7 +459,10 @@ where C: RaftTypeConfig } } }; - *streaming = Some(Streaming::new(req.meta.snapshot_id.clone(), snapshot_data)); + *streaming = Some(crate::network::stream_snapshot::Streaming::new( + req.meta.snapshot_id.clone(), + snapshot_data, + )); } let snapshot = Chunked::receive_snapshot(&mut *streaming, req).await?; diff --git a/openraft/src/raft/raft_inner.rs b/openraft/src/raft/raft_inner.rs index 2141fa5a8..ceecf7f4c 100644 --- a/openraft/src/raft/raft_inner.rs +++ b/openraft/src/raft/raft_inner.rs @@ -11,7 +11,6 @@ use tracing::Level; use crate::config::RuntimeConfig; use crate::core::raft_msg::external_command::ExternalCommand; use crate::core::raft_msg::RaftMsg; -use crate::core::streaming_state::Streaming; use crate::core::TickHandle; use crate::error::Fatal; use crate::error::RaftError; @@ -44,7 +43,8 @@ where C: RaftTypeConfig pub(in crate::raft) core_state: Mutex>, /// The ongoing snapshot transmission. - pub(in crate::raft) snapshot: Mutex>>, + #[cfg(not(feature = "generic-snapshot-data"))] + pub(in crate::raft) snapshot: Mutex>>, } impl RaftInner diff --git a/openraft/src/storage/mod.rs b/openraft/src/storage/mod.rs index f17fcb1ef..742370157 100644 --- a/openraft/src/storage/mod.rs +++ b/openraft/src/storage/mod.rs @@ -113,6 +113,7 @@ where C: RaftTypeConfig impl Snapshot where C: RaftTypeConfig { + #[allow(dead_code)] pub(crate) fn new(meta: SnapshotMeta, snapshot: Box) -> Self { Self { meta, snapshot } } diff --git a/openraft/src/type_config.rs b/openraft/src/type_config.rs index 421f87539..7271d4662 100644 --- a/openraft/src/type_config.rs +++ b/openraft/src/type_config.rs @@ -1,9 +1,5 @@ use std::fmt::Debug; -use tokio::io::AsyncRead; -use tokio::io::AsyncSeek; -use tokio::io::AsyncWrite; - use crate::entry::FromAppData; use crate::entry::RaftEntry; use crate::AppData; @@ -58,11 +54,20 @@ pub trait RaftTypeConfig: /// Raft log entry, which can be built from an AppData. type Entry: RaftEntry + FromAppData; + // TODO: fix the doc address /// Snapshot data for exposing a snapshot for reading & writing. /// /// See the [storage chapter of the guide](https://datafuselabs.github.io/openraft/getting-started.html#implement-raftstorage) /// for details on where and how this is used. - type SnapshotData: AsyncRead + AsyncWrite + AsyncSeek + OptionalSend + OptionalSync + Unpin + 'static; + #[cfg(not(feature = "generic-snapshot-data"))] + type SnapshotData: tokio::io::AsyncRead + + tokio::io::AsyncWrite + + tokio::io::AsyncSeek + + OptionalSend + + Unpin + + 'static; + #[cfg(feature = "generic-snapshot-data")] + type SnapshotData: OptionalSend + 'static; /// Asynchronous runtime type. type AsyncRuntime: AsyncRuntime;