Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry pick all 0.4.x #263

Merged
merged 9 commits into from
Jul 19, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions harness/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,12 @@ impl Network {
npeers.insert(*id, r);
}
Some(r) => {
if r.raft.as_ref().map_or(false, |r| r.id != *id) {
panic!("peer {} in peers has a wrong position", r.id);
if let Some(raft) = r.raft.as_ref() {
if raft.id != *id {
panic!("peer {} in peers has a wrong position", r.id);
}
let store = raft.raft_log.store.clone();
nstorage.insert(*id, store);
}
npeers.insert(*id, r);
}
Expand Down
1 change: 1 addition & 0 deletions proto/proto/eraftpb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ message Message {
repeated Entry entries = 7;
uint64 commit = 8;
Snapshot snapshot = 9;
uint64 request_snapshot = 13;
bool reject = 10;
uint64 reject_hint = 11;
bytes context = 12;
Expand Down
2 changes: 2 additions & 0 deletions proto/src/prost/eraftpb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ pub struct Message {
pub commit: u64,
#[prost(message, optional, tag = "9")]
pub snapshot: ::std::option::Option<Snapshot>,
#[prost(uint64, tag = "13")]
pub request_snapshot: u64,
#[prost(bool, tag = "10")]
pub reject: bool,
#[prost(uint64, tag = "11")]
Expand Down
4 changes: 4 additions & 0 deletions proto/src/prost/wrapper_eraftpb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,10 @@ impl Message {
self.snapshot.take().unwrap_or_else(Snapshot::default)
}
#[inline]
pub fn clear_request_snapshot(&mut self) {
self.request_snapshot = 0
}
#[inline]
pub fn clear_reject(&mut self) {
self.reject = false
}
Expand Down
5 changes: 5 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ quick_error! {
ViolatesContract(contract: String) {
display("An argument violate a calling contract: {}", contract)
}
/// The request snapshot is dropped.
RequestSnapshotDropped {
description("raft: request snapshot dropped")
}
}
}

Expand All @@ -96,6 +100,7 @@ impl cmp::PartialEq for Error {
(&Error::Io(ref e1), &Error::Io(ref e2)) => e1.kind() == e2.kind(),
(&Error::StepLocalMsg, &Error::StepLocalMsg) => true,
(&Error::ConfigInvalid(ref e1), &Error::ConfigInvalid(ref e2)) => e1 == e2,
(&Error::RequestSnapshotDropped, &Error::RequestSnapshotDropped) => true,
_ => false,
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ before taking old, removed peers offline.

*/

#![cfg_attr(not(feature = "cargo-clippy"), allow(unknown_lints))]
#![deny(clippy::all)]
#![deny(missing_docs)]
#![recursion_limit = "128"]
Expand Down Expand Up @@ -406,7 +407,7 @@ pub use self::raft::{vote_resp_msg_type, Raft, SoftState, StateRole, INVALID_ID,
pub use self::raft_log::{RaftLog, NO_LIMIT};
pub use self::raw_node::{is_empty_snap, Peer, RawNode, Ready, SnapshotStatus};
pub use self::read_only::{ReadOnlyOption, ReadState};
pub use self::status::Status;
pub use self::status::{Status, StatusRef};
pub use self::storage::{RaftState, Storage};
pub use raft_proto::eraftpb;
use slog::{Drain, Logger};
Expand Down Expand Up @@ -435,7 +436,7 @@ pub mod prelude {

pub use crate::progress::Progress;

pub use crate::status::Status;
pub use crate::status::{Status, StatusRef};

pub use crate::read_only::{ReadOnlyOption, ReadState};
}
Expand Down
41 changes: 32 additions & 9 deletions src/progress/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::cmp;

use self::inflights::Inflights;
use std::cmp;
use crate::raft::INVALID_INDEX;
pub mod inflights;
pub mod progress_set;

Expand Down Expand Up @@ -73,6 +74,10 @@ pub struct Progress {
/// this Progress will be paused. raft will not resend snapshot until the pending one
/// is reported to be failed.
pub pending_snapshot: u64,
/// This field is used in request snapshot.
/// If there is a pending request snapshot, this will be set to the request
/// index of the snapshot.
pub pending_request_snapshot: u64,

/// This is true if the progress is recently active. Receiving any messages
/// from the corresponding follower indicates the progress is active.
Expand All @@ -98,6 +103,7 @@ impl Progress {
state: ProgressState::default(),
paused: false,
pending_snapshot: 0,
pending_request_snapshot: 0,
recent_active: false,
ins: Inflights::new(ins_size),
}
Expand All @@ -116,6 +122,7 @@ impl Progress {
self.state = ProgressState::default();
self.paused = false;
self.pending_snapshot = 0;
self.pending_request_snapshot = INVALID_INDEX;
self.recent_active = false;
debug_assert!(self.ins.cap() != 0);
self.ins.reset();
Expand Down Expand Up @@ -188,25 +195,41 @@ impl Progress {
/// Returns false if the given index comes from an out of order message.
/// Otherwise it decreases the progress next index to min(rejected, last)
/// and returns true.
pub fn maybe_decr_to(&mut self, rejected: u64, last: u64) -> bool {
pub fn maybe_decr_to(&mut self, rejected: u64, last: u64, request_snapshot: u64) -> bool {
if self.state == ProgressState::Replicate {
// the rejection must be stale if the progress has matched and "rejected"
// is smaller than "match".
if rejected <= self.matched {
// Or rejected equals to matched and request_snapshot is the INVALID_INDEX.
if rejected < self.matched
|| (rejected == self.matched && request_snapshot == INVALID_INDEX)
{
return false;
}
self.next_idx = self.matched + 1;
if request_snapshot == INVALID_INDEX {
self.next_idx = self.matched + 1;
} else {
self.pending_request_snapshot = request_snapshot;
}
return true;
}

// the rejection must be stale if "rejected" does not match next - 1
if self.next_idx == 0 || self.next_idx - 1 != rejected {
// The rejection must be stale if "rejected" does not match next - 1.
// Do not consider it stale if it is a request snapshot message.
if (self.next_idx == 0 || self.next_idx - 1 != rejected)
&& request_snapshot == INVALID_INDEX
{
return false;
}

self.next_idx = cmp::min(rejected, last + 1);
if self.next_idx < 1 {
self.next_idx = 1;
// Do not decrease next index if it's requesting snapshot.
if request_snapshot == INVALID_INDEX {
self.next_idx = cmp::min(rejected, last + 1);
if self.next_idx < 1 {
self.next_idx = 1;
}
} else if self.pending_request_snapshot == INVALID_INDEX {
// Allow requesting snapshot even if it's not Replicate.
self.pending_request_snapshot = request_snapshot;
}
self.resume();
true
Expand Down
Loading