Skip to content

Commit

Permalink
Feature: add 'singlethreaded' raft mode (#878)
Browse files Browse the repository at this point in the history
* Feature: add 'singlethreaded' raft mode

- 'singlethreaded' compile-time feature gate.

The new feature gate forces the raft instance to be used by a single thread by not implementing `Send` for certain data structures and substituting calls to `tokio::spawn` with `tokio::spawn_local` when using the `tokio` asynchronous runtime.

- Re-export `add_async_trait` for application to define a `!Send` async trait.

- Fix: #862
  • Loading branch information
wvwwvwwv authored Jun 27, 2023
1 parent 2196ccb commit 104983d
Show file tree
Hide file tree
Showing 21 changed files with 167 additions and 57 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
matrix:
include:
- toolchain: "nightly"
features: "bench,serde,bt"
features: "bench,serde,bt,singlethreaded"

steps:
- name: Setup | Checkout
Expand Down Expand Up @@ -271,7 +271,7 @@ jobs:
features: "single-term-leader"

- toolchain: "nightly"
features: "single-term-leader,serde"
features: "single-term-leader,serde,singlethreaded"


steps:
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ resolver = "2"
members = [
"openraft",
"memstore",
"macros",
"tests",
"rocksstore",
"rocksstore-compat07",
Expand Down
22 changes: 22 additions & 0 deletions macros/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
name = "macros"

version = { workspace = true }
edition = { workspace = true }
authors = { workspace = true }
categories = { workspace = true }
description = { workspace = true }
documentation = { workspace = true }
homepage = { workspace = true }
keywords = { workspace = true }
license = { workspace = true }
repository = { workspace = true }

[lib]
proc-macro = true

[features]

# Passes `?Send` to `async_trait` to force affected tasks to be spawned in the current thread.
singlethreaded = []

21 changes: 21 additions & 0 deletions macros/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use proc_macro::TokenStream;

/// This macro either emits `#[async_trait::async_trait]` or `#[asnc_trait::async_trait(?Send)]`
/// based on the activated feature set.
///
/// This assumes that the `[async_trait](https://crates.io/crates/async-trait)` crate is imported
/// as `async_trait`. If the `singlethreaded` feature is enabled, `?Send` is passed to
/// `async_trait`, thereby forcing the affected asynchronous trait functions and methods to be run
/// in the same thread.
#[proc_macro_attribute]
pub fn add_async_trait(_attr: TokenStream, item: TokenStream) -> TokenStream {
if cfg!(feature = "singlethreaded") {
let mut output = "#[async_trait::async_trait(?Send)]".parse::<TokenStream>().unwrap();
output.extend(item);
output
} else {
let mut output = "#[async_trait::async_trait]".parse::<TokenStream>().unwrap();
output.extend(item);
output
}
}
4 changes: 4 additions & 0 deletions openraft/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ anyhow = { workspace = true, optional = true }
byte-unit = { workspace = true }
derive_more = { workspace = true }
futures = { workspace = true }
macros = { path = "../macros" }
maplit = { workspace = true }
pin-utils = { workspace = true }
rand = { workspace = true }
Expand Down Expand Up @@ -83,6 +84,9 @@ compat-07-testing = ["dep:tempfile", "anyhow", "dep:serde_json"]
# V2 API are unstable and may change in the future.
storage-v2 = []

# Disallows applications to share a raft instance with multiple threads.
singlethreaded = ["macros/singlethreaded"]

# default = ["single-term-leader"]

[package.metadata.docs.rs]
Expand Down
44 changes: 28 additions & 16 deletions openraft/src/async_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use std::future::Future;
use std::time::Duration;

use crate::Instant;
use crate::OptionalSend;
use crate::OptionalSync;
use crate::TokioInstant;

/// A trait defining interfaces with an asynchronous runtime.
Expand All @@ -19,10 +21,13 @@ pub trait AsyncRuntime: Debug + Default + Send + Sync + 'static {
type JoinError: Debug + Display + Send;

/// The return type of [`Self::spawn`].
type JoinHandle<T: Send + 'static>: Future<Output = Result<T, Self::JoinError>> + Send + Sync + Unpin;
type JoinHandle<T: OptionalSend + 'static>: Future<Output = Result<T, Self::JoinError>>
+ OptionalSend
+ OptionalSync
+ Unpin;

/// The type that enables the user to sleep in an asynchronous runtime.
type Sleep: Future<Output = ()> + Send + Sync;
type Sleep: Future<Output = ()> + OptionalSend + OptionalSync;

/// A measurement of a monotonically non-decreasing clock.
type Instant: Instant;
Expand All @@ -32,13 +37,13 @@ pub trait AsyncRuntime: Debug + Default + Send + Sync + 'static {

/// The timeout type used by [`Self::timeout`] and [`Self::timeout_at`] that enables the user
/// to await the outcome of a [`Future`].
type Timeout<R, T: Future<Output = R> + Send>: Future<Output = Result<R, Self::TimeoutError>> + Send;
type Timeout<R, T: Future<Output = R> + OptionalSend>: Future<Output = Result<R, Self::TimeoutError>> + OptionalSend;

/// Spawn a new task.
fn spawn<T>(future: T) -> Self::JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static;
T: Future + OptionalSend + 'static,
T::Output: OptionalSend + 'static;

/// Wait until `duration` has elapsed.
fn sleep(duration: Duration) -> Self::Sleep;
Expand All @@ -47,16 +52,16 @@ pub trait AsyncRuntime: Debug + Default + Send + Sync + 'static {
fn sleep_until(deadline: Self::Instant) -> Self::Sleep;

/// Require a [`Future`] to complete before the specified duration has elapsed.
fn timeout<R, F: Future<Output = R> + Send>(duration: Duration, future: F) -> Self::Timeout<R, F>;
fn timeout<R, F: Future<Output = R> + OptionalSend>(duration: Duration, future: F) -> Self::Timeout<R, F>;

/// Require a [`Future`] to complete before the specified instant in time.
fn timeout_at<R, F: Future<Output = R> + Send>(deadline: Self::Instant, future: F) -> Self::Timeout<R, F>;
fn timeout_at<R, F: Future<Output = R> + OptionalSend>(deadline: Self::Instant, future: F) -> Self::Timeout<R, F>;

/// Check if the [`Self::JoinError`] is `panic`.
fn is_panic(join_error: &Self::JoinError) -> bool;

/// Abort the task associated with the supplied join handle.
fn abort<T: Send + 'static>(join_handle: &Self::JoinHandle<T>);
fn abort<T: OptionalSend + 'static>(join_handle: &Self::JoinHandle<T>);
}

/// `Tokio` is the default asynchronous executor.
Expand All @@ -65,19 +70,26 @@ pub struct TokioRuntime;

impl AsyncRuntime for TokioRuntime {
type JoinError = tokio::task::JoinError;
type JoinHandle<T: Send + 'static> = tokio::task::JoinHandle<T>;
type JoinHandle<T: OptionalSend + 'static> = tokio::task::JoinHandle<T>;
type Sleep = tokio::time::Sleep;
type Instant = TokioInstant;
type TimeoutError = tokio::time::error::Elapsed;
type Timeout<R, T: Future<Output = R> + Send> = tokio::time::Timeout<T>;
type Timeout<R, T: Future<Output = R> + OptionalSend> = tokio::time::Timeout<T>;

#[inline]
fn spawn<T>(future: T) -> Self::JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
T: Future + OptionalSend + 'static,
T::Output: OptionalSend + 'static,
{
tokio::task::spawn(future)
#[cfg(feature = "singlethreaded")]
{
tokio::task::spawn_local(future)
}
#[cfg(not(feature = "singlethreaded"))]
{
tokio::task::spawn(future)
}
}

#[inline]
Expand All @@ -91,12 +103,12 @@ impl AsyncRuntime for TokioRuntime {
}

#[inline]
fn timeout<R, F: Future<Output = R> + Send>(duration: Duration, future: F) -> Self::Timeout<R, F> {
fn timeout<R, F: Future<Output = R> + OptionalSend>(duration: Duration, future: F) -> Self::Timeout<R, F> {
tokio::time::timeout(duration, future)
}

#[inline]
fn timeout_at<R, F: Future<Output = R> + Send>(deadline: Self::Instant, future: F) -> Self::Timeout<R, F> {
fn timeout_at<R, F: Future<Output = R> + OptionalSend>(deadline: Self::Instant, future: F) -> Self::Timeout<R, F> {
tokio::time::timeout_at(deadline, future)
}

Expand All @@ -106,7 +118,7 @@ impl AsyncRuntime for TokioRuntime {
}

#[inline]
fn abort<T: Send + 'static>(join_handle: &Self::JoinHandle<T>) {
fn abort<T: OptionalSend + 'static>(join_handle: &Self::JoinHandle<T>) {
join_handle.abort();
}
}
8 changes: 5 additions & 3 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use anyerror::AnyError;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use futures::TryFutureExt;
use macros::add_async_trait;
use maplit::btreeset;
use tokio::select;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -90,6 +91,7 @@ use crate::Membership;
use crate::MessageSummary;
use crate::Node;
use crate::NodeId;
use crate::OptionalSend;
use crate::RaftTypeConfig;
use crate::StorageError;
use crate::StorageIOError;
Expand Down Expand Up @@ -644,8 +646,8 @@ where
last_log_id: LogId<C::NodeId>,
) -> Result<(), StorageError<C::NodeId>>
where
I: IntoIterator<Item = C::Entry> + Send,
I::IntoIter: Send,
I: IntoIterator<Item = C::Entry> + OptionalSend,
I::IntoIter: OptionalSend,
{
tracing::debug!("append_to_log");

Expand Down Expand Up @@ -1470,7 +1472,7 @@ where
}
}

