Skip to content

Commit

Permalink
change: trait RaftStore: remove get_membership_config(), add last_mem…
Browse files Browse the repository at this point in the history
…bership_in_log() and get_membership() with default impl

Goal: minimize the work for users to implement a correct raft application.

Now RaftStorage provides default implementations for `get_membership()`
and `last_membership_in_log()`.

These two methods just can be implemented with other basic user impl
methods.

- fix: #59
  • Loading branch information
drmingdrmer committed Jan 4, 2022
1 parent 9b00e7c commit c9c8d89
Show file tree
Hide file tree
Showing 10 changed files with 211 additions and 106 deletions.
71 changes: 6 additions & 65 deletions memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use std::sync::Mutex;
use openraft::async_trait::async_trait;
use openraft::raft::Entry;
use openraft::raft::EntryPayload;
use openraft::raft::Membership;
use openraft::storage::HardState;
use openraft::storage::InitialState;
use openraft::storage::Snapshot;
Expand Down Expand Up @@ -157,74 +156,13 @@ impl RaftStorageDebug<MemStoreStateMachine> for MemStore {
}
}

impl MemStore {
fn find_first_membership_log<'a, T, D>(mut it: T) -> Option<EffectiveMembership>
where
T: 'a + Iterator<Item = &'a Entry<D>>,
D: AppData,
{
it.find_map(|entry| match &entry.payload {
EntryPayload::Membership(cfg) => Some(EffectiveMembership {
log_id: entry.log_id,
membership: cfg.clone(),
}),
_ => None,
})
}

/// Go backwards through the log to find the most recent membership config <= `upto_index`.
#[tracing::instrument(level = "trace", skip(self))]
pub async fn get_membership_from_log(&self, upto_index: Option<u64>) -> Result<EffectiveMembership, StorageError> {
let membership_in_log = {
let log = self.log.read().await;

let reversed_logs = log.values().rev();
match upto_index {
Some(upto) => {
let skipped = reversed_logs.skip_while(|entry| entry.log_id.index > upto);
Self::find_first_membership_log(skipped)
}
None => Self::find_first_membership_log(reversed_logs),
}
};

// Find membership stored in state machine.

let (_, membership_in_sm) = self.last_applied_state().await?;

let membership =
if membership_in_log.as_ref().map(|x| x.log_id.index) > membership_in_sm.as_ref().map(|x| x.log_id.index) {
membership_in_log
} else {
membership_in_sm
};

// Create a default one if both are None.

Ok(match membership {
Some(x) => x,
None => EffectiveMembership {
log_id: LogId { term: 0, index: 0 },
membership: Membership::new_initial(self.id),
},
})
}
}

#[async_trait]
impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
type SnapshotData = Cursor<Vec<u8>>;

#[tracing::instrument(level = "trace", skip(self))]
async fn get_membership_config(&self) -> Result<EffectiveMembership, StorageError> {
self.get_membership_from_log(None).await
}

