Skip to content

Commit

Permalink
Change: Refactor storage APIs
Browse files Browse the repository at this point in the history
This is a larger refactoring of storage APIs to allow more clear ownership of data.

RaftStorage is now refactored to:
- RaftLogReader to read data from the log in parallel tasks independent of the main Raft loop
- RaftStorage to modify the log and the state machine (implements also RaftLogReader) intended to be used in the main Raft loop
- RaftSnapshotBuilder to build the snapshot in background independent of the main Raft loop

The RaftStorage API offers to create new RaftLogReader or RaftSnapshotBuilder on it.

RaftNetwork is also refactored to:
- RaftNetwork responsible for sending RPCs
- RaftNetworkFactory responsible for creating instances of RaftNetwork for sending data to a particular node

All these traits now take &mut self, so it's possible to heavily optimize a lot of stuff by removing any synchronization/lookups and in general to streamline the implementation of the "real" store and network.

All traits are built in such a way that they can be implemented for Arc<T> (or similar) to basically model the original behaviour with &self receiver.

DefensiveCheck was also split to DefensiveCheckBase and DefensiveCheck to allow creating separate checkers for checked LogReader. There is a single TODO in LogReaderExt for one missing check.

Tests run.

Further optimizations of the API to yet better streamline applying log entries to the state machine (if the state machine is complex and can process requests in the background) are still pending.

Further optimizations of the LogReader API to "invert" the semantics to send the functor to process the log entry to the log instead of materializing the log on heap are also pending.

async_trait optimization is pending

Configuration of types to use for optimizing various synchronization operations (like one-shot channel) is pending.
  • Loading branch information
schreter committed Feb 22, 2022
1 parent 2055a23 commit a76f41a
Show file tree
Hide file tree
Showing 60 changed files with 816 additions and 557 deletions.
191 changes: 107 additions & 84 deletions memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use openraft::async_trait::async_trait;
use openraft::raft::Entry;
use openraft::raft::EntryPayload;
use openraft::storage::LogState;
use openraft::storage::RaftLogReader;
use openraft::storage::RaftSnapshotBuilder;
use openraft::storage::Snapshot;
use openraft::AppData;
use openraft::AppDataResponse;
Expand Down Expand Up @@ -117,35 +119,24 @@ impl MemStore {
current_snapshot,
}
}

pub async fn new_arc() -> Arc<Self> {
Arc::new(Self::new().await)
}
}

#[async_trait]
impl RaftStorageDebug<MemStoreStateMachine> for MemStore {
impl RaftStorageDebug<MemStoreStateMachine> for Arc<MemStore> {
/// Get a handle to the state machine for testing purposes.
async fn get_state_machine(&self) -> MemStoreStateMachine {
async fn get_state_machine(&mut self) -> MemStoreStateMachine {
self.sm.write().await.clone()
}
}

#[async_trait]
impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
type SnapshotData = Cursor<Vec<u8>>;

#[tracing::instrument(level = "trace", skip(self))]
async fn save_vote(&self, vote: &Vote) -> Result<(), StorageError> {
tracing::debug!(?vote, "save_vote");
let mut h = self.vote.write().await;

*h = Some(*vote);
Ok(())
}

async fn read_vote(&self) -> Result<Option<Vote>, StorageError> {
Ok(*self.vote.read().await)
}

impl RaftLogReader<ClientRequest, ClientResponse> for Arc<MemStore> {
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + Send + Sync>(
&self,
&mut self,
range: RB,
) -> Result<Vec<Entry<ClientRequest>>, StorageError> {
let res = {
Expand All @@ -156,7 +147,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
Ok(res)
}

async fn get_log_state(&self) -> Result<LogState, StorageError> {
async fn get_log_state(&mut self) -> Result<LogState, StorageError> {
let log = self.log.read().await;
let last = log.iter().rev().next().map(|(_, ent)| ent.log_id);

Expand All @@ -172,14 +163,91 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
last_log_id: last,
})
}
}

#[async_trait]
impl RaftSnapshotBuilder<ClientRequest, ClientResponse, Cursor<Vec<u8>>> for Arc<MemStore> {
#[tracing::instrument(level = "trace", skip(self))]
async fn build_snapshot(&mut self) -> Result<Snapshot<Cursor<Vec<u8>>>, StorageError> {
let (data, last_applied_log);

{
// Serialize the data of the state machine.
let sm = self.sm.read().await;
data = serde_json::to_vec(&*sm)
.map_err(|e| StorageIOError::new(ErrorSubject::StateMachine, ErrorVerb::Read, AnyError::new(&e)))?;

last_applied_log = sm.last_applied_log;
}

let last_applied_log = match last_applied_log {
None => {
panic!("can not compact empty state machine");
}
Some(x) => x,
};

let snapshot_size = data.len();

let snapshot_idx = {
let mut l = self.snapshot_idx.lock().unwrap();
*l += 1;
*l
};

let snapshot_id = format!(
"{}-{}-{}",
last_applied_log.leader_id, last_applied_log.index, snapshot_idx
);

let meta = SnapshotMeta {
last_log_id: last_applied_log,
snapshot_id,
};

let snapshot = MemStoreSnapshot {
meta: meta.clone(),
data: data.clone(),
};

{
let mut current_snapshot = self.current_snapshot.write().await;
*current_snapshot = Some(snapshot);
}

tracing::info!(snapshot_size, "log compaction complete");

Ok(Snapshot {
meta,
snapshot: Box::new(Cursor::new(data)),
})
}
}

async fn last_applied_state(&self) -> Result<(Option<LogId>, Option<EffectiveMembership>), StorageError> {
#[async_trait]
impl RaftStorage<ClientRequest, ClientResponse> for Arc<MemStore> {
type SnapshotData = Cursor<Vec<u8>>;

#[tracing::instrument(level = "trace", skip(self))]
async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> {
tracing::debug!(?vote, "save_vote");
let mut h = self.vote.write().await;

*h = Some(*vote);
Ok(())
}

async fn read_vote(&mut self) -> Result<Option<Vote>, StorageError> {
Ok(*self.vote.read().await)
}

async fn last_applied_state(&mut self) -> Result<(Option<LogId>, Option<EffectiveMembership>), StorageError> {
let sm = self.sm.read().await;
Ok((sm.last_applied_log, sm.last_membership.clone()))
}

#[tracing::instrument(level = "debug", skip(self))]
async fn delete_conflict_logs_since(&self, log_id: LogId) -> Result<(), StorageError> {
async fn delete_conflict_logs_since(&mut self, log_id: LogId) -> Result<(), StorageError> {
tracing::debug!("delete_log: [{:?}, +oo)", log_id);

{
Expand All @@ -195,7 +263,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
}

#[tracing::instrument(level = "debug", skip(self))]
async fn purge_logs_upto(&self, log_id: LogId) -> Result<(), StorageError> {
async fn purge_logs_upto(&mut self, log_id: LogId) -> Result<(), StorageError> {
tracing::debug!("delete_log: [{:?}, +oo)", log_id);

{
Expand All @@ -217,7 +285,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
}

#[tracing::instrument(level = "trace", skip(self, entries))]
async fn append_to_log(&self, entries: &[&Entry<ClientRequest>]) -> Result<(), StorageError> {
async fn append_to_log(&mut self, entries: &[&Entry<ClientRequest>]) -> Result<(), StorageError> {
let mut log = self.log.write().await;
for entry in entries {
log.insert(entry.log_id.index, (*entry).clone());
Expand All @@ -227,7 +295,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {

#[tracing::instrument(level = "trace", skip(self, entries))]
async fn apply_to_state_machine(
&self,
&mut self,
entries: &[&Entry<ClientRequest>],
) -> Result<Vec<ClientResponse>, StorageError> {
let mut res = Vec::with_capacity(entries.len());
Expand Down Expand Up @@ -262,69 +330,13 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
}

#[tracing::instrument(level = "trace", skip(self))]
async fn build_snapshot(&self) -> Result<Snapshot<Self::SnapshotData>, StorageError> {
let (data, last_applied_log);

{
// Serialize the data of the state machine.
let sm = self.sm.read().await;
data = serde_json::to_vec(&*sm)
.map_err(|e| StorageIOError::new(ErrorSubject::StateMachine, ErrorVerb::Read, AnyError::new(&e)))?;

last_applied_log = sm.last_applied_log;
}

let last_applied_log = match last_applied_log {
None => {
panic!("can not compact empty state machine");
}
Some(x) => x,
};

let snapshot_size = data.len();

let snapshot_idx = {
let mut l = self.snapshot_idx.lock().unwrap();
*l += 1;
*l
};

let snapshot_id = format!(
"{}-{}-{}",
last_applied_log.leader_id, last_applied_log.index, snapshot_idx
);

let meta = SnapshotMeta {
last_log_id: last_applied_log,
snapshot_id,
};

let snapshot = MemStoreSnapshot {
meta: meta.clone(),
data: data.clone(),
};

{
let mut current_snapshot = self.current_snapshot.write().await;
*current_snapshot = Some(snapshot);
}

tracing::info!(snapshot_size, "log compaction complete");

Ok(Snapshot {
meta,
snapshot: Box::new(Cursor::new(data)),
})
}

#[tracing::instrument(level = "trace", skip(self))]
async fn begin_receiving_snapshot(&self) -> Result<Box<Self::SnapshotData>, StorageError> {
async fn begin_receiving_snapshot(&mut self) -> Result<Box<Self::SnapshotData>, StorageError> {
Ok(Box::new(Cursor::new(Vec::new())))
}

#[tracing::instrument(level = "trace", skip(self, snapshot))]
async fn install_snapshot(
&self,
&mut self,
meta: &SnapshotMeta,
snapshot: Box<Self::SnapshotData>,
) -> Result<StateMachineChanges, StorageError> {
Expand Down Expand Up @@ -368,7 +380,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
}

#[tracing::instrument(level = "trace", skip(self))]
async fn get_current_snapshot(&self) -> Result<Option<Snapshot<Self::SnapshotData>>, StorageError> {
async fn get_current_snapshot(&mut self) -> Result<Option<Snapshot<Self::SnapshotData>>, StorageError> {
match &*self.current_snapshot.read().await {
Some(snapshot) => {
let data = snapshot.data.clone();
Expand All @@ -380,4 +392,15 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
None => Ok(None),
}
}

type LogReader = Self;
type SnapshotBuilder = Self;

async fn get_log_reader(&mut self) -> Self::LogReader {
self.clone()
}

async fn get_snapshot_builder(&mut self) -> Self::SnapshotBuilder {
self.clone()
}
}
2 changes: 1 addition & 1 deletion memstore/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ use crate::MemStore;
/// ```
#[test]
pub fn test_mem_store() -> Result<(), StorageError> {
Suite::test_all(MemStore::new)?;
Suite::test_all(MemStore::new_arc)?;
Ok(())
}
10 changes: 5 additions & 5 deletions openraft/src/core/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ use crate::LogId;
use crate::Membership;
use crate::Node;
use crate::NodeId;
use crate::RaftNetwork;
use crate::RaftNetworkFactory;
use crate::RaftStorage;
use crate::StorageError;

impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> LearnerState<'a, D, R, N, S> {
impl<'a, D: AppData, R: AppDataResponse, N: RaftNetworkFactory<D>, S: RaftStorage<D, R>> LearnerState<'a, D, R, N, S> {
/// Handle the admin `init_with_config` command.
#[tracing::instrument(level = "debug", skip(self))]
pub(super) async fn handle_init_with_config(&mut self, members: EitherNodesOrIds) -> Result<(), InitializeError> {
Expand Down Expand Up @@ -67,7 +67,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}
}

impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> LeaderState<'a, D, R, N, S> {
impl<'a, D: AppData, R: AppDataResponse, N: RaftNetworkFactory<D>, S: RaftStorage<D, R>> LeaderState<'a, D, R, N, S> {
// add node into learner,return true if the node is already a member or learner
#[tracing::instrument(level = "debug", skip(self))]
async fn add_learner_into_membership(
Expand Down Expand Up @@ -139,10 +139,10 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}

if blocking {
let state = self.spawn_replication_stream(target, Some(tx));
let state = self.spawn_replication_stream(target, Some(tx)).await;
self.nodes.insert(target, state);
} else {
let state = self.spawn_replication_stream(target, None);
let state = self.spawn_replication_stream(target, None).await;
self.nodes.insert(target, state);

// non-blocking mode, do not know about the replication stat.
Expand Down
12 changes: 6 additions & 6 deletions openraft/src/core/append_entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ use crate::AppDataResponse;
use crate::EffectiveMembership;
use crate::LogId;
use crate::MessageSummary;
use crate::RaftNetwork;
use crate::RaftNetworkFactory;
use crate::RaftStorage;
use crate::StorageError;
use crate::Update;

impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> RaftCore<D, R, N, S> {
impl<D: AppData, R: AppDataResponse, N: RaftNetworkFactory<D>, S: RaftStorage<D, R>> RaftCore<D, R, N, S> {
/// An RPC invoked by the leader to replicate log entries (§5.3); also used as heartbeat (§5.2).
///
/// See `receiver implementation: AppendEntries RPC` in raft-essentials.md in this repo.
Expand Down Expand Up @@ -238,7 +238,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
/// The entries in request that are matches local ones does not need to be append again.
/// Filter them out.
pub async fn skip_matching_entries<'s, 'e>(
&'s self,
&'s mut self,
entries: &'e [Entry<D>],
) -> Result<(usize, &'e [Entry<D>]), StorageError> {
let l = entries.len();
Expand Down Expand Up @@ -271,7 +271,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
///
/// This way to check if the entries in append-entries request is consecutive with local logs.
/// Raft only accept consecutive logs to be appended.
pub async fn does_log_id_match(&self, remote_log_id: Option<LogId>) -> Result<Option<LogId>, StorageError> {
pub async fn does_log_id_match(&mut self, remote_log_id: Option<LogId>) -> Result<Option<LogId>, StorageError> {
let log_id = match remote_log_id {
None => {
return Ok(None);
Expand Down Expand Up @@ -368,12 +368,12 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra

let entries_refs: Vec<_> = entries.iter().collect();

apply_to_state_machine(self.storage.clone(), &entries_refs, self.config.max_applied_log_to_keep).await?;
apply_to_state_machine(&mut self.storage, &entries_refs, self.config.max_applied_log_to_keep).await?;

self.last_applied = Some(last_log_id);

self.report_metrics(Update::AsIs);
self.trigger_log_compaction_if_needed(false);
self.trigger_log_compaction_if_needed(false).await;

Ok(())
}
Expand Down
Loading

0 comments on commit a76f41a

Please sign in to comment.