Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
The patch is to speed up log replication when a node is way
behind than leader and logs are not compacted yet.

Signed-off-by: Jay Lee <[email protected]>
  • Loading branch information
BusyJay authored Mar 3, 2020
1 parent fe10576 commit 9782199
Show file tree
Hide file tree
Showing 5 changed files with 224 additions and 118 deletions.
10 changes: 5 additions & 5 deletions src/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,10 @@ impl ProgressSet {
/// Adds a voter node
pub fn insert_voter(&mut self, id: u64, pr: Progress) -> Result<(), Error> {
if self.voters.contains_key(&id) {
Err(Error::Exists(id, "voters"))?
return Err(Error::Exists(id, "voters"));
}
if self.learners.contains_key(&id) {
Err(Error::Exists(id, "learners"))?;
return Err(Error::Exists(id, "learners"));
}
self.voters.insert(id, pr);
Ok(())
Expand All @@ -130,10 +130,10 @@ impl ProgressSet {
/// Adds a learner to the cluster
pub fn insert_learner(&mut self, id: u64, pr: Progress) -> Result<(), Error> {
if self.voters.contains_key(&id) {
Err(Error::Exists(id, "voters"))?
return Err(Error::Exists(id, "voters"));
}
if self.learners.contains_key(&id) {
Err(Error::Exists(id, "learners"))?
return Err(Error::Exists(id, "learners"));
}
self.learners.insert(id, pr);
Ok(())
Expand All @@ -150,7 +150,7 @@ impl ProgressSet {
/// Promote a learner to a peer.
pub fn promote_learner(&mut self, id: u64) -> Result<(), Error> {
if self.voters.contains_key(&id) {
Err(Error::Exists(id, "voters"))?;
return Err(Error::Exists(id, "voters"));
}
// We don't want to remove it unless it's there.
if self.learners.contains_key(&id) {
Expand Down
170 changes: 95 additions & 75 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,16 @@ pub fn quorum(total: usize) -> usize {
total / 2 + 1
}

#[derive(Default)]
pub struct HandleResponseContext {
maybe_commit: bool,
send_append: bool,
loop_append: bool,
transfer_leader: bool,
old_paused: bool,
more_to_send: Vec<Message>,
}

impl<T: Storage> Raft<T> {
/// Creates a new raft for use on the node.
pub fn new(c: &Config, store: T) -> Raft<T> {
Expand Down Expand Up @@ -521,31 +531,46 @@ impl<T: Storage> Raft<T> {
}
}

/// Sends RPC, with entries to the given peer.
/// Sends an append RPC with new entries (if any) and the
/// current commit index to the given peer.
pub fn send_append(&mut self, to: u64, pr: &mut Progress) {
self.maybe_send_append(to, pr, true);
}

/// Sends an append RPC with new entries to the given peer,
/// if necessary. Returns true if a message was sent. The allow_empty
/// argument controls whether messages with no entries will be sent
/// ("empty" messages are useful to convey updated Commit indexes, but
/// are undesirable when we're sending multiple messages in a batch).
fn maybe_send_append(&mut self, to: u64, pr: &mut Progress, allow_empty: bool) -> bool {
if pr.is_paused() {
return;
return false;
}
let mut m = Message::new();
m.set_to(to);
if pr.pending_request_snapshot != INVALID_INDEX {
// Check pending request snapshot first to avoid unnecessary loading entries.
if !self.prepare_send_snapshot(&mut m, pr, to) {
return;
return false;
}
} else {
let term = self.raft_log.term(pr.next_idx - 1);
let ents = self.raft_log.entries(pr.next_idx, self.max_msg_size);
if term.is_err() || ents.is_err() {
// send snapshot if we failed to get term or entries.
if !self.prepare_send_snapshot(&mut m, pr, to) {
return;
if !allow_empty && ents.as_ref().ok().map_or(true, |e| e.is_empty()) {
return false;
}
let term = self.raft_log.term(pr.next_idx - 1);
match (term, ents) {
(Ok(term), Ok(ents)) => self.prepare_send_entries(&mut m, pr, term, ents),
_ => {
// send snapshot if we failed to get term or entries.
if !self.prepare_send_snapshot(&mut m, pr, to) {
return false;
}
}
} else {
self.prepare_send_entries(&mut m, pr, term.unwrap(), ents.unwrap());
}
}
self.send(m);
true
}

// send_heartbeat sends an empty MsgAppend
Expand Down Expand Up @@ -1155,9 +1180,7 @@ impl<T: Storage> Raft<T> {
&mut self,
m: &Message,
prs: &mut ProgressSet,
old_paused: &mut bool,
send_append: &mut bool,
maybe_commit: &mut bool,
ctx: &mut HandleResponseContext,
) {
let pr = prs.get_mut(m.get_from()).unwrap();
pr.recent_active = true;
Expand All @@ -1181,56 +1204,61 @@ impl<T: Storage> Raft<T> {
if pr.state == ProgressState::Replicate {
pr.become_probe();
}
*send_append = true;
ctx.send_append = true;
}
return;
}

*old_paused = pr.is_paused();
ctx.old_paused = pr.is_paused();
if !pr.maybe_update(m.get_index()) {
return;
}

match pr.state {
ProgressState::Probe => pr.become_replicate(),
ProgressState::Snapshot => {
if pr.maybe_snapshot_abort() {
debug!(
"{} snapshot aborted, resumed sending replication messages to {} \
[{:?}]",
self.tag,
m.get_from(),
pr
);
pr.become_probe();
}
}
ProgressState::Replicate => pr.ins.free_to(m.get_index()),
}
ctx.maybe_commit = true;
// We've updated flow control information above, which may
// allow us to send multiple (size-limited) in-flight messages
// at once (such as when transitioning from probe to
// replicate, or when freeTo() covers multiple messages). If
// we have more entries to send, send as many messages as we
// can (without sending empty messages for the commit index)
ctx.loop_append = true;

// Transfer leadership is in progress.
if let Some(lead_transferee) = self.lead_transferee {
if Some(m.get_from()) == self.lead_transferee {
let last_index = self.raft_log.last_index();
if m.get_from() == lead_transferee && pr.matched == last_index {
if pr.matched == last_index {
info!(
"{} sent MsgTimeoutNow to {} after received MsgAppResp",
self.tag,
m.get_from()
);
self.send_timeout_now(m.get_from());
}
}

match pr.state {
ProgressState::Probe => pr.become_replicate(),
ProgressState::Snapshot => {
if !pr.maybe_snapshot_abort() {
return;
}
debug!(
"{} snapshot aborted, resumed sending replication messages to {} \
[{:?}]",
self.tag,
m.get_from(),
pr
);
pr.become_probe();
ctx.transfer_leader = true;
}
ProgressState::Replicate => pr.ins.free_to(m.get_index()),
}
*maybe_commit = true;
}

fn handle_heartbeat_response(
&mut self,
m: &Message,
prs: &mut ProgressSet,
quorum: usize,
send_append: &mut bool,
more_to_send: &mut Option<Message>,
ctx: &mut HandleResponseContext,
) {
let pr = prs.get_mut(m.get_from()).unwrap();
pr.recent_active = true;
Expand All @@ -1242,7 +1270,7 @@ impl<T: Storage> Raft<T> {
}
// Does it request snapshot?
if pr.matched < self.raft_log.last_index() || pr.pending_request_snapshot != INVALID_INDEX {
*send_append = true;
ctx.send_append = true;
}

if self.read_only.option != ReadOnlyOption::Safe || m.get_context().is_empty() {
Expand All @@ -1269,7 +1297,7 @@ impl<T: Storage> Raft<T> {
to_send.set_msg_type(MessageType::MsgReadIndexResp);
to_send.set_index(rs.index);
to_send.set_entries(req.take_entries());
*more_to_send = Some(to_send);
ctx.more_to_send.push(to_send);
}
}
}
Expand All @@ -1281,9 +1309,8 @@ impl<T: Storage> Raft<T> {
}

let lead_transferee = m.get_from();
let last_lead_transferee = self.lead_transferee;
if last_lead_transferee.is_some() {
if last_lead_transferee.unwrap() == lead_transferee {
if let Some(last_lead_transferee) = self.lead_transferee {
if last_lead_transferee == lead_transferee {
info!(
"{} [term {}] transfer leadership to {} is in progress, ignores request \
to same node {}",
Expand All @@ -1294,9 +1321,7 @@ impl<T: Storage> Raft<T> {
self.abort_leader_transfer();
info!(
"{} [term {}] abort previous transferring leadership to {}",
self.tag,
self.term,
last_lead_transferee.unwrap()
self.tag, self.term, last_lead_transferee
);
}
if lead_transferee == self.id {
Expand Down Expand Up @@ -1353,14 +1378,7 @@ impl<T: Storage> Raft<T> {
}

/// Check message's progress to decide which action should be taken.
fn check_message_with_progress(
&mut self,
m: &mut Message,
send_append: &mut bool,
old_paused: &mut bool,
maybe_commit: &mut bool,
more_to_send: &mut Option<Message>,
) {
fn check_message_with_progress(&mut self, m: &mut Message, ctx: &mut HandleResponseContext) {
if self.prs().get(m.get_from()).is_none() {
debug!("{} no progress available for {}", self.tag, m.get_from());
return;
Expand All @@ -1369,11 +1387,11 @@ impl<T: Storage> Raft<T> {
let mut prs = self.take_prs();
match m.get_msg_type() {
MessageType::MsgAppendResponse => {
self.handle_append_response(m, &mut prs, old_paused, send_append, maybe_commit);
self.handle_append_response(m, &mut prs, ctx);
}
MessageType::MsgHeartbeatResponse => {
let quorum = quorum(prs.voters().len());
self.handle_heartbeat_response(m, &mut prs, quorum, send_append, more_to_send);
self.handle_heartbeat_response(m, &mut prs, quorum, ctx);
}
MessageType::MsgSnapStatus => {
let pr = prs.get_mut(m.get_from()).unwrap();
Expand Down Expand Up @@ -1526,37 +1544,39 @@ impl<T: Storage> Raft<T> {
_ => {}
}

let mut send_append = false;
let mut maybe_commit = false;
let mut old_paused = false;
let mut more_to_send = None;
self.check_message_with_progress(
&mut m,
&mut send_append,
&mut old_paused,
&mut maybe_commit,
&mut more_to_send,
);
if maybe_commit {
let mut ctx = HandleResponseContext::default();
self.check_message_with_progress(&mut m, &mut ctx);
if ctx.maybe_commit {
if self.maybe_commit() {
if self.should_bcast_commit() {
self.bcast_append();
}
} else if old_paused {
} else if ctx.old_paused {
// update() reset the wait state on this node. If we had delayed sending
// an update before, send it now.
send_append = true;
ctx.send_append = true;
}
}

if send_append {
if ctx.send_append || ctx.loop_append {
let from = m.get_from();
let mut prs = self.take_prs();
self.send_append(from, prs.get_mut(from).unwrap());
let pr = prs.get_mut(from).unwrap();
if ctx.send_append {
self.send_append(from, pr);
}
if ctx.loop_append {
while self.maybe_send_append(from, pr, false) {}
}
self.set_prs(prs);
}
if let Some(to_send) = more_to_send {
self.send(to_send)
if ctx.transfer_leader {
self.send_timeout_now(m.get_from());
}
if !ctx.more_to_send.is_empty() {
for m in ctx.more_to_send.drain(..) {
self.send(m);
}
}

Ok(())
Expand Down
7 changes: 3 additions & 4 deletions src/raft_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,8 +430,8 @@ impl<T: Storage> RaftLog<T> {
/// returned by value. The result is truncated to the max_size in bytes.
pub fn slice(&self, low: u64, high: u64, max_size: u64) -> Result<Vec<Entry>> {
let err = self.must_check_outofbounds(low, high);
if err.is_some() {
return Err(err.unwrap());
if let Some(err) = err {
return Err(err);
}

let mut ents = vec![];
Expand All @@ -443,8 +443,7 @@ impl<T: Storage> RaftLog<T> {
let stored_entries =
self.store
.entries(low, cmp::min(high, self.unstable.offset), max_size);
if stored_entries.is_err() {
let e = stored_entries.unwrap_err();
if let Err(e) = stored_entries {
match e {
Error::Store(StorageError::Compacted) => return Err(e),
Error::Store(StorageError::Unavailable) => panic!(
Expand Down
21 changes: 11 additions & 10 deletions src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::cmp::Ordering;
use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};

use eraftpb::{ConfState, Entry, HardState, Snapshot};
Expand Down Expand Up @@ -211,19 +212,19 @@ impl MemStorageCore {
};

let offset = te[0].get_index() - self.entries[0].get_index();
if self.entries.len() as u64 > offset {
let mut new_entries: Vec<Entry> = vec![];
new_entries.extend_from_slice(&self.entries[..offset as usize]);
new_entries.extend_from_slice(te);
self.entries = new_entries;
} else if self.entries.len() as u64 == offset {
self.entries.extend_from_slice(te);
} else {
panic!(
match (self.entries.len() as u64).cmp(&offset) {
Ordering::Greater => {
let mut new_entries: Vec<Entry> = vec![];
new_entries.extend_from_slice(&self.entries[..offset as usize]);
new_entries.extend_from_slice(te);
self.entries = new_entries;
}
Ordering::Equal => self.entries.extend_from_slice(te),
_ => panic!(
"missing log entry [last: {}, append at: {}]",
self.inner_last_index(),
te[0].get_index()
)
),
}

Ok(())
Expand Down
Loading

0 comments on commit 9782199

Please sign in to comment.