#[tracing::instrument(level = "trace", skip(self))]
async fn get_initial_state(&self) -> Result<InitialState, StorageError> {
let membership = self.get_membership_config().await?;
let mut hs = self.hs.write().await;
match &mut *hs {
let hs = self.read_hard_state().await?;
match hs {
Some(inner) => {
// Search for two place and use the max one,
// because when a state machine is installed there could be logs
Expand All @@ -237,6 +175,9 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {

let last_log_id = max(last_in_log, last_applied);

let membership = self.get_membership().await?;
let membership = membership.unwrap_or_else(|| EffectiveMembership::new_initial(self.id));

Ok(InitialState {
last_log_id,
last_applied,
Expand All @@ -246,7 +187,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
}
None => {
let new = InitialState::new_initial(self.id);
*hs = Some(new.hard_state.clone());
self.save_hard_state(&new.hard_state).await?;
Ok(new)
}
}
Expand Down
116 changes: 104 additions & 12 deletions memstore/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::marker::PhantomData;

use async_trait::async_trait;
use maplit::btreeset;
use openraft::raft::Membership;
use openraft::DefensiveCheck;
use openraft::DefensiveError;
use openraft::StoreExt;
Expand Down Expand Up @@ -84,8 +85,10 @@ where
B: StoreBuilder<ClientRequest, ClientResponse, S>,
{
fn test_store(builder: &B) -> anyhow::Result<()> {
run_fut(Suite::get_membership_config_default(builder))?;
run_fut(Suite::get_membership_config_from_log_and_sm(builder))?;
run_fut(Suite::last_membership_in_log_initial(builder))?;
run_fut(Suite::last_membership_in_log(builder))?;
run_fut(Suite::get_membership_initial(builder))?;
run_fut(Suite::get_membership_from_log_and_sm(builder))?;
run_fut(Suite::get_initial_state_default(builder))?;
run_fut(Suite::get_initial_state_membership_from_log_and_sm(builder))?;
run_fut(Suite::get_initial_state_with_state(builder))?;
Expand All @@ -109,17 +112,101 @@ where
Ok(())
}

pub async fn get_membership_config_default(builder: &B) -> anyhow::Result<()> {
pub async fn last_membership_in_log_initial(builder: &B) -> anyhow::Result<()> {
let store = builder.build(NODE_ID).await;

let membership = store.get_membership_config().await?;
let membership = store.last_membership_in_log(0).await?;

assert_eq!(Membership::new_single(btreeset! {NODE_ID}), membership.membership,);
assert!(membership.is_none());

Ok(())
}

pub async fn get_membership_config_from_log_and_sm(builder: &B) -> anyhow::Result<()> {
pub async fn last_membership_in_log(builder: &B) -> anyhow::Result<()> {
let store = builder.build(NODE_ID).await;

tracing::info!("--- no log, do not read membership from state machine");
{
store
.apply_to_state_machine(&[
&Entry {
log_id: LogId { term: 1, index: 1 },
payload: EntryPayload::Blank,
},
&Entry {
log_id: LogId { term: 1, index: 2 },
payload: EntryPayload::Membership(Membership::new_single(btreeset! {3,4,5})),
},
])
.await?;

let mem = store.last_membership_in_log(0).await?;

assert!(mem.is_none());
}

tracing::info!("--- membership presents in log, smaller than last_applied, read from log");
{
store
.append_to_log(&[&Entry {
log_id: (1, 1).into(),
payload: EntryPayload::Membership(Membership::new_single(btreeset! {1,2,3})),
}])
.await?;

let mem = store.last_membership_in_log(0).await?;
let mem = mem.unwrap();
assert_eq!(Membership::new_single(btreeset! {1, 2, 3}), mem.membership,);

let mem = store.last_membership_in_log(1).await?;
let mem = mem.unwrap();
assert_eq!(Membership::new_single(btreeset! {1, 2, 3}), mem.membership,);

let mem = store.last_membership_in_log(2).await?;
assert!(mem.is_none());
}

tracing::info!("--- membership presents in log and > sm.last_applied, read from log");
{
store
.append_to_log(&[
&Entry {
log_id: LogId { term: 1, index: 3 },
payload: EntryPayload::Membership(Membership::new_single(btreeset! {7,8,9})),
},
&Entry {
log_id: LogId { term: 1, index: 4 },
payload: EntryPayload::Blank,
},
])
.await?;

let mem = store.last_membership_in_log(0).await?;
let mem = mem.unwrap();

assert_eq!(Membership::new_single(btreeset! {7,8,9},), mem.membership,);
}

tracing::info!("--- membership presents in log and > sm.last_applied, read from log but since_index is greater than the last");
{
let mem = store.last_membership_in_log(4).await?;
assert!(mem.is_none());
}

Ok(())
}

pub async fn get_membership_initial(builder: &B) -> anyhow::Result<()> {
let store = builder.build(NODE_ID).await;

let membership = store.get_membership().await?;

assert!(membership.is_none());

Ok(())
}

pub async fn get_membership_from_log_and_sm(builder: &B) -> anyhow::Result<()> {
let store = builder.build(NODE_ID).await;

tracing::info!("--- no log, read membership from state machine");
Expand All @@ -137,7 +224,8 @@ where
])
.await?;

let mem = store.get_membership_config().await?;
let mem = store.get_membership().await?;
let mem = mem.unwrap();

assert_eq!(Membership::new_single(btreeset! {3,4,5}), mem.membership,);
}
Expand All @@ -151,7 +239,9 @@ where
}])
.await?;

let mem = store.get_membership_config().await?;
let mem = store.get_membership().await?;

let mem = mem.unwrap();

assert_eq!(Membership::new_single(btreeset! {3, 4, 5}), mem.membership,);
}
Expand All @@ -161,13 +251,15 @@ where
store
.append_to_log(&[&Entry {
log_id: LogId { term: 1, index: 3 },
payload: EntryPayload::Membership(Membership::new_single(btreeset! {1,2,3})),
payload: EntryPayload::Membership(Membership::new_single(btreeset! {7,8,9})),
}])
.await?;

let mem = store.get_membership_config().await?;
let mem = store.get_membership().await?;

let mem = mem.unwrap();

assert_eq!(Membership::new_single(btreeset! {1,2,3},), mem.membership,);
assert_eq!(Membership::new_single(btreeset! {7,8,9},), mem.membership,);
}

Ok(())
Expand Down Expand Up @@ -933,7 +1025,7 @@ where
])
.await?;

let res = store.get_membership_config().await;
let res = store.get_membership().await;

let e = res.unwrap_err().into_defensive().unwrap();
assert!(matches!(e, DefensiveError {
Expand Down
2 changes: 2 additions & 0 deletions openraft/src/core/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
if self.core.effective_membership.membership.all_nodes().len() == 1 {
self.core.current_term += 1;
self.core.voted_for = Some(self.core.id);

// TODO(xp): it should always commit a initial log entry
self.core.set_target_state(State::Leader);
self.core.save_hard_state().await?;
} else {
Expand Down
10 changes: 9 additions & 1 deletion openraft/src/core/append_entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,15 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra

self.last_log_id = self.get_log_id(start - 1).await?;

let membership = self.storage.get_membership_config().await.map_err(|err| self.map_storage_error(err))?;
// TODO(xp): get_membership() should have a defensive check to ensure it always returns Some() if node is
// initialized. Because a node always commit a membership log as the first log entry.
let membership = self.storage.get_membership().await.map_err(|err| self.map_storage_error(err))?;

// TODO(xp): This is a dirty patch:
// When a node starts in a single-node mode, it does not append an initial log
// but instead depends on storage.get_membership() to return a default one.
// It would be better a node always append an initial log entry.
let membership = membership.unwrap_or_else(|| EffectiveMembership::new_initial(self.id));

self.update_membership(membership)?;

Expand Down
6 changes: 5 additions & 1 deletion openraft/src/core/install_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,13 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
}

// There could be unknown membership in the snapshot.
let membership = self.storage.get_membership_config().await.map_err(|err| self.map_storage_error(err))?;
let membership = self.storage.get_membership().await.map_err(|err| self.map_storage_error(err))?;
tracing::debug!("storage membership: {:?}", membership);

assert!(membership.is_some());

let membership = membership.unwrap();

self.update_membership(membership)?;

self.snapshot_last_log_id = self.last_applied;
Expand Down
11 changes: 10 additions & 1 deletion openraft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ pub struct EffectiveMembership {
pub membership: Membership,
}

impl EffectiveMembership {
pub fn new_initial(node_id: u64) -> Self {
EffectiveMembership {
log_id: LogId::new(0, 0),
membership: Membership::new_initial(node_id),
}
}
}

impl MessageSummary for EffectiveMembership {
fn summary(&self) -> String {
format!("{{log_id:{} membership:{}}}", self.log_id, self.membership.summary())
Expand Down Expand Up @@ -1018,7 +1027,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}

/// Run the learner loop.
#[tracing::instrument(level="debug", skip(self), fields(id=self.core.id, raft_state="non-voter"))]
#[tracing::instrument(level="debug", skip(self), fields(id=self.core.id, raft_state="learner"))]
pub(self) async fn run(mut self) -> RaftResult<()> {
self.core.report_metrics(Update::Update(None));
loop {
Expand Down
Loading

0 comments on commit c9c8d89

Please sign in to comment.