Skip to content

Commit

Permalink
Feature: Add rocks based example
Browse files Browse the repository at this point in the history
Signed-off-by: Heinz N. Gies <[email protected]>
  • Loading branch information
Licenser committed Jul 6, 2022
1 parent 401d427 commit 4d0918f
Show file tree
Hide file tree
Showing 17 changed files with 1,752 additions and 7 deletions.
9 changes: 2 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
[workspace]
members = [
"openraft",
"memstore",
]
exclude = [
"examples/raft-kv-memstore",
]
members = ["openraft", "memstore"]
exclude = ["examples/raft-kv-memstore", "examples/raft-kv-rocksdb"]
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defensive_test:
test: lint fmt
cargo test
cargo test --manifest-path examples/raft-kv-memstore/Cargo.toml
cargo test --manifest-path examples/raft-kv-rocksdb/Cargo.toml

bench_cluster_of_1:
cargo test --package openraft --test benchmark --release bench_cluster_of_1 -- --ignored --nocapture
Expand Down
5 changes: 5 additions & 0 deletions examples/raft-kv-rocksdb/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
target
vendor
.idea
*.db
*.log
51 changes: 51 additions & 0 deletions examples/raft-kv-rocksdb/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
[package]
name = "raft-key-value-rocks"
version = "0.1.0"
edition = "2021"
authors = [
"drdr xp <[email protected]>",
"Pedro Paulo de Amorim <[email protected]>",
"The Tremor Team",
]
categories = ["algorithms", "asynchronous", "data-structures"]
description = "An example distributed key-value store built upon `openraft`."
homepage = "https://github.com/datafuselabs/openraft"
keywords = ["raft", "consensus"]
license = "MIT/Apache-2.0"
repository = "https://github.com/datafuselabs/openraft"
readme = "README.md"

[[bin]]
name = "raft-key-value-rocks"
path = "src/bin/main.rs"

[dependencies]
rocksdb = "0.18.0"
tide = { version = "0.16" }
async-std = { version = "1.12.0", features = ["attributes", "tokio1"] }
async-trait = "0.1.36"
clap = { version = "3.0.13", features = ["derive", "env"] }
tracing-subscriber = "0.3.0"
openraft = { version = "0.6", path = "../../openraft", features = ["serde"] }
reqwest = { version = "0.11.9", features = ["json"] }
serde = { version = "1.0.114", features = ["derive"] }
serde_json = "1.0.57"
tracing = "0.1.29"
tracing-futures = "0.2.4"
toy-rpc = { version = "*", features = [
"ws_async_std",
"server",
"client",
"async_std_runtime",
] }
byteorder = "1.4.3"

[dev-dependencies]
maplit = "1.0.2"
tempdir = "0.3.0"

[features]
docinclude = [] # Used only for activating `doc(include="...")` on nightly.

[package.metadata.docs.rs]
features = ["docinclude"] # Activate `docinclude` during docs.rs build.
18 changes: 18 additions & 0 deletions examples/raft-kv-rocksdb/src/app.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use std::sync::Arc;

use openraft::Config;

use crate::ExampleNodeId;
use crate::ExampleRaft;
use crate::ExampleStore;

// Representation of an application state. This struct can be shared around to share
// instances of raft, store and more.
pub struct ExampleApp {
pub id: ExampleNodeId,
pub api_addr: String,
pub rcp_addr: String,
pub raft: ExampleRaft,
pub store: Arc<ExampleStore>,
pub config: Arc<Config>,
}
38 changes: 38 additions & 0 deletions examples/raft-kv-rocksdb/src/bin/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use clap::Parser;
use openraft::Raft;
use raft_key_value_rocks::network::raft_network_impl::ExampleNetwork;
use raft_key_value_rocks::start_example_raft_node;
use raft_key_value_rocks::store::ExampleStore;
use raft_key_value_rocks::ExampleTypeConfig;

pub type ExampleRaft = Raft<ExampleTypeConfig, ExampleNetwork, ExampleStore>;

