Skip to content

Commit

Permalink
Port #243 to the Prost branch (#259)
Browse files Browse the repository at this point in the history
Signed-off-by: Nick Cameron <[email protected]>
  • Loading branch information
nrc authored Jul 12, 2019
1 parent 07cde2d commit eeaee89
Show file tree
Hide file tree
Showing 12 changed files with 553 additions and 34 deletions.
1 change: 1 addition & 0 deletions proto/proto/eraftpb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ message Message {
repeated Entry entries = 7;
uint64 commit = 8;
Snapshot snapshot = 9;
uint64 request_snapshot = 13;
bool reject = 10;
uint64 reject_hint = 11;
bytes context = 12;
Expand Down
2 changes: 2 additions & 0 deletions proto/src/prost/eraftpb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ pub struct Message {
pub commit: u64,
#[prost(message, optional, tag = "9")]
pub snapshot: ::std::option::Option<Snapshot>,
#[prost(uint64, tag = "13")]
pub request_snapshot: u64,
#[prost(bool, tag = "10")]
pub reject: bool,
#[prost(uint64, tag = "11")]
Expand Down
12 changes: 12 additions & 0 deletions proto/src/prost/wrapper_eraftpb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,18 @@ impl Message {
self.snapshot.take().unwrap_or_else(Snapshot::default)
}
#[inline]
pub fn clear_request_snapshot(&mut self) {
self.request_snapshot = 0
}
#[inline]
pub fn set_request_snapshot(&mut self, v: u64) {
self.request_snapshot = v;
}
#[inline]
pub fn get_request_snapshot(&self) -> u64 {
self.request_snapshot
}
#[inline]
pub fn clear_reject(&mut self) {
self.reject = false
}
Expand Down
5 changes: 5 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ quick_error! {
NotExists(id: u64, set: &'static str) {
display("The node {} is not in the {} set.", id, set)
}
/// The request snapshot is dropped.
RequestSnapshotDropped {
description("raft: request snapshot dropped")
}
}
}

Expand All @@ -82,6 +86,7 @@ impl cmp::PartialEq for Error {
(&Error::Io(ref e1), &Error::Io(ref e2)) => e1.kind() == e2.kind(),
(&Error::StepLocalMsg, &Error::StepLocalMsg) => true,
(&Error::ConfigInvalid(ref e1), &Error::ConfigInvalid(ref e2)) => e1 == e2,
(&Error::RequestSnapshotDropped, &Error::RequestSnapshotDropped) => true,
_ => false,
}
}
Expand Down
41 changes: 31 additions & 10 deletions src/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
// limitations under the License.

