Skip to content

Commit

Permalink
Change: add AsyncRuntime type parameter to RaftTypeConfig (#869)
Browse files Browse the repository at this point in the history
-   Add AsyncRuntime type parameter to RaftTypeConfig

    This commit introduces the AsyncRuntime type parameter to
    RaftTypeConfig, allowing applications to choose their preferred
    asynchronous runtime, such as tokio or async-std.

-   Parameterize Instant type for flexibility with async runtimes

    To create a more flexible interface between the crate and
    asynchronous runtimes, the Instant type is now associated with the
    async runtime. This is because Instant::now can have different
    implementations. This commit adds the trait Instant and TokioInstant
    for representing system states.

- Fix: #741
  • Loading branch information
wvwwvwwv authored Jun 21, 2023
1 parent 98b2606 commit 6098f5c
Show file tree
Hide file tree
Showing 59 changed files with 556 additions and 265 deletions.
3 changes: 2 additions & 1 deletion cluster_benchmark/tests/benchmark/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use openraft::SnapshotMeta;
use openraft::StorageError;
use openraft::StorageIOError;
use openraft::StoredMembership;
use openraft::TokioRuntime;
use openraft::Vote;
use serde::Deserialize;
use serde::Serialize;
Expand All @@ -40,7 +41,7 @@ pub type NodeId = u64;

openraft::declare_raft_types!(
pub TypeConfig: D = ClientRequest, R = ClientResponse, NodeId = NodeId, Node = (),
Entry = Entry<TypeConfig>, SnapshotData = Cursor<Vec<u8>>
Entry = Entry<TypeConfig>, SnapshotData = Cursor<Vec<u8>>, AsyncRuntime = TokioRuntime
);

#[derive(Debug)]
Expand Down
3 changes: 2 additions & 1 deletion examples/raft-kv-memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use actix_web::HttpServer;
use openraft::storage::Adaptor;
use openraft::BasicNode;
use openraft::Config;
use openraft::TokioRuntime;

use crate::app::App;
use crate::network::api;
Expand All @@ -31,7 +32,7 @@ pub type NodeId = u64;
openraft::declare_raft_types!(
/// Declare the type configuration for example K/V store.
pub TypeConfig: D = Request, R = Response, NodeId = NodeId, Node = BasicNode,
Entry = openraft::Entry<TypeConfig>, SnapshotData = Cursor<Vec<u8>>
Entry = openraft::Entry<TypeConfig>, SnapshotData = Cursor<Vec<u8>>, AsyncRuntime = TokioRuntime
);

pub type LogStore = Adaptor<TypeConfig, Arc<Store>>;
Expand Down
3 changes: 2 additions & 1 deletion examples/raft-kv-rocksdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use async_std::net::TcpListener;
use async_std::task;
use openraft::storage::Adaptor;
use openraft::Config;
use openraft::TokioRuntime;

use crate::app::App;
use crate::network::api;
Expand Down Expand Up @@ -41,7 +42,7 @@ impl Display for Node {
openraft::declare_raft_types!(
/// Declare the type configuration for example K/V store.
pub TypeConfig: D = Request, R = Response, NodeId = NodeId, Node = Node,
Entry = openraft::Entry<TypeConfig>, SnapshotData = Cursor<Vec<u8>>
Entry = openraft::Entry<TypeConfig>, SnapshotData = Cursor<Vec<u8>>, AsyncRuntime = TokioRuntime
);

pub type LogStore = Adaptor<TypeConfig, Arc<Store>>;
Expand Down
3 changes: 2 additions & 1 deletion memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use openraft::SnapshotMeta;
use openraft::StorageError;
use openraft::StorageIOError;
use openraft::StoredMembership;
use openraft::TokioRuntime;
use openraft::Vote;
use serde::Deserialize;
use serde::Serialize;
Expand Down Expand Up @@ -74,7 +75,7 @@ pub type MemNodeId = u64;
openraft::declare_raft_types!(
/// Declare the type configuration for `MemStore`.
pub TypeConfig: D = ClientRequest, R = ClientResponse, NodeId = MemNodeId, Node = (),
Entry = Entry<TypeConfig>, SnapshotData = Cursor<Vec<u8>>
Entry = Entry<TypeConfig>, SnapshotData = Cursor<Vec<u8>>, AsyncRuntime = TokioRuntime
);

/// The application snapshot type which the `MemStore` works with.
Expand Down
112 changes: 112 additions & 0 deletions openraft/src/async_runtime.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
use std::fmt::Debug;
use std::fmt::Display;
use std::future::Future;
use std::time::Duration;

use crate::Instant;
use crate::TokioInstant;

/// A trait defining interfaces with an asynchronous runtime.
///
/// The intention of this trait is to allow an application using this crate to bind an asynchronous
/// runtime that suits it the best.
///
/// ## Note
///
/// The default asynchronous runtime is `tokio`.
pub trait AsyncRuntime: Debug + Default + Send + Sync + 'static {
/// The error type of [`Self::JoinHandle`].
type JoinError: Debug + Display + Send;

/// The return type of [`Self::spawn`].
type JoinHandle<T: Send + 'static>: Future<Output = Result<T, Self::JoinError>> + Send + Sync + Unpin;

/// The type that enables the user to sleep in an asynchronous runtime.
type Sleep: Future<Output = ()> + Send + Sync;

/// A measurement of a monotonically non-decreasing clock.
type Instant: Instant;

/// The timeout error type.
type TimeoutError: Debug + Display + Send;

/// The timeout type used by [`Self::timeout`] and [`Self::timeout_at`] that enables the user
/// to await the outcome of a [`Future`].
type Timeout<R, T: Future<Output = R> + Send>: Future<Output = Result<R, Self::TimeoutError>> + Send;

/// Spawn a new task.
fn spawn<T>(future: T) -> Self::JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static;

/// Wait until `duration` has elapsed.
fn sleep(duration: Duration) -> Self::Sleep;

/// Wait until `deadline` is reached.
fn sleep_until(deadline: Self::Instant) -> Self::Sleep;

/// Require a [`Future`] to complete before the specified duration has elapsed.
fn timeout<R, F: Future<Output = R> + Send>(duration: Duration, future: F) -> Self::Timeout<R, F>;

/// Require a [`Future`] to complete before the specified instant in time.
fn timeout_at<R, F: Future<Output = R> + Send>(deadline: Self::Instant, future: F) -> Self::Timeout<R, F>;

/// Check if the [`Self::JoinError`] is `panic`.
fn is_panic(join_error: &Self::JoinError) -> bool;

/// Abort the task associated with the supplied join handle.
fn abort<T: Send + 'static>(join_handle: &Self::JoinHandle<T>);
}

/// `Tokio` is the default asynchronous executor.
#[derive(Debug, Default)]
pub struct TokioRuntime;

impl AsyncRuntime for TokioRuntime {
type JoinError = tokio::task::JoinError;
type JoinHandle<T: Send + 'static> = tokio::task::JoinHandle<T>;
type Sleep = tokio::time::Sleep;
type Instant = TokioInstant;
type TimeoutError = tokio::time::error::Elapsed;
type Timeout<R, T: Future<Output = R> + Send> = tokio::time::Timeout<T>;

#[inline]
fn spawn<T>(future: T) -> Self::JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
tokio::task::spawn(future)
}

#[inline]
fn sleep(duration: Duration) -> Self::Sleep {
tokio::time::sleep(duration)
}

#[inline]
fn sleep_until(deadline: Self::Instant) -> Self::Sleep {
tokio::time::sleep_until(deadline)
}

#[inline]
fn timeout<R, F: Future<Output = R> + Send>(duration: Duration, future: F) -> Self::Timeout<R, F> {
tokio::time::timeout(duration, future)
}

#[inline]
fn timeout_at<R, F: Future<Output = R> + Send>(deadline: Self::Instant, future: F) -> Self::Timeout<R, F> {
tokio::time::timeout_at(deadline, future)
}

#[inline]
fn is_panic(join_error: &Self::JoinError) -> bool {
join_error.is_panic()
}

#[inline]
fn abort<T: Send + 'static>(join_handle: &Self::JoinHandle<T>) {
join_handle.abort();
}
}
4 changes: 3 additions & 1 deletion openraft/src/compat/compat07.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ mod tests {

use crate::compat::Upgrade;
use crate::CommittedLeaderId;
use crate::TokioRuntime;

#[test]
fn test_serde_log_id() -> anyhow::Result<()> {
Expand Down Expand Up @@ -511,7 +512,8 @@ mod tests {
crate::declare_raft_types!(
pub TestingConfig:
D = u64, R = u64, NodeId = u64, Node = crate::EmptyNode,
Entry = crate::Entry<TestingConfig>, SnapshotData = Cursor<Vec<u8>>
Entry = crate::Entry<TestingConfig>, SnapshotData = Cursor<Vec<u8>>,
AsyncRuntime = TokioRuntime
);

#[test]
Expand Down
44 changes: 26 additions & 18 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::fmt::Formatter;
use std::marker::PhantomData;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;

use anyerror::AnyError;
use futures::stream::FuturesUnordered;
Expand All @@ -16,9 +17,6 @@ use tokio::select;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::sync::watch;
use tokio::time::timeout;
use tokio::time::Duration;
use tokio::time::Instant;
use tracing::Instrument;
use tracing::Level;
use tracing::Span;
Expand Down Expand Up @@ -84,7 +82,9 @@ use crate::storage::RaftLogReaderExt;
use crate::storage::RaftLogStorage;
use crate::storage::RaftStateMachine;
use crate::utime::UTime;
use crate::AsyncRuntime;
use crate::ChangeMembers;
use crate::Instant;
use crate::LogId;
use crate::Membership;
use crate::MessageSummary;
Expand Down Expand Up @@ -140,15 +140,15 @@ pub(crate) struct LeaderData<C: RaftTypeConfig> {
pub(super) replications: BTreeMap<C::NodeId, ReplicationHandle<C>>,

/// The time to send next heartbeat.
pub(crate) next_heartbeat: Instant,
pub(crate) next_heartbeat: <C::AsyncRuntime as AsyncRuntime>::Instant,
}

impl<C: RaftTypeConfig> LeaderData<C> {
pub(crate) fn new() -> Self {
Self {
client_resp_channels: Default::default(),
replications: BTreeMap::new(),
next_heartbeat: Instant::now(),
next_heartbeat: <C::AsyncRuntime as AsyncRuntime>::Instant::now(),
}
}
}
Expand Down Expand Up @@ -308,7 +308,7 @@ where
let option = RPCOption::new(ttl);

let fu = async move {
let outer_res = timeout(ttl, client.append_entries(rpc, option)).await;
let outer_res = C::AsyncRuntime::timeout(ttl, client.append_entries(rpc, option)).await;
match outer_res {
Ok(append_res) => match append_res {
Ok(x) => Ok((target, x)),
Expand All @@ -328,7 +328,7 @@ where
};

let fu = fu.instrument(tracing::debug_span!("spawn_is_leader", target = target.to_string()));
let task = tokio::spawn(fu).map_err(move |err| (target, err));
let task = C::AsyncRuntime::spawn(fu).map_err(move |err| (target, err));

pending.push(task);
}
Expand Down Expand Up @@ -392,7 +392,7 @@ where
.into()));
};

tokio::spawn(waiting_fu.instrument(tracing::debug_span!("spawn_is_leader_waiting")));
C::AsyncRuntime::spawn(waiting_fu.instrument(tracing::debug_span!("spawn_is_leader_waiting")));
}