#[derive(Parser, Clone, Debug)]
#[clap(author, version, about, long_about = None)]
pub struct Opt {
#[clap(long)]
pub id: u64,

#[clap(long)]
pub http_addr: String,

#[clap(long)]
pub rpc_addr: String,
}

#[async_std::main]
async fn main() -> std::io::Result<()> {
// Setup the logger
tracing_subscriber::fmt().init();

// Parse the parameters passed by arguments.
let options = Opt::parse();

start_example_raft_node(
options.id,
format!("{}.db", options.rpc_addr),
options.http_addr,
options.rpc_addr,
)
.await
}
221 changes: 221 additions & 0 deletions examples/raft-kv-rocksdb/src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
use std::collections::BTreeSet;
use std::sync::Arc;
use std::sync::Mutex;

use openraft::error::AddLearnerError;
use openraft::error::CheckIsLeaderError;
use openraft::error::ClientWriteError;
use openraft::error::ForwardToLeader;
use openraft::error::Infallible;
use openraft::error::InitializeError;
use openraft::error::NetworkError;
use openraft::error::RPCError;
use openraft::error::RemoteError;
use openraft::raft::AddLearnerResponse;
use openraft::raft::ClientWriteResponse;
use openraft::RaftMetrics;
use reqwest::Client;
use serde::de::DeserializeOwned;
use serde::Deserialize;
use serde::Serialize;

use crate::ExampleNodeId;
use crate::ExampleRequest;
use crate::ExampleTypeConfig;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Empty {}

pub struct ExampleClient {
/// The leader node to send request to.
///
/// All traffic should be sent to the leader in a cluster.
pub leader: Arc<Mutex<(ExampleNodeId, String)>>,

pub inner: Client,
}

