Skip to content

Commit

Permalink
Feature: add Raft::purge_log()
Browse files Browse the repository at this point in the history
This method allows users to purge logs when required.
It initiates the log purge up to and including the given `upto` log index.

Logs that are not included in a snapshot will **NOT** be purged.
In such scenario it will delete as many log as possible.
The `max_in_snapshot_log_to_keep` config is not taken into account when
purging logs.

Openraft won't purge logs at once, e.g. it may be delayed by several
seconds, because if it is a leader and a replication task has been
replicating the logs to a follower, the logs can't be purged until the
replication task is finished.

- Fix: #852
  • Loading branch information
drmingdrmer committed May 25, 2023
1 parent 0223477 commit 9e7195a
Show file tree
Hide file tree
Showing 8 changed files with 343 additions and 10 deletions.
3 changes: 3 additions & 0 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1129,6 +1129,9 @@ where
self.send_heartbeat("ExternalCommand");
}
ExternalCommand::Snapshot => self.trigger_snapshot(),
ExternalCommand::PurgeLog { upto } => {
self.engine.trigger_purge_log(upto);
}
}
}
};
Expand Down
65 changes: 63 additions & 2 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::time::Duration;
use tokio::time::Instant;

use crate::core::ServerState;
use crate::display_ext::DisplayOptionExt;
use crate::display_ext::DisplaySlice;
use crate::engine::engine_config::EngineConfig;
use crate::engine::handler::following_handler::FollowingHandler;
Expand Down Expand Up @@ -39,6 +40,7 @@ use crate::raft_state::RaftState;
use crate::summary::MessageSummary;
use crate::validate::Valid;
use crate::LogId;
use crate::LogIdOptionExt;
use crate::Membership;
use crate::RaftLogId;
use crate::RaftTypeConfig;
Expand Down Expand Up @@ -526,7 +528,7 @@ where C: RaftTypeConfig
/// - and a snapshot smaller than last-committed is not allowed to be installed.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn finish_building_snapshot(&mut self, meta: SnapshotMeta<C::NodeId, C::Node>) {
tracing::info!("finish_building_snapshot: {:?}", meta);
tracing::info!(snapshot_meta = display(&meta), "{}", func_name!());

self.state.io_state_mut().set_building_snapshot(false);

Expand All @@ -537,7 +539,24 @@ where C: RaftTypeConfig
return;
}

self.log_handler().update_purge_upto();
self.log_handler().schedule_policy_based_purge();
self.try_purge_log();
}

/// Try to purge logs up to the expected position.
///
/// If the node is a leader, it will only purge logs when no replication tasks are using them.
/// Otherwise, it will retry purging the logs the next time replication has made progress.
///
/// If the node is a follower or learner, it will always purge the logs immediately since no
/// other tasks are using the logs.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn try_purge_log(&mut self) {
tracing::debug!(
purge_upto = display(self.state.purge_upto().display()),
"{}",
func_name!()
);

if self.internal_server_state.is_leading() {
// If it is leading, it must not delete a log that is in use by a replication task.
Expand All @@ -547,6 +566,48 @@ where C: RaftTypeConfig
self.log_handler().purge_log();
}
}

/// This is a to user API that triggers log purging upto `index`, inclusive.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn trigger_purge_log(&mut self, mut index: u64) {
tracing::info!(index = display(index), "{}", func_name!());

let snapshot_last_log_id = self.state.snapshot_last_log_id();
let snapshot_last_log_id = if let Some(x) = snapshot_last_log_id {
*x
} else {
tracing::info!("no snapshot, can not purge");
return;
};

let scheduled = self.state.purge_upto();

if index < scheduled.next_index() {
tracing::info!(
"no update, already scheduled: {}; index: {}",
scheduled.display(),
index,
);
return;
}

if index > snapshot_last_log_id.index {
tracing::info!(
"can not purge logs not in a snapshot; index: {}, last in snapshot log id: {}",
index,
snapshot_last_log_id
);
index = snapshot_last_log_id.index;
}

// Safe unwrap: `index` is ensured to be present in the above code.
let log_id = self.state.get_log_id(index).unwrap();

tracing::info!(purge_upto = display(log_id), "{}", func_name!());

self.log_handler().update_purge_upto(log_id);
self.try_purge_log();
}
}

/// Supporting util
Expand Down
13 changes: 9 additions & 4 deletions openraft/src/engine/handler/log_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,19 @@ where C: RaftTypeConfig
/// This method is called after building a snapshot, because openraft only purge logs that are
/// already included in snapshot.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn update_purge_upto(&mut self) {
pub(crate) fn schedule_policy_based_purge(&mut self) {
if let Some(purge_upto) = self.calc_purge_upto() {
debug_assert!(self.state.purge_upto() <= Some(&purge_upto));

self.state.purge_upto = Some(purge_upto);
self.update_purge_upto(purge_upto);
}
}

/// Update the log id it expect to purge up to. It won't trigger purge immediately.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn update_purge_upto(&mut self, purge_upto: LogId<C::NodeId>) {
debug_assert!(self.state.purge_upto() <= Some(&purge_upto));
self.state.purge_upto = Some(purge_upto);
}

/// Calculate the log id up to which to purge, inclusive.
///
/// Only log included in snapshot will be purged.
Expand Down
1 change: 1 addition & 0 deletions openraft/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ mod tests {
mod initialize_test;
mod log_id_list_test;
mod startup_test;
mod trigger_purge_log_test;
}
#[cfg(test)] mod testing;

Expand Down
125 changes: 125 additions & 0 deletions openraft/src/engine/tests/trigger_purge_log_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
use std::ops::Deref;

use maplit::btreeset;
use pretty_assertions::assert_eq;

use crate::engine::testing::UTConfig;
use crate::engine::Command;
use crate::engine::Engine;
use crate::engine::LogIdList;
use crate::progress::Progress;
use crate::testing::log_id;
use crate::CommittedLeaderId;
use crate::EffectiveMembership;
use crate::LogId;
use crate::Membership;
use crate::MembershipState;
use crate::SnapshotMeta;
use crate::StoredMembership;

fn m12() -> Membership<u64, ()> {
Membership::<u64, ()>::new(vec![btreeset! {1,2}], None)
}

fn eng() -> Engine<UTConfig> {
let mut eng = Engine::default();
eng.state.enable_validate = false; // Disable validation for incomplete state
eng.state.membership_state = MembershipState::new(
EffectiveMembership::new_arc(Some(log_id(1, 0, 1)), m12()),
EffectiveMembership::new_arc(Some(log_id(1, 0, 1)), m12()),
);

eng.state.log_ids = LogIdList::new([LogId::new(CommittedLeaderId::new(0, 0), 0)]);
eng
}

#[test]
fn test_trigger_purge_log_no_snapshot() -> anyhow::Result<()> {
let mut eng = eng();

eng.trigger_purge_log(1);

assert_eq!(None, eng.state.purge_upto, "no snapshot, can not purge");

assert_eq!(0, eng.output.take_commands().len());

Ok(())
}

#[test]
fn test_trigger_purge_log_already_scheduled() -> anyhow::Result<()> {
let mut eng = eng();
eng.state.snapshot_meta = SnapshotMeta {
last_log_id: Some(log_id(1, 0, 3)),
last_membership: StoredMembership::new(Some(log_id(1, 0, 1)), m12()),
snapshot_id: "1".to_string(),
};
eng.state.purge_upto = Some(log_id(1, 0, 2));
eng.state.io_state.purged = Some(log_id(1, 0, 2));

eng.trigger_purge_log(2);

assert_eq!(Some(log_id(1, 0, 2)), eng.state.purge_upto, "already purged, no update");

assert_eq!(0, eng.output.take_commands().len());

Ok(())
}

#[test]
fn test_trigger_purge_log_delete_only_in_snapshot_logs() -> anyhow::Result<()> {
let mut eng = eng();
eng.state.snapshot_meta = SnapshotMeta {
last_log_id: Some(log_id(1, 0, 3)),
last_membership: StoredMembership::new(Some(log_id(1, 0, 1)), m12()),
snapshot_id: "1".to_string(),
};
eng.state.purge_upto = Some(log_id(1, 0, 2));
eng.state.io_state.purged = Some(log_id(1, 0, 2));
eng.state.log_ids = LogIdList::new([log_id(1, 0, 2), log_id(1, 0, 10)]);

eng.trigger_purge_log(5);

assert_eq!(
Some(log_id(1, 0, 3)),
eng.state.purge_upto,
"delete only in snapshot logs"
);

assert_eq!(
vec![Command::PurgeLog { upto: log_id(1, 0, 3) },],
eng.output.take_commands()
);

Ok(())
}

#[test]
fn test_trigger_purge_log_in_used_wont_be_delete() -> anyhow::Result<()> {
let mut eng = eng();
eng.state.snapshot_meta = SnapshotMeta {
last_log_id: Some(log_id(1, 0, 3)),
last_membership: StoredMembership::new(Some(log_id(1, 0, 1)), m12()),
snapshot_id: "1".to_string(),
};
eng.state.purge_upto = Some(log_id(1, 0, 2));
eng.state.io_state.purged = Some(log_id(1, 0, 2));
eng.state.log_ids = LogIdList::new([log_id(1, 0, 2), log_id(1, 0, 10)]);

// Make it a leader and mark the logs are in flight.
eng.vote_handler().become_leading();
let l = eng.internal_server_state.leading_mut().unwrap();
let _ = l.progress.get_mut(&2).unwrap().next_send(eng.state.deref(), 10).unwrap();

eng.trigger_purge_log(5);

assert_eq!(
Some(log_id(1, 0, 3)),
eng.state.purge_upto,
"delete only in snapshot logs"
);

assert_eq!(0, eng.output.take_commands().len(), "in used log wont be deleted");

Ok(())
}
53 changes: 50 additions & 3 deletions openraft/src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,27 +334,46 @@ where

/// Trigger election at once and return at once.
///
/// Returns error when RaftCore has Fatal error, e.g. shut down or having storage error.
/// Returns error when RaftCore has [`Fatal`] error, e.g. shut down or having storage error.
/// It is not affected by `Raft::enable_elect(false)`.
pub async fn trigger_elect(&self) -> Result<(), Fatal<C::NodeId>> {
self.send_external_command(ExternalCommand::Elect, "trigger_elect").await
}

/// Trigger a heartbeat at once and return at once.
///
/// Returns error when RaftCore has Fatal error, e.g. shut down or having storage error.
/// Returns error when RaftCore has [`Fatal`] error, e.g. shut down or having storage error.
/// It is not affected by `Raft::enable_heartbeat(false)`.
pub async fn trigger_heartbeat(&self) -> Result<(), Fatal<C::NodeId>> {
self.send_external_command(ExternalCommand::Heartbeat, "trigger_heartbeat").await
}

/// Trigger to build a snapshot at once and return at once.
///
/// Returns error when RaftCore has Fatal error, e.g. shut down or having storage error.
/// Returns error when RaftCore has [`Fatal`] error, e.g. shut down or having storage error.
pub async fn trigger_snapshot(&self) -> Result<(), Fatal<C::NodeId>> {
self.send_external_command(ExternalCommand::Snapshot, "trigger_snapshot").await
}

/// Initiate the log purge up to and including the given `upto` log index.
///
/// Logs that are not included in a snapshot will **NOT** be purged.
/// In such scenario it will delete as many log as possible.
/// The [`max_in_snapshot_log_to_keep`] config is not taken into account
/// when purging logs.
///
/// It returns error only when RaftCore has [`Fatal`] error, e.g. shut down or having storage
/// error.
///
/// Openraft won't purge logs at once, e.g. it may be delayed by several seconds, because if it
/// is a leader and a replication task has been replicating the logs to a follower, the logs
/// can't be purged until the replication task is finished.
///
/// [`max_in_snapshot_log_to_keep`]: `crate::Config::max_in_snapshot_log_to_keep`
pub async fn purge_log(&self, upto: u64) -> Result<(), Fatal<C::NodeId>> {
self.send_external_command(ExternalCommand::PurgeLog { upto }, "purge_log").await
}

async fn send_external_command(
&self,
cmd: ExternalCommand,
Expand Down Expand Up @@ -977,10 +996,38 @@ where
pub(crate) enum ExternalCommand {
/// Trigger an election at once.
Elect,

/// Emit a heartbeat message, only if the node is leader.
Heartbeat,

/// Trigger to build a snapshot on this node.
Snapshot,

/// Purge logs that are already in a snapshot.
///
/// Openraft will respect the [`max_in_snapshot_log_to_keep`] when purging.
///
/// [`max_in_snapshot_log_to_keep`]: `crate::Config::max_in_snapshot_log_to_keep`
PurgeLog { upto: u64 },
}

impl fmt::Display for ExternalCommand {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ExternalCommand::Elect => {
write!(f, "{:?}", self)
}
ExternalCommand::Heartbeat => {
write!(f, "{:?}", self)
}
ExternalCommand::Snapshot => {
write!(f, "{:?}", self)
}
ExternalCommand::PurgeLog { upto } => {
write!(f, "PurgeLog[..={}]", upto)
}
}
}
}

/// An RPC sent by a cluster leader to replicate log entries (§5.3), and as a heartbeat (§5.2).
Expand Down
3 changes: 2 additions & 1 deletion tests/tests/client_api/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
mod fixtures;

// The number indicate the preferred running order for these case.
// The later tests may depend on the earlier ones.
// See ./README.md

mod t10_client_writes;
mod t11_client_reads;
mod t12_trigger_purge_log;
mod t50_lagging_network_write;
Loading

0 comments on commit 9e7195a

Please sign in to comment.