Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add request snapshot #243

Merged
merged 27 commits into from
Jul 1, 2019
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
3c5e02c
request snapshot
overvenus May 26, 2019
b59e2e4
Export request snapshot method
overvenus May 27, 2019
0a30175
Fix CI
overvenus May 27, 2019
f1d5104
request_snapshot also affects flow control
overvenus Jun 2, 2019
e7a534b
Can not request snapshot if there is no leader or there is a snapshot…
overvenus Jun 3, 2019
0222aa7
Follower can not become leader during requesting snaphot
overvenus Jun 4, 2019
4c8a95d
send append if peer is requesting snapshot
overvenus Jun 4, 2019
fbf8981
cargo fmt
overvenus Jun 10, 2019
7048c19
remove requesting snapshot flag
overvenus Jun 10, 2019
89f32e1
Add request snapshot field in message
overvenus Jun 12, 2019
a2e038d
Allow starting leader election in requesting snapshot state
overvenus Jun 12, 2019
e087dd9
Remove MsgRequestSnapshot message type
overvenus Jun 12, 2019
70949a0
Try to fix CI
overvenus Jun 12, 2019
197494d
Pass a request index to the snapshot method
overvenus Jun 14, 2019
3897ed5
Address comments
overvenus Jun 17, 2019
0648140
update request_snpashot doc
overvenus Jun 18, 2019
e9880d4
Request snapshot on heartbeat
overvenus Jun 18, 2019
43b6c93
Alway consider decreased if the peer is requesting snapshot
overvenus Jun 19, 2019
2cdc800
Add two tests
overvenus Jun 19, 2019
fd4edad
Address comments
overvenus Jun 19, 2019
ef84d14
Allow request snapshot when it's state is not Replicate
overvenus Jun 20, 2019
3278364
Do not decrease next index if it's requesting snapshot
overvenus Jun 20, 2019
4336598
Do not consider a request snapshot message stale
overvenus Jun 24, 2019
c8d4854
Address comments
overvenus Jun 27, 2019
6191ccb
Address comments
overvenus Jun 27, 2019
1448df7
Check index for snapshot in raft log unstable
overvenus Jun 27, 2019
a2a5c53
Address comments
overvenus Jun 29, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/single_mem_node/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::time::{Duration, Instant};
use raft::prelude::*;
use raft::storage::MemStorage;

type ProposeCallback = Box<Fn() + Send>;
type ProposeCallback = Box<dyn Fn() + Send>;

enum Msg {
Propose {
Expand Down
2 changes: 2 additions & 0 deletions generate-proto.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ with open("src/eraftpb.rs") as reader:
src = reader.read()

res = re.sub('::protobuf::rt::read_proto3_enum_with_unknown_fields_into\(([^,]+), ([^,]+), &mut ([^,]+), [^\)]+\)\?', 'if \\\\1 == ::protobuf::wire_format::WireTypeVarint {\\\\3 = \\\\2.read_enum()?;} else { return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type)); }', src)
res = re.sub('#!\[allow\(clippy\)\]', '#![allow(clippy::all)]', res)
res = re.sub('#!\[allow\(unused_results\)\]', '#![allow(unused_results, bare_trait_objects)]', res)

with open("src/eraftpb.rs", "w") as writer:
writer.write(res)
Expand Down
1 change: 1 addition & 0 deletions proto/eraftpb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ message Message {
repeated Entry entries = 7;
uint64 commit = 8;
Snapshot snapshot = 9;
uint64 request_snapshot = 13;
bool reject = 10;
uint64 reject_hint = 11;
bytes context = 12;
Expand Down
418 changes: 235 additions & 183 deletions src/eraftpb.rs

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ quick_error! {
NotExists(id: u64, set: &'static str) {
display("The node {} is not in the {} set.", id, set)
}
/// The request snapshot is dropped.
RequestSnapshotDropped {
description("raft: request snapshot dropped")
}
}
}

Expand All @@ -76,6 +80,7 @@ impl cmp::PartialEq for Error {
(&Error::Io(ref e1), &Error::Io(ref e2)) => e1.kind() == e2.kind(),
(&Error::StepLocalMsg, &Error::StepLocalMsg) => true,
(&Error::ConfigInvalid(ref e1), &Error::ConfigInvalid(ref e2)) => e1 == e2,
(&Error::RequestSnapshotDropped, &Error::RequestSnapshotDropped) => true,
_ => false,
}
}
Expand All @@ -102,7 +107,7 @@ quick_error! {
description("snapshot is temporarily unavailable")
}
/// Some other error occurred.
Other(err: Box<error::Error + Sync + Send>) {
Other(err: Box<dyn error::Error + Sync + Send>) {
from()
cause(err.as_ref())
description(err.description())
Expand Down
38 changes: 29 additions & 9 deletions src/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

use errors::Error;
use fxhash::FxHashMap;
use raft::INVALID_INDEX;
use std::cmp;
use std::collections::hash_map::HashMap;

Expand Down Expand Up @@ -189,6 +190,10 @@ pub struct Progress {
/// this Progress will be paused. raft will not resend snapshot until the pending one
/// is reported to be failed.
pub pending_snapshot: u64,
/// This field is used in request snapshot.
/// If there is a pending request snapshot, this will be set to the request
/// index of the snapshot.
pub pending_request_snapshot: u64,

/// This is true if the progress is recently active. Receiving any messages
/// from the corresponding follower indicates the progress is active.
Expand Down Expand Up @@ -248,8 +253,8 @@ impl Progress {
self.pending_snapshot = 0;
}

/// Unsets pendingSnapshot if Match is equal or higher than
/// the pendingSnapshot
/// Unsets pending_snapshot if matched is equal or higher than
/// the pending_snapshot and the snapshot is not requested.
pub fn maybe_snapshot_abort(&self) -> bool {
self.state == ProgressState::Snapshot && self.matched >= self.pending_snapshot
}
Expand Down Expand Up @@ -278,25 +283,40 @@ impl Progress {
/// Returns false if the given index comes from an out of order message.
/// Otherwise it decreases the progress next index to min(rejected, last)
/// and returns true.
pub fn maybe_decr_to(&mut self, rejected: u64, last: u64) -> bool {
pub fn maybe_decr_to(&mut self, rejected: u64, last: u64, request_snapshot: u64) -> bool {
if self.state == ProgressState::Replicate {
// the rejection must be stale if the progress has matched and "rejected"
// is smaller than "match".
if rejected <= self.matched {
// Or rejected equals to matched and request_snapshot is the INVALID_INDEX.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you give more explanation for this condition?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

if rejected < self.matched
|| (rejected == self.matched && request_snapshot == INVALID_INDEX)
{
return false;
}
self.next_idx = self.matched + 1;
if request_snapshot == INVALID_INDEX {
self.next_idx = self.matched + 1;
} else {
self.pending_request_snapshot = request_snapshot;
}
return true;
}

// the rejection must be stale if "rejected" does not match next - 1
// Allow requesting snapshot even if it's not Replicate.
if request_snapshot != INVALID_INDEX && self.pending_request_snapshot == INVALID_INDEX {
self.pending_request_snapshot = request_snapshot;
}

// The rejection must be stale if "rejected" does not match next - 1.
if self.next_idx == 0 || self.next_idx - 1 != rejected {
return false;
}

self.next_idx = cmp::min(rejected, last + 1);
if self.next_idx < 1 {
self.next_idx = 1;
// Do not decrease next index if it's requesting snapshot.
if request_snapshot == INVALID_INDEX {
self.next_idx = cmp::min(rejected, last + 1);
if self.next_idx < 1 {
self.next_idx = 1;
}
}
self.resume();
true
Expand Down
84 changes: 75 additions & 9 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ pub struct Raft<T: Storage> {
/// The maximum length (in bytes) of all the entries.
pub max_msg_size: u64,

/// The peer is requesting snapshot, it is the index that the follower
/// needs to be included in a snapshot.
pending_request_snapshot: u64,

prs: Option<ProgressSet>,

/// The current role of this node.
Expand Down Expand Up @@ -244,6 +248,7 @@ impl<T: Storage> Raft<T> {
raft_log,
max_inflight: c.max_inflight_msgs,
max_msg_size: c.max_size_per_msg,
pending_request_snapshot: INVALID_INDEX,
prs: Some(ProgressSet::new(peers.len(), learners.len())),
state: StateRole::Follower,
is_learner: false,
Expand Down Expand Up @@ -448,7 +453,7 @@ impl<T: Storage> Raft<T> {
}

m.set_msg_type(MessageType::MsgSnapshot);
let snapshot_r = self.raft_log.snapshot();
let snapshot_r = self.raft_log.snapshot(pr.pending_request_snapshot);
if let Err(e) = snapshot_r {
if e == Error::Store(StorageError::SnapshotTemporarilyUnavailable) {
debug!(
Expand Down Expand Up @@ -521,17 +526,23 @@ impl<T: Storage> Raft<T> {
if pr.is_paused() {
return;
}
let term = self.raft_log.term(pr.next_idx - 1);
let ents = self.raft_log.entries(pr.next_idx, self.max_msg_size);
let mut m = Message::new();
m.set_to(to);
if term.is_err() || ents.is_err() {
// send snapshot if we failed to get term or entries
if pr.pending_request_snapshot != INVALID_INDEX {
// Check pending request snapshot first to avoid unnecessary loading entries.
if !self.prepare_send_snapshot(&mut m, pr, to) {
return;
}
} else {
self.prepare_send_entries(&mut m, pr, term.unwrap(), ents.unwrap());
let term = self.raft_log.term(pr.next_idx - 1);
let ents = self.raft_log.entries(pr.next_idx, self.max_msg_size);
if pr.pending_request_snapshot != INVALID_INDEX || term.is_err() || ents.is_err() {
overvenus marked this conversation as resolved.
Show resolved Hide resolved
if !self.prepare_send_snapshot(&mut m, pr, to) {
return;
}
} else {
self.prepare_send_entries(&mut m, pr, term.unwrap(), ents.unwrap());
}
}
self.send(m);
}
Expand Down Expand Up @@ -1156,7 +1167,7 @@ impl<T: Storage> Raft<T> {
m.get_index()
);

if pr.maybe_decr_to(m.get_index(), m.get_reject_hint()) {
if pr.maybe_decr_to(m.get_index(), m.get_reject_hint(), m.get_request_snapshot()) {
debug!(
"{} decreased progress of {} to [{:?}]",
self.tag,
Expand Down Expand Up @@ -1225,7 +1236,8 @@ impl<T: Storage> Raft<T> {
if pr.state == ProgressState::Replicate && pr.ins.full() {
pr.ins.free_first_one();
}
if pr.matched < self.raft_log.last_index() {
// Does it request snapshot?
if pr.matched < self.raft_log.last_index() || pr.pending_request_snapshot != INVALID_INDEX {
*send_append = true;
}

Expand Down Expand Up @@ -1333,6 +1345,7 @@ impl<T: Storage> Raft<T> {
// out the next msgAppend.
// If snapshot failure, wait for a heartbeat interval before next try
pr.pause();
pr.pending_request_snapshot = 0;
overvenus marked this conversation as resolved.
Show resolved Hide resolved
}

/// Check message's progress to decide which action should be taken.
Expand Down Expand Up @@ -1710,9 +1723,43 @@ impl<T: Storage> Raft<T> {
Ok(())
}

/// Request a snapshot from a leader.
pub fn request_snapshot(&mut self, request_index: u64) -> Result<()> {
if self.state == StateRole::Leader {
info!(
"{} can not request snapshot on leader; dropping request snapshot",
self.tag
);
} else if self.leader_id == INVALID_ID {
info!(
"{} no leader at term {}; dropping request snapshot",
self.tag, self.term
);
} else if self.get_snap().is_some() {
info!(
"{} there is a pending snapshot; dropping request snapshot",
self.tag
);
} else if self.pending_request_snapshot != INVALID_INDEX {
info!(
"{} there is a pending snapshot; dropping request snapshot",
self.tag
);
} else {
self.pending_request_snapshot = request_index;
self.send_request_snapshot();
return Ok(());
}
Err(Error::RequestSnapshotDropped)
}

// TODO: revoke pub when there is a better way to test.
/// For a given message, append the entries to the log.
pub fn handle_append_entries(&mut self, m: &Message) {
if self.pending_request_snapshot != INVALID_INDEX {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if the entries is more new than pending_request_snapshot?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be dropped if a follower is requesting a snapshot. After applying a snapshot, the follower will receive raft log entries again.

self.send_request_snapshot();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the leader can tolerant follower send_request_snapshot multiple times with the same pending_request_snapshot?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because request snapshot message may be dropped on the network, it needs to be able to send the same message multiple times.

return;
}
if m.get_index() < self.raft_log.committed {
let mut to_send = Message::new();
to_send.set_to(m.get_from());
Expand Down Expand Up @@ -1757,6 +1804,10 @@ impl<T: Storage> Raft<T> {
/// For a message, commit and send out heartbeat.
pub fn handle_heartbeat(&mut self, mut m: Message) {
self.raft_log.commit_to(m.get_commit());
if self.pending_request_snapshot != INVALID_INDEX {
overvenus marked this conversation as resolved.
Show resolved Hide resolved
self.send_request_snapshot();
return;
}
let mut to_send = Message::new();
to_send.set_to(m.get_from());
to_send.set_msg_type(MessageType::MsgHeartbeatResponse);
Expand Down Expand Up @@ -1794,7 +1845,10 @@ impl<T: Storage> Raft<T> {

fn restore_raft(&mut self, snap: &Snapshot) -> Option<bool> {
let meta = snap.get_metadata();
if self.raft_log.match_term(meta.get_index(), meta.get_term()) {
// Do not fast-forward commit if we are requesting snapshot.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the snapshot may be dropped if there is no further proposal after requesting a snapshot.

if self.pending_request_snapshot == INVALID_INDEX
&& self.raft_log.match_term(meta.get_index(), meta.get_term())
{
info!(
"{} [commit: {}, lastindex: {}, lastterm: {}] fast-forwarded commit to \
snapshot [index: {}, term: {}]",
Expand Down Expand Up @@ -1837,6 +1891,7 @@ impl<T: Storage> Raft<T> {
meta.get_term()
);

self.pending_request_snapshot = INVALID_INDEX;
overvenus marked this conversation as resolved.
Show resolved Hide resolved
let nodes = meta.get_conf_state().get_nodes();
let learners = meta.get_conf_state().get_learners();
self.prs = Some(ProgressSet::new(nodes.len(), learners.len()));
Expand Down Expand Up @@ -2058,4 +2113,15 @@ impl<T: Storage> Raft<T> {
pub fn abort_leader_transfer(&mut self) {
self.lead_transferee = None;
}

fn send_request_snapshot(&mut self) {
let mut m = Message::new();
m.set_msg_type(MessageType::MsgAppendResponse);
m.set_index(self.raft_log.committed);
m.set_reject(true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why set reject?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It needs to be rejected, so it can enter requesting snapshot state in leader's view. See more at handle_append_response.

m.set_reject_hint(self.raft_log.last_index());
m.set_to(self.leader_id);
m.set_request_snapshot(self.pending_request_snapshot);
self.send(m);
}
}
4 changes: 2 additions & 2 deletions src/raft_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,11 +384,11 @@ impl<T: Storage> RaftLog<T> {
}

/// Returns the current snapshot
pub fn snapshot(&self) -> Result<Snapshot> {
pub fn snapshot(&self, request_index: u64) -> Result<Snapshot> {
self.unstable
.snapshot
.clone()
.map_or_else(|| self.store.snapshot(), Ok)
.map_or_else(|| self.store.snapshot(request_index), Ok)
}

fn must_check_outofbounds(&self, low: u64, high: u64) -> Option<Error> {
Expand Down
6 changes: 6 additions & 0 deletions src/raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,12 @@ impl<T: Storage> RawNode<T> {
let _ = self.raft.step(m);
}

/// Request a snapshot from a leader.
/// The snapshot's index must be greater or equal to the request_index.
pub fn request_snapshot(&mut self, request_index: u64) -> Result<()> {
self.raft.request_snapshot(request_index)
}

/// TransferLeader tries to transfer leadership to the given transferee.
pub fn transfer_leader(&mut self, transferee: u64) {
let mut m = Message::new();
Expand Down
Loading