diff --git a/harness/tests/integration_cases/test_raft.rs b/harness/tests/integration_cases/test_raft.rs index 8ee3edb43..a120a97e1 100644 --- a/harness/tests/integration_cases/test_raft.rs +++ b/harness/tests/integration_cases/test_raft.rs @@ -347,6 +347,75 @@ fn test_progress_paused() { assert_eq!(ms.len(), 1); } +#[test] +fn test_progress_flow_control() { + let l = default_logger(); + let mut cfg = new_test_config(1, 5, 1); + cfg.max_inflight_msgs = 3; + cfg.max_size_per_msg = 2048; + let s = MemStorage::new_with_conf_state((vec![1, 2], vec![])); + let mut r = new_test_raft_with_config(&cfg, s, &l); + r.become_candidate(); + r.become_leader(); + + // Throw away all the messages relating to the initial election. + r.read_messages(); + + // While node 2 is in probe state, propose a bunch of entries. + r.mut_prs().get_mut(2).unwrap().become_probe(); + let data: String = std::iter::repeat('a').take(1000).collect(); + for _ in 0..10 { + let msg = new_message_with_entries( + 1, + 1, + MessageType::MsgPropose, + vec![new_entry(0, 0, Some(&data))], + ); + r.step(msg).unwrap(); + } + + let mut ms = r.read_messages(); + // First append has two entries: the empty entry to confirm the + // election, and the first proposal (only one proposal gets sent + // because we're in probe state). + assert_eq!(ms.len(), 1); + assert_eq!(ms[0].msg_type, MessageType::MsgAppend); + assert_eq!(ms[0].entries.len(), 2); + assert_eq!(ms[0].entries[0].data.len(), 0); + assert_eq!(ms[0].entries[1].data.len(), 1000); + + // When this append is acked, we change to replicate state and can + // send multiple messages at once. + let mut msg = new_message(2, 1, MessageType::MsgAppendResponse, 0); + msg.index = ms[0].entries[1].index; + r.step(msg).unwrap(); + ms = r.read_messages(); + assert_eq!(ms.len(), 3); + for (i, m) in ms.iter().enumerate() { + if m.msg_type != MessageType::MsgAppend { + panic!("{}: expected MsgAppend, got {:?}", i, m.msg_type); + } + if m.entries.len() != 2 { + panic!("{}: expected 2 entries, got {}", i, m.entries.len()); + } + } + + // Ack all three of those messages together and get the last two + // messages (containing three entries). + let mut msg = new_message(2, 1, MessageType::MsgAppendResponse, 0); + msg.index = ms[2].entries[1].index; + r.step(msg).unwrap(); + ms = r.read_messages(); + assert_eq!(ms.len(), 2); + for (i, m) in ms.iter().enumerate() { + if m.msg_type != MessageType::MsgAppend { + panic!("{}: expected MsgAppend, got {:?}", i, m.msg_type); + } + } + assert_eq!(ms[0].entries.len(), 2); + assert_eq!(ms[1].entries.len(), 1); +} + #[test] fn test_leader_election() { let l = default_logger(); diff --git a/src/raft.rs b/src/raft.rs index 36aed5269..345c51138 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -72,6 +72,16 @@ pub struct SoftState { pub raft_state: StateRole, } +#[derive(Default)] +pub struct HandleResponseContext { + maybe_commit: bool, + send_append: bool, + loop_append: bool, + transfer_leader: bool, + old_paused: bool, + more_to_send: Vec, +} + /// A struct that represents the raft consensus itself. Stores details concerning the current /// and possible state the system can take. #[derive(Getters)] @@ -558,8 +568,18 @@ impl Raft { is_batched } - /// Sends RPC, with entries to the given peer. + /// Sends an append RPC with new entries (if any) and the current commit index to the given + /// peer. pub fn send_append(&mut self, to: u64, pr: &mut Progress) { + self.maybe_send_append(to, pr, true); + } + + /// Sends an append RPC with new entries to the given peer, + /// if necessary. Returns true if a message was sent. The allow_empty + /// argument controls whether messages with no entries will be sent + /// ("empty" messages are useful to convey updated Commit indexes, but + /// are undesirable when we're sending multiple messages in a batch). + fn maybe_send_append(&mut self, to: u64, pr: &mut Progress, allow_empty: bool) -> bool { if pr.is_paused() { trace!( self.logger, @@ -567,32 +587,38 @@ impl Raft { to = to; "progress" => ?pr, ); - return; + return false; } let mut m = Message::default(); m.to = to; if pr.pending_request_snapshot != INVALID_INDEX { // Check pending request snapshot first to avoid unnecessary loading entries. if !self.prepare_send_snapshot(&mut m, pr, to) { - return; + return false; } } else { - let term = self.raft_log.term(pr.next_idx - 1); let ents = self.raft_log.entries(pr.next_idx, self.max_msg_size); - if term.is_err() || ents.is_err() { - // send snapshot if we failed to get term or entries. - if !self.prepare_send_snapshot(&mut m, pr, to) { - return; + if !allow_empty && ents.as_ref().ok().map_or(true, |e| e.is_empty()) { + return false; + } + let term = self.raft_log.term(pr.next_idx - 1); + match (term, ents) { + (Ok(term), Ok(mut ents)) => { + if self.batch_append && self.try_batching(to, pr, &mut ents) { + return true; + } + self.prepare_send_entries(&mut m, pr, term, ents) } - } else { - let mut ents = ents.unwrap(); - if self.batch_append && self.try_batching(to, pr, &mut ents) { - return; + _ => { + // send snapshot if we failed to get term or entries. + if !self.prepare_send_snapshot(&mut m, pr, to) { + return false; + } } - self.prepare_send_entries(&mut m, pr, term.unwrap(), ents); } } self.send(m); + true } // send_heartbeat sends an empty MsgAppend @@ -1224,9 +1250,7 @@ impl Raft { &mut self, m: &Message, prs: &mut ProgressSet, - old_paused: &mut bool, - send_append: &mut bool, - maybe_commit: &mut bool, + ctx: &mut HandleResponseContext, ) { let pr = prs.get_mut(m.from).unwrap(); pr.recent_active = true; @@ -1250,54 +1274,59 @@ impl Raft { if pr.state == ProgressState::Replicate { pr.become_probe(); } - *send_append = true; + ctx.send_append = true; } return; } - *old_paused = pr.is_paused(); + ctx.old_paused = pr.is_paused(); if !pr.maybe_update(m.index) { return; } - // Transfer leadership is in progress. - if let Some(lead_transferee) = self.lead_transferee { - let last_index = self.raft_log.last_index(); - if m.from == lead_transferee && pr.matched == last_index { - info!( - self.logger, - "sent MsgTimeoutNow to {from} after received MsgAppResp", - from = m.from; - ); - self.send_timeout_now(m.from); - } - } - match pr.state { ProgressState::Probe => pr.become_replicate(), ProgressState::Snapshot => { - if !pr.maybe_snapshot_abort() { - return; + if pr.maybe_snapshot_abort() { + debug!( + self.logger, + "snapshot aborted, resumed sending replication messages to {from}", + from = m.from; + "progress" => ?pr, + ); + pr.become_probe(); } - debug!( + } + ProgressState::Replicate => pr.ins.free_to(m.get_index()), + } + ctx.maybe_commit = true; + // We've updated flow control information above, which may + // allow us to send multiple (size-limited) in-flight messages + // at once (such as when transitioning from probe to + // replicate, or when freeTo() covers multiple messages). If + // we have more entries to send, send as many messages as we + // can (without sending empty messages for the commit index) + ctx.loop_append = true; + + // Transfer leadership is in progress. + if Some(m.from) == self.lead_transferee { + let last_index = self.raft_log.last_index(); + if pr.matched == last_index { + info!( self.logger, - "snapshot aborted, resumed sending replication messages to {from}", + "sent MsgTimeoutNow to {from} after received MsgAppResp", from = m.from; - "progress" => ?pr, ); - pr.become_probe(); + ctx.transfer_leader = true; } - ProgressState::Replicate => pr.ins.free_to(m.index), } - *maybe_commit = true; } fn handle_heartbeat_response( &mut self, m: &Message, prs: &mut ProgressSet, - send_append: &mut bool, - more_to_send: &mut Vec, + ctx: &mut HandleResponseContext, ) { // Update the node. Drop the value explicitly since we'll check the qourum after. { @@ -1313,7 +1342,7 @@ impl Raft { if pr.matched < self.raft_log.last_index() || pr.pending_request_snapshot != INVALID_INDEX { - *send_append = true; + ctx.send_append = true; } if self.read_only.option != ReadOnlyOption::Safe || m.context.is_empty() { @@ -1341,7 +1370,7 @@ impl Raft { to_send.to = req.from; to_send.index = rs.index; to_send.set_entries(req.take_entries()); - more_to_send.push(to_send); + ctx.more_to_send.push(to_send); } } } @@ -1356,9 +1385,8 @@ impl Raft { return; } let lead_transferee = from; - let last_lead_transferee = self.lead_transferee; - if last_lead_transferee.is_some() { - if last_lead_transferee.unwrap() == lead_transferee { + if let Some(last_lead_transferee) = self.lead_transferee { + if last_lead_transferee == lead_transferee { info!( self.logger, "[term {term}] transfer leadership to {lead_transferee} is in progress, ignores request \ @@ -1373,7 +1401,7 @@ impl Raft { self.logger, "[term {term}] abort previous transferring leadership to {last_lead_transferee}", term = self.term, - last_lead_transferee = last_lead_transferee.unwrap(); + last_lead_transferee = last_lead_transferee; ); } if lead_transferee == self.id { @@ -1434,14 +1462,7 @@ impl Raft { } /// Check message's progress to decide which action should be taken. - fn check_message_with_progress( - &mut self, - m: &mut Message, - send_append: &mut bool, - old_paused: &mut bool, - maybe_commit: &mut bool, - more_to_send: &mut Vec, - ) { + fn check_message_with_progress(&mut self, m: &mut Message, ctx: &mut HandleResponseContext) { if self.prs().get(m.from).is_none() { debug!( self.logger, @@ -1454,10 +1475,10 @@ impl Raft { let mut prs = self.take_prs(); match m.get_msg_type() { MessageType::MsgAppendResponse => { - self.handle_append_response(m, &mut prs, old_paused, send_append, maybe_commit); + self.handle_append_response(m, &mut prs, ctx); } MessageType::MsgHeartbeatResponse => { - self.handle_heartbeat_response(m, &mut prs, send_append, more_to_send); + self.handle_heartbeat_response(m, &mut prs, ctx); } MessageType::MsgSnapStatus => { let pr = prs.get_mut(m.from).unwrap(); @@ -1610,38 +1631,38 @@ impl Raft { _ => {} } - let mut send_append = false; - let mut maybe_commit = false; - let mut old_paused = false; - let mut more_to_send = vec![]; - self.check_message_with_progress( - &mut m, - &mut send_append, - &mut old_paused, - &mut maybe_commit, - &mut more_to_send, - ); - if maybe_commit { + let mut ctx = HandleResponseContext::default(); + self.check_message_with_progress(&mut m, &mut ctx); + if ctx.maybe_commit { if self.maybe_commit() { if self.should_bcast_commit() { self.bcast_append(); } - } else if old_paused { + } else if ctx.old_paused { // update() reset the wait state on this node. If we had delayed sending // an update before, send it now. - send_append = true; + ctx.send_append = true; } } - if send_append { + if ctx.send_append || ctx.loop_append { let from = m.from; let mut prs = self.take_prs(); - self.send_append(from, prs.get_mut(from).unwrap()); + let pr = prs.get_mut(from).unwrap(); + if ctx.send_append { + self.send_append(from, pr); + } + if ctx.loop_append { + while self.maybe_send_append(from, pr, false) {} + } self.set_prs(prs); } - if !more_to_send.is_empty() { - for to_send in more_to_send.drain(..) { - self.send(to_send); + if ctx.transfer_leader { + self.send_timeout_now(m.get_from()); + } + if !ctx.more_to_send.is_empty() { + for m in ctx.more_to_send.drain(..) { + self.send(m); } }