Skip to content

Commit

Permalink
change: RaftStorage add 2 API: last_id_in_log() and last_applied_stat…
Browse files Browse the repository at this point in the history
…e(), remove get_last_log_id()
  • Loading branch information
drmingdrmer committed Sep 15, 2021
1 parent 74283fd commit 112252b
Show file tree
Hide file tree
Showing 13 changed files with 131 additions and 194 deletions.
24 changes: 7 additions & 17 deletions async-raft/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,24 +161,14 @@ where
/// It does not return an error if in defensive mode and the log entry at `log_index` is not found.
async fn try_get_log_entry(&self, log_index: u64) -> Result<Option<Entry<D>>, StorageError>;

/// Returns the last known log id.
/// It could be the id of the last entry in log, or the last applied id that is saved in state machine.
/// Returns the last log id in log.
///
/// When there is no log or state machine, it returns (0,0)
///
/// Caveat: an impl must hold the log-state-machine consistency or must deal with the inconsistency when accessing
/// it:
///
/// I.e.: if `logs.last().log_id.index > last_applied.index`, `logs.last().log_id > last_applied` must hold. E.g.,
/// `logs.last() == {term:1, index:2}` and `last_applied == {term:2, index:1}` is inconsistent:
///
/// Log `{term:1, index:2}` can not be committed and should definitely be removed. The simplest way to achieve
/// consistency is to remove such inconsistent logs after a successful `append_entries` or `install_snapshot`
/// request.
///
/// TODO(xp) test it
/// TODO(xp) defensive test about consistency
async fn get_last_log_id(&self) -> Result<LogId, StorageError>;
/// The impl should not consider the applied log id in state machine.
async fn last_id_in_log(&self) -> Result<LogId, StorageError>;

/// Returns the last applied log id which is recorded in state machine, and the last applied membership log id and
/// membership config.
async fn last_applied_state(&self) -> Result<(LogId, Option<(LogId, MembershipConfig)>), StorageError>;

/// Delete all logs in a `range`.
///
Expand Down
2 changes: 1 addition & 1 deletion async-raft/tests/client_writes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ async fn client_writes() -> Result<()> {
members_after_consensus: None,
})),
)
.await;
.await?;

Ok(())
}
4 changes: 2 additions & 2 deletions async-raft/tests/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ async fn compaction() -> Result<()> {
members_after_consensus: None,
})),
)
.await;
.await?;

// Add a new node and assert that it received the same snapshot.
let sto1 = router.new_store(1).await;
Expand Down Expand Up @@ -120,7 +120,7 @@ async fn compaction() -> Result<()> {
LogId { term: 1, index: n_logs },
expected_snap,
)
.await;
.await?;

Ok(())
}
10 changes: 6 additions & 4 deletions async-raft/tests/fixtures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ impl RaftRouter {
expect_voted_for: Option<u64>,
expect_sm_last_applied_log: LogId,
expect_snapshot: Option<(ValueTest<u64>, u64, MembershipConfig)>,
) {
) -> anyhow::Result<()> {
let rt = self.routing_table.read().await;
for (id, (_node, storage)) in rt.iter() {
let last_log_id = storage.last_log_id().await;
Expand Down Expand Up @@ -645,14 +645,16 @@ impl RaftRouter {
);
}

let sm = storage.get_state_machine().await;
let (last_applied, _) = storage.last_applied_state().await?;

assert_eq!(
&sm.last_applied_log, &expect_sm_last_applied_log,
&last_applied, &expect_sm_last_applied_log,
"expected node {} to have state machine last_applied_log {}, got {}",
id, expect_sm_last_applied_log, sm.last_applied_log
id, expect_sm_last_applied_log, last_applied
);
}

Ok(())
}
}

Expand Down
8 changes: 4 additions & 4 deletions async-raft/tests/initialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use anyhow::Result;
use async_raft::raft::EntryPayload;
use async_raft::raft::MembershipConfig;
use async_raft::Config;
use async_raft::LogId;
use async_raft::RaftStorage;
use async_raft::RaftStorageDebug;
use async_raft::State;
use fixtures::RaftRouter;
use maplit::btreeset;
Expand Down Expand Up @@ -65,12 +65,12 @@ async fn initialization() -> Result<()> {
};
assert_eq!(btreeset![0, 1, 2], mem.members);

let sm_mem = sto.get_state_machine().await.last_membership.clone();
let sm_mem = sto.last_applied_state().await?.1;
assert_eq!(
Some(MembershipConfig {
Some((LogId { term: 1, index: 1 }, MembershipConfig {
members: btreeset![0, 1, 2],
members_after_consensus: None,
}),
})),
sm_mem
);
}
Expand Down
2 changes: 1 addition & 1 deletion async-raft/tests/singlenode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async fn singlenode() -> Result<()> {
// Write some data to the single node cluster.
router.client_request_many(0, "0", 1000).await;
router.assert_stable_cluster(Some(1), Some(1001)).await;
router.assert_storage_state(1, 1001, Some(0), LogId { term: 1, index: 1001 }, None).await;
router.assert_storage_state(1, 1001, Some(0), LogId { term: 1, index: 1001 }, None).await?;

// Read some data from the single node cluster.
router.client_read(0).await?;
Expand Down
4 changes: 2 additions & 2 deletions async-raft/tests/snapshot_chunk_size.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ async fn snapshot_chunk_size() -> Result<()> {

router.wait_for_log(&btreeset![0], want, None, "send log to trigger snapshot").await?;
router.wait_for_snapshot(&btreeset![0], LogId { term: 1, index: want }, None, "snapshot").await?;
router.assert_storage_state(1, want, Some(0), LogId { term: 1, index: want }, want_snap).await;
router.assert_storage_state(1, want, Some(0), LogId { term: 1, index: want }, want_snap).await?;
}

tracing::info!("--- add non-voter to receive snapshot and logs");
Expand All @@ -89,7 +89,7 @@ async fn snapshot_chunk_size() -> Result<()> {
LogId { term: 1, index: want },
want_snap,
)
.await;
.await?;
}

Ok(())
Expand Down
4 changes: 2 additions & 2 deletions async-raft/tests/snapshot_ge_half_threshold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ async fn snapshot_ge_half_threshold() -> Result<()> {
members_after_consensus: None,
})),
)
.await;
.await?;
}

tracing::info!("--- send logs to make distance between snapshot index and last_log_index");
Expand Down Expand Up @@ -106,7 +106,7 @@ async fn snapshot_ge_half_threshold() -> Result<()> {
LogId { term: 1, index: want },
expected_snap,
)
.await;
.await?;
}

Ok(())
Expand Down
4 changes: 2 additions & 2 deletions async-raft/tests/snapshot_overrides_membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ async fn snapshot_overrides_membership() -> Result<()> {
members_after_consensus: None,
})),
)
.await;
.await?;
}

tracing::info!("--- create non-voter");
Expand Down Expand Up @@ -146,7 +146,7 @@ async fn snapshot_overrides_membership() -> Result<()> {
LogId { term: 1, index: want },
expected_snap,
)
.await;
.await?;

let m = sto.get_membership_config().await?;
assert_eq!(
Expand Down
18 changes: 9 additions & 9 deletions async-raft/tests/state_machien_apply_membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use std::sync::Arc;
use anyhow::Result;
use async_raft::raft::MembershipConfig;
use async_raft::Config;
use async_raft::RaftStorageDebug;
use async_raft::LogId;
use async_raft::RaftStorage;
use async_raft::State;
use fixtures::RaftRouter;
use futures::stream::StreamExt;
Expand Down Expand Up @@ -48,13 +49,12 @@ async fn state_machine_apply_membership() -> Result<()> {

for i in 0..=0 {
let sto = router.get_storage_handle(&i).await?;
let sm = sto.get_state_machine().await;
assert_eq!(
Some(MembershipConfig {
Some((LogId { term: 1, index: 1 }, MembershipConfig {
members: btreeset![0],
members_after_consensus: None,
}),
sm.last_membership
})),
sto.last_applied_state().await?.1
);
}

Expand Down Expand Up @@ -83,13 +83,13 @@ async fn state_machine_apply_membership() -> Result<()> {
tracing::info!("--- check applied membership config");
for i in 0..5 {
let sto = router.get_storage_handle(&i).await?;
let sm = sto.get_state_machine().await;
let (_, last_membership) = sto.last_applied_state().await?;
assert_eq!(
Some(MembershipConfig {
Some((LogId { term: 1, index: 3 }, MembershipConfig {
members: btreeset![0, 1, 2],
members_after_consensus: None,
}),
sm.last_membership
})),
last_membership
);
}

Expand Down
2 changes: 1 addition & 1 deletion async-raft/tests/stepdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ async fn stepdown() -> Result<()> {
router.assert_stable_cluster(Some(metrics.current_term), Some(want)).await;
router
.assert_storage_state(metrics.current_term, want, None, LogId { term: 2, index: 4 }, None)
.await;
.await?;
// ----------------------------------- ^^^ this is `0` instead of `4` because blank payloads from new leaders
// and config change entries are never applied to the state machine.

Expand Down
89 changes: 29 additions & 60 deletions memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ pub struct MemStoreSnapshot {
pub struct MemStoreStateMachine {
pub last_applied_log: LogId,

pub last_membership: Option<MembershipConfig>,
pub last_membership: Option<(LogId, MembershipConfig)>,

/// A mapping of client IDs to their state info.
pub client_serial_responses: HashMap<String, (u64, Option<String>)>,
Expand Down Expand Up @@ -302,29 +302,6 @@ impl MemStore {
std::cmp::max(log_last_id, sm_last_id)
}

pub async fn defensive_consistent_log_sm(&self) -> Result<(), DefensiveError> {
let log_last_id = {
let log_last = self.log.read().await;
log_last.iter().last().map(|(_k, v)| v.log_id).unwrap_or_default()
};

let sm_last_id = self.sm.read().await.last_applied_log;

if (log_last_id.index == sm_last_id.index && log_last_id != sm_last_id)
|| (log_last_id.index > sm_last_id.index && log_last_id < sm_last_id)
{
return Err(DefensiveError::new(
ErrorSubject::Log(log_last_id),
Violation::DirtyLog {
higher_index_log_id: log_last_id,
lower_index_log_id: sm_last_id,
},
));
}

Ok(())
}

pub async fn defensive_apply_index_is_last_applied_plus_one<D: AppData>(
&self,
entries: &[&Entry<D>],
Expand Down Expand Up @@ -512,7 +489,7 @@ impl MemStore {
pub async fn get_membership_from_log(&self, upto_index: Option<u64>) -> Result<MembershipConfig, StorageError> {
self.defensive_no_dirty_log().await?;

let membership = {
let membership_in_log = {
let log = self.log.read().await;

let reversed_logs = log.values().rev();
Expand All @@ -527,26 +504,19 @@ impl MemStore {

// Find membership stored in state machine.

let (sm_mem, last_applied) = {
let sm = self.sm.read().await;
(sm.last_membership.clone(), sm.last_applied_log)
};
let (_, membership_in_sm) = self.last_applied_state().await?;

let membership = match membership {
None => sm_mem,
Some((id, log_mem)) => {
if id < last_applied {
sm_mem
} else {
Some(log_mem)
}
}
};
let membership =
if membership_in_log.as_ref().map(|(id, _)| id.index) > membership_in_sm.as_ref().map(|(id, _)| id.index) {
membership_in_log
} else {
membership_in_sm
};

// Otherwise, create a default one.
// Create a default one if both are None.

Ok(match membership {
Some(cfg) => cfg,
Some((_id, cfg)) => cfg,
None => MembershipConfig::new_initial(self.id),
})
}
Expand All @@ -573,27 +543,22 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {

let membership = self.get_membership_config().await?;
let mut hs = self.hs.write().await;
let log = self.log.read().await;
let sm = self.sm.read().await;
match &mut *hs {
Some(inner) => {
// Search for two place and use the max one,
// because when a state machine is installed there could be logs
// included in the state machine that are not cleaned:
// - the last log id
// - the last_applied log id in state machine.
// TODO(xp): add test for RaftStore to ensure it looks for two places.

let last = log.values().rev().next();
let last = last.map(|x| x.log_id);
let last_in_log = last.unwrap_or_default();
let last_applied_log = sm.last_applied_log;
let last_in_log = self.last_id_in_log().await?;
let (last_applied, _) = self.last_applied_state().await?;

let last_log_id = max(last_in_log, last_applied_log);
let last_log_id = max(last_in_log, last_applied);

Ok(InitialState {
last_log_id,
last_applied: last_applied_log,
last_applied,
hard_state: inner.clone(),
membership,
})
Expand Down Expand Up @@ -639,15 +604,15 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
Ok(log.get(&log_index).cloned())
}

#[tracing::instrument(level = "trace", skip(self))]
async fn get_last_log_id(&self) -> Result<LogId, StorageError> {
self.defensive_consistent_log_sm().await?;

// TODO: log id must consistent:
let log_last_id = self.log.read().await.iter().last().map(|(_k, v)| v.log_id).unwrap_or_default();
let last_applied_id = self.sm.read().await.last_applied_log;
async fn last_id_in_log(&self) -> Result<LogId, StorageError> {
let log = self.log.read().await;
let last = log.iter().last().map(|(_, ent)| ent.log_id).unwrap_or_default();
Ok(last)
}

Ok(max(log_last_id, last_applied_id))
async fn last_applied_state(&self) -> Result<(LogId, Option<(LogId, MembershipConfig)>), StorageError> {
let sm = self.sm.read().await;
Ok((sm.last_applied_log, sm.last_membership.clone()))
}

#[tracing::instrument(level = "trace", skip(self, range), fields(range=?range))]
Expand Down Expand Up @@ -714,7 +679,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
res.push(ClientResponse(previous));
}
EntryPayload::ConfigChange(ref mem) => {
sm.last_membership = Some(mem.membership.clone());
sm.last_membership = Some((entry.log_id, mem.membership.clone()));
res.push(ClientResponse(None))
}
};
Expand All @@ -734,7 +699,11 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
.map_err(|e| StorageIOError::new(ErrorSubject::StateMachine, ErrorVerb::Read, e.into()))?;

last_applied_log = sm.last_applied_log;
membership_config = sm.last_membership.clone().unwrap_or_else(|| MembershipConfig::new_initial(self.id));
membership_config = sm
.last_membership
.clone()
.map(|(_id, mem)| mem)
.unwrap_or_else(|| MembershipConfig::new_initial(self.id));
}

let snapshot_size = data.len();
Expand Down
Loading

0 comments on commit 112252b

Please sign in to comment.