/// Submit change-membership by writing a Membership log entry.
Expand Down Expand Up @@ -470,14 +470,21 @@ where
/// Currently heartbeat is a blank log
#[tracing::instrument(level = "debug", skip_all, fields(id = display(self.id)))]
pub fn send_heartbeat(&mut self, emitter: impl Display) -> bool {
tracing::debug!(now = debug(Instant::now()), "send_heartbeat");
tracing::debug!(
now = debug(<C::AsyncRuntime as AsyncRuntime>::Instant::now()),
"send_heartbeat"
);

let mut lh = if let Some((lh, _)) =
self.engine.get_leader_handler_or_reject::<(), ClientWriteError<C::NodeId, C::Node>>(None)
{
lh
} else {
tracing::debug!(now = debug(Instant::now()), "{} failed to send heartbeat", emitter);
tracing::debug!(
now = debug(<C::AsyncRuntime as AsyncRuntime>::Instant::now()),
"{} failed to send heartbeat",
emitter
);
return false;
};

Expand Down Expand Up @@ -988,9 +995,9 @@ where
let id = self.id;
let option = RPCOption::new(ttl);

tokio::spawn(
C::AsyncRuntime::spawn(
async move {
let tm_res = timeout(ttl, client.vote(req, option)).await;
let tm_res = C::AsyncRuntime::timeout(ttl, client.vote(req, option)).await;
let res = match tm_res {
Ok(res) => res,

Expand Down Expand Up @@ -1062,7 +1069,7 @@ where
self.handle_append_entries_request(rpc, tx);
}
RaftMsg::RequestVote { rpc, tx } => {
let now = Instant::now();
let now = <C::AsyncRuntime as AsyncRuntime>::Instant::now();
tracing::info!(
now = debug(now),
vote_request = display(rpc.summary()),
Expand Down Expand Up @@ -1149,7 +1156,7 @@ where
resp,
sender_vote: vote,
} => {
let now = Instant::now();
let now = <C::AsyncRuntime as AsyncRuntime>::Instant::now();

tracing::info!(
now = debug(now),
Expand Down Expand Up @@ -1185,7 +1192,7 @@ where
Notify::Tick { i } => {
// check every timer

let now = Instant::now();
let now = <C::AsyncRuntime as AsyncRuntime>::Instant::now();
tracing::debug!("received tick: {}, now: {:?}", i, now);

self.handle_tick_election();
Expand All @@ -1202,7 +1209,8 @@ where

// Install next heartbeat
if let Some(l) = &mut self.leader_data {
l.next_heartbeat = Instant::now() + Duration::from_millis(self.config.heartbeat_interval);
l.next_heartbeat = <C::AsyncRuntime as AsyncRuntime>::Instant::now()
+ Duration::from_millis(self.config.heartbeat_interval);
}
}
}
Expand Down Expand Up @@ -1330,7 +1338,7 @@ where

#[tracing::instrument(level = "debug", skip_all)]
fn handle_tick_election(&mut self) {
let now = Instant::now();
let now = <C::AsyncRuntime as AsyncRuntime>::Instant::now();

tracing::debug!("try to trigger election by tick, now: {:?}", now);

Expand Down Expand Up @@ -1399,7 +1407,7 @@ where
&mut self,
target: C::NodeId,
id: u64,
result: Result<UTime<ReplicationResult<C::NodeId>>, String>,
result: Result<UTime<ReplicationResult<C::NodeId>, <C::AsyncRuntime as AsyncRuntime>::Instant>, String>,
) {
tracing::debug!(
target = display(target),
Expand Down
9 changes: 5 additions & 4 deletions openraft/src/core/sm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::entry::RaftPayload;
use crate::raft::InstallSnapshotRequest;
use crate::storage::RaftStateMachine;
use crate::summary::MessageSummary;
use crate::AsyncRuntime;
use crate::RaftLogId;
use crate::RaftSnapshotBuilder;
use crate::RaftTypeConfig;
Expand All @@ -41,7 +42,7 @@ where C: RaftTypeConfig
{
cmd_tx: mpsc::UnboundedSender<Command<C>>,
#[allow(dead_code)]
join_handle: tokio::task::JoinHandle<()>,
join_handle: <C::AsyncRuntime as AsyncRuntime>::JoinHandle<()>,
}

impl<C> Handle<C>
Expand Down Expand Up @@ -91,8 +92,8 @@ where
Handle { cmd_tx, join_handle }
}

fn do_spawn(mut self) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
fn do_spawn(mut self) -> <C::AsyncRuntime as AsyncRuntime>::JoinHandle<()> {
C::AsyncRuntime::spawn(async move {
let res = self.worker_loop().await;

if let Err(err) = res {
Expand Down Expand Up @@ -221,7 +222,7 @@ where

let mut builder = self.state_machine.get_snapshot_builder().await;

let _handle = tokio::spawn(async move {
let _handle = C::AsyncRuntime::spawn(async move {
let res = builder.build_snapshot().await;
let res = res.map(|snap| Response::BuildSnapshot(snap.meta));
let cmd_res = CommandResult::new(seq, res);
Expand Down
Loading

0 comments on commit 6098f5c

Please sign in to comment.