Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add request snapshot #243

Merged
merged 27 commits into from
Jul 1, 2019
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
3c5e02c
request snapshot
overvenus May 26, 2019
b59e2e4
Export request snapshot method
overvenus May 27, 2019
0a30175
Fix CI
overvenus May 27, 2019
f1d5104
request_snapshot also affects flow control
overvenus Jun 2, 2019
e7a534b
Can not request snapshot if there is no leader or there is a snapshot…
overvenus Jun 3, 2019
0222aa7
Follower can not become leader during requesting snaphot
overvenus Jun 4, 2019
4c8a95d
send append if peer is requesting snapshot
overvenus Jun 4, 2019
fbf8981
cargo fmt
overvenus Jun 10, 2019
7048c19
remove requesting snapshot flag
overvenus Jun 10, 2019
89f32e1
Add request snapshot field in message
overvenus Jun 12, 2019
a2e038d
Allow starting leader election in requesting snapshot state
overvenus Jun 12, 2019
e087dd9
Remove MsgRequestSnapshot message type
overvenus Jun 12, 2019
70949a0
Try to fix CI
overvenus Jun 12, 2019
197494d
Pass a request index to the snapshot method
overvenus Jun 14, 2019
3897ed5
Address comments
overvenus Jun 17, 2019
0648140
update request_snpashot doc
overvenus Jun 18, 2019
e9880d4
Request snapshot on heartbeat
overvenus Jun 18, 2019
43b6c93
Alway consider decreased if the peer is requesting snapshot
overvenus Jun 19, 2019
2cdc800
Add two tests
overvenus Jun 19, 2019
fd4edad
Address comments
overvenus Jun 19, 2019
ef84d14
Allow request snapshot when it's state is not Replicate
overvenus Jun 20, 2019
3278364
Do not decrease next index if it's requesting snapshot
overvenus Jun 20, 2019
4336598
Do not consider a request snapshot message stale
overvenus Jun 24, 2019
c8d4854
Address comments
overvenus Jun 27, 2019
6191ccb
Address comments
overvenus Jun 27, 2019
1448df7
Check index for snapshot in raft log unstable
overvenus Jun 27, 2019
a2a5c53
Address comments
overvenus Jun 29, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions generate-proto.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ with open("src/eraftpb.rs") as reader:
src = reader.read()

res = re.sub('::protobuf::rt::read_proto3_enum_with_unknown_fields_into\(([^,]+), ([^,]+), &mut ([^,]+), [^\)]+\)\?', 'if \\\\1 == ::protobuf::wire_format::WireTypeVarint {\\\\3 = \\\\2.read_enum()?;} else { return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type)); }', src)
res = re.sub('#!\[allow\(clippy\)\]', '#![allow(clippy::all)]', res)
res = re.sub('#!\[allow\(unused_results\)\]', '#![allow(unused_results, bare_trait_objects)]', res)

with open("src/eraftpb.rs", "w") as writer:
writer.write(res)
Expand Down
1 change: 1 addition & 0 deletions proto/eraftpb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ message Message {
bool reject = 10;
uint64 reject_hint = 11;
bytes context = 12;
bool request_snapshot = 13;
overvenus marked this conversation as resolved.
Show resolved Hide resolved
}

message HardState {
Expand Down
412 changes: 232 additions & 180 deletions src/eraftpb.rs

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ quick_error! {
NotExists(id: u64, set: &'static str) {
display("The node {} is not in the {} set.", id, set)
}
/// The request snapshot if dropped.
overvenus marked this conversation as resolved.
Show resolved Hide resolved
RequestSnapshotDropped {
description("raft: request snapshot dropped")
}
}
}

Expand All @@ -76,6 +80,7 @@ impl cmp::PartialEq for Error {
(&Error::Io(ref e1), &Error::Io(ref e2)) => e1.kind() == e2.kind(),
(&Error::StepLocalMsg, &Error::StepLocalMsg) => true,
(&Error::ConfigInvalid(ref e1), &Error::ConfigInvalid(ref e2)) => e1 == e2,
(&Error::RequestSnapshotDropped, &Error::RequestSnapshotDropped) => true,
_ => false,
}
}
Expand Down
15 changes: 10 additions & 5 deletions src/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,8 @@ impl Progress {
self.pending_snapshot = 0;
}

/// Unsets pendingSnapshot if Match is equal or higher than
/// the pendingSnapshot
/// Unsets pending_snapshot if match is equal or higher than
overvenus marked this conversation as resolved.
Show resolved Hide resolved
/// the pending_snapshot and the snapshot is not requested.
pub fn maybe_snapshot_abort(&self) -> bool {
self.state == ProgressState::Snapshot && self.matched >= self.pending_snapshot
}
Expand Down Expand Up @@ -278,14 +278,19 @@ impl Progress {
/// Returns false if the given index comes from an out of order message.
/// Otherwise it decreases the progress next index to min(rejected, last)
/// and returns true.
pub fn maybe_decr_to(&mut self, rejected: u64, last: u64) -> bool {
pub fn maybe_decr_to(&mut self, rejected: u64, last: u64, request_snapshot: bool) -> bool {
if self.state == ProgressState::Replicate {
// the rejection must be stale if the progress has matched and "rejected"
// is smaller than "match".
if rejected <= self.matched {
// Or rejected equals to matched and last is not the INVALID_INDEX.
if rejected < self.matched || (rejected == self.matched && !request_snapshot) {
return false;
}
self.next_idx = self.matched + 1;
if request_snapshot {
self.next_idx = last + 1;
} else {
self.next_idx = self.matched + 1;
}
return true;
}

Expand Down
73 changes: 67 additions & 6 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ pub struct Raft<T: Storage> {
/// The maximum length (in bytes) of all the entries.
pub max_msg_size: u64,

/// The peer is requesting snapshot, it can not handle any appedn messages
overvenus marked this conversation as resolved.
Show resolved Hide resolved
/// from leader.
requesting_snapshot: bool,

prs: Option<ProgressSet>,

/// The current role of this node.
Expand Down Expand Up @@ -244,6 +248,7 @@ impl<T: Storage> Raft<T> {
raft_log,
max_inflight: c.max_inflight_msgs,
max_msg_size: c.max_size_per_msg,
requesting_snapshot: false,
prs: Some(ProgressSet::new(peers.len(), learners.len())),
state: StateRole::Follower,
is_learner: false,
Expand Down Expand Up @@ -526,7 +531,6 @@ impl<T: Storage> Raft<T> {
let mut m = Message::new();
m.set_to(to);
if term.is_err() || ents.is_err() {
// send snapshot if we failed to get term or entries
if !self.prepare_send_snapshot(&mut m, pr, to) {
return;
}
Expand Down Expand Up @@ -1149,14 +1153,15 @@ impl<T: Storage> Raft<T> {

if m.get_reject() {
debug!(
"{} received msgAppend rejection(lastindex: {}) from {} for index {}",
"{} received msgAppend rejection(lastindex: {}) from {} for index {}, [{:?}]",
self.tag,
m.get_reject_hint(),
m.get_from(),
m.get_index()
m.get_index(),
pr
);

if pr.maybe_decr_to(m.get_index(), m.get_reject_hint()) {
if pr.maybe_decr_to(m.get_index(), m.get_reject_hint(), m.get_request_snapshot()) {
debug!(
"{} decreased progress of {} to [{:?}]",
self.tag,
Expand Down Expand Up @@ -1225,7 +1230,10 @@ impl<T: Storage> Raft<T> {
if pr.state == ProgressState::Replicate && pr.ins.full() {
pr.ins.free_first_one();
}
if pr.matched < self.raft_log.last_index() {
if pr.matched < self.raft_log.last_index()
|| self.raft_log.term(pr.next_idx - 1).is_err() // Does it request snapshot?
overvenus marked this conversation as resolved.
Show resolved Hide resolved
|| self.raft_log.entries(pr.next_idx, self.max_msg_size).is_err()
{
*send_append = true;
}

Expand Down Expand Up @@ -1710,9 +1718,48 @@ impl<T: Storage> Raft<T> {
Ok(())
}

/// Request a snapshot from a leader.
pub fn request_snapshot(&mut self) -> Result<()> {
if !self.is_learner && self.prs().voters().len() == 1 {
overvenus marked this conversation as resolved.
Show resolved Hide resolved
info!(
"{} can not request snapshot on single node group; dropping request snapshot",
self.tag
);
return Err(Error::RequestSnapshotDropped);
}
if self.state == StateRole::Leader {
info!(
"{} can not request snapshot on leader; dropping request snapshot",
self.tag
);
return Err(Error::RequestSnapshotDropped);
}
if self.leader_id == INVALID_ID {
info!(
"{} no leader at term {}; dropping request snapshot",
self.tag, self.term
);
return Err(Error::RequestSnapshotDropped);
}
if self.get_snap().is_some() {
info!(
"{} there is a pending snapshot; dropping request snapshot",
self.tag
);
return Err(Error::RequestSnapshotDropped);
}
self.send_request_snapshot();
self.requesting_snapshot = true;
Ok(())
}

// TODO: revoke pub when there is a better way to test.
/// For a given message, append the entries to the log.
pub fn handle_append_entries(&mut self, m: &Message) {
if self.requesting_snapshot {
self.send_request_snapshot();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the leader can tolerant follower send_request_snapshot multiple times with the same pending_request_snapshot?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because request snapshot message may be dropped on the network, it needs to be able to send the same message multiple times.

return;
}
if m.get_index() < self.raft_log.committed {
let mut to_send = Message::new();
to_send.set_to(m.get_from());
Expand Down Expand Up @@ -1794,7 +1841,9 @@ impl<T: Storage> Raft<T> {

fn restore_raft(&mut self, snap: &Snapshot) -> Option<bool> {
let meta = snap.get_metadata();
if self.raft_log.match_term(meta.get_index(), meta.get_term()) {
// Do not fast-forward commit if we are requesting snapshot.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the snapshot may be dropped if there is no further proposal after requesting a snapshot.

if !self.requesting_snapshot && self.raft_log.match_term(meta.get_index(), meta.get_term())
{
info!(
"{} [commit: {}, lastindex: {}, lastterm: {}] fast-forwarded commit to \
snapshot [index: {}, term: {}]",
Expand Down Expand Up @@ -1837,6 +1886,7 @@ impl<T: Storage> Raft<T> {
meta.get_term()
);

self.requesting_snapshot = false;
let nodes = meta.get_conf_state().get_nodes();
let learners = meta.get_conf_state().get_learners();
self.prs = Some(ProgressSet::new(nodes.len(), learners.len()));
Expand Down Expand Up @@ -2058,4 +2108,15 @@ impl<T: Storage> Raft<T> {
pub fn abort_leader_transfer(&mut self) {
self.lead_transferee = None;
}

fn send_request_snapshot(&mut self) {
let mut m = Message::new();
m.set_msg_type(MessageType::MsgAppendResponse);
m.set_index(self.raft_log.committed);
m.set_reject(true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why set reject?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It needs to be rejected, so it can enter requesting snapshot state in leader's view. See more at handle_append_response.

m.set_reject_hint(INVALID_INDEX);
m.set_to(self.leader_id);
m.set_request_snapshot(true);
self.send(m);
}
}
5 changes: 5 additions & 0 deletions src/raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,11 @@ impl<T: Storage> RawNode<T> {
let _ = self.raft.step(m);
}

/// ReportSnapshot reports the status of the sent snapshot.
pub fn request_snapshot(&mut self) -> Result<()> {
self.raft.request_snapshot()
}

/// TransferLeader tries to transfer leadership to the given transferee.
pub fn transfer_leader(&mut self, transferee: u64) {
let mut m = Message::new();
Expand Down
92 changes: 90 additions & 2 deletions tests/integration_cases/test_raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ fn test_progress_maybe_decr() {
];
for (i, &(state, m, n, rejected, last, w, wn)) in tests.iter().enumerate() {
let mut p = new_progress(state, m, n, 0, 0);
if p.maybe_decr_to(rejected, last) != w {
if p.maybe_decr_to(rejected, last, false) != w {
panic!("#{}: maybeDecrTo= {}, want {}", i, !w, w);
}
if p.matched != m {
Expand Down Expand Up @@ -284,7 +284,7 @@ fn test_progress_resume() {
paused: true,
..Default::default()
};
p.maybe_decr_to(1, 1);
p.maybe_decr_to(1, 1, false);
assert!(!p.paused, "paused= true, want false");
p.paused = true;
p.maybe_update(2);
Expand Down Expand Up @@ -4184,3 +4184,91 @@ fn test_conf_change_check_before_campaign() {
}
assert_eq!(nt.peers[&1].state, StateRole::Candidate);
}

fn prepare_request_snapshot() -> Network {
fn index_term_11(id: u64, ids: Vec<u64>) -> Interface {
let store = MemStorage::new();
store
.wl()
.apply_snapshot(new_snapshot(11, 11, ids.clone()))
.unwrap();
let mut raft = new_test_raft(id, vec![], 5, 1, store);
raft.reset(11);
raft
}
let mut nt = Network::new(vec![
Some(index_term_11(1, vec![1, 2, 3])),
Some(index_term_11(2, vec![1, 2, 3])),
Some(index_term_11(3, vec![1, 2, 3])),
]);

// elect r1 as leader
nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]);

let mut test_entries = Entry::new();
test_entries.set_data(b"testdata".to_vec());
let msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![test_entries.clone()]);
nt.send(vec![msg.clone(), msg.clone()]);
assert_eq!(nt.peers[&1].raft_log.committed, 14);
assert_eq!(nt.peers[&2].raft_log.committed, 14);

let mut cs = ConfState::new();
cs.set_nodes(nt.peers[&1].prs().nodes());
let ents = nt
.peers
.get_mut(&1)
.unwrap()
.raft_log
.unstable_entries()
.unwrap_or(&[])
.to_vec();
nt.storage[&1].wl().append(&ents).unwrap();
nt.peers.get_mut(&1).unwrap().raft_log.applied = 14;
nt.storage[&1]
.wl()
.create_snapshot(14, Some(cs), vec![7; 7])
.unwrap();
nt
}

// Test if an up-to-date follower can request a snapshot from leader.
#[test]
fn test_follower_request_snapshot() {
setup_for_test();
let mut nt = prepare_request_snapshot();

let mut test_entries = Entry::new();
test_entries.set_data(b"testdata".to_vec());
let msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![test_entries.clone()]);

nt.peers.get_mut(&2).unwrap().request_snapshot().unwrap();

// Send the request snapshot message.
let req_snap = nt.peers.get_mut(&2).unwrap().msgs.pop().unwrap();
assert!(
req_snap.get_msg_type() == MessageType::MsgAppendResponse
&& req_snap.get_reject()
&& req_snap.get_reject_hint() == INVALID_INDEX,
"{:?}",
req_snap
);
nt.peers.get_mut(&1).unwrap().step(req_snap).unwrap();

// New proposes can not be replicated to peer 2.
nt.send(vec![msg.clone()]);
assert_eq!(nt.peers[&1].raft_log.committed, 15);
assert_eq!(
nt.peers[&1].prs().voters()[&2].state,
ProgressState::Snapshot
);
assert_eq!(nt.peers[&2].raft_log.committed, 14);

// Util snapshot success or fail.
let report_ok = new_message(2, 1, MessageType::MsgSnapStatus, 0);
overvenus marked this conversation as resolved.
Show resolved Hide resolved
nt.send(vec![report_ok]);
let hb_resp = new_message(2, 1, MessageType::MsgHeartbeatResponse, 0);
nt.send(vec![hb_resp]);
nt.send(vec![msg]);
assert_eq!(nt.peers[&1].raft_log.committed, 16);
assert_eq!(nt.peers[&2].raft_log.committed, 16);
}
Loading