Skip to content

Commit

Permalink
change: to get last_log and membership, Storage should search for bot…
Browse files Browse the repository at this point in the history
…h logs and state machines.

Why:

depending on the impl, a RaftStore may have logs that are included in
the state machine still present.
This may be caused by a non-transactional impl of the store, e.g.
installing snapshot and removing logs are not atomic.

Thus when searching for last_log or last membership, a RaftStore should
search for both logs and state machine, and returns the greater one
that is found.

- Test: add test to prove these behaviors,
  which includes: `get_initial_state()` and `get_membership()`.

- Refactor: Make store tests a suite that could be applied to other
  impl.
  • Loading branch information
drmingdrmer committed Aug 26, 2021
1 parent 6f3cc68 commit 79a3997
Show file tree
Hide file tree
Showing 4 changed files with 648 additions and 336 deletions.
2 changes: 1 addition & 1 deletion async-raft/src/raft_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use serde::Serialize;

/// The identity of a raft log.
/// A term and an index identifies an log globally.
#[derive(Debug, Default, Copy, Clone, PartialOrd, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Default, Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Serialize, Deserialize)]
pub struct LogId {
pub term: u64,
pub index: u64,
Expand Down
3 changes: 3 additions & 0 deletions memstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ tokio = { version="1.0", default-features=false, features=["sync"] }
tracing = "0.1.17"
tracing-futures = "0.2.4"

[dev-dependencies]
maplit = "1.0.2"

[features]
docinclude = [] # Used only for activating `doc(include="...")` on nightly.

Expand Down
37 changes: 29 additions & 8 deletions memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#[cfg(test)]
mod test;

use std::cmp::max;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::io::Cursor;
Expand Down Expand Up @@ -153,13 +154,13 @@ impl RaftStorageDebug<MemStoreStateMachine> for MemStore {
}

impl MemStore {
fn find_first_membership_log<'a, T, D>(mut it: T) -> Option<MembershipConfig>
fn find_first_membership_log<'a, T, D>(mut it: T) -> Option<(LogId, MembershipConfig)>
where
T: 'a + Iterator<Item = &'a Entry<D>>,
D: AppData,
{
it.find_map(|entry| match &entry.payload {
EntryPayload::ConfigChange(cfg) => Some(cfg.membership.clone()),
EntryPayload::ConfigChange(cfg) => Some((entry.log_id,cfg.membership.clone())),
_ => None,
})
}
Expand All @@ -182,9 +183,20 @@ impl MemStore {

// Find membership stored in state machine.

let (sm_mem, last_applied) = {
let sm = self.sm.read().await;
(sm.last_membership.clone(), sm.last_applied_log)
};

let membership = match membership {
None => self.sm.read().await.last_membership.clone(),
Some(x) => Some(x),
None => sm_mem,
Some((id, log_mem)) => {
if id < last_applied {
sm_mem
} else {
Some(log_mem)
}
}
};

// Otherwise, create a default one.
Expand Down Expand Up @@ -214,11 +226,20 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
let sm = self.sm.read().await;
match &mut *hs {
Some(inner) => {
let last_log_id = match log.values().rev().next() {
Some(log) => log.log_id,
None => (0, 0).into(),
};
// Search for two place and use the max one,
// because when a state machine is installed there could be logs
// included in the state machine that are not cleaned:
// - the last log id
// - the last_applied log id in state machine.
// TODO(xp): add test for RaftStore to ensure it looks for two places.

let last = log.values().rev().next();
let last = last.map(|x| x.log_id);
let last_in_log = last.unwrap_or(LogId::default());
let last_applied_log = sm.last_applied_log;

let last_log_id = max(last_in_log, last_applied_log);

Ok(InitialState {
last_log_id,
last_applied_log,
Expand Down
Loading

0 comments on commit 79a3997

Please sign in to comment.