Skip to content

Commit

Permalink
Request snapshot (#243)
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Shen <[email protected]>
  • Loading branch information
overvenus authored Jul 1, 2019
1 parent e501361 commit fe10576
Show file tree
Hide file tree
Showing 13 changed files with 778 additions and 219 deletions.
2 changes: 1 addition & 1 deletion examples/single_mem_node/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::time::{Duration, Instant};
use raft::prelude::*;
use raft::storage::MemStorage;

type ProposeCallback = Box<Fn() + Send>;
type ProposeCallback = Box<dyn Fn() + Send>;

enum Msg {
Propose {
Expand Down
2 changes: 2 additions & 0 deletions generate-proto.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ with open("src/eraftpb.rs") as reader:
src = reader.read()
res = re.sub('::protobuf::rt::read_proto3_enum_with_unknown_fields_into\(([^,]+), ([^,]+), &mut ([^,]+), [^\)]+\)\?', 'if \\\\1 == ::protobuf::wire_format::WireTypeVarint {\\\\3 = \\\\2.read_enum()?;} else { return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type)); }', src)
res = re.sub('#!\[allow\(clippy\)\]', '#![allow(clippy::all)]', res)
res = re.sub('#!\[allow\(unused_results\)\]', '#![allow(unused_results, bare_trait_objects)]', res)
with open("src/eraftpb.rs", "w") as writer:
writer.write(res)
Expand Down
1 change: 1 addition & 0 deletions proto/eraftpb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ message Message {
repeated Entry entries = 7;
uint64 commit = 8;
Snapshot snapshot = 9;
uint64 request_snapshot = 13;
bool reject = 10;
uint64 reject_hint = 11;
bytes context = 12;
Expand Down
418 changes: 235 additions & 183 deletions src/eraftpb.rs

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ quick_error! {
NotExists(id: u64, set: &'static str) {
display("The node {} is not in the {} set.", id, set)
}
/// The request snapshot is dropped.
RequestSnapshotDropped {
description("raft: request snapshot dropped")
}
}
}

Expand All @@ -76,6 +80,7 @@ impl cmp::PartialEq for Error {
(&Error::Io(ref e1), &Error::Io(ref e2)) => e1.kind() == e2.kind(),
(&Error::StepLocalMsg, &Error::StepLocalMsg) => true,
(&Error::ConfigInvalid(ref e1), &Error::ConfigInvalid(ref e2)) => e1 == e2,
(&Error::RequestSnapshotDropped, &Error::RequestSnapshotDropped) => true,
_ => false,
}
}
Expand All @@ -102,7 +107,7 @@ quick_error! {
description("snapshot is temporarily unavailable")
}
/// Some other error occurred.
Other(err: Box<error::Error + Sync + Send>) {
Other(err: Box<dyn error::Error + Sync + Send>) {
from()
cause(err.as_ref())
description(err.description())
Expand Down
41 changes: 31 additions & 10 deletions src/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

use errors::Error;
use fxhash::FxHashMap;
use raft::INVALID_INDEX;
use std::cmp;
use std::collections::hash_map::HashMap;

Expand Down Expand Up @@ -189,6 +190,10 @@ pub struct Progress {
/// this Progress will be paused. raft will not resend snapshot until the pending one
/// is reported to be failed.
pub pending_snapshot: u64,
/// This field is used in request snapshot.
/// If there is a pending request snapshot, this will be set to the request
/// index of the snapshot.
pub pending_request_snapshot: u64,

/// This is true if the progress is recently active. Receiving any messages
/// from the corresponding follower indicates the progress is active.
Expand Down Expand Up @@ -248,8 +253,8 @@ impl Progress {
self.pending_snapshot = 0;
}

/// Unsets pendingSnapshot if Match is equal or higher than
/// the pendingSnapshot
/// Unsets pending_snapshot if matched is equal or higher than
/// the pending_snapshot and the snapshot is not requested.
pub fn maybe_snapshot_abort(&self) -> bool {
self.state == ProgressState::Snapshot && self.matched >= self.pending_snapshot
}
Expand Down Expand Up @@ -278,25 +283,41 @@ impl Progress {
/// Returns false if the given index comes from an out of order message.
/// Otherwise it decreases the progress next index to min(rejected, last)
/// and returns true.
pub fn maybe_decr_to(&mut self, rejected: u64, last: u64) -> bool {
pub fn maybe_decr_to(&mut self, rejected: u64, last: u64, request_snapshot: u64) -> bool {
if self.state == ProgressState::Replicate {
// the rejection must be stale if the progress has matched and "rejected"
// is smaller than "match".
if rejected <= self.matched {
// Or rejected equals to matched and request_snapshot is the INVALID_INDEX.
if rejected < self.matched
|| (rejected == self.matched && request_snapshot == INVALID_INDEX)
{
return false;
}
self.next_idx = self.matched + 1;
if request_snapshot == INVALID_INDEX {
self.next_idx = self.matched + 1;
} else {
self.pending_request_snapshot = request_snapshot;
}
return true;
}

// the rejection must be stale if "rejected" does not match next - 1
if self.next_idx == 0 || self.next_idx - 1 != rejected {
// The rejection must be stale if "rejected" does not match next - 1.
// Do not consider it stale if it is a request snapshot message.
if (self.next_idx == 0 || self.next_idx - 1 != rejected)
&& request_snapshot == INVALID_INDEX
{
return false;
}

self.next_idx = cmp::min(rejected, last + 1);
if self.next_idx < 1 {
self.next_idx = 1;
// Do not decrease next index if it's requesting snapshot.
if request_snapshot == INVALID_INDEX {
self.next_idx = cmp::min(rejected, last + 1);
if self.next_idx < 1 {
self.next_idx = 1;
}
} else if self.pending_request_snapshot == INVALID_INDEX {
// Allow requesting snapshot even if it's not Replicate.
self.pending_request_snapshot = request_snapshot;
}
self.resume();
true
Expand Down
88 changes: 79 additions & 9 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ pub struct Raft<T: Storage> {
/// The maximum length (in bytes) of all the entries.
pub max_msg_size: u64,

/// The peer is requesting snapshot, it is the index that the follower
/// needs it to be included in a snapshot.
pub pending_request_snapshot: u64,

prs: Option<ProgressSet>,

/// The current role of this node.
Expand Down Expand Up @@ -244,6 +248,7 @@ impl<T: Storage> Raft<T> {
raft_log,
max_inflight: c.max_inflight_msgs,
max_msg_size: c.max_size_per_msg,
pending_request_snapshot: INVALID_INDEX,
prs: Some(ProgressSet::new(peers.len(), learners.len())),
state: StateRole::Follower,
is_learner: false,
Expand Down Expand Up @@ -448,7 +453,7 @@ impl<T: Storage> Raft<T> {
}

m.set_msg_type(MessageType::MsgSnapshot);
let snapshot_r = self.raft_log.snapshot();
let snapshot_r = self.raft_log.snapshot(pr.pending_request_snapshot);
if let Err(e) = snapshot_r {
if e == Error::Store(StorageError::SnapshotTemporarilyUnavailable) {
debug!(
Expand Down Expand Up @@ -521,17 +526,24 @@ impl<T: Storage> Raft<T> {
if pr.is_paused() {
return;
}
let term = self.raft_log.term(pr.next_idx - 1);
let ents = self.raft_log.entries(pr.next_idx, self.max_msg_size);
let mut m = Message::new();
m.set_to(to);
if term.is_err() || ents.is_err() {
// send snapshot if we failed to get term or entries
if pr.pending_request_snapshot != INVALID_INDEX {
// Check pending request snapshot first to avoid unnecessary loading entries.
if !self.prepare_send_snapshot(&mut m, pr, to) {
return;
}
} else {
self.prepare_send_entries(&mut m, pr, term.unwrap(), ents.unwrap());
let term = self.raft_log.term(pr.next_idx - 1);
let ents = self.raft_log.entries(pr.next_idx, self.max_msg_size);
if term.is_err() || ents.is_err() {
// send snapshot if we failed to get term or entries.
if !self.prepare_send_snapshot(&mut m, pr, to) {
return;
}
} else {
self.prepare_send_entries(&mut m, pr, term.unwrap(), ents.unwrap());
}
}
self.send(m);
}
Expand Down Expand Up @@ -626,6 +638,7 @@ impl<T: Storage> Raft<T> {

self.pending_conf_index = 0;
self.read_only = ReadOnly::new(self.read_only.option);
self.pending_request_snapshot = INVALID_INDEX;

let (last_index, max_inflight) = (self.raft_log.last_index(), self.max_inflight);
let self_id = self.id;
Expand Down Expand Up @@ -717,9 +730,11 @@ impl<T: Storage> Raft<T> {

/// Converts this node to a follower.
pub fn become_follower(&mut self, term: u64, leader_id: u64) {
let pending_request_snapshot = self.pending_request_snapshot;
self.reset(term);
self.leader_id = leader_id;
self.state = StateRole::Follower;
self.pending_request_snapshot = pending_request_snapshot;
info!("{} became follower at term {}", self.tag, self.term);
}

Expand Down Expand Up @@ -1156,7 +1171,7 @@ impl<T: Storage> Raft<T> {
m.get_index()
);

if pr.maybe_decr_to(m.get_index(), m.get_reject_hint()) {
if pr.maybe_decr_to(m.get_index(), m.get_reject_hint(), m.get_request_snapshot()) {
debug!(
"{} decreased progress of {} to [{:?}]",
self.tag,
Expand Down Expand Up @@ -1225,7 +1240,8 @@ impl<T: Storage> Raft<T> {
if pr.state == ProgressState::Replicate && pr.ins.full() {
pr.ins.free_first_one();
}
if pr.matched < self.raft_log.last_index() {
// Does it request snapshot?
if pr.matched < self.raft_log.last_index() || pr.pending_request_snapshot != INVALID_INDEX {
*send_append = true;
}

Expand Down Expand Up @@ -1333,6 +1349,7 @@ impl<T: Storage> Raft<T> {
// out the next msgAppend.
// If snapshot failure, wait for a heartbeat interval before next try
pr.pause();
pr.pending_request_snapshot = INVALID_INDEX;
}

/// Check message's progress to decide which action should be taken.
Expand Down Expand Up @@ -1710,9 +1727,43 @@ impl<T: Storage> Raft<T> {
Ok(())
}

/// Request a snapshot from a leader.
pub fn request_snapshot(&mut self, request_index: u64) -> Result<()> {
if self.state == StateRole::Leader {
info!(
"{} can not request snapshot on leader; dropping request snapshot",
self.tag
);
} else if self.leader_id == INVALID_ID {
info!(
"{} no leader at term {}; dropping request snapshot",
self.tag, self.term
);
} else if self.get_snap().is_some() {
info!(
"{} there is a pending snapshot; dropping request snapshot",
self.tag
);
} else if self.pending_request_snapshot != INVALID_INDEX {
info!(
"{} there is a pending snapshot; dropping request snapshot",
self.tag
);
} else {
self.pending_request_snapshot = request_index;
self.send_request_snapshot();
return Ok(());
}
Err(Error::RequestSnapshotDropped)
}

// TODO: revoke pub when there is a better way to test.
/// For a given message, append the entries to the log.
pub fn handle_append_entries(&mut self, m: &Message) {
if self.pending_request_snapshot != INVALID_INDEX {
self.send_request_snapshot();
return;
}
if m.get_index() < self.raft_log.committed {
let mut to_send = Message::new();
to_send.set_to(m.get_from());
Expand Down Expand Up @@ -1757,6 +1808,10 @@ impl<T: Storage> Raft<T> {
/// For a message, commit and send out heartbeat.
pub fn handle_heartbeat(&mut self, mut m: Message) {
self.raft_log.commit_to(m.get_commit());
if self.pending_request_snapshot != INVALID_INDEX {
self.send_request_snapshot();
return;
}
let mut to_send = Message::new();
to_send.set_to(m.get_from());
to_send.set_msg_type(MessageType::MsgHeartbeatResponse);
Expand Down Expand Up @@ -1794,7 +1849,10 @@ impl<T: Storage> Raft<T> {

fn restore_raft(&mut self, snap: &Snapshot) -> Option<bool> {
let meta = snap.get_metadata();
if self.raft_log.match_term(meta.get_index(), meta.get_term()) {
// Do not fast-forward commit if we are requesting snapshot.
if self.pending_request_snapshot == INVALID_INDEX
&& self.raft_log.match_term(meta.get_index(), meta.get_term())
{
info!(
"{} [commit: {}, lastindex: {}, lastterm: {}] fast-forwarded commit to \
snapshot [index: {}, term: {}]",
Expand Down Expand Up @@ -1837,6 +1895,7 @@ impl<T: Storage> Raft<T> {
meta.get_term()
);

self.pending_request_snapshot = INVALID_INDEX;
let nodes = meta.get_conf_state().get_nodes();
let learners = meta.get_conf_state().get_learners();
self.prs = Some(ProgressSet::new(nodes.len(), learners.len()));
Expand Down Expand Up @@ -2058,4 +2117,15 @@ impl<T: Storage> Raft<T> {
pub fn abort_leader_transfer(&mut self) {
self.lead_transferee = None;
}

fn send_request_snapshot(&mut self) {
let mut m = Message::new();
m.set_msg_type(MessageType::MsgAppendResponse);
m.set_index(self.raft_log.committed);
m.set_reject(true);
m.set_reject_hint(self.raft_log.last_index());
m.set_to(self.leader_id);
m.set_request_snapshot(self.pending_request_snapshot);
self.send(m);
}
}
12 changes: 7 additions & 5 deletions src/raft_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,11 +384,13 @@ impl<T: Storage> RaftLog<T> {
}

/// Returns the current snapshot
pub fn snapshot(&self) -> Result<Snapshot> {
self.unstable
.snapshot
.clone()
.map_or_else(|| self.store.snapshot(), Ok)
pub fn snapshot(&self, request_index: u64) -> Result<Snapshot> {
if let Some(snap) = self.unstable.snapshot.as_ref() {
if snap.get_metadata().get_index() >= request_index {
return Ok(snap.clone());
}
}
self.store.snapshot(request_index)
}

fn must_check_outofbounds(&self, low: u64, high: u64) -> Option<Error> {
Expand Down
6 changes: 6 additions & 0 deletions src/raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,12 @@ impl<T: Storage> RawNode<T> {
let _ = self.raft.step(m);
}

/// Request a snapshot from a leader.
/// The snapshot's index must be greater or equal to the request_index.
pub fn request_snapshot(&mut self, request_index: u64) -> Result<()> {
self.raft.request_snapshot(request_index)
}

/// TransferLeader tries to transfer leadership to the given transferee.
pub fn transfer_leader(&mut self, transferee: u64) {
let mut m = Message::new();
Expand Down
Loading

0 comments on commit fe10576

Please sign in to comment.