Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
The patch is to speed up log replication when a node is way behind than
leader and logs are not compacted yet.

Signed-off-by: Jay Lee <[email protected]>
  • Loading branch information
BusyJay authored Mar 27, 2020
1 parent e2a1841 commit b9891b6
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 76 deletions.
69 changes: 69 additions & 0 deletions harness/tests/integration_cases/test_raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,75 @@ fn test_progress_paused() {
assert_eq!(ms.len(), 1);
}

#[test]
fn test_progress_flow_control() {
let l = default_logger();
let mut cfg = new_test_config(1, 5, 1);
cfg.max_inflight_msgs = 3;
cfg.max_size_per_msg = 2048;
let s = MemStorage::new_with_conf_state((vec![1, 2], vec![]));
let mut r = new_test_raft_with_config(&cfg, s, &l);
r.become_candidate();
r.become_leader();

// Throw away all the messages relating to the initial election.
r.read_messages();

// While node 2 is in probe state, propose a bunch of entries.
r.mut_prs().get_mut(2).unwrap().become_probe();
let data: String = std::iter::repeat('a').take(1000).collect();
for _ in 0..10 {
let msg = new_message_with_entries(
1,
1,
MessageType::MsgPropose,
vec![new_entry(0, 0, Some(&data))],
);
r.step(msg).unwrap();
}

let mut ms = r.read_messages();
// First append has two entries: the empty entry to confirm the
// election, and the first proposal (only one proposal gets sent
// because we're in probe state).
assert_eq!(ms.len(), 1);
assert_eq!(ms[0].msg_type, MessageType::MsgAppend);
assert_eq!(ms[0].entries.len(), 2);
assert_eq!(ms[0].entries[0].data.len(), 0);
assert_eq!(ms[0].entries[1].data.len(), 1000);

// When this append is acked, we change to replicate state and can
// send multiple messages at once.
let mut msg = new_message(2, 1, MessageType::MsgAppendResponse, 0);
msg.index = ms[0].entries[1].index;
r.step(msg).unwrap();
ms = r.read_messages();
assert_eq!(ms.len(), 3);
for (i, m) in ms.iter().enumerate() {
if m.msg_type != MessageType::MsgAppend {
panic!("{}: expected MsgAppend, got {:?}", i, m.msg_type);
}
if m.entries.len() != 2 {
panic!("{}: expected 2 entries, got {}", i, m.entries.len());
}
}

// Ack all three of those messages together and get the last two
// messages (containing three entries).
let mut msg = new_message(2, 1, MessageType::MsgAppendResponse, 0);
msg.index = ms[2].entries[1].index;
r.step(msg).unwrap();
ms = r.read_messages();
assert_eq!(ms.len(), 2);
for (i, m) in ms.iter().enumerate() {
if m.msg_type != MessageType::MsgAppend {
panic!("{}: expected MsgAppend, got {:?}", i, m.msg_type);
}
}
assert_eq!(ms[0].entries.len(), 2);
assert_eq!(ms[1].entries.len(), 1);
}

#[test]
fn test_leader_election() {
let l = default_logger();
Expand Down
173 changes: 97 additions & 76 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ pub struct SoftState {
pub raft_state: StateRole,
}

#[derive(Default)]
pub struct HandleResponseContext {
maybe_commit: bool,
send_append: bool,
loop_append: bool,
transfer_leader: bool,
old_paused: bool,
more_to_send: Vec<Message>,
}

/// A struct that represents the raft consensus itself. Stores details concerning the current
/// and possible state the system can take.
#[derive(Getters)]
Expand Down Expand Up @@ -558,41 +568,57 @@ impl<T: Storage> Raft<T> {
is_batched
}

/// Sends RPC, with entries to the given peer.
/// Sends an append RPC with new entries (if any) and the current commit index to the given
/// peer.
pub fn send_append(&mut self, to: u64, pr: &mut Progress) {
self.maybe_send_append(to, pr, true);
}

/// Sends an append RPC with new entries to the given peer,
/// if necessary. Returns true if a message was sent. The allow_empty
/// argument controls whether messages with no entries will be sent
/// ("empty" messages are useful to convey updated Commit indexes, but
/// are undesirable when we're sending multiple messages in a batch).
fn maybe_send_append(&mut self, to: u64, pr: &mut Progress, allow_empty: bool) -> bool {
if pr.is_paused() {
trace!(
self.logger,
"Skipping sending to {to}, it's paused",
to = to;
"progress" => ?pr,
);
return;
return false;
}
let mut m = Message::default();
m.to = to;
if pr.pending_request_snapshot != INVALID_INDEX {
// Check pending request snapshot first to avoid unnecessary loading entries.
if !self.prepare_send_snapshot(&mut m, pr, to) {
return;
return false;
}
} else {
let term = self.raft_log.term(pr.next_idx - 1);
let ents = self.raft_log.entries(pr.next_idx, self.max_msg_size);
if term.is_err() || ents.is_err() {
// send snapshot if we failed to get term or entries.
if !self.prepare_send_snapshot(&mut m, pr, to) {
return;
if !allow_empty && ents.as_ref().ok().map_or(true, |e| e.is_empty()) {
return false;
}
let term = self.raft_log.term(pr.next_idx - 1);
match (term, ents) {
(Ok(term), Ok(mut ents)) => {
if self.batch_append && self.try_batching(to, pr, &mut ents) {
return true;
}
self.prepare_send_entries(&mut m, pr, term, ents)
}
} else {
let mut ents = ents.unwrap();
if self.batch_append && self.try_batching(to, pr, &mut ents) {
return;
_ => {
// send snapshot if we failed to get term or entries.
if !self.prepare_send_snapshot(&mut m, pr, to) {
return false;
}
}
self.prepare_send_entries(&mut m, pr, term.unwrap(), ents);
}
}
self.send(m);
true
}

// send_heartbeat sends an empty MsgAppend
Expand Down Expand Up @@ -1224,9 +1250,7 @@ impl<T: Storage> Raft<T> {
&mut self,
m: &Message,
prs: &mut ProgressSet,
old_paused: &mut bool,
send_append: &mut bool,
maybe_commit: &mut bool,
ctx: &mut HandleResponseContext,
) {
let pr = prs.get_mut(m.from).unwrap();
pr.recent_active = true;
Expand All @@ -1250,54 +1274,59 @@ impl<T: Storage> Raft<T> {
if pr.state == ProgressState::Replicate {
pr.become_probe();
}
*send_append = true;
ctx.send_append = true;
}
return;
}

*old_paused = pr.is_paused();
ctx.old_paused = pr.is_paused();
if !pr.maybe_update(m.index) {
return;
}

// Transfer leadership is in progress.
if let Some(lead_transferee) = self.lead_transferee {
let last_index = self.raft_log.last_index();
if m.from == lead_transferee && pr.matched == last_index {
info!(
self.logger,
"sent MsgTimeoutNow to {from} after received MsgAppResp",
from = m.from;
);
self.send_timeout_now(m.from);
}
}

match pr.state {
ProgressState::Probe => pr.become_replicate(),
ProgressState::Snapshot => {
if !pr.maybe_snapshot_abort() {
return;
if pr.maybe_snapshot_abort() {
debug!(
self.logger,
"snapshot aborted, resumed sending replication messages to {from}",
from = m.from;
"progress" => ?pr,
);
pr.become_probe();
}
debug!(
}
ProgressState::Replicate => pr.ins.free_to(m.get_index()),
}
ctx.maybe_commit = true;
// We've updated flow control information above, which may
// allow us to send multiple (size-limited) in-flight messages
// at once (such as when transitioning from probe to
// replicate, or when freeTo() covers multiple messages). If
// we have more entries to send, send as many messages as we
// can (without sending empty messages for the commit index)
ctx.loop_append = true;

// Transfer leadership is in progress.
if Some(m.from) == self.lead_transferee {
let last_index = self.raft_log.last_index();
if pr.matched == last_index {
info!(
self.logger,
"snapshot aborted, resumed sending replication messages to {from}",
"sent MsgTimeoutNow to {from} after received MsgAppResp",
from = m.from;
"progress" => ?pr,
);
pr.become_probe();
ctx.transfer_leader = true;
}
ProgressState::Replicate => pr.ins.free_to(m.index),
}
*maybe_commit = true;
}

fn handle_heartbeat_response(
&mut self,
m: &Message,
prs: &mut ProgressSet,
send_append: &mut bool,
more_to_send: &mut Vec<Message>,
ctx: &mut HandleResponseContext,
) {
// Update the node. Drop the value explicitly since we'll check the qourum after.
{
Expand All @@ -1313,7 +1342,7 @@ impl<T: Storage> Raft<T> {
if pr.matched < self.raft_log.last_index()
|| pr.pending_request_snapshot != INVALID_INDEX
{
*send_append = true;
ctx.send_append = true;
}

if self.read_only.option != ReadOnlyOption::Safe || m.context.is_empty() {
Expand Down Expand Up @@ -1341,7 +1370,7 @@ impl<T: Storage> Raft<T> {
to_send.to = req.from;
to_send.index = rs.index;
to_send.set_entries(req.take_entries());
more_to_send.push(to_send);
ctx.more_to_send.push(to_send);
}
}
}
Expand All @@ -1356,9 +1385,8 @@ impl<T: Storage> Raft<T> {
return;
}
let lead_transferee = from;
let last_lead_transferee = self.lead_transferee;
if last_lead_transferee.is_some() {
if last_lead_transferee.unwrap() == lead_transferee {
if let Some(last_lead_transferee) = self.lead_transferee {
if last_lead_transferee == lead_transferee {
info!(
self.logger,
"[term {term}] transfer leadership to {lead_transferee} is in progress, ignores request \
Expand All @@ -1373,7 +1401,7 @@ impl<T: Storage> Raft<T> {
self.logger,
"[term {term}] abort previous transferring leadership to {last_lead_transferee}",
term = self.term,
last_lead_transferee = last_lead_transferee.unwrap();
last_lead_transferee = last_lead_transferee;
);
}
if lead_transferee == self.id {
Expand Down Expand Up @@ -1434,14 +1462,7 @@ impl<T: Storage> Raft<T> {
}

/// Check message's progress to decide which action should be taken.
fn check_message_with_progress(
&mut self,
m: &mut Message,
send_append: &mut bool,
old_paused: &mut bool,
maybe_commit: &mut bool,
more_to_send: &mut Vec<Message>,
) {
fn check_message_with_progress(&mut self, m: &mut Message, ctx: &mut HandleResponseContext) {
if self.prs().get(m.from).is_none() {
debug!(
self.logger,
Expand All @@ -1454,10 +1475,10 @@ impl<T: Storage> Raft<T> {
let mut prs = self.take_prs();
match m.get_msg_type() {
MessageType::MsgAppendResponse => {
self.handle_append_response(m, &mut prs, old_paused, send_append, maybe_commit);
self.handle_append_response(m, &mut prs, ctx);
}
MessageType::MsgHeartbeatResponse => {
self.handle_heartbeat_response(m, &mut prs, send_append, more_to_send);
self.handle_heartbeat_response(m, &mut prs, ctx);
}
MessageType::MsgSnapStatus => {
let pr = prs.get_mut(m.from).unwrap();
Expand Down Expand Up @@ -1610,38 +1631,38 @@ impl<T: Storage> Raft<T> {
_ => {}
}

let mut send_append = false;
let mut maybe_commit = false;
let mut old_paused = false;
let mut more_to_send = vec![];
self.check_message_with_progress(
&mut m,
&mut send_append,
&mut old_paused,
&mut maybe_commit,
&mut more_to_send,
);
if maybe_commit {
let mut ctx = HandleResponseContext::default();
self.check_message_with_progress(&mut m, &mut ctx);
if ctx.maybe_commit {
if self.maybe_commit() {
if self.should_bcast_commit() {
self.bcast_append();
}
} else if old_paused {
} else if ctx.old_paused {
// update() reset the wait state on this node. If we had delayed sending
// an update before, send it now.
send_append = true;
ctx.send_append = true;
}
}

if send_append {
if ctx.send_append || ctx.loop_append {
let from = m.from;
let mut prs = self.take_prs();
self.send_append(from, prs.get_mut(from).unwrap());
let pr = prs.get_mut(from).unwrap();
if ctx.send_append {
self.send_append(from, pr);
}
if ctx.loop_append {
while self.maybe_send_append(from, pr, false) {}
}
self.set_prs(prs);
}
if !more_to_send.is_empty() {
for to_send in more_to_send.drain(..) {
self.send(to_send);
if ctx.transfer_leader {
self.send_timeout_now(m.get_from());
}
if !ctx.more_to_send.is_empty() {
for m in ctx.more_to_send.drain(..) {
self.send(m);
}
}

Expand Down

0 comments on commit b9891b6

Please sign in to comment.