#[async_trait::async_trait]
#[add_async_trait]
impl<C, N, LS, SM> RaftRuntime<C> for RaftCore<C, N, LS, SM>
where
C: RaftTypeConfig,
Expand Down
4 changes: 4 additions & 0 deletions openraft/src/docs/feature_flags/feature-flags.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,7 @@ By default openraft enables no features.
This feature disables `Adapter`, which is for v1 storage to be used as v2.
V2 storage separates log store and state machine store so that log IO and state machine IO can be parallelized naturally.

- `singlethreaded`: removes `Send` bounds from `AppData`, `AppDataResponse`, `RaftEntry`, and `SnapshotData` to force the
asynchronous runtime to spawn any tasks in the current thread.
This is for any single-threaded application that never allows a raft instance to be shared among multiple threads.
In order to use the feature, `AsyncRuntime::spawn` should invoke `tokio::task::spawn_local` or equivalents.
3 changes: 2 additions & 1 deletion openraft/src/entry/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::LogId;
use crate::Membership;
use crate::Node;
use crate::NodeId;
use crate::OptionalSend;
use crate::OptionalSerde;

/// Defines operations on an entry payload.
Expand All @@ -26,7 +27,7 @@ pub trait RaftEntry<NID, N>: RaftPayload<NID, N> + RaftLogId<NID>
where
N: Node,
NID: NodeId,
Self: OptionalSerde + Debug + Display + Send + Sync,
Self: OptionalSerde + Debug + Display + OptionalSend + Sync,
{
/// Create a new blank log entry.
///
Expand Down
7 changes: 7 additions & 0 deletions openraft/src/instant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,10 @@ impl Instant for tokio::time::Instant {
tokio::time::Instant::now()
}
}

