Skip to content

Commit

Permalink
Change: SnapshotMeta.last_log_id from LogId to Option of LogId
Browse files Browse the repository at this point in the history
`SnapshotMeta.last_log_id` should be the same type as
`StateMachine.last_applied`.

By making `SnapshotMeta.last_log_id` an Option of LogId, a snapshot can
be build on an empty state-machine(in which `last_applied` is None).
  • Loading branch information
drmingdrmer committed Aug 17, 2022
1 parent 56ded06 commit 70e3318
Show file tree
Hide file tree
Showing 13 changed files with 78 additions and 91 deletions.
16 changes: 5 additions & 11 deletions examples/raft-kv-memstore/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,23 +147,17 @@ impl RaftSnapshotBuilder<ExampleTypeConfig, Cursor<Vec<u8>>> for Arc<ExampleStor
last_membership = state_machine.last_membership.clone();
}

let last_applied_log = match last_applied_log {
None => {
panic!("can not compact empty state machine");
}
Some(x) => x,
};

let snapshot_idx = {
let mut l = self.snapshot_idx.lock().unwrap();
*l += 1;
*l
};

let snapshot_id = format!(
"{}-{}-{}",
last_applied_log.leader_id, last_applied_log.index, snapshot_idx
);
let snapshot_id = if let Some(last) = last_applied_log {
format!("{}-{}-{}", last.leader_id, last.index, snapshot_idx)
} else {
format!("--{}", snapshot_idx)
};

let meta = SnapshotMeta {
last_log_id: last_applied_log,
Expand Down
16 changes: 5 additions & 11 deletions examples/raft-kv-rocksdb/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,21 +380,15 @@ impl RaftSnapshotBuilder<ExampleTypeConfig, Cursor<Vec<u8>>> for Arc<ExampleStor
last_membership = state_machine.last_membership.clone();
}

let last_applied_log = match last_applied_log {
None => {
panic!("can not compact empty state machine");
}
Some(x) => x,
};

// TODO: we probably want thius to be atomic.
let snapshot_idx: u64 = self.get_snapshot_index_()? + 1;
self.set_snapshot_indesx_(snapshot_idx)?;

let snapshot_id = format!(
"{}-{}-{}",
last_applied_log.leader_id, last_applied_log.index, snapshot_idx
);
let snapshot_id = if let Some(last) = last_applied_log {
format!("{}-{}-{}", last.leader_id, last.index, snapshot_idx)
} else {
format!("--{}", snapshot_idx)
};

let meta = SnapshotMeta {
last_log_id: last_applied_log,
Expand Down
16 changes: 5 additions & 11 deletions memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,6 @@ impl RaftSnapshotBuilder<Config, Cursor<Vec<u8>>> for Arc<MemStore> {
last_membership = sm.last_membership.clone();
}

let last_applied_log = match last_applied_log {
None => {
panic!("can not compact empty state machine");
}
Some(x) => x,
};

let snapshot_size = data.len();

let snapshot_idx = {
Expand All @@ -217,10 +210,11 @@ impl RaftSnapshotBuilder<Config, Cursor<Vec<u8>>> for Arc<MemStore> {
*l
};

let snapshot_id = format!(
"{}-{}-{}",
last_applied_log.leader_id, last_applied_log.index, snapshot_idx
);
let snapshot_id = if let Some(last) = last_applied_log {
format!("{}-{}-{}", last.leader_id, last.index, snapshot_idx)
} else {
format!("--{}", snapshot_idx)
};

let meta = SnapshotMeta {
last_log_id: last_applied_log,
Expand Down
40 changes: 22 additions & 18 deletions openraft/src/core/install_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,19 +228,21 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
// Unlike normal append-entries RPC, if conflicting logs are found, it is not **necessary** to delete them.
// See: [Snapshot-replication](https://datafuselabs.github.io/openraft/replication.html#snapshot-replication)
{
let local = self.storage.try_get_log_entry(snap_last_log_id.index).await?;

if let Some(local_log) = local {
if local_log.log_id != snap_last_log_id {
tracing::info!(
local_log_id = display(&local_log.log_id),
snap_last_log_id = display(&snap_last_log_id),
"found conflict log id, when installing snapshot"
);
if let Some(last) = snap_last_log_id {
let local = self.storage.try_get_log_entry(last.index).await?;

if let Some(local_log) = local {
if local_log.log_id != last {
tracing::info!(
local_log_id = display(&local_log.log_id),
snap_last_log_id = display(&last),
"found conflict log id, when installing snapshot"
);
}

self.engine.truncate_logs(last.index);
self.run_engine_commands::<Entry<C>>(&[]).await?;
}

self.engine.truncate_logs(snap_last_log_id.index);
self.run_engine_commands::<Entry<C>>(&[]).await?;
}
}

Expand All @@ -251,17 +253,19 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,

let last_applied = changes.last_applied;

if st.committed < Some(last_applied) {
st.committed = Some(last_applied);
if st.committed < last_applied {
st.committed = last_applied;
}

debug_assert!(st.last_purged_log_id() <= Some(last_applied));
debug_assert!(st.last_purged_log_id() <= last_applied);

// A local log that is <= last_applied may be inconsistent with the leader.
// It has to purge all of them to prevent these log form being replicated, when this node becomes leader.
self.engine.snapshot_last_log_id = Some(last_applied); // update and make last applied log removable
self.engine.purge_log(last_applied);
self.run_engine_commands::<Entry<C>>(&[]).await?;
self.engine.snapshot_last_log_id = last_applied; // update and make last applied log removable
if let Some(last) = last_applied {
self.engine.purge_log(last);
self.run_engine_commands::<Entry<C>>(&[]).await?;
}

self.engine.update_committed_membership(req.meta.last_membership);
self.run_engine_commands::<Entry<C>>(&[]).await?;
Expand Down
14 changes: 7 additions & 7 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ pub struct RaftCore<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<
pub(crate) leader_data: Option<LeaderData<C>>,

/// The node's current snapshot state.
pub(crate) snapshot_state: Option<SnapshotState<S::SnapshotData>>,
pub(crate) snapshot_state: Option<SnapshotState<C, S::SnapshotData>>,

/// The time to elect if a follower does not receive any append-entry message.
pub(crate) next_election_time: VoteWiseTime<C::NodeId>,
Expand Down Expand Up @@ -252,7 +252,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,

// Fetch the most recent snapshot in the system.
if let Some(snapshot) = self.storage.get_current_snapshot().await? {
self.engine.snapshot_last_log_id = Some(snapshot.meta.last_log_id);
self.engine.snapshot_last_log_id = snapshot.meta.last_log_id;
self.engine.metrics_flags.set_data_changed();
}

Expand Down Expand Up @@ -836,7 +836,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
#[tracing::instrument(level = "trace", skip(self))]
pub(crate) fn update_snapshot_state(&mut self, update: SnapshotUpdate<C::NodeId>) {
if let SnapshotUpdate::SnapshotComplete(log_id) = update {
self.engine.snapshot_last_log_id = Some(log_id);
self.engine.snapshot_last_log_id = log_id;
self.engine.metrics_flags.set_data_changed();
}
// If snapshot state is anything other than streaming, then drop it.
Expand Down Expand Up @@ -894,7 +894,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
update: SnapshotUpdate::SnapshotComplete(snapshot.meta.last_log_id),
});
// This will always succeed.
let _ = chan_tx.send(snapshot.meta.last_log_id.index);
let _ = chan_tx.send(snapshot.meta.last_log_id);
}
Err(err) => {
tracing::error!({error=%err}, "error while generating snapshot");
Expand Down Expand Up @@ -1514,16 +1514,16 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
let current_snapshot_opt = self.storage.get_current_snapshot().await?;

if let Some(snapshot) = current_snapshot_opt {
if let Some(must_inc) = must_include {
if snapshot.meta.last_log_id >= must_inc {
if must_include.is_some() {
if snapshot.meta.last_log_id >= must_include {
let _ = tx.send(snapshot);
return Ok(());
}
} else {
// If snapshot exists, ensure its distance from the leader's last log index is <= half
// of the configured snapshot threshold, else create a new snapshot.
if snapshot_is_within_half_of_threshold(
&snapshot.meta.last_log_id.index,
&snapshot.meta.last_log_id.unwrap_or_default().index,
&self.engine.state.last_log_id().unwrap_or_default().index,
&threshold,
) {
Expand Down
9 changes: 5 additions & 4 deletions openraft/src/core/snapshot_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ use tokio::sync::broadcast;

use crate::LogId;
use crate::NodeId;
use crate::RaftTypeConfig;

/// The current snapshot state of the Raft node.
pub(crate) enum SnapshotState<S> {
pub(crate) enum SnapshotState<C: RaftTypeConfig, SD> {
/// The Raft node is compacting itself.
Snapshotting {
/// A handle to abort the compaction process early if needed.
handle: AbortHandle,
/// A sender for notifying any other tasks of the completion of this compaction.
sender: broadcast::Sender<u64>,
sender: broadcast::Sender<Option<LogId<C::NodeId>>>,
},
/// The Raft node is streaming in a snapshot from the leader.
Streaming {
Expand All @@ -20,15 +21,15 @@ pub(crate) enum SnapshotState<S> {
/// The ID of the snapshot being written.
id: String,
/// A handle to the snapshot writer.
snapshot: Box<S>,
snapshot: Box<SD>,
},
}

/// An update on a snapshot creation process.
#[derive(Debug, Clone)]
pub(crate) enum SnapshotUpdate<NID: NodeId> {
/// Snapshot creation has finished successfully and covers the given index.
SnapshotComplete(LogId<NID>),
SnapshotComplete(Option<LogId<NID>>),

/// Snapshot creation failed.
SnapshotFailed,
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/raft_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,6 @@ impl MetricsChangeFlags {
/// E.g. when applying a log to state machine, or installing a state machine from snapshot.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct StateMachineChanges<C: RaftTypeConfig> {
pub last_applied: LogId<C::NodeId>,
pub last_applied: Option<LogId<C::NodeId>>,
pub is_snapshot: bool,
}
4 changes: 2 additions & 2 deletions openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -779,12 +779,12 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
// If we just sent the final chunk of the snapshot, then transition to lagging state.
if done {
tracing::debug!(
"done install snapshot: snapshot last_log_id: {}, matched: {:?}",
"done install snapshot: snapshot last_log_id: {:?}, matched: {:?}",
snapshot.meta.last_log_id,
self.matched,
);

self.update_matched(Some(snapshot.meta.last_log_id));
self.update_matched(snapshot.meta.last_log_id);

return Ok(());
}
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ where
N: Node,
{
/// Log entries upto which this snapshot includes, inclusive.
pub last_log_id: LogId<NID>,
pub last_log_id: Option<LogId<NID>>,

/// The last applied membership config.
pub last_membership: EffectiveMembership<NID, N>,
Expand All @@ -49,7 +49,7 @@ where
{
pub fn signature(&self) -> SnapshotSignature<NID> {
SnapshotSignature {
last_log_id: Some(self.last_log_id),
last_log_id: self.last_log_id,
last_membership_log_id: self.last_membership.log_id,
snapshot_id: self.snapshot_id.clone(),
}
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/testing/suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1054,7 +1054,7 @@ where
let mut b = store.get_snapshot_builder().await;
let snap = b.build_snapshot().await?;
let meta = snap.meta;
assert_eq!(log_id(0, 0), meta.last_log_id);
assert_eq!(Some(log_id(0, 0)), meta.last_log_id);
assert_eq!(Some(log_id(0, 0)), meta.last_membership.log_id);
assert_eq!(
Membership::new(vec![btreeset! {1,2}], None),
Expand All @@ -1078,7 +1078,7 @@ where
let mut b = store.get_snapshot_builder().await;
let snap = b.build_snapshot().await?;
let meta = snap.meta;
assert_eq!(log_id(2, 2), meta.last_log_id);
assert_eq!(Some(log_id(2, 2)), meta.last_log_id);
assert_eq!(Some(log_id(2, 2)), meta.last_membership.log_id);
assert_eq!(
Membership::new(vec![btreeset! {3,4}], None),
Expand Down
24 changes: 15 additions & 9 deletions openraft/tests/fixtures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -853,23 +853,29 @@ where

match index_test {
ValueTest::Exact(index) => assert_eq!(
&snap.meta.last_log_id.index, index,
"expected node {} to have snapshot with index {}, got {}",
id, index, snap.meta.last_log_id.index
snap.meta.last_log_id.index(),
Some(*index),
"expected node {} to have snapshot with index {}, got {:?}",
id,
index,
snap.meta.last_log_id
),
ValueTest::Range(range) => assert!(
range.contains(&snap.meta.last_log_id.index),
"expected node {} to have snapshot within range {:?}, got {}",
range.contains(&snap.meta.last_log_id.index().unwrap_or_default()),
"expected node {} to have snapshot within range {:?}, got {:?}",
id,
range,
snap.meta.last_log_id.index
snap.meta.last_log_id
),
}

assert_eq!(
&snap.meta.last_log_id.leader_id.term, term,
"expected node {} to have snapshot with term {}, got {}",
id, term, snap.meta.last_log_id.leader_id.term
&snap.meta.last_log_id.unwrap_or_default().leader_id.term,
term,
"expected node {} to have snapshot with term {}, got {:?}",
id,
term,
snap.meta.last_log_id
);
}

Expand Down
4 changes: 2 additions & 2 deletions openraft/tests/snapshot/t20_api_install_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ async fn snapshot_arguments() -> Result<()> {
vote: Vote::new_committed(1, 0),
meta: SnapshotMeta {
snapshot_id: "ss1".into(),
last_log_id: LogId {
last_log_id: Some(LogId {
leader_id: LeaderId::new(1, 0),
index: 0,
},
}),
last_membership: Default::default(),
},
offset: 0,
Expand Down
16 changes: 5 additions & 11 deletions rocksstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,21 +378,15 @@ impl RaftSnapshotBuilder<Config, Cursor<Vec<u8>>> for Arc<RocksStore> {
last_membership = state_machine.last_membership;
}

let last_applied_log = match last_applied_log {
None => {
panic!("can not compact empty state machine");
}
Some(x) => x,
};

// TODO: we probably want thius to be atomic.
let snapshot_idx: u64 = self.get_snapshot_index_()? + 1;
self.set_snapshot_indesx_(snapshot_idx)?;

let snapshot_id = format!(
"{}-{}-{}",
last_applied_log.leader_id, last_applied_log.index, snapshot_idx
);
let snapshot_id = if let Some(last) = last_applied_log {
format!("{}-{}-{}", last.leader_id, last.index, snapshot_idx)
} else {
format!("--{}", snapshot_idx)
};

let meta = SnapshotMeta {
last_log_id: last_applied_log,
Expand Down

0 comments on commit 70e3318

Please sign in to comment.