impl ExampleClient {
/// Create a client with a leader node id and a node manager to get node address by node id.
pub fn new(leader_id: ExampleNodeId, leader_addr: String) -> Self {
Self {
leader: Arc::new(Mutex::new((leader_id, leader_addr))),
inner: reqwest::Client::new(),
}
}

// --- Application API

/// Submit a write request to the raft cluster.
///
/// The request will be processed by raft protocol: it will be replicated to a quorum and then will be applied to
/// state machine.
///
/// The result of applying the request will be returned.
pub async fn write(
&self,
req: &ExampleRequest,
) -> Result<ClientWriteResponse<ExampleTypeConfig>, RPCError<ExampleNodeId, ClientWriteError<ExampleNodeId>>> {
self.send_rpc_to_leader("api/write", Some(req)).await
}

/// Read value by key, in an inconsistent mode.
///
/// This method may return stale value because it does not force to read on a legal leader.
pub async fn read(&self, req: &String) -> Result<String, RPCError<ExampleNodeId, Infallible>> {
self.do_send_rpc_to_leader("api/read", Some(req)).await
}

/// Consistent Read value by key, in an inconsistent mode.
///
/// This method MUST return consitent value or CheckIsLeaderError.
pub async fn consistent_read(
&self,
req: &String,
) -> Result<String, RPCError<ExampleNodeId, CheckIsLeaderError<ExampleNodeId>>> {
self.do_send_rpc_to_leader("api/consistent_read", Some(req)).await
}

// --- Cluster management API

/// Initialize a cluster of only the node that receives this request.
///
/// This is the first step to initialize a cluster.
/// With a initialized cluster, new node can be added with [`write`].
/// Then setup replication with [`add_learner`].
/// Then make the new node a member with [`change_membership`].
pub async fn init(&self) -> Result<(), RPCError<ExampleNodeId, InitializeError<ExampleNodeId>>> {
self.do_send_rpc_to_leader("cluster/init", Some(&Empty {})).await
}

/// Add a node as learner.
///
/// The node to add has to exist, i.e., being added with `write(ExampleRequest::AddNode{})`
pub async fn add_learner(
&self,
req: (ExampleNodeId, String, String),
) -> Result<AddLearnerResponse<ExampleNodeId>, RPCError<ExampleNodeId, AddLearnerError<ExampleNodeId>>> {
self.send_rpc_to_leader("cluster/add-learner", Some(&req)).await
}

/// Change membership to the specified set of nodes.
///
/// All nodes in `req` have to be already added as learner with [`add_learner`],
/// or an error [`LearnerNotFound`] will be returned.
pub async fn change_membership(
&self,
req: &BTreeSet<ExampleNodeId>,
) -> Result<ClientWriteResponse<ExampleTypeConfig>, RPCError<ExampleNodeId, ClientWriteError<ExampleNodeId>>> {
self.send_rpc_to_leader("cluster/change-membership", Some(req)).await
}

/// Get the metrics about the cluster.
///
/// Metrics contains various information about the cluster, such as current leader,
/// membership config, replication status etc.
/// See [`RaftMetrics`].
pub async fn metrics(&self) -> Result<RaftMetrics<ExampleNodeId>, RPCError<ExampleNodeId, Infallible>> {
self.do_send_rpc_to_leader("cluster/metrics", None::<&()>).await
}

// --- Internal methods

/// Send RPC to specified node.
///
/// It sends out a POST request if `req` is Some. Otherwise a GET request.
/// The remote endpoint must respond a reply in form of `Result<T, E>`.
/// An `Err` happened on remote will be wrapped in an [`RPCError::RemoteError`].
async fn do_send_rpc_to_leader<Req, Resp, Err>(
&self,
uri: &str,
req: Option<&Req>,
) -> Result<Resp, RPCError<ExampleNodeId, Err>>
where
Req: Serialize + 'static,
Resp: Serialize + DeserializeOwned,
Err: std::error::Error + Serialize + DeserializeOwned,
{
let (leader_id, url) = {
let t = self.leader.lock().unwrap();
let target_addr = &t.1;
(t.0, format!("http://{}/{}", target_addr, uri))
};

let resp = if let Some(r) = req {
println!(
">>> client send request to {}: {}",
url,
serde_json::to_string_pretty(&r).unwrap()
);
self.inner.post(url.clone()).json(r)
} else {
println!(">>> client send request to {}", url,);
self.inner.get(url.clone())
}
.send()
.await
.map_err(|e| RPCError::Network(NetworkError::new(&e)))?;

let res: Result<Resp, Err> = resp.json().await.map_err(|e| RPCError::Network(NetworkError::new(&e)))?;
println!(
"<<< client recv reply from {}: {}",
url,
serde_json::to_string_pretty(&res).unwrap()
);

res.map_err(|e| RPCError::RemoteError(RemoteError::new(leader_id, e)))
}

/// Try the best to send a request to the leader.
///
/// If the target node is not a leader, a `ForwardToLeader` error will be
/// returned and this client will retry at most 3 times to contact the updated leader.
async fn send_rpc_to_leader<Req, Resp, Err>(
&self,
uri: &str,
req: Option<&Req>,
) -> Result<Resp, RPCError<ExampleNodeId, Err>>
where
Req: Serialize + 'static,
Resp: Serialize + DeserializeOwned,
Err: std::error::Error + Serialize + DeserializeOwned + TryInto<ForwardToLeader<ExampleNodeId>> + Clone,
{
// Retry at most 3 times to find a valid leader.
let mut n_retry = 3;

loop {
let res: Result<Resp, RPCError<ExampleNodeId, Err>> = self.do_send_rpc_to_leader(uri, req).await;

let rpc_err = match res {
Ok(x) => return Ok(x),
Err(rpc_err) => rpc_err,
};

if let RPCError::RemoteError(remote_err) = &rpc_err {
let forward_err_res =
<Err as TryInto<ForwardToLeader<ExampleNodeId>>>::try_into(remote_err.source.clone());

if let Ok(ForwardToLeader {
leader_id: Some(leader_id),
leader_node: Some(leader_node),
..
}) = forward_err_res
{
// Update target to the new leader.
{
let mut t = self.leader.lock().unwrap();
let api_addr = leader_node.data.get("api_addr").unwrap().clone();
*t = (leader_id, api_addr);
}

n_retry -= 1;
if n_retry > 0 {
continue;
}
}
}

return Err(rpc_err);
}
}
}
Loading

0 comments on commit 4d0918f

Please sign in to comment.