impl Instant for std::time::Instant {
#[inline]
fn now() -> Self {
Self::now()
}
}
35 changes: 31 additions & 4 deletions openraft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
//!
//! - `serde`: Add serde::Serialize and serde:Deserialize bound to data types. If you'd like to use
//! `serde` to serialize messages.
//!
//! - `singlethreaded`: Prevent `Raft` instances from being shared among multiple threads.

macro_rules! func_name {
() => {{
Expand Down Expand Up @@ -83,6 +85,7 @@ mod try_as_ref;
pub use anyerror;
pub use anyerror::AnyError;
pub use async_trait;
pub use macros::add_async_trait;
pub use network::RPCTypes;
pub use network::RaftNetwork;
pub use network::RaftNetworkFactory;
Expand Down Expand Up @@ -150,6 +153,30 @@ pub trait OptionalSerde {}
#[cfg(not(feature = "serde"))]
impl<T> OptionalSerde for T {}

#[cfg(feature = "singlethreaded")]
pub trait OptionalSend {}

#[cfg(feature = "singlethreaded")]
pub trait OptionalSync {}

#[cfg(feature = "singlethreaded")]
impl<T: ?Sized> OptionalSend for T {}

#[cfg(feature = "singlethreaded")]
impl<T: ?Sized> OptionalSync for T {}

#[cfg(not(feature = "singlethreaded"))]
pub trait OptionalSend: Send {}

#[cfg(not(feature = "singlethreaded"))]
pub trait OptionalSync: Sync {}

#[cfg(not(feature = "singlethreaded"))]
impl<T: Send + ?Sized> OptionalSend for T {}

#[cfg(not(feature = "singlethreaded"))]
impl<T: Sync + ?Sized> OptionalSync for T {}

/// A trait defining application specific data.
///
/// The intention of this trait is that applications which are using this crate will be able to
Expand All @@ -162,9 +189,9 @@ impl<T> OptionalSerde for T {}
/// ## Note
///
/// The trait is automatically implemented for all types which satisfy its supertraits.
pub trait AppData: Send + Sync + 'static + OptionalSerde {}
pub trait AppData: OptionalSend + Sync + 'static + OptionalSerde {}

impl<T> AppData for T where T: Send + Sync + 'static + OptionalSerde {}
impl<T> AppData for T where T: OptionalSend + Sync + 'static + OptionalSerde {}

/// A trait defining application specific response data.
///
Expand All @@ -183,6 +210,6 @@ impl<T> AppData for T where T: Send + Sync + 'static + OptionalSerde {}
/// ## Note
///
/// The trait is automatically implemented for all types which satisfy its supertraits.
pub trait AppDataResponse: Send + Sync + 'static + OptionalSerde {}
pub trait AppDataResponse: OptionalSend + Sync + 'static + OptionalSerde {}

impl<T> AppDataResponse for T where T: Send + Sync + 'static + OptionalSerde {}
impl<T> AppDataResponse for T where T: OptionalSend + Sync + 'static + OptionalSerde {}
3 changes: 2 additions & 1 deletion openraft/src/metrics/wait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::LogId;
use crate::LogIdOptionExt;
use crate::MessageSummary;
use crate::NodeId;
use crate::OptionalSend;
use crate::Vote;

// Error variants related to metrics.
Expand Down Expand Up @@ -47,7 +48,7 @@ where
/// Wait for metrics to satisfy some condition or timeout.
#[tracing::instrument(level = "trace", skip(self, func), fields(msg=%msg.to_string()))]
pub async fn metrics<T>(&self, func: T, msg: impl ToString) -> Result<RaftMetrics<NID, N>, WaitError>
where T: Fn(&RaftMetrics<NID, N>) -> bool + Send {
where T: Fn(&RaftMetrics<NID, N>) -> bool + OptionalSend {
let timeout_at = A::Instant::now() + self.timeout;

let mut rx = self.rx.clone();
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/network/factory.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use async_trait::async_trait;
use macros::add_async_trait;

use crate::network::RaftNetwork;
use crate::RaftTypeConfig;
Expand All @@ -11,7 +11,7 @@ use crate::RaftTypeConfig;
///
/// Typically, the network implementation as such will be hidden behind a `Box<T>` or `Arc<T>` and
/// this interface implemented on the `Box<T>` or `Arc<T>`.
#[async_trait]
#[add_async_trait]
pub trait RaftNetworkFactory<C>: Send + Sync + 'static
where C: RaftTypeConfig
{
Expand Down
Loading

0 comments on commit 104983d

Please sign in to comment.