use crate::errors::Error;
use crate::raft::INVALID_INDEX;
use fxhash::FxHashMap;
use std::cmp;
use std::collections::hash_map::HashMap;
Expand Down Expand Up @@ -189,6 +190,10 @@ pub struct Progress {
/// this Progress will be paused. raft will not resend snapshot until the pending one
/// is reported to be failed.
pub pending_snapshot: u64,
/// This field is used in request snapshot.
/// If there is a pending request snapshot, this will be set to the request
/// index of the snapshot.
pub pending_request_snapshot: u64,

/// This is true if the progress is recently active. Receiving any messages
/// from the corresponding follower indicates the progress is active.
Expand Down Expand Up @@ -248,8 +253,8 @@ impl Progress {
self.pending_snapshot = 0;
}

/// Unsets pendingSnapshot if Match is equal or higher than
/// the pendingSnapshot
/// Unsets pending_snapshot if matched is equal or higher than
/// the pending_snapshot and the snapshot is not requested.
pub fn maybe_snapshot_abort(&self) -> bool {
self.state == ProgressState::Snapshot && self.matched >= self.pending_snapshot
}
Expand Down Expand Up @@ -278,25 +283,41 @@ impl Progress {
/// Returns false if the given index comes from an out of order message.
/// Otherwise it decreases the progress next index to min(rejected, last)
/// and returns true.
pub fn maybe_decr_to(&mut self, rejected: u64, last: u64) -> bool {
pub fn maybe_decr_to(&mut self, rejected: u64, last: u64, request_snapshot: u64) -> bool {
if self.state == ProgressState::Replicate {
// the rejection must be stale if the progress has matched and "rejected"
// is smaller than "match".
if rejected <= self.matched {
// Or rejected equals to matched and request_snapshot is the INVALID_INDEX.
if rejected < self.matched
|| (rejected == self.matched && request_snapshot == INVALID_INDEX)
{
return false;
}
self.next_idx = self.matched + 1;
if request_snapshot == INVALID_INDEX {
self.next_idx = self.matched + 1;
} else {
self.pending_request_snapshot = request_snapshot;
}
return true;
}

// the rejection must be stale if "rejected" does not match next - 1
if self.next_idx == 0 || self.next_idx - 1 != rejected {
// The rejection must be stale if "rejected" does not match next - 1.
// Do not consider it stale if it is a request snapshot message.
if (self.next_idx == 0 || self.next_idx - 1 != rejected)
&& request_snapshot == INVALID_INDEX
{
return false;
}

self.next_idx = cmp::min(rejected, last + 1);
if self.next_idx < 1 {
self.next_idx = 1;
// Do not decrease next index if it's requesting snapshot.
if request_snapshot == INVALID_INDEX {
self.next_idx = cmp::min(rejected, last + 1);
if self.next_idx < 1 {
self.next_idx = 1;
}
} else if self.pending_request_snapshot == INVALID_INDEX {
// Allow requesting snapshot even if it's not Replicate.
self.pending_request_snapshot = request_snapshot;
}
self.resume();
true
Expand Down
88 changes: 79 additions & 9 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ pub struct Raft<T: Storage> {
/// The maximum length (in bytes) of all the entries.
pub max_msg_size: u64,

/// The peer is requesting snapshot, it is the index that the follower
/// needs it to be included in a snapshot.
pub pending_request_snapshot: u64,

prs: Option<ProgressSet>,

/// The current role of this node.
Expand Down Expand Up @@ -243,6 +247,7 @@ impl<T: Storage> Raft<T> {
raft_log,
max_inflight: c.max_inflight_msgs,
max_msg_size: c.max_size_per_msg,
pending_request_snapshot: INVALID_INDEX,
prs: Some(ProgressSet::new(peers.len(), learners.len())),
state: StateRole::Follower,
is_learner: false,
Expand Down Expand Up @@ -447,7 +452,7 @@ impl<T: Storage> Raft<T> {
}

m.set_msg_type(MessageType::MsgSnapshot);
let snapshot_r = self.raft_log.snapshot();
let snapshot_r = self.raft_log.snapshot(pr.pending_request_snapshot);
if let Err(e) = snapshot_r {
if e == Error::Store(StorageError::SnapshotTemporarilyUnavailable) {
debug!(
Expand Down Expand Up @@ -520,17 +525,24 @@ impl<T: Storage> Raft<T> {
if pr.is_paused() {
return;
}
let term = self.raft_log.term(pr.next_idx - 1);
let ents = self.raft_log.entries(pr.next_idx, self.max_msg_size);
let mut m = Message::default();
m.set_to(to);
if term.is_err() || ents.is_err() {
// send snapshot if we failed to get term or entries
if pr.pending_request_snapshot != INVALID_INDEX {
// Check pending request snapshot first to avoid unnecessary loading entries.
if !self.prepare_send_snapshot(&mut m, pr, to) {
return;
}
} else {
self.prepare_send_entries(&mut m, pr, term.unwrap(), ents.unwrap());
let term = self.raft_log.term(pr.next_idx - 1);
let ents = self.raft_log.entries(pr.next_idx, self.max_msg_size);
if term.is_err() || ents.is_err() {
// send snapshot if we failed to get term or entries.
if !self.prepare_send_snapshot(&mut m, pr, to) {
return;
}
} else {
self.prepare_send_entries(&mut m, pr, term.unwrap(), ents.unwrap());
}
}
self.send(m);
}
Expand Down Expand Up @@ -625,6 +637,7 @@ impl<T: Storage> Raft<T> {

self.pending_conf_index = 0;
self.read_only = ReadOnly::new(self.read_only.option);
self.pending_request_snapshot = INVALID_INDEX;

let (last_index, max_inflight) = (self.raft_log.last_index(), self.max_inflight);
let self_id = self.id;
Expand Down Expand Up @@ -716,9 +729,11 @@ impl<T: Storage> Raft<T> {

/// Converts this node to a follower.
pub fn become_follower(&mut self, term: u64, leader_id: u64) {
let pending_request_snapshot = self.pending_request_snapshot;
self.reset(term);
self.leader_id = leader_id;
self.state = StateRole::Follower;
self.pending_request_snapshot = pending_request_snapshot;
info!("{} became follower at term {}", self.tag, self.term);
}

Expand Down Expand Up @@ -1155,7 +1170,7 @@ impl<T: Storage> Raft<T> {
m.get_index()
);

if pr.maybe_decr_to(m.get_index(), m.get_reject_hint()) {
if pr.maybe_decr_to(m.get_index(), m.get_reject_hint(), m.get_request_snapshot()) {
debug!(
"{} decreased progress of {} to [{:?}]",
self.tag,
Expand Down Expand Up @@ -1224,7 +1239,8 @@ impl<T: Storage> Raft<T> {
if pr.state == ProgressState::Replicate && pr.ins.full() {
pr.ins.free_first_one();
}
if pr.matched < self.raft_log.last_index() {
// Does it request snapshot?
if pr.matched < self.raft_log.last_index() || pr.pending_request_snapshot != INVALID_INDEX {
*send_append = true;
}

Expand Down Expand Up @@ -1332,6 +1348,7 @@ impl<T: Storage> Raft<T> {
// out the next msgAppend.
// If snapshot failure, wait for a heartbeat interval before next try
pr.pause();
pr.pending_request_snapshot = INVALID_INDEX;
}

/// Check message's progress to decide which action should be taken.
Expand Down Expand Up @@ -1706,9 +1723,43 @@ impl<T: Storage> Raft<T> {
Ok(())
}

/// Request a snapshot from a leader.
pub fn request_snapshot(&mut self, request_index: u64) -> Result<()> {
if self.state == StateRole::Leader {
info!(
"{} can not request snapshot on leader; dropping request snapshot",
self.tag
);
} else if self.leader_id == INVALID_ID {
info!(
"{} no leader at term {}; dropping request snapshot",
self.tag, self.term
);
} else if self.get_snap().is_some() {
info!(
"{} there is a pending snapshot; dropping request snapshot",
self.tag
);
} else if self.pending_request_snapshot != INVALID_INDEX {
info!(
"{} there is a pending snapshot; dropping request snapshot",
self.tag
);
} else {
self.pending_request_snapshot = request_index;
self.send_request_snapshot();
return Ok(());
}
Err(Error::RequestSnapshotDropped)
}

// TODO: revoke pub when there is a better way to test.
/// For a given message, append the entries to the log.
pub fn handle_append_entries(&mut self, m: &Message) {
if self.pending_request_snapshot != INVALID_INDEX {
self.send_request_snapshot();
return;
}
if m.get_index() < self.raft_log.committed {
let mut to_send = Message::default();
to_send.set_to(m.get_from());
Expand Down Expand Up @@ -1753,6 +1804,10 @@ impl<T: Storage> Raft<T> {
/// For a message, commit and send out heartbeat.
pub fn handle_heartbeat(&mut self, mut m: Message) {
self.raft_log.commit_to(m.get_commit());
if self.pending_request_snapshot != INVALID_INDEX {
self.send_request_snapshot();
return;
}
let mut to_send = Message::default();
to_send.set_to(m.get_from());
to_send.set_msg_type(MessageType::MsgHeartbeatResponse);
Expand Down Expand Up @@ -1790,7 +1845,10 @@ impl<T: Storage> Raft<T> {

fn restore_raft(&mut self, snap: &Snapshot) -> Option<bool> {
let meta = snap.get_metadata();
if self.raft_log.match_term(meta.get_index(), meta.get_term()) {
// Do not fast-forward commit if we are requesting snapshot.
if self.pending_request_snapshot == INVALID_INDEX
&& self.raft_log.match_term(meta.get_index(), meta.get_term())
{
info!(
"{} [commit: {}, lastindex: {}, lastterm: {}] fast-forwarded commit to \
snapshot [index: {}, term: {}]",
Expand Down Expand Up @@ -1833,6 +1891,7 @@ impl<T: Storage> Raft<T> {
meta.get_term()
);

self.pending_request_snapshot = INVALID_INDEX;
let nodes = meta.get_conf_state().get_nodes();
let learners = meta.get_conf_state().get_learners();
self.prs = Some(ProgressSet::new(nodes.len(), learners.len()));
Expand Down Expand Up @@ -2054,4 +2113,15 @@ impl<T: Storage> Raft<T> {
pub fn abort_leader_transfer(&mut self) {
self.lead_transferee = None;
}

fn send_request_snapshot(&mut self) {
let mut m = Message::default();
m.set_msg_type(MessageType::MsgAppendResponse);
m.set_index(self.raft_log.committed);
m.set_reject(true);
m.set_reject_hint(self.raft_log.last_index());
m.set_to(self.leader_id);
m.set_request_snapshot(self.pending_request_snapshot);
self.send(m);
}
}
12 changes: 7 additions & 5 deletions src/raft_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,11 +384,13 @@ impl<T: Storage> RaftLog<T> {
}

/// Returns the current snapshot
pub fn snapshot(&self) -> Result<Snapshot> {
self.unstable
.snapshot
.clone()
.map_or_else(|| self.store.snapshot(), Ok)
pub fn snapshot(&self, request_index: u64) -> Result<Snapshot> {
if let Some(snap) = self.unstable.snapshot.as_ref() {
if snap.get_metadata().get_index() >= request_index {
return Ok(snap.clone());
}
}
self.store.snapshot(request_index)
}

fn must_check_outofbounds(&self, low: u64, high: u64) -> Option<Error> {
Expand Down
6 changes: 6 additions & 0 deletions src/raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,12 @@ impl<T: Storage> RawNode<T> {
let _ = self.raft.step(m);
}

/// Request a snapshot from a leader.
/// The snapshot's index must be greater or equal to the request_index.
pub fn request_snapshot(&mut self, request_index: u64) -> Result<()> {
self.raft.request_snapshot(request_index)
}

/// TransferLeader tries to transfer leadership to the given transferee.
pub fn transfer_leader(&mut self, transferee: u64) {
let mut m = Message::default();
Expand Down
Loading

0 comments on commit eeaee89

Please sign in to comment.