Skip to content

Commit

Permalink
Feature: add RaftMetrics::purged to report the last purged log id
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed May 1, 2023
1 parent 1235ee4 commit f0dc0eb
Show file tree
Hide file tree
Showing 11 changed files with 120 additions and 7 deletions.
8 changes: 7 additions & 1 deletion memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ pub struct MemStoreStateMachine {
#[derive(PartialOrd, Ord)]
pub enum BlockOperation {
BuildSnapshot,
PurgeLog,
}

/// An in-memory storage system implementing the `RaftStorage` trait.
Expand Down Expand Up @@ -331,7 +332,12 @@ impl RaftStorage<TypeConfig> for Arc<MemStore> {

#[tracing::instrument(level = "debug", skip_all)]
async fn purge_logs_upto(&mut self, log_id: LogId<MemNodeId>) -> Result<(), StorageError<MemNodeId>> {
tracing::debug!("purge_log_upto: [{:?}, +oo)", log_id);
tracing::debug!("purge_log_upto: {:?}", log_id);

if let Some(d) = self.get_blocking(&BlockOperation::PurgeLog) {
tracing::info!(?d, "block purging log");
tokio::time::sleep(d).await;
}

{
let mut ld = self.last_purged_log_id.write().await;
Expand Down
6 changes: 5 additions & 1 deletion openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,7 @@ where
last_log_index: self.engine.state.last_log_id().index(),
last_applied: self.engine.state.io_applied().copied(),
snapshot: self.engine.state.snapshot_meta.last_log_id,
purged: self.engine.state.io_purged().copied(),

// --- cluster ---
state: self.engine.state.server_state,
Expand Down Expand Up @@ -1520,7 +1521,10 @@ where
Command::SaveVote { vote } => {
self.log_store.save_vote(&vote).await?;
}
Command::PurgeLog { upto } => self.log_store.purge(upto).await?,
Command::PurgeLog { upto } => {
self.log_store.purge(upto).await?;
self.engine.state.io_state_mut().update_purged(Some(upto));
}
Command::DeleteConflictLog { since } => {
self.log_store.truncate(since).await?;
}
Expand Down
16 changes: 13 additions & 3 deletions openraft/src/metrics/raft_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ where
/// If there is no snapshot, it is (0,0).
pub snapshot: Option<LogId<NID>>,

/// The last log id that has purged from storage, inclusive.
///
/// `purged` is also the first log id Openraft knows, although the corresponding log entry has
/// already been deleted.
pub purged: Option<LogId<NID>>,

// ---
// --- cluster ---
// ---
Expand All @@ -64,7 +70,7 @@ where
{
// TODO: make this more readable
fn summary(&self) -> String {
format!("Metrics{{id:{},{:?}, term:{}, last_log:{:?}, last_applied:{:?}, leader:{:?}, membership:{}, snapshot:{:?}, replication:{{{}}}",
format!("Metrics{{id:{},{:?}, term:{}, last_log:{:?}, last_applied:{:?}, leader:{:?}, membership:{}, snapshot:{:?}, purged:{}, replication:{{{}}}",
self.id,
self.state,
self.current_term,
Expand All @@ -73,6 +79,7 @@ where
self.current_leader,
self.membership_config.summary(),
self.snapshot,
self.purged.summary(),
self.replication.as_ref().map(|x| {
x.iter().map(|(k, v)| format!("{}:{}", k, v.summary())).collect::<Vec<_>>().join(",")
}).unwrap_or_default(),
Expand All @@ -89,13 +96,16 @@ where
Self {
running_state: Ok(()),
id,
state: ServerState::Follower,

current_term: 0,
last_log_index: None,
last_applied: None,
snapshot: None,
purged: None,

state: ServerState::Follower,
current_leader: None,
membership_config: Arc::new(StoredMembership::default()),
snapshot: None,
replication: None,
}
}
Expand Down
2 changes: 2 additions & 0 deletions openraft/src/metrics/wait_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ where
current_term: 0,
last_log_index: None,
last_applied: None,
purged: None,

current_leader: None,
membership_config: Arc::new(StoredMembership::new(None, Membership::new(vec![btreeset! {}], None))),

Expand Down
4 changes: 4 additions & 0 deletions openraft/src/progress/entry/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ impl LogStateReader<u64> for LogState {
todo!()
}

fn io_purged(&self) -> Option<&LogId<u64>> {
todo!()
}

fn snapshot_last_log_id(&self) -> Option<&LogId<u64>> {
self.snap_last.as_ref()
}
Expand Down
19 changes: 18 additions & 1 deletion openraft/src/raft_state/io_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,25 @@ pub(crate) struct IOState<NID: NodeId> {

/// The last log id that has been flushed to storage.
pub(crate) flushed: LogIOId<NID>,

/// The last log id that has been applied to state machine.
pub(crate) applied: Option<LogId<NID>>,

/// The last log id that has been purged from storage.
///
/// [`RaftState::last_purged_log_id`](`crate::raft_state::RaftState::last_purged_log_id`)
/// is just the log id that is going to be purged, i.e., there is a `PurgeLog` command queued to
/// be executed, and it may not be the actually purged log id.
pub(crate) purged: Option<LogId<NID>>,
}

impl<NID: NodeId> IOState<NID> {
pub(crate) fn new(flushed: LogIOId<NID>, applied: Option<LogId<NID>>) -> Self {
pub(crate) fn new(flushed: LogIOId<NID>, applied: Option<LogId<NID>>, purged: Option<LogId<NID>>) -> Self {
Self {
building_snapshot: false,
flushed,
applied,
purged,
}
}
pub(crate) fn update_applied(&mut self, log_id: Option<LogId<NID>>) {
Expand Down Expand Up @@ -62,4 +71,12 @@ impl<NID: NodeId> IOState<NID> {
pub(crate) fn building_snapshot(&self) -> bool {
self.building_snapshot
}

pub(crate) fn update_purged(&mut self, log_id: Option<LogId<NID>>) {
self.purged = log_id;
}

pub(crate) fn purged(&self) -> Option<&LogId<NID>> {
self.purged.as_ref()
}
}
5 changes: 5 additions & 0 deletions openraft/src/raft_state/log_state_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ pub(crate) trait LogStateReader<NID: NodeId> {
/// This is actually happened io-state which might fall behind committed log id.
fn io_applied(&self) -> Option<&LogId<NID>>;

/// The last known purged log id, inclusive.
///
/// This is actually purged log id from storage.
fn io_purged(&self) -> Option<&LogId<NID>>;

/// Return the last log id the snapshot includes.
fn snapshot_last_log_id(&self) -> Option<&LogId<NID>>;

Expand Down
4 changes: 4 additions & 0 deletions openraft/src/raft_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ where
self.io_state.applied()
}

fn io_purged(&self) -> Option<&LogId<NID>> {
self.io_state.purged()
}

fn snapshot_last_log_id(&self) -> Option<&LogId<NID>> {
self.snapshot_meta.last_log_id.as_ref()
}
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/storage/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ where
let log_ids = LogIdList::load_log_ids(last_purged_log_id, last_log_id, self.log_store).await?;

// TODO: `flushed` is not set.
let io_state = IOState::new(LogIOId::default(), last_applied);
let io_state = IOState::new(LogIOId::default(), last_applied, last_purged_log_id);

let snapshot = self.state_machine.get_current_snapshot().await?;

Expand Down
1 change: 1 addition & 0 deletions tests/tests/metrics/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mod fixtures;
// The later tests may depend on the earlier ones.

mod t10_current_leader;
mod t10_purged;
mod t20_metrics_state_machine_consistency;
mod t30_leader_metrics;
mod t40_metrics_wait;
60 changes: 60 additions & 0 deletions tests/tests/metrics/t10_purged.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use maplit::btreeset;
use openraft::testing::log_id;
use openraft::Config;
use openraft::RaftLogReader;

use crate::fixtures::init_default_ut_tracing;
use crate::fixtures::RaftRouter;

/// Metric `purged` should be the last purged log id.
#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn metrics_purged() -> Result<()> {
let config = Arc::new(
Config {
enable_heartbeat: false,
max_in_snapshot_log_to_keep: 0,
purge_batch_size: 1,
..Default::default()
}
.validate()?,
);

let mut router = RaftRouter::new(config.clone());

tracing::info!("--- initialize cluster");
let mut log_index = router.new_cluster(btreeset! {0,1,2}, btreeset! {}).await?;

let n = 10;
tracing::info!(log_index, "--- write {} logs", n);
log_index += router.client_request_many(0, "foo", n).await?;

tracing::info!(log_index, "--- trigger snapshot");
{
let n0 = router.get_raft_handle(&0)?;
n0.trigger_snapshot().await?;
n0.wait(timeout()).snapshot(log_id(1, 0, log_index), "build snapshot").await?;

tracing::info!(log_index, "--- metrics reports purged log id");
n0.wait(timeout())
.metrics(
|m| m.purged == Some(log_id(1, 0, log_index)),
"purged is reported to metrics",
)
.await?;

tracing::info!(log_index, "--- check storage at once to ensure purged log is removed");
let (mut sto0, _sm0) = router.get_storage_handle(&0)?;
let state = sto0.get_log_state().await?;
assert_eq!(state.last_purged_log_id, Some(log_id(1, 0, log_index)));
}

Ok(())
}

fn timeout() -> Option<Duration> {
Some(Duration::from_millis(1_000))
}

0 comments on commit f0dc0eb

Please sign in to comment.