Skip to content

Commit

Permalink
change: CurrentSnapshotData: merge term and index into included.
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Jul 9, 2021
1 parent 933e0b3 commit 85859d0
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 21 deletions.
6 changes: 3 additions & 3 deletions async-raft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
if let Some(snapshot) =
self.storage.get_current_snapshot().await.map_err(|err| self.map_fatal_storage_error(err))?
{
self.snapshot_index = snapshot.index;
self.snapshot_index = snapshot.included.index;
}

let has_log = self.last_log_index != u64::min_value();
Expand Down Expand Up @@ -452,8 +452,8 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
match res {
Ok(res) => match res {
Ok(snapshot) => {
let _ = tx_compaction.try_send(SnapshotUpdate::SnapshotComplete(snapshot.index));
let _ = chan_tx.send(snapshot.index); // This will always succeed.
let _ = tx_compaction.try_send(SnapshotUpdate::SnapshotComplete(snapshot.included.index));
let _ = chan_tx.send(snapshot.included.index); // This will always succeed.
}
Err(err) => {
tracing::error!({error=%err}, "error while generating snapshot");
Expand Down
2 changes: 1 addition & 1 deletion async-raft/src/core/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
if let Some(snapshot) = current_snapshot_opt {
// If snapshot exists, ensure its distance from the leader's last log index is <= half
// of the configured snapshot threshold, else create a new snapshot.
if snapshot_is_within_half_of_threshold(&snapshot.index, &self.core.last_log_index, &threshold) {
if snapshot_is_within_half_of_threshold(&snapshot.included.index, &self.core.last_log_index, &threshold) {
let _ = tx.send(snapshot);
return Ok(());
}
Expand Down
6 changes: 3 additions & 3 deletions async-raft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -832,8 +832,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
async fn stream_snapshot(&mut self, mut snapshot: CurrentSnapshotData<S::Snapshot>) -> RaftResult<()> {
let snapshot_id = snapshot.snapshot_id.clone();
let mut offset = 0;
self.core.next_index = snapshot.index + 1;
self.core.matched = (snapshot.term, snapshot.index).into();
self.core.next_index = snapshot.included.index + 1;
self.core.matched = snapshot.included;
let mut buf = Vec::with_capacity(self.core.config.snapshot_max_chunk_size as usize);
loop {
// Build the RPC.
Expand All @@ -844,7 +844,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
term: self.core.term,
leader_id: self.core.id,
snapshot_id: snapshot_id.clone(),
last_included: (snapshot.term, snapshot.index).into(),
last_included: snapshot.included,
offset,
data: Vec::from(&buf[..nread]),
done,
Expand Down
8 changes: 4 additions & 4 deletions async-raft/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@ use crate::raft::MembershipConfig;
use crate::raft_types::SnapshotId;
use crate::AppData;
use crate::AppDataResponse;
use crate::LogId;
use crate::NodeId;

/// The data associated with the current snapshot.
pub struct CurrentSnapshotData<S>
where S: AsyncRead + AsyncSeek + Send + Unpin + 'static
{
/// The snapshot entry's term.
pub term: u64,
/// The snapshot entry's index.
pub index: u64,
// Log entries upto which this snapshot includes, inclusive.
pub included: LogId,

/// The latest membership configuration covered by the snapshot.
pub membership: MembershipConfig,

Expand Down
12 changes: 6 additions & 6 deletions async-raft/tests/fixtures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,22 +485,22 @@ impl RaftRouter {
.unwrap_or_else(|| panic!("no snapshot present for node {}", id));
match index_test {
ValueTest::Exact(index) => assert_eq!(
&snap.index, index,
&snap.included.index, index,
"expected node {} to have snapshot with index {}, got {}",
id, index, snap.index
id, index, snap.included.index
),
ValueTest::Range(range) => assert!(
range.contains(&snap.index),
range.contains(&snap.included.index),
"expected node {} to have snapshot within range {:?}, got {}",
id,
range,
snap.index
snap.included.index
),
}
assert_eq!(
&snap.term, term,
&snap.included.term, term,
"expected node {} to have snapshot with term {}, got {}",
id, term, snap.term
id, term, snap.included.term
);
assert_eq!(
&snap.membership, cfg,
Expand Down
6 changes: 2 additions & 4 deletions memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {

tracing::trace!({ snapshot_size = snapshot_bytes.len() }, "log compaction complete");
Ok(CurrentSnapshotData {
term,
index: last_applied_log,
included: (term, last_applied_log).into(),
membership: membership_config.clone(),
snapshot_id,
snapshot: Box::new(Cursor::new(snapshot_bytes)),
Expand Down Expand Up @@ -423,8 +422,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
Some(snapshot) => {
let reader = serde_json::to_vec(&snapshot)?;
Ok(Some(CurrentSnapshotData {
index: snapshot.index,
term: snapshot.term,
included: (snapshot.term, snapshot.index).into(),
membership: snapshot.membership.clone(),
snapshot_id: snapshot.snapshot_id.clone(),
snapshot: Box::new(Cursor::new(reader)),
Expand Down

0 comments on commit 85859d0

Please sign in to comment.