Skip to content

Commit

Permalink
Feature: define custom Entry type for raft log
Browse files Browse the repository at this point in the history
This commit introduces a new feature that allows applications to define
a custom type for Raft log entries in `RaftTypeConfig`. By setting `Entry =
MyEntry`, where `MyEntry` implements the `RaftEntry` trait, an application
can now define its own log entry type that reflects its architecture.
However, the default implementation, the `Entry` type is still available.

This change provides more flexibility for applications to tailor the
Raft log entry to their specific needs.

- Fix: #705

- Changes: `RaftStorage::append_to_log()` and `RaftStorage::apply_to_state_machine()` accepts slice of entries instead of slice of entry references.
  • Loading branch information
drmingdrmer committed Mar 20, 2023
1 parent 04a157d commit e39da9f
Show file tree
Hide file tree
Showing 43 changed files with 436 additions and 417 deletions.
2 changes: 1 addition & 1 deletion examples/raft-kv-memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub type ExampleNodeId = u64;

openraft::declare_raft_types!(
/// Declare the type configuration for example K/V store.
pub ExampleTypeConfig: D = ExampleRequest, R = ExampleResponse, NodeId = ExampleNodeId, Node = BasicNode
pub ExampleTypeConfig: D = ExampleRequest, R = ExampleResponse, NodeId = ExampleNodeId, Node = BasicNode, Entry = openraft::Entry<ExampleTypeConfig>
);

pub type ExampleRaft = Raft<ExampleTypeConfig, ExampleNetwork, Arc<ExampleStore>>;
Expand Down
7 changes: 2 additions & 5 deletions examples/raft-kv-memstore/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,7 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
}

#[tracing::instrument(level = "trace", skip(self, entries))]
async fn append_to_log(
&mut self,
entries: &[&Entry<ExampleTypeConfig>],
) -> Result<(), StorageError<ExampleNodeId>> {
async fn append_to_log(&mut self, entries: &[Entry<ExampleTypeConfig>]) -> Result<(), StorageError<ExampleNodeId>> {
let mut log = self.log.write().await;
for entry in entries {
log.insert(entry.log_id.index, (*entry).clone());
Expand Down Expand Up @@ -258,7 +255,7 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
#[tracing::instrument(level = "trace", skip(self, entries))]
async fn apply_to_state_machine(
&mut self,
entries: &[&Entry<ExampleTypeConfig>],
entries: &[Entry<ExampleTypeConfig>],
) -> Result<Vec<ExampleResponse>, StorageError<ExampleNodeId>> {
let mut res = Vec::with_capacity(entries.len());

Expand Down
2 changes: 1 addition & 1 deletion examples/raft-kv-rocksdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl Display for ExampleNode {

openraft::declare_raft_types!(
/// Declare the type configuration for example K/V store.
pub ExampleTypeConfig: D = ExampleRequest, R = ExampleResponse, NodeId = ExampleNodeId, Node = ExampleNode
pub ExampleTypeConfig: D = ExampleRequest, R = ExampleResponse, NodeId = ExampleNodeId, Node = ExampleNode, Entry = openraft::Entry<ExampleTypeConfig>
);

pub type ExampleRaft = Raft<ExampleTypeConfig, ExampleNetwork, Arc<ExampleStore>>;
Expand Down
4 changes: 2 additions & 2 deletions examples/raft-kv-rocksdb/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
}

#[tracing::instrument(level = "trace", skip(self, entries))]
async fn append_to_log(&mut self, entries: &[&Entry<ExampleTypeConfig>]) -> StorageResult<()> {
async fn append_to_log(&mut self, entries: &[Entry<ExampleTypeConfig>]) -> StorageResult<()> {
for entry in entries {
let id = id_to_bin(entry.log_id.index);
assert_eq!(bin_to_id(&id), entry.log_id.index);
Expand Down Expand Up @@ -495,7 +495,7 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
#[tracing::instrument(level = "trace", skip(self, entries))]
async fn apply_to_state_machine(
&mut self,
entries: &[&Entry<ExampleTypeConfig>],
entries: &[Entry<ExampleTypeConfig>],
) -> Result<Vec<ExampleResponse>, StorageError<ExampleNodeId>> {
let mut res = Vec::with_capacity(entries.len());

Expand Down
6 changes: 3 additions & 3 deletions memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ pub type MemNodeId = u64;

openraft::declare_raft_types!(
/// Declare the type configuration for `MemStore`.
pub Config: D = ClientRequest, R = ClientResponse, NodeId = MemNodeId, Node = ()
pub Config: D = ClientRequest, R = ClientResponse, NodeId = MemNodeId, Node = (), Entry = openraft::Entry<Config>
);

/// The application snapshot type which the `MemStore` works with.
Expand Down Expand Up @@ -306,7 +306,7 @@ impl RaftStorage<Config> for Arc<MemStore> {
}

#[tracing::instrument(level = "trace", skip(self, entries))]
async fn append_to_log(&mut self, entries: &[&Entry<Config>]) -> Result<(), StorageError<MemNodeId>> {
async fn append_to_log(&mut self, entries: &[Entry<Config>]) -> Result<(), StorageError<MemNodeId>> {
let mut log = self.log.write().await;
for entry in entries {
log.insert(entry.log_id.index, (*entry).clone());
Expand All @@ -317,7 +317,7 @@ impl RaftStorage<Config> for Arc<MemStore> {
#[tracing::instrument(level = "trace", skip(self, entries))]
async fn apply_to_state_machine(
&mut self,
entries: &[&Entry<Config>],
entries: &[Entry<Config>],
) -> Result<Vec<ClientResponse>, StorageError<MemNodeId>> {
let mut res = Vec::with_capacity(entries.len());

Expand Down
31 changes: 17 additions & 14 deletions openraft/src/compat/compat07.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,8 @@ pub mod testing {

use crate::compat;
use crate::compat::compat07;
use crate::entry::RaftPayload;
use crate::log_id::RaftLogId;

/// Build a v0.7 `RaftStorage` implementation for compatibility test.
#[async_trait::async_trait]
Expand Down Expand Up @@ -401,14 +403,14 @@ pub mod testing {
];

assert_eq!(3, got.len());
assert_eq!(want[0].log_id, got[0].log_id);
assert_eq!(want[1].log_id, got[1].log_id);
assert_eq!(want[2].log_id, got[2].log_id);
assert_eq!(want[0].log_id, *got[0].get_log_id());
assert_eq!(want[1].log_id, *got[1].get_log_id());
assert_eq!(want[2].log_id, *got[2].get_log_id());

assert!(matches!(got[0].payload, crate::EntryPayload::Blank));
if let crate::EntryPayload::Membership(m) = &got[1].payload {
assert!(got[0].is_blank());
if let Some(m) = got[1].get_membership() {
assert_eq!(
&crate::Membership::new(
&crate::Membership::<u64, crate::EmptyNode>::new(
vec![btreeset! {1,2}],
btreemap! {1=> crate::EmptyNode{}, 2=>crate::EmptyNode{}},
),
Expand All @@ -418,10 +420,11 @@ pub mod testing {
unreachable!("expect Membership");
}

let s = serde_json::to_string(&got[2].payload)?;
let want = serde_json::to_string(&crate::EntryPayload::<BLatest::C>::Normal(
self.builder_latest.sample_app_data(),
))?;
let s = serde_json::to_string(&got[2])?;
let want = serde_json::to_string(&crate::Entry::<BLatest::C> {
log_id: crate::LogId::new(crate::CommittedLeaderId::new(1, 0), 7),
payload: crate::EntryPayload::Normal(self.builder_latest.sample_app_data()),
})?;
assert_eq!(want, s);
}

Expand Down Expand Up @@ -464,11 +467,11 @@ pub mod testing {
}];

assert_eq!(1, got.len());
assert_eq!(want[0].log_id, got[0].log_id);
assert_eq!(want[0].log_id, *got[0].get_log_id());

if let crate::EntryPayload::Membership(m) = &got[0].payload {
if let Some(m) = got[0].get_membership() {
assert_eq!(
&crate::Membership::new(
&crate::Membership::<u64, crate::EmptyNode>::new(
vec![btreeset! {1,2}],
btreemap! {1=> crate::EmptyNode{}, 2=>crate::EmptyNode{}},
),
Expand Down Expand Up @@ -672,7 +675,7 @@ mod tests {
}

crate::declare_raft_types!(
pub TestingConfig: D = u64, R = u64, NodeId = u64, Node = crate::EmptyNode
pub TestingConfig: D = u64, R = u64, NodeId = u64, Node = crate::EmptyNode, Entry = crate::Entry<TestingConfig>
);

#[test]
Expand Down
5 changes: 2 additions & 3 deletions openraft/src/core/install_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::error::SnapshotMismatch;
use crate::raft::InstallSnapshotRequest;
use crate::raft::InstallSnapshotResponse;
use crate::raft::InstallSnapshotTx;
use crate::Entry;
use crate::ErrorSubject;
use crate::ErrorVerb;
use crate::MessageSummary;
Expand Down Expand Up @@ -36,7 +35,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
tracing::debug!(req = display(req.summary()));

let res = self.engine.vote_handler().handle_message_vote(&req.vote);
self.run_engine_commands::<Entry<C>>(&[]).await?;
self.run_engine_commands(&[]).await?;
if res.is_err() {
tracing::info!(
my_vote = display(self.engine.state.vote_ref()),
Expand Down Expand Up @@ -178,7 +177,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
self.received_snapshot.insert(meta.snapshot_id.clone(), snapshot_data);

self.engine.following_handler().install_snapshot(meta);
self.run_engine_commands::<Entry<C>>(&[]).await?;
self.run_engine_commands(&[]).await?;

Ok(())
}
Expand Down
Loading

0 comments on commit e39da9f

Please sign in to comment.