Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry pick all 0.4.x #263

Merged
merged 9 commits into from
Jul 19, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions proto/proto/eraftpb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ message Message {
repeated Entry entries = 7;
uint64 commit = 8;
Snapshot snapshot = 9;
uint64 request_snapshot = 13;
bool reject = 10;
uint64 reject_hint = 11;
bytes context = 12;
Expand Down
2 changes: 2 additions & 0 deletions proto/src/prost/eraftpb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ pub struct Message {
pub commit: u64,
#[prost(message, optional, tag = "9")]
pub snapshot: ::std::option::Option<Snapshot>,
#[prost(uint64, tag = "13")]
pub request_snapshot: u64,
#[prost(bool, tag = "10")]
pub reject: bool,
#[prost(uint64, tag = "11")]
Expand Down
4 changes: 4 additions & 0 deletions proto/src/prost/wrapper_eraftpb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,10 @@ impl Message {
self.snapshot.take().unwrap_or_else(Snapshot::default)
}
#[inline]
pub fn clear_request_snapshot(&mut self) {
self.request_snapshot = 0
}
#[inline]
pub fn clear_reject(&mut self) {
self.reject = false
}
Expand Down
5 changes: 5 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ quick_error! {
ViolatesContract(contract: String) {
display("An argument violate a calling contract: {}", contract)
}
/// The request snapshot is dropped.
RequestSnapshotDropped {
description("raft: request snapshot dropped")
}
}
}

Expand All @@ -96,6 +100,7 @@ impl cmp::PartialEq for Error {
(&Error::Io(ref e1), &Error::Io(ref e2)) => e1.kind() == e2.kind(),
(&Error::StepLocalMsg, &Error::StepLocalMsg) => true,
(&Error::ConfigInvalid(ref e1), &Error::ConfigInvalid(ref e2)) => e1 == e2,
(&Error::RequestSnapshotDropped, &Error::RequestSnapshotDropped) => true,
_ => false,
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,12 +385,12 @@ extern crate getset;
mod config;
mod errors;
mod log_unstable;
mod progress;
#[cfg(test)]
pub mod raft;
#[cfg(not(test))]
mod raft;
mod raft_log;
mod progress;
pub mod raw_node;
mod read_only;
mod status;
Expand Down
41 changes: 32 additions & 9 deletions src/progress/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::cmp;

use crate::raft::INVALID_INDEX;
use self::inflights::Inflights;
use std::cmp;
pub mod inflights;
pub mod progress_set;

Expand Down Expand Up @@ -73,6 +74,10 @@ pub struct Progress {
/// this Progress will be paused. raft will not resend snapshot until the pending one
/// is reported to be failed.
pub pending_snapshot: u64,
/// This field is used in request snapshot.
/// If there is a pending request snapshot, this will be set to the request
/// index of the snapshot.
pub pending_request_snapshot: u64,

/// This is true if the progress is recently active. Receiving any messages
/// from the corresponding follower indicates the progress is active.
Expand All @@ -98,6 +103,7 @@ impl Progress {
state: ProgressState::default(),
paused: false,
pending_snapshot: 0,
pending_request_snapshot: 0,
recent_active: false,
ins: Inflights::new(ins_size),
}
Expand All @@ -116,6 +122,7 @@ impl Progress {
self.state = ProgressState::default();
self.paused = false;
self.pending_snapshot = 0;
self.pending_request_snapshot = INVALID_INDEX;
self.recent_active = false;
debug_assert!(self.ins.cap() != 0);
self.ins.reset();
Expand Down Expand Up @@ -188,25 +195,41 @@ impl Progress {
/// Returns false if the given index comes from an out of order message.
/// Otherwise it decreases the progress next index to min(rejected, last)
/// and returns true.
pub fn maybe_decr_to(&mut self, rejected: u64, last: u64) -> bool {
pub fn maybe_decr_to(&mut self, rejected: u64, last: u64, request_snapshot: u64) -> bool {
if self.state == ProgressState::Replicate {
// the rejection must be stale if the progress has matched and "rejected"
// is smaller than "match".
if rejected <= self.matched {
// Or rejected equals to matched and request_snapshot is the INVALID_INDEX.
if rejected < self.matched
|| (rejected == self.matched && request_snapshot == INVALID_INDEX)
{
return false;
}
self.next_idx = self.matched + 1;
if request_snapshot == INVALID_INDEX {
self.next_idx = self.matched + 1;
} else {
self.pending_request_snapshot = request_snapshot;
}
return true;
}

// the rejection must be stale if "rejected" does not match next - 1
if self.next_idx == 0 || self.next_idx - 1 != rejected {
// The rejection must be stale if "rejected" does not match next - 1.
// Do not consider it stale if it is a request snapshot message.
if (self.next_idx == 0 || self.next_idx - 1 != rejected)
&& request_snapshot == INVALID_INDEX
{
return false;
}

self.next_idx = cmp::min(rejected, last + 1);
if self.next_idx < 1 {
self.next_idx = 1;
// Do not decrease next index if it's requesting snapshot.
if request_snapshot == INVALID_INDEX {
self.next_idx = cmp::min(rejected, last + 1);
if self.next_idx < 1 {
self.next_idx = 1;
}
} else if self.pending_request_snapshot == INVALID_INDEX {
// Allow requesting snapshot even if it's not Replicate.
self.pending_request_snapshot = request_snapshot;
}
self.resume();
true
Expand Down
109 changes: 86 additions & 23 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ pub struct Raft<T: Storage> {
/// The maximum length (in bytes) of all the entries.
pub max_msg_size: u64,

/// The peer is requesting snapshot, it is the index that the follower
/// needs it to be included in a snapshot.
pub pending_request_snapshot: u64,

prs: Option<ProgressSet>,

/// The current role of this node.
Expand Down Expand Up @@ -253,6 +257,7 @@ impl<T: Storage> Raft<T> {
max_inflight: c.max_inflight_msgs,
max_msg_size: c.max_size_per_msg,
prs: Some(ProgressSet::with_capacity(peers.len(), learners.len())),
pending_request_snapshot: INVALID_INDEX,
state: StateRole::Follower,
is_learner: false,
check_quorum: c.check_quorum,
Expand Down Expand Up @@ -524,7 +529,7 @@ impl<T: Storage> Raft<T> {
}

m.set_msg_type(MessageType::MsgSnapshot);
let snapshot_r = self.raft_log.snapshot();
let snapshot_r = self.raft_log.snapshot(pr.pending_request_snapshot);
if let Err(e) = snapshot_r {
if e == Error::Store(StorageError::SnapshotTemporarilyUnavailable) {
debug!(
Expand Down Expand Up @@ -619,34 +624,28 @@ impl<T: Storage> Raft<T> {
);
return;
}
let term = self.raft_log.term(pr.next_idx - 1);
let ents = self.raft_log.entries(pr.next_idx, self.max_msg_size);
let mut m = Message::default();
m.to = to;
if term.is_err() || ents.is_err() {
// send snapshot if we failed to get term or entries
trace!(
self.logger,
"Skipping sending to {to}",
to = to;
"index" => pr.next_idx,
"tag" => &self.tag,
"term" => ?term,
"ents" => ?ents,
);
if pr.pending_request_snapshot != INVALID_INDEX {
// Check pending request snapshot first to avoid unnecessary loading entries.
if !self.prepare_send_snapshot(&mut m, pr, to) {
return;
}
} else {
let mut ents = ents.unwrap();
if self.batch_append {
let batched = self.try_batching(to, pr, &mut ents);
if batched {
let term = self.raft_log.term(pr.next_idx - 1);
let ents = self.raft_log.entries(pr.next_idx, self.max_msg_size);
if term.is_err() || ents.is_err() {
// send snapshot if we failed to get term or entries.
if !self.prepare_send_snapshot(&mut m, pr, to) {
return;
}
} else {
let mut ents = ents.unwrap();
if self.batch_append && self.try_batching(to, pr, &mut ents) {
return;
}
self.prepare_send_entries(&mut m, pr, term.unwrap(), ents);
}
let term = term.unwrap();
self.prepare_send_entries(&mut m, pr, term, ents);
}
self.send(m);
}
Expand Down Expand Up @@ -768,6 +767,7 @@ impl<T: Storage> Raft<T> {

self.pending_conf_index = 0;
self.read_only = ReadOnly::new(self.read_only.option);
self.pending_request_snapshot = INVALID_INDEX;

let last_index = self.raft_log.last_index();
let self_id = self.id;
Expand Down Expand Up @@ -857,9 +857,11 @@ impl<T: Storage> Raft<T> {

/// Converts this node to a follower.
pub fn become_follower(&mut self, term: u64, leader_id: u64) {
let pending_request_snapshot = self.pending_request_snapshot;
self.reset(term);
self.leader_id = leader_id;
self.state = StateRole::Follower;
self.pending_request_snapshot = pending_request_snapshot;
info!(
self.logger,
"became follower at term {term}",
Expand Down Expand Up @@ -1459,7 +1461,7 @@ impl<T: Storage> Raft<T> {
"tag" => &self.tag,
);

if pr.maybe_decr_to(m.index, m.reject_hint) {
if pr.maybe_decr_to(m.index, m.reject_hint, m.request_snapshot) {
debug!(
self.logger,
"decreased progress of {}",
Expand Down Expand Up @@ -1531,7 +1533,8 @@ impl<T: Storage> Raft<T> {
if pr.state == ProgressState::Replicate && pr.ins.full() {
pr.ins.free_first_one();
}
if pr.matched < self.raft_log.last_index() {
// Does it request snapshot?
if pr.matched < self.raft_log.last_index() || pr.pending_request_snapshot != INVALID_INDEX {
*send_append = true;
}

Expand Down Expand Up @@ -1657,6 +1660,7 @@ impl<T: Storage> Raft<T> {
// out the next msgAppend.
// If snapshot failure, wait for a heartbeat interval before next try
pr.pause();
pr.pending_request_snapshot = INVALID_INDEX;
}

/// Check message's progress to decide which action should be taken.
Expand Down Expand Up @@ -2062,16 +2066,55 @@ impl<T: Storage> Raft<T> {
Ok(())
}

/// Request a snapshot from a leader.
pub fn request_snapshot(&mut self, request_index: u64) -> Result<()> {
if self.state == StateRole::Leader {
info!(
self.logger,
"can not request snapshot on leader; dropping request snapshot";
"tag" => &self.tag,
);
} else if self.leader_id == INVALID_ID {
info!(
self.logger,
"drop request snapshot because of no leader";
"tag" => &self.tag, "term" => self.term,
);
} else if self.get_snap().is_some() {
info!(
self.logger,
"there is a pending snapshot; dropping request snapshot";
"tag" => &self.tag,
);
} else if self.pending_request_snapshot != INVALID_INDEX {
info!(
self.logger,
"there is a pending snapshot; dropping request snapshot";
"tag" => &self.tag,
);
} else {
self.pending_request_snapshot = request_index;
self.send_request_snapshot();
return Ok(());
}
Err(Error::RequestSnapshotDropped)
}

// TODO: revoke pub when there is a better way to test.
/// For a given message, append the entries to the log.
pub fn handle_append_entries(&mut self, m: &Message) {
if self.pending_request_snapshot != INVALID_INDEX {
self.send_request_snapshot();
return;
}
if m.index < self.raft_log.committed {
debug!(
self.logger,
"got message with lower index than committed.";
"tag" => &self.tag,
);
let mut to_send = Message::default();
to_send.to = m.from;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dup with line 2119.

to_send.set_msg_type(MessageType::MsgAppendResponse);
to_send.to = m.from;
to_send.index = self.raft_log.committed;
Expand Down Expand Up @@ -2115,7 +2158,12 @@ impl<T: Storage> Raft<T> {
/// For a message, commit and send out heartbeat.
pub fn handle_heartbeat(&mut self, mut m: Message) {
self.raft_log.commit_to(m.commit);
if self.pending_request_snapshot != INVALID_INDEX {
self.send_request_snapshot();
return;
}
let mut to_send = Message::default();
to_send.to = m.from;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

to_send.set_msg_type(MessageType::MsgHeartbeatResponse);
to_send.to = m.from;
to_send.context = m.take_context();
Expand Down Expand Up @@ -2160,7 +2208,10 @@ impl<T: Storage> Raft<T> {

fn restore_raft(&mut self, snap: &Snapshot) -> Option<bool> {
let meta = snap.get_metadata();
if self.raft_log.match_term(meta.index, meta.term) {
// Do not fast-forward commit if we are requesting snapshot.
if self.pending_request_snapshot == INVALID_INDEX
&& self.raft_log.match_term(meta.index, meta.term)
{
info!(
self.logger,
"[commit: {commit}, lastindex: {last_index}, lastterm: {last_term}] fast-forwarded commit to \
Expand Down Expand Up @@ -2220,6 +2271,7 @@ impl<T: Storage> Raft<T> {
conf_change.start_index = meta.pending_membership_change_index;
self.pending_membership_change = Some(conf_change);
}
self.pending_request_snapshot = INVALID_INDEX;
None
}

Expand Down Expand Up @@ -2503,4 +2555,15 @@ impl<T: Storage> Raft<T> {
pub fn is_in_membership_change(&self) -> bool {
self.prs().is_in_membership_change()
}

fn send_request_snapshot(&mut self) {
let mut m = Message::default();
m.set_msg_type(MessageType::MsgAppendResponse);
m.index = self.raft_log.committed;
m.reject = true;
m.reject_hint = self.raft_log.last_index();
m.to = self.leader_id;
m.request_snapshot = self.pending_request_snapshot;
self.send(m);
}
}
12 changes: 7 additions & 5 deletions src/raft_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,11 +424,13 @@ impl<T: Storage> RaftLog<T> {
}

/// Returns the current snapshot
pub fn snapshot(&self) -> Result<Snapshot> {
self.unstable
.snapshot
.clone()
.map_or_else(|| self.store.snapshot(), Ok)
pub fn snapshot(&self, request_index: u64) -> Result<Snapshot> {
if let Some(snap) = self.unstable.snapshot.as_ref() {
if snap.get_metadata().index >= request_index {
return Ok(snap.clone());
}
}
self.store.snapshot(request_index)
}

fn must_check_outofbounds(&self, low: u64, high: u64) -> Option<Error> {
Expand Down
Loading