diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index ca4ff705d..6b1d9d7b3 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -405,6 +405,7 @@ jobs: - "nightly" example: - "raft-kv-memstore" + - "raft-kv-memstore-general-snapshot-data" - "raft-kv-memstore-singlethreaded" - "raft-kv-rocksdb" diff --git a/Cargo.toml b/Cargo.toml index 7ea4db3db..5fc0c4baf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,5 +58,6 @@ exclude = [ "stores/rocksstore-v2", "examples/raft-kv-memstore", "examples/raft-kv-memstore-singlethreaded", + "examples/raft-kv-memstore-general-snapshot-data", "examples/raft-kv-rocksdb", ] diff --git a/examples/raft-kv-memstore-general-snapshot-data/.gitignore b/examples/raft-kv-memstore-general-snapshot-data/.gitignore new file mode 100644 index 000000000..cb4025390 --- /dev/null +++ b/examples/raft-kv-memstore-general-snapshot-data/.gitignore @@ -0,0 +1,5 @@ +target +vendor +.idea + +/*.log diff --git a/examples/raft-kv-memstore-general-snapshot-data/Cargo.toml b/examples/raft-kv-memstore-general-snapshot-data/Cargo.toml new file mode 100644 index 000000000..a6714959a --- /dev/null +++ b/examples/raft-kv-memstore-general-snapshot-data/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "raft-kv-memstore-general-snapshot-data" +version = "0.1.0" +readme = "README.md" + +edition = "2021" +authors = [ + "drdr xp ", + "Pedro Paulo de Amorim ", +] +categories = ["algorithms", "asynchronous", "data-structures"] +description = "An example distributed key-value store built upon `openraft`." +homepage = "https://github.com/datafuselabs/openraft" +keywords = ["raft", "consensus"] +license = "MIT OR Apache-2.0" +repository = "https://github.com/datafuselabs/openraft" + +[dependencies] +openraft = { path = "../../openraft", features = ["serde", "storage-v2", "general-snapshot-data"] } + +clap = { version = "4.1.11", features = ["derive", "env"] } +reqwest = { version = "0.11.9", features = ["json"] } +serde = { version = "1.0.114", features = ["derive"] } +serde_json = "1.0.57" +tokio = { version = "1.0", default-features = false, features = ["sync"] } +tracing = "0.1.29" +tracing-subscriber = { version = "0.3.0", features = ["env-filter"] } + +[dev-dependencies] +maplit = "1.0.2" + +[features] + +[package.metadata.docs.rs] +all-features = true diff --git a/examples/raft-kv-memstore-general-snapshot-data/README.md b/examples/raft-kv-memstore-general-snapshot-data/README.md new file mode 100644 index 000000000..6f9e1eadd --- /dev/null +++ b/examples/raft-kv-memstore-general-snapshot-data/README.md @@ -0,0 +1,17 @@ +# Example Openraft kv-store with `general-snapshot-data` enabled + +With `general-snapshot-data` feature flag enabled, Openraft allows application to use any data type for snapshot data, +instead of a single-file like data format with `AsyncSeek + AsyncRead + AsyncWrite + Unpin` bounds. + +This example is similar to the basic raft-kv-memstore example +but focuses on how to handle snapshot with `general-snapshot-data` enabled. +Other aspects are minimized. + +To send a complete snapshot, Refer to implementation of `RaftNetwork::snapshot()` in this example. + +To receive a complete snapshot, Refer to implementation of `api::snapshot()` in this example. + + +## Run it + +Run it with `cargo test -- --nocaputre`. \ No newline at end of file diff --git a/examples/raft-kv-memstore-general-snapshot-data/src/api.rs b/examples/raft-kv-memstore-general-snapshot-data/src/api.rs new file mode 100644 index 000000000..2e87f16d6 --- /dev/null +++ b/examples/raft-kv-memstore-general-snapshot-data/src/api.rs @@ -0,0 +1,104 @@ +//! This mod implements a network API for raft node. + +use std::collections::BTreeMap; +use std::collections::BTreeSet; + +use openraft::error::CheckIsLeaderError; +use openraft::error::Infallible; +use openraft::error::RaftError; +use openraft::BasicNode; +use openraft::RaftMetrics; + +use crate::app::App; +use crate::decode; +use crate::encode; +use crate::typ; +use crate::NodeId; + +pub async fn write(app: &mut App, req: String) -> String { + let res = app.raft.client_write(decode(&req)).await; + encode(res) +} + +pub async fn read(app: &mut App, req: String) -> String { + let key: String = decode(&req); + + let ret = app.raft.ensure_linearizable().await; + + let res = match ret { + Ok(_) => { + let state_machine = app.state_machine.state_machine.lock().unwrap(); + let value = state_machine.data.get(&key).cloned(); + + let res: Result>> = + Ok(value.unwrap_or_default()); + res + } + Err(e) => Err(e), + }; + encode(res) +} + +// Raft API + +pub async fn vote(app: &mut App, req: String) -> String { + let res = app.raft.vote(decode(&req)).await; + encode(res) +} + +pub async fn append(app: &mut App, req: String) -> String { + let res = app.raft.append_entries(decode(&req)).await; + encode(res) +} + +/// Receive a snapshot and install it. +pub async fn snapshot(app: &mut App, req: String) -> String { + let (vote, snapshot_meta, snapshot_data): (typ::Vote, typ::SnapshotMeta, typ::SnapshotData) = decode(&req); + let snapshot = typ::Snapshot { + meta: snapshot_meta, + snapshot: Box::new(snapshot_data), + }; + let res = app + .raft + .install_complete_snapshot(vote, snapshot) + .await + .map_err(|e| typ::RaftError::::Fatal(e)); + encode(res) +} + +// Management API + +/// Add a node as **Learner**. +/// +/// A Learner receives log replication from the leader but does not vote. +/// This should be done before adding a node as a member into the cluster +/// (by calling `change-membership`) +pub async fn add_learner(app: &mut App, req: String) -> String { + let node_id: NodeId = decode(&req); + let node = BasicNode { addr: "".to_string() }; + let res = app.raft.add_learner(node_id, node, true).await; + encode(res) +} + +/// Changes specified learners to members, or remove members. +pub async fn change_membership(app: &mut App, req: String) -> String { + let node_ids: BTreeSet = decode(&req); + let res = app.raft.change_membership(node_ids, false).await; + encode(res) +} + +/// Initialize a single-node cluster. +pub async fn init(app: &mut App) -> String { + let mut nodes = BTreeMap::new(); + nodes.insert(app.id, BasicNode { addr: "".to_string() }); + let res = app.raft.initialize(nodes).await; + encode(res) +} + +/// Get the latest metrics of the cluster +pub async fn metrics(app: &mut App) -> String { + let metrics = app.raft.metrics().borrow().clone(); + + let res: Result, Infallible> = Ok(metrics); + encode(res) +} diff --git a/examples/raft-kv-memstore-general-snapshot-data/src/app.rs b/examples/raft-kv-memstore-general-snapshot-data/src/app.rs new file mode 100644 index 000000000..5d5a05fe6 --- /dev/null +++ b/examples/raft-kv-memstore-general-snapshot-data/src/app.rs @@ -0,0 +1,73 @@ +use std::sync::Arc; + +use tokio::sync::mpsc; +use tokio::sync::oneshot; + +use crate::api; +use crate::router::Router; +use crate::typ; +use crate::NodeId; +use crate::StateMachineStore; + +pub type Path = String; +pub type Payload = String; +pub type ResponseTx = oneshot::Sender; +pub type RequestTx = mpsc::UnboundedSender<(Path, Payload, ResponseTx)>; + +/// Representation of an application state. +pub struct App { + pub id: NodeId, + pub raft: typ::Raft, + + /// Receive application requests, Raft protocol request or management requests. + pub rx: mpsc::UnboundedReceiver<(Path, Payload, ResponseTx)>, + pub router: Router, + + pub state_machine: Arc, +} + +impl App { + pub fn new(id: NodeId, raft: typ::Raft, router: Router, state_machine: Arc) -> Self { + let (tx, rx) = mpsc::unbounded_channel(); + + { + let mut targets = router.targets.lock().unwrap(); + targets.insert(id, tx); + } + + Self { + id, + raft, + rx, + router, + state_machine, + } + } + + pub async fn run(mut self) -> Option<()> { + loop { + let (path, payload, response_tx) = self.rx.recv().await?; + + let res = match path.as_str() { + // Application API + "/app/write" => api::write(&mut self, payload).await, + "/app/read" => api::read(&mut self, payload).await, + + // Raft API + "/raft/append" => api::append(&mut self, payload).await, + "/raft/snapshot" => api::snapshot(&mut self, payload).await, + "/raft/vote" => api::vote(&mut self, payload).await, + + // Management API + "/mng/add-learner" => api::add_learner(&mut self, payload).await, + "/mng/change-membership" => api::change_membership(&mut self, payload).await, + "/mng/init" => api::init(&mut self).await, + "/mng/metrics" => api::metrics(&mut self).await, + + _ => panic!("unknown path: {}", path), + }; + + response_tx.send(res).unwrap(); + } + } +} diff --git a/examples/raft-kv-memstore-general-snapshot-data/src/lib.rs b/examples/raft-kv-memstore-general-snapshot-data/src/lib.rs new file mode 100644 index 000000000..05006bd4c --- /dev/null +++ b/examples/raft-kv-memstore-general-snapshot-data/src/lib.rs @@ -0,0 +1,107 @@ +#![allow(clippy::uninlined_format_args)] +#![deny(unused_qualifications)] + +use std::sync::Arc; + +use openraft::BasicNode; +use openraft::Config; +use openraft::TokioRuntime; + +use crate::app::App; +use crate::router::Router; +use crate::store::Request; +use crate::store::Response; +use crate::store::StateMachineData; + +pub mod router; + +pub mod api; +pub mod app; +pub mod network; +pub mod store; + +pub type NodeId = u64; + +openraft::declare_raft_types!( + /// Declare the type configuration for example K/V store. + pub TypeConfig: + D = Request, + R = Response, + NodeId = NodeId, + Node = BasicNode, + Entry = openraft::Entry, + // In this example, snapshot is just a copy of the state machine. + SnapshotData = StateMachineData, + AsyncRuntime = TokioRuntime +); + +pub type LogStore = crate::store::LogStore; +pub type StateMachineStore = crate::store::StateMachineStore; + +pub mod typ { + use openraft::BasicNode; + + use crate::NodeId; + use crate::TypeConfig; + + pub type Raft = openraft::Raft; + + pub type Vote = openraft::Vote; + pub type SnapshotMeta = openraft::SnapshotMeta; + pub type SnapshotData = ::SnapshotData; + pub type Snapshot = openraft::Snapshot; + + pub type Infallible = openraft::error::Infallible; + pub type Fatal = openraft::error::Fatal; + pub type RaftError = openraft::error::RaftError; + pub type RPCError = openraft::error::RPCError>; + pub type StreamingError = openraft::error::StreamingError; + + pub type RaftMetrics = openraft::RaftMetrics; + + pub type ClientWriteError = openraft::error::ClientWriteError; + pub type CheckIsLeaderError = openraft::error::CheckIsLeaderError; + pub type ForwardToLeader = openraft::error::ForwardToLeader; + pub type InitializeError = openraft::error::InitializeError; + + pub type ClientWriteResponse = openraft::raft::ClientWriteResponse; +} + +pub fn encode(t: T) -> String { + serde_json::to_string(&t).unwrap() +} + +pub fn decode(s: &str) -> T { + serde_json::from_str(s).unwrap() +} + +pub async fn new_raft(node_id: NodeId, router: Router) -> (typ::Raft, App) { + // Create a configuration for the raft instance. + let config = Config { + heartbeat_interval: 500, + election_timeout_min: 1500, + election_timeout_max: 3000, + // Once snapshot is built, delete the logs at once. + max_in_snapshot_log_to_keep: 0, + ..Default::default() + }; + + let config = Arc::new(config.validate().unwrap()); + + // Create a instance of where the Raft logs will be stored. + let log_store = Arc::new(LogStore::default()); + + // Create a instance of where the state machine data will be stored. + let state_machine_store = Arc::new(StateMachineStore::default()); + + // Create a local raft instance. + let raft = openraft::Raft::new(node_id, config, router.clone(), log_store, state_machine_store.clone()) + .await + .unwrap(); + + // Create an application that will store all the instances created above, this will + // later be used on the actix-web services. + let app = App::new(node_id, raft.clone(), router, state_machine_store); + + (raft, app) +} diff --git a/examples/raft-kv-memstore-general-snapshot-data/src/network.rs b/examples/raft-kv-memstore-general-snapshot-data/src/network.rs new file mode 100644 index 000000000..3c783addc --- /dev/null +++ b/examples/raft-kv-memstore-general-snapshot-data/src/network.rs @@ -0,0 +1,79 @@ +use std::future::Future; + + + +use openraft::error::RemoteError; +use openraft::error::ReplicationClosed; +use openraft::network::RPCOption; +use openraft::raft::AppendEntriesRequest; +use openraft::raft::AppendEntriesResponse; + + +use openraft::raft::SnapshotResponse; +use openraft::raft::VoteRequest; +use openraft::raft::VoteResponse; +use openraft::OptionalSend; +use openraft::RaftNetwork; +use openraft::RaftNetworkFactory; +use openraft::Snapshot; +use openraft::Vote; + +use crate::router::Router; +use crate::typ; +use crate::BasicNode; +use crate::NodeId; +use crate::TypeConfig; + +pub struct Connection { + router: Router, + target: NodeId, +} + +impl RaftNetworkFactory for Router { + type Network = Connection; + + async fn new_client(&mut self, target: NodeId, _node: &BasicNode) -> Self::Network { + Connection { + router: self.clone(), + target, + } + } +} + +impl RaftNetwork for Connection { + async fn send_append_entries( + &mut self, + req: AppendEntriesRequest, + ) -> Result, typ::RPCError> { + let resp = self + .router + .send(self.target, "/raft/append", req) + .await + .map_err(|e| RemoteError::new(self.target, e))?; + Ok(resp) + } + + async fn snapshot( + &mut self, + vote: Vote, + snapshot: Snapshot, + _cancel: impl Future + OptionalSend, + _option: RPCOption, + ) -> Result, typ::StreamingError> { + let resp = self + .router + .send::<_, _, typ::Infallible>(self.target, "/raft/snapshot", (vote, snapshot.meta, snapshot.snapshot)) + .await + .map_err(|e| RemoteError::new(self.target, e.into_fatal().unwrap()))?; + Ok(resp) + } + + async fn send_vote(&mut self, req: VoteRequest) -> Result, typ::RPCError> { + let resp = self + .router + .send(self.target, "/raft/vote", req) + .await + .map_err(|e| RemoteError::new(self.target, e))?; + Ok(resp) + } +} diff --git a/examples/raft-kv-memstore-general-snapshot-data/src/router.rs b/examples/raft-kv-memstore-general-snapshot-data/src/router.rs new file mode 100644 index 000000000..8d8c54806 --- /dev/null +++ b/examples/raft-kv-memstore-general-snapshot-data/src/router.rs @@ -0,0 +1,46 @@ + +use std::collections::BTreeMap; + +use std::sync::Arc; +use std::sync::Mutex; + +use tokio::sync::oneshot; + +use crate::app::RequestTx; +use crate::decode; +use crate::encode; +use crate::typ::RaftError; +use crate::NodeId; + +/// Simulate a network router. +#[derive(Debug, Clone)] +#[derive(Default)] +pub struct Router { + pub targets: Arc>>, +} + +impl Router { + /// Send request `Req` to target node `to`, and wait for response `Result>`. + pub async fn send(&self, to: NodeId, path: &str, req: Req) -> Result> + where + Req: serde::Serialize, + Result>: serde::de::DeserializeOwned, + { + let (resp_tx, resp_rx) = oneshot::channel(); + + let encoded_req = encode(req); + tracing::debug!("send to: {}, {}, {}", to, path, encoded_req); + + { + let mut targets = self.targets.lock().unwrap(); + let tx = targets.get_mut(&to).unwrap(); + + tx.send((path.to_string(), encoded_req, resp_tx)).unwrap(); + } + + let resp_str = resp_rx.await.unwrap(); + tracing::debug!("resp from: {}, {}, {}", to, path, resp_str); + + decode::>>(&resp_str) + } +} diff --git a/examples/raft-kv-memstore-general-snapshot-data/src/store.rs b/examples/raft-kv-memstore-general-snapshot-data/src/store.rs new file mode 100644 index 000000000..ed0d1b353 --- /dev/null +++ b/examples/raft-kv-memstore-general-snapshot-data/src/store.rs @@ -0,0 +1,367 @@ + +use std::collections::BTreeMap; +use std::fmt::Debug; + + +use std::ops::RangeBounds; + +use std::sync::Arc; +use std::sync::Mutex; + +use openraft::storage::LogFlushed; +use openraft::storage::LogState; +use openraft::storage::RaftLogStorage; +use openraft::storage::RaftStateMachine; +use openraft::storage::Snapshot; +use openraft::BasicNode; +use openraft::Entry; +use openraft::EntryPayload; +use openraft::LogId; +use openraft::RaftLogReader; +use openraft::RaftSnapshotBuilder; +use openraft::RaftTypeConfig; +use openraft::SnapshotMeta; +use openraft::StorageError; +use openraft::StoredMembership; +use openraft::Vote; +use serde::Deserialize; +use serde::Serialize; + +use crate::typ; +use crate::NodeId; +use crate::TypeConfig; + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum Request { + Set { key: String, value: String }, +} + +impl Request { + pub fn set(key: impl ToString, value: impl ToString) -> Self { + Self::Set { + key: key.to_string(), + value: value.to_string(), + } + } +} + +#[cfg(test)] +mod tests { + use std::marker::PhantomData; + + use crate::store::Request; + + #[test] + fn test_serde() { + let a = Request::Set { + key: "foo".to_string(), + value: "bar".to_string(), + _p: PhantomData, + }; + + let b = serde_json::to_string(&a).unwrap(); + println!("{}", b); + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct Response { + pub value: Option, +} + +#[derive(Debug)] +pub struct StoredSnapshot { + pub meta: SnapshotMeta, + + /// The data of the state machine at the time of this snapshot. + pub data: Box, +} + +/// Data contained in the Raft state machine. Note that we are using `serde` to serialize the +/// `data`, which has a implementation to be serialized. Note that for this test we set both the key +/// and value as String, but you could set any type of value that has the serialization impl. +#[derive(Serialize, Deserialize, Debug, Default, Clone)] +pub struct StateMachineData { + pub last_applied: Option>, + + pub last_membership: StoredMembership, + + /// Application data. + pub data: BTreeMap, +} + +/// Defines a state machine for the Raft cluster. This state machine represents a copy of the +/// data for this node. Additionally, it is responsible for storing the last snapshot of the data. +#[derive(Debug, Default)] +pub struct StateMachineStore { + /// The Raft state machine. + pub state_machine: Mutex, + + snapshot_idx: Mutex, + + /// The last received snapshot. + current_snapshot: Mutex>, +} + +#[derive(Debug, Default)] +pub struct LogStore { + last_purged_log_id: Mutex>>, + + /// The Raft log. + log: Mutex>>, + + committed: Mutex>>, + + /// The current granted vote. + vote: Mutex>>, +} + +impl RaftLogReader for Arc { + async fn try_get_log_entries + Clone + Debug>( + &mut self, + range: RB, + ) -> Result>, StorageError> { + let log = self.log.lock().unwrap(); + let response = log.range(range.clone()).map(|(_, val)| val.clone()).collect::>(); + Ok(response) + } +} + +impl RaftSnapshotBuilder for Arc { + #[tracing::instrument(level = "trace", skip(self))] + async fn build_snapshot(&mut self) -> Result, StorageError> { + let data; + let last_applied_log; + let last_membership; + + { + // Serialize the data of the state machine. + let state_machine = self.state_machine.lock().unwrap().clone(); + + last_applied_log = state_machine.last_applied; + last_membership = state_machine.last_membership.clone(); + data = state_machine; + } + + let snapshot_idx = { + let mut l = self.snapshot_idx.lock().unwrap(); + *l += 1; + *l + }; + + let snapshot_id = if let Some(last) = last_applied_log { + format!("{}-{}-{}", last.leader_id, last.index, snapshot_idx) + } else { + format!("--{}", snapshot_idx) + }; + + let meta = SnapshotMeta { + last_log_id: last_applied_log, + last_membership, + snapshot_id, + }; + + let snapshot = StoredSnapshot { + meta: meta.clone(), + data: Box::new(data.clone()), + }; + + { + let mut current_snapshot = self.current_snapshot.lock().unwrap(); + *current_snapshot = Some(snapshot); + } + + Ok(Snapshot { + meta, + snapshot: Box::new(data), + }) + } +} + +impl RaftStateMachine for Arc { + type SnapshotBuilder = Self; + + async fn applied_state( + &mut self, + ) -> Result<(Option>, StoredMembership), StorageError> { + let state_machine = self.state_machine.lock().unwrap(); + Ok((state_machine.last_applied, state_machine.last_membership.clone())) + } + + #[tracing::instrument(level = "trace", skip(self, entries))] + async fn apply(&mut self, entries: I) -> Result, StorageError> + where I: IntoIterator> { + let mut res = Vec::new(); //No `with_capacity`; do not know `len` of iterator + + let mut sm = self.state_machine.lock().unwrap(); + + for entry in entries { + tracing::debug!(%entry.log_id, "replicate to sm"); + + sm.last_applied = Some(entry.log_id); + + match entry.payload { + EntryPayload::Blank => res.push(Response { value: None }), + EntryPayload::Normal(ref req) => match req { + Request::Set { key, value, .. } => { + sm.data.insert(key.clone(), value.clone()); + res.push(Response { + value: Some(value.clone()), + }) + } + }, + EntryPayload::Membership(ref mem) => { + sm.last_membership = StoredMembership::new(Some(entry.log_id), mem.clone()); + res.push(Response { value: None }) + } + }; + } + Ok(res) + } + + #[tracing::instrument(level = "trace", skip(self))] + async fn begin_receiving_snapshot( + &mut self, + ) -> Result::SnapshotData>, StorageError> { + Ok(Box::new(StateMachineData::default())) + } + + #[tracing::instrument(level = "trace", skip(self, snapshot))] + async fn install_snapshot( + &mut self, + meta: &SnapshotMeta, + snapshot: Box<::SnapshotData>, + ) -> Result<(), StorageError> { + tracing::info!("install snapshot"); + + let new_snapshot = StoredSnapshot { + meta: meta.clone(), + data: snapshot, + }; + + // Update the state machine. + { + let updated_state_machine: StateMachineData = *new_snapshot.data.clone(); + let mut state_machine = self.state_machine.lock().unwrap(); + *state_machine = updated_state_machine; + } + + // Update current snapshot. + let mut current_snapshot = self.current_snapshot.lock().unwrap(); + *current_snapshot = Some(new_snapshot); + Ok(()) + } + + #[tracing::instrument(level = "trace", skip(self))] + async fn get_current_snapshot(&mut self) -> Result>, StorageError> { + match &*self.current_snapshot.lock().unwrap() { + Some(snapshot) => { + let data = snapshot.data.clone(); + Ok(Some(Snapshot { + meta: snapshot.meta.clone(), + snapshot: data, + })) + } + None => Ok(None), + } + } + + async fn get_snapshot_builder(&mut self) -> Self::SnapshotBuilder { + self.clone() + } +} + +impl RaftLogStorage for Arc { + type LogReader = Self; + + async fn get_log_state(&mut self) -> Result, StorageError> { + let log = self.log.lock().unwrap(); + let last = log.iter().next_back().map(|(_, ent)| ent.log_id); + + let last_purged = *self.last_purged_log_id.lock().unwrap(); + + let last = match last { + None => last_purged, + Some(x) => Some(x), + }; + + Ok(LogState { + last_purged_log_id: last_purged, + last_log_id: last, + }) + } + + async fn save_committed(&mut self, committed: Option>) -> Result<(), StorageError> { + let mut c = self.committed.lock().unwrap(); + *c = committed; + Ok(()) + } + + async fn read_committed(&mut self) -> Result>, StorageError> { + let committed = self.committed.lock().unwrap(); + Ok(*committed) + } + + #[tracing::instrument(level = "trace", skip(self))] + async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { + let mut v = self.vote.lock().unwrap(); + *v = Some(*vote); + Ok(()) + } + + async fn read_vote(&mut self) -> Result>, StorageError> { + Ok(*self.vote.lock().unwrap()) + } + + #[tracing::instrument(level = "trace", skip(self, entries, callback))] + async fn append(&mut self, entries: I, callback: LogFlushed) -> Result<(), StorageError> + where I: IntoIterator> { + // Simple implementation that calls the flush-before-return `append_to_log`. + let mut log = self.log.lock().unwrap(); + for entry in entries { + log.insert(entry.log_id.index, entry); + } + callback.log_io_completed(Ok(())); + + Ok(()) + } + + #[tracing::instrument(level = "debug", skip(self))] + async fn truncate(&mut self, log_id: LogId) -> Result<(), StorageError> { + tracing::debug!("delete_log: [{:?}, +oo)", log_id); + + let mut log = self.log.lock().unwrap(); + let keys = log.range(log_id.index..).map(|(k, _v)| *k).collect::>(); + for key in keys { + log.remove(&key); + } + + Ok(()) + } + + #[tracing::instrument(level = "debug", skip(self))] + async fn purge(&mut self, log_id: LogId) -> Result<(), StorageError> { + tracing::debug!("delete_log: (-oo, {:?}]", log_id); + + { + let mut ld = self.last_purged_log_id.lock().unwrap(); + assert!(*ld <= Some(log_id)); + *ld = Some(log_id); + } + + { + let mut log = self.log.lock().unwrap(); + + let keys = log.range(..=log_id.index).map(|(k, _v)| *k).collect::>(); + for key in keys { + log.remove(&key); + } + } + + Ok(()) + } + + async fn get_log_reader(&mut self) -> Self::LogReader { + self.clone() + } +} diff --git a/examples/raft-kv-memstore-general-snapshot-data/test-cluster.sh b/examples/raft-kv-memstore-general-snapshot-data/test-cluster.sh new file mode 100755 index 000000000..9b582da4a --- /dev/null +++ b/examples/raft-kv-memstore-general-snapshot-data/test-cluster.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +echo "No shell test script for this example" \ No newline at end of file diff --git a/examples/raft-kv-memstore-general-snapshot-data/tests/cluster/main.rs b/examples/raft-kv-memstore-general-snapshot-data/tests/cluster/main.rs new file mode 100644 index 000000000..5148911f9 --- /dev/null +++ b/examples/raft-kv-memstore-general-snapshot-data/tests/cluster/main.rs @@ -0,0 +1,3 @@ +#![allow(clippy::uninlined_format_args)] + +mod test_cluster; diff --git a/examples/raft-kv-memstore-general-snapshot-data/tests/cluster/test_cluster.rs b/examples/raft-kv-memstore-general-snapshot-data/tests/cluster/test_cluster.rs new file mode 100644 index 000000000..fd1d0178d --- /dev/null +++ b/examples/raft-kv-memstore-general-snapshot-data/tests/cluster/test_cluster.rs @@ -0,0 +1,137 @@ +use std::backtrace::Backtrace; +use std::collections::BTreeMap; +use std::panic::PanicInfo; +use std::time::Duration; + +use openraft::BasicNode; +use raft_kv_memstore_general_snapshot_data::new_raft; +use raft_kv_memstore_general_snapshot_data::router::Router; +use raft_kv_memstore_general_snapshot_data::store::Request; +use raft_kv_memstore_general_snapshot_data::typ; +use tokio::task; +use tokio::task::LocalSet; +use tracing_subscriber::EnvFilter; + +pub fn log_panic(panic: &PanicInfo) { + let backtrace = format!("{:?}", Backtrace::force_capture()); + + eprintln!("{}", panic); + + if let Some(location) = panic.location() { + tracing::error!( + message = %panic, + backtrace = %backtrace, + panic.file = location.file(), + panic.line = location.line(), + panic.column = location.column(), + ); + eprintln!("{}:{}:{}", location.file(), location.line(), location.column()); + } else { + tracing::error!(message = %panic, backtrace = %backtrace); + } + + eprintln!("{}", backtrace); +} + +/// This test shows how to transfer a snapshot from one node to another: +/// +/// - Setup a single node cluster, write some logs, take a snapshot; +/// - Add a learner node-2 to receive snapshot replication, via the complete-snapshot API: +/// - The sending end sends snapshot with `RaftNetwork::snapshot()`; +/// - The receiving end deliver the received snapshot to `Raft` with +/// `Raft::install_complete_snapshot()`. +#[tokio::test] +async fn test_cluster() { + std::panic::set_hook(Box::new(|panic| { + log_panic(panic); + })); + + tracing_subscriber::fmt() + .with_target(true) + .with_thread_ids(true) + .with_level(true) + .with_ansi(false) + .with_env_filter(EnvFilter::from_default_env()) + .init(); + + let router = Router::default(); + + let local = LocalSet::new(); + + let (raft1, app1) = new_raft(1, router.clone()).await; + let (raft2, app2) = new_raft(2, router.clone()).await; + + let rafts = [raft1, raft2]; + + local + .run_until(async move { + task::spawn_local(app1.run()); + task::spawn_local(app2.run()); + + run_test(&rafts, router).await; + }) + .await; +} + +async fn run_test(rafts: &[typ::Raft], router: Router) { + let _ = router; + + // Wait for server to start up. + tokio::time::sleep(Duration::from_millis(200)).await; + + let raft1 = &rafts[0]; + let raft2 = &rafts[1]; + + println!("=== init single node cluster"); + { + let mut nodes = BTreeMap::new(); + nodes.insert(1, BasicNode { addr: "".to_string() }); + raft1.initialize(nodes).await.unwrap(); + } + + println!("=== write 2 logs"); + { + let resp = raft1.client_write(Request::set("foo1", "bar1")).await.unwrap(); + println!("write resp: {:#?}", resp); + let resp = raft1.client_write(Request::set("foo2", "bar2")).await.unwrap(); + println!("write resp: {:#?}", resp); + } + + println!("=== let node-1 take a snapshot"); + { + raft1.trigger().snapshot().await.unwrap(); + + // Wait for a while to let the snapshot get done. + tokio::time::sleep(Duration::from_millis(500)).await; + } + + println!("=== metrics after building snapshot"); + { + let metrics = raft1.metrics().borrow().clone(); + println!("node 1 metrics: {:#?}", metrics); + assert_eq!(Some(3), metrics.snapshot.map(|x| x.index)); + assert_eq!(Some(3), metrics.purged.map(|x| x.index)); + } + + println!("=== add-learner node-2"); + { + let node = BasicNode { addr: "".to_string() }; + let resp = raft1.add_learner(2, node, true).await.unwrap(); + println!("add-learner node-2 resp: {:#?}", resp); + } + + // Wait for a while to let the node 2 to receive snapshot replication. + tokio::time::sleep(Duration::from_millis(500)).await; + + println!("=== metrics of node 2 that received snapshot"); + { + let metrics = raft2.metrics().borrow().clone(); + println!("node 2 metrics: {:#?}", metrics); + assert_eq!(Some(3), metrics.snapshot.map(|x| x.index)); + assert_eq!(Some(3), metrics.purged.map(|x| x.index)); + } + + // In this example, the snapshot is just a copy of the state machine. + let snapshot = raft2.get_snapshot().await.unwrap(); + println!("node 2 received snapshot: {:#?}", snapshot); +} diff --git a/openraft/Cargo.toml b/openraft/Cargo.toml index c96a0b4cc..b21fc62c8 100644 --- a/openraft/Cargo.toml +++ b/openraft/Cargo.toml @@ -92,6 +92,14 @@ singlethreaded = ["macros/singlethreaded"] # For instance, in an even number nodes cluster, erasing a node's data and then rebooting it(log reverts to empty) will not result in data loss. loosen-follower-log-revert = [] + +# Enable this feature flag to eliminate the `AsyncRead + AsyncWrite + AsyncSeek + Unpin` bound from `RaftTypeConfig::SnapshotData`. +# Enabling this feature allows applications to use a custom snapshot data format and transport fragmentation, diverging from the default implementation which typically relies on a single-file structure . +# By default it is off. +# This feature is introduced in 0.9.0 +general-snapshot-data = [] + + # Enables "log" feature in `tracing` crate, to let tracing events emit log # record. # See: https://docs.rs/tracing/latest/tracing/#emitting-log-records diff --git a/openraft/src/core/mod.rs b/openraft/src/core/mod.rs index c10d9ddc3..d401e4707 100644 --- a/openraft/src/core/mod.rs +++ b/openraft/src/core/mod.rs @@ -12,11 +12,8 @@ pub(crate) mod raft_msg; mod replication_state; mod server_state; pub(crate) mod sm; -pub(crate) mod streaming_state; mod tick; -pub(crate) mod snapshot_state; - pub(crate) use raft_core::ApplyResult; pub(crate) use raft_core::ApplyingEntry; pub use raft_core::RaftCore; diff --git a/openraft/src/core/snapshot_state.rs b/openraft/src/core/snapshot_state.rs deleted file mode 100644 index 10c437ca5..000000000 --- a/openraft/src/core/snapshot_state.rs +++ /dev/null @@ -1,22 +0,0 @@ -use crate::LeaderId; -use crate::NodeId; -use crate::SnapshotId; - -/// A global unique id of install-snapshot request. -#[derive(Debug, Clone)] -#[derive(PartialEq, Eq)] -pub(crate) struct SnapshotRequestId { - pub(crate) leader_id: LeaderId, - pub(crate) snapshot_id: SnapshotId, - pub(crate) offset: u64, -} - -impl SnapshotRequestId { - pub(crate) fn new(leader_id: LeaderId, snapshot_id: SnapshotId, offset: u64) -> Self { - Self { - leader_id, - snapshot_id, - offset, - } - } -} diff --git a/openraft/src/docs/feature_flags/feature-flags.md b/openraft/src/docs/feature_flags/feature-flags.md index 79ed37a9b..5d128af49 100644 --- a/openraft/src/docs/feature_flags/feature-flags.md +++ b/openraft/src/docs/feature_flags/feature-flags.md @@ -55,6 +55,12 @@ By default openraft enables no features. In order to use the feature, `AsyncRuntime::spawn` should invoke `tokio::task::spawn_local` or equivalents.

+- `general-snapshot-data`: Enable this feature flag to eliminate the `AsyncRead + AsyncWrite + AsyncSeek + Unpin` bound from [`RaftTypeConfig::SnapshotData`]. + Enabling this feature allows applications to use a custom snapshot data format and transport fragmentation, diverging from the default implementation which typically relies on a single-file structure . + By default, it is off. + This feature is introduced in 0.9.0 +

+ - `tracing-log`: enables "log" feature in `tracing` crate, to let tracing events emit log record. See: [tracing doc: emitting-log-records](https://docs.rs/tracing/latest/tracing/#emitting-log-records) diff --git a/openraft/src/network/mod.rs b/openraft/src/network/mod.rs index 1552c273d..d910b08a7 100644 --- a/openraft/src/network/mod.rs +++ b/openraft/src/network/mod.rs @@ -5,7 +5,8 @@ mod factory; #[allow(clippy::module_inception)] mod network; mod rpc_option; mod rpc_type; -pub(crate) mod stream_snapshot; +#[cfg(not(feature = "general-snapshot-data"))] pub(crate) mod stream_snapshot; +#[cfg(not(feature = "general-snapshot-data"))] pub(crate) mod streaming; pub use backoff::Backoff; pub use factory::RaftNetworkFactory; diff --git a/openraft/src/network/network.rs b/openraft/src/network/network.rs index 8ca51cfb7..9efa2adca 100644 --- a/openraft/src/network/network.rs +++ b/openraft/src/network/network.rs @@ -10,7 +10,9 @@ use crate::error::RaftError; use crate::error::ReplicationClosed; use crate::error::StreamingError; use crate::network::rpc_option::RPCOption; +#[cfg(not(feature = "general-snapshot-data"))] use crate::network::stream_snapshot; +#[cfg(not(feature = "general-snapshot-data"))] use crate::network::stream_snapshot::SnapshotTransport; use crate::network::Backoff; use crate::raft::AppendEntriesRequest; @@ -108,8 +110,18 @@ where C: RaftTypeConfig cancel: impl Future + OptionalSend, option: RPCOption, ) -> Result, StreamingError>> { - let resp = stream_snapshot::Chunked::send_snapshot(self, vote, snapshot, cancel, option).await?; - Ok(resp) + #[cfg(not(feature = "general-snapshot-data"))] + { + let resp = stream_snapshot::Chunked::send_snapshot(self, vote, snapshot, cancel, option).await?; + Ok(resp) + } + #[cfg(feature = "general-snapshot-data")] + { + let _ = (vote, snapshot, cancel, option); + unimplemented!( + "no default implementation for RaftNetwork::snapshot() if `general-snapshot-data` feature is enabled" + ) + } } /// Send an AppendEntries RPC to the target Raft node (ยง5). diff --git a/openraft/src/network/stream_snapshot.rs b/openraft/src/network/stream_snapshot.rs index 4ecfdae34..792583658 100644 --- a/openraft/src/network/stream_snapshot.rs +++ b/openraft/src/network/stream_snapshot.rs @@ -1,22 +1,17 @@ use std::future::Future; use std::io::SeekFrom; -use std::pin::Pin; use std::time::Duration; use futures::FutureExt; use macros::add_async_trait; -use tokio::io::AsyncRead; use tokio::io::AsyncReadExt; -use tokio::io::AsyncSeek; use tokio::io::AsyncSeekExt; -use tokio::io::AsyncWrite; use tokio::io::AsyncWriteExt; -use crate::core::snapshot_state::SnapshotRequestId; -use crate::core::streaming_state::Streaming; use crate::error::Fatal; use crate::error::ReplicationClosed; use crate::error::StreamingError; +use crate::network::streaming::Streaming; use crate::network::RPCOption; use crate::raft::InstallSnapshotRequest; use crate::raft::SnapshotResponse; @@ -25,7 +20,6 @@ use crate::AsyncRuntime; use crate::ErrorSubject; use crate::ErrorVerb; use crate::OptionalSend; -use crate::OptionalSync; use crate::RaftNetwork; use crate::RaftTypeConfig; use crate::Snapshot; @@ -60,9 +54,7 @@ pub(crate) trait SnapshotTransport { /// Send and Receive snapshot by chunks. pub(crate) struct Chunked {} -impl SnapshotTransport for Chunked -where C::SnapshotData: AsyncRead + AsyncWrite + AsyncSeek + OptionalSend + OptionalSync + Unpin + 'static -{ +impl SnapshotTransport for Chunked { /// Stream snapshot by chunks. /// /// This function is for backward compatibility and provides a default implement for @@ -89,12 +81,10 @@ where C::SnapshotData: AsyncRead + AsyncWrite + AsyncSeek + OptionalSend + Optio let mut offset = 0; let end = snapshot.snapshot.seek(SeekFrom::End(0)).await.sto_res(subject_verb)?; + let mut c = std::pin::pin!(cancel); loop { - // Safety: `cancel` is a future that is polled only by this function. - let c = unsafe { Pin::new_unchecked(&mut cancel) }; - // If canceled, return at once - if let Some(err) = c.now_or_never() { + if let Some(err) = c.as_mut().now_or_never() { return Err(err.into()); } @@ -173,23 +163,15 @@ where C::SnapshotData: AsyncRead + AsyncWrite + AsyncSeek + OptionalSend + Optio ) -> Result>, StorageError> { let snapshot_meta = req.meta.clone(); let done = req.done; - let offset = req.offset; - - let req_id = SnapshotRequestId::new(*req.vote.leader_id(), snapshot_meta.snapshot_id.clone(), offset); - tracing::info!( - req = display(&req), - snapshot_req_id = debug(&req_id), - "{}", - func_name!() - ); + tracing::info!(req = display(&req), "{}", func_name!()); { let s = streaming.as_mut().unwrap(); s.receive(req).await?; } - tracing::info!(snapshot_req_id = debug(&req_id), "received snapshot chunk"); + tracing::info!("Done received snapshot chunk"); if done { let streaming = streaming.take().unwrap(); diff --git a/openraft/src/core/streaming_state.rs b/openraft/src/network/streaming.rs similarity index 100% rename from openraft/src/core/streaming_state.rs rename to openraft/src/network/streaming.rs diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index aae4eea03..54d2a6756 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -42,7 +42,6 @@ use crate::core::raft_msg::external_command::ExternalCommand; use crate::core::raft_msg::RaftMsg; use crate::core::replication_lag; use crate::core::sm; -use crate::core::streaming_state::Streaming; use crate::core::RaftCore; use crate::core::Tick; use crate::engine::Engine; @@ -52,16 +51,16 @@ use crate::error::ClientWriteError; use crate::error::Fatal; use crate::error::HigherVote; use crate::error::InitializeError; -use crate::error::InstallSnapshotError; use crate::error::RaftError; -use crate::error::SnapshotMismatch; use crate::membership::IntoNodes; use crate::metrics::RaftDataMetrics; use crate::metrics::RaftMetrics; use crate::metrics::RaftServerMetrics; use crate::metrics::Wait; use crate::metrics::WaitError; +#[cfg(not(feature = "general-snapshot-data"))] use crate::network::stream_snapshot::Chunked; +#[cfg(not(feature = "general-snapshot-data"))] use crate::network::stream_snapshot::SnapshotTransport; use crate::network::RaftNetworkFactory; use crate::raft::raft_inner::RaftInner; @@ -78,7 +77,6 @@ use crate::MessageSummary; use crate::RaftState; pub use crate::RaftTypeConfig; use crate::Snapshot; -use crate::SnapshotSegmentId; use crate::StorageHelper; use crate::Vote; @@ -261,6 +259,8 @@ where C: RaftTypeConfig rx_server_metrics, tx_shutdown: Mutex::new(Some(tx_shutdown)), core_state: Mutex::new(CoreState::Running(core_handle)), + + #[cfg(not(feature = "general-snapshot-data"))] snapshot: Mutex::new(None), }; @@ -415,11 +415,15 @@ where C: RaftTypeConfig /// /// If receiving is finished `done == true`, it installs the snapshot to the state machine. /// Nothing will be done if the input snapshot is older than the state machine. + #[cfg(not(feature = "general-snapshot-data"))] #[tracing::instrument(level = "debug", skip_all)] pub async fn install_snapshot( &self, req: InstallSnapshotRequest, - ) -> Result, RaftError> { + ) -> Result, RaftError> + where + C::SnapshotData: tokio::io::AsyncRead + tokio::io::AsyncWrite + tokio::io::AsyncSeek + Unpin, + { tracing::debug!(req = display(&req), "Raft::install_snapshot()"); let req_vote = req.vote; @@ -431,12 +435,12 @@ where C: RaftTypeConfig if curr_id != Some(&req.meta.snapshot_id) { if req.offset != 0 { - let mismatch = InstallSnapshotError::SnapshotMismatch(SnapshotMismatch { - expect: SnapshotSegmentId { + let mismatch = crate::error::InstallSnapshotError::SnapshotMismatch(crate::error::SnapshotMismatch { + expect: crate::SnapshotSegmentId { id: snapshot_id.clone(), offset: 0, }, - got: SnapshotSegmentId { + got: crate::SnapshotSegmentId { id: snapshot_id.clone(), offset: req.offset, }, @@ -456,7 +460,10 @@ where C: RaftTypeConfig } } }; - *streaming = Some(Streaming::new(req.meta.snapshot_id.clone(), snapshot_data)); + *streaming = Some(crate::network::streaming::Streaming::new( + req.meta.snapshot_id.clone(), + snapshot_data, + )); } let snapshot = Chunked::receive_snapshot(&mut *streaming, req).await?; diff --git a/openraft/src/raft/raft_inner.rs b/openraft/src/raft/raft_inner.rs index 2141fa5a8..4c5c64f32 100644 --- a/openraft/src/raft/raft_inner.rs +++ b/openraft/src/raft/raft_inner.rs @@ -11,7 +11,6 @@ use tracing::Level; use crate::config::RuntimeConfig; use crate::core::raft_msg::external_command::ExternalCommand; use crate::core::raft_msg::RaftMsg; -use crate::core::streaming_state::Streaming; use crate::core::TickHandle; use crate::error::Fatal; use crate::error::RaftError; @@ -44,7 +43,8 @@ where C: RaftTypeConfig pub(in crate::raft) core_state: Mutex>, /// The ongoing snapshot transmission. - pub(in crate::raft) snapshot: Mutex>>, + #[cfg(not(feature = "general-snapshot-data"))] + pub(in crate::raft) snapshot: Mutex>>, } impl RaftInner diff --git a/openraft/src/storage/mod.rs b/openraft/src/storage/mod.rs index f17fcb1ef..742370157 100644 --- a/openraft/src/storage/mod.rs +++ b/openraft/src/storage/mod.rs @@ -113,6 +113,7 @@ where C: RaftTypeConfig impl Snapshot where C: RaftTypeConfig { + #[allow(dead_code)] pub(crate) fn new(meta: SnapshotMeta, snapshot: Box) -> Self { Self { meta, snapshot } } diff --git a/openraft/src/type_config.rs b/openraft/src/type_config.rs index 421f87539..1dc511e03 100644 --- a/openraft/src/type_config.rs +++ b/openraft/src/type_config.rs @@ -1,8 +1,8 @@ use std::fmt::Debug; -use tokio::io::AsyncRead; -use tokio::io::AsyncSeek; -use tokio::io::AsyncWrite; +#[cfg(not(feature = "general-snapshot-data"))] use tokio::io::AsyncRead; +#[cfg(not(feature = "general-snapshot-data"))] use tokio::io::AsyncSeek; +#[cfg(not(feature = "general-snapshot-data"))] use tokio::io::AsyncWrite; use crate::entry::FromAppData; use crate::entry::RaftEntry; @@ -58,11 +58,15 @@ pub trait RaftTypeConfig: /// Raft log entry, which can be built from an AppData. type Entry: RaftEntry + FromAppData; + // TODO: fix the doc address /// Snapshot data for exposing a snapshot for reading & writing. /// /// See the [storage chapter of the guide](https://datafuselabs.github.io/openraft/getting-started.html#implement-raftstorage) /// for details on where and how this is used. - type SnapshotData: AsyncRead + AsyncWrite + AsyncSeek + OptionalSend + OptionalSync + Unpin + 'static; + #[cfg(not(feature = "general-snapshot-data"))] + type SnapshotData: AsyncRead + AsyncWrite + AsyncSeek + OptionalSend + Unpin + 'static; + #[cfg(feature = "general-snapshot-data")] + type SnapshotData: OptionalSend + 'static; /// Asynchronous runtime type. type AsyncRuntime: AsyncRuntime;