From 9782199fa6318f94bf0974129664d974b03f8f11 Mon Sep 17 00:00:00 2001 From: Jay Date: Tue, 3 Mar 2020 16:24:51 +0800 Subject: [PATCH] *: port coreos/etcd#9985 (#340) The patch is to speed up log replication when a node is way behind than leader and logs are not compacted yet. Signed-off-by: Jay Lee --- src/progress.rs | 10 +- src/raft.rs | 170 +++++++++++++++------------ src/raft_log.rs | 7 +- src/storage.rs | 21 ++-- tests/integration_cases/test_raft.rs | 134 +++++++++++++++++---- 5 files changed, 224 insertions(+), 118 deletions(-) diff --git a/src/progress.rs b/src/progress.rs index accb59316..837355f7d 100644 --- a/src/progress.rs +++ b/src/progress.rs @@ -118,10 +118,10 @@ impl ProgressSet { /// Adds a voter node pub fn insert_voter(&mut self, id: u64, pr: Progress) -> Result<(), Error> { if self.voters.contains_key(&id) { - Err(Error::Exists(id, "voters"))? + return Err(Error::Exists(id, "voters")); } if self.learners.contains_key(&id) { - Err(Error::Exists(id, "learners"))?; + return Err(Error::Exists(id, "learners")); } self.voters.insert(id, pr); Ok(()) @@ -130,10 +130,10 @@ impl ProgressSet { /// Adds a learner to the cluster pub fn insert_learner(&mut self, id: u64, pr: Progress) -> Result<(), Error> { if self.voters.contains_key(&id) { - Err(Error::Exists(id, "voters"))? + return Err(Error::Exists(id, "voters")); } if self.learners.contains_key(&id) { - Err(Error::Exists(id, "learners"))? + return Err(Error::Exists(id, "learners")); } self.learners.insert(id, pr); Ok(()) @@ -150,7 +150,7 @@ impl ProgressSet { /// Promote a learner to a peer. pub fn promote_learner(&mut self, id: u64) -> Result<(), Error> { if self.voters.contains_key(&id) { - Err(Error::Exists(id, "voters"))?; + return Err(Error::Exists(id, "voters")); } // We don't want to remove it unless it's there. if self.learners.contains_key(&id) { diff --git a/src/raft.rs b/src/raft.rs index 87a8ef7d3..8ef7cb9cc 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -220,6 +220,16 @@ pub fn quorum(total: usize) -> usize { total / 2 + 1 } +#[derive(Default)] +pub struct HandleResponseContext { + maybe_commit: bool, + send_append: bool, + loop_append: bool, + transfer_leader: bool, + old_paused: bool, + more_to_send: Vec, +} + impl Raft { /// Creates a new raft for use on the node. pub fn new(c: &Config, store: T) -> Raft { @@ -521,31 +531,46 @@ impl Raft { } } - /// Sends RPC, with entries to the given peer. + /// Sends an append RPC with new entries (if any) and the + /// current commit index to the given peer. pub fn send_append(&mut self, to: u64, pr: &mut Progress) { + self.maybe_send_append(to, pr, true); + } + + /// Sends an append RPC with new entries to the given peer, + /// if necessary. Returns true if a message was sent. The allow_empty + /// argument controls whether messages with no entries will be sent + /// ("empty" messages are useful to convey updated Commit indexes, but + /// are undesirable when we're sending multiple messages in a batch). + fn maybe_send_append(&mut self, to: u64, pr: &mut Progress, allow_empty: bool) -> bool { if pr.is_paused() { - return; + return false; } let mut m = Message::new(); m.set_to(to); if pr.pending_request_snapshot != INVALID_INDEX { // Check pending request snapshot first to avoid unnecessary loading entries. if !self.prepare_send_snapshot(&mut m, pr, to) { - return; + return false; } } else { - let term = self.raft_log.term(pr.next_idx - 1); let ents = self.raft_log.entries(pr.next_idx, self.max_msg_size); - if term.is_err() || ents.is_err() { - // send snapshot if we failed to get term or entries. - if !self.prepare_send_snapshot(&mut m, pr, to) { - return; + if !allow_empty && ents.as_ref().ok().map_or(true, |e| e.is_empty()) { + return false; + } + let term = self.raft_log.term(pr.next_idx - 1); + match (term, ents) { + (Ok(term), Ok(ents)) => self.prepare_send_entries(&mut m, pr, term, ents), + _ => { + // send snapshot if we failed to get term or entries. + if !self.prepare_send_snapshot(&mut m, pr, to) { + return false; + } } - } else { - self.prepare_send_entries(&mut m, pr, term.unwrap(), ents.unwrap()); } } self.send(m); + true } // send_heartbeat sends an empty MsgAppend @@ -1155,9 +1180,7 @@ impl Raft { &mut self, m: &Message, prs: &mut ProgressSet, - old_paused: &mut bool, - send_append: &mut bool, - maybe_commit: &mut bool, + ctx: &mut HandleResponseContext, ) { let pr = prs.get_mut(m.get_from()).unwrap(); pr.recent_active = true; @@ -1181,47 +1204,53 @@ impl Raft { if pr.state == ProgressState::Replicate { pr.become_probe(); } - *send_append = true; + ctx.send_append = true; } return; } - *old_paused = pr.is_paused(); + ctx.old_paused = pr.is_paused(); if !pr.maybe_update(m.get_index()) { return; } + match pr.state { + ProgressState::Probe => pr.become_replicate(), + ProgressState::Snapshot => { + if pr.maybe_snapshot_abort() { + debug!( + "{} snapshot aborted, resumed sending replication messages to {} \ + [{:?}]", + self.tag, + m.get_from(), + pr + ); + pr.become_probe(); + } + } + ProgressState::Replicate => pr.ins.free_to(m.get_index()), + } + ctx.maybe_commit = true; + // We've updated flow control information above, which may + // allow us to send multiple (size-limited) in-flight messages + // at once (such as when transitioning from probe to + // replicate, or when freeTo() covers multiple messages). If + // we have more entries to send, send as many messages as we + // can (without sending empty messages for the commit index) + ctx.loop_append = true; + // Transfer leadership is in progress. - if let Some(lead_transferee) = self.lead_transferee { + if Some(m.get_from()) == self.lead_transferee { let last_index = self.raft_log.last_index(); - if m.get_from() == lead_transferee && pr.matched == last_index { + if pr.matched == last_index { info!( "{} sent MsgTimeoutNow to {} after received MsgAppResp", self.tag, m.get_from() ); - self.send_timeout_now(m.get_from()); - } - } - - match pr.state { - ProgressState::Probe => pr.become_replicate(), - ProgressState::Snapshot => { - if !pr.maybe_snapshot_abort() { - return; - } - debug!( - "{} snapshot aborted, resumed sending replication messages to {} \ - [{:?}]", - self.tag, - m.get_from(), - pr - ); - pr.become_probe(); + ctx.transfer_leader = true; } - ProgressState::Replicate => pr.ins.free_to(m.get_index()), } - *maybe_commit = true; } fn handle_heartbeat_response( @@ -1229,8 +1258,7 @@ impl Raft { m: &Message, prs: &mut ProgressSet, quorum: usize, - send_append: &mut bool, - more_to_send: &mut Option, + ctx: &mut HandleResponseContext, ) { let pr = prs.get_mut(m.get_from()).unwrap(); pr.recent_active = true; @@ -1242,7 +1270,7 @@ impl Raft { } // Does it request snapshot? if pr.matched < self.raft_log.last_index() || pr.pending_request_snapshot != INVALID_INDEX { - *send_append = true; + ctx.send_append = true; } if self.read_only.option != ReadOnlyOption::Safe || m.get_context().is_empty() { @@ -1269,7 +1297,7 @@ impl Raft { to_send.set_msg_type(MessageType::MsgReadIndexResp); to_send.set_index(rs.index); to_send.set_entries(req.take_entries()); - *more_to_send = Some(to_send); + ctx.more_to_send.push(to_send); } } } @@ -1281,9 +1309,8 @@ impl Raft { } let lead_transferee = m.get_from(); - let last_lead_transferee = self.lead_transferee; - if last_lead_transferee.is_some() { - if last_lead_transferee.unwrap() == lead_transferee { + if let Some(last_lead_transferee) = self.lead_transferee { + if last_lead_transferee == lead_transferee { info!( "{} [term {}] transfer leadership to {} is in progress, ignores request \ to same node {}", @@ -1294,9 +1321,7 @@ impl Raft { self.abort_leader_transfer(); info!( "{} [term {}] abort previous transferring leadership to {}", - self.tag, - self.term, - last_lead_transferee.unwrap() + self.tag, self.term, last_lead_transferee ); } if lead_transferee == self.id { @@ -1353,14 +1378,7 @@ impl Raft { } /// Check message's progress to decide which action should be taken. - fn check_message_with_progress( - &mut self, - m: &mut Message, - send_append: &mut bool, - old_paused: &mut bool, - maybe_commit: &mut bool, - more_to_send: &mut Option, - ) { + fn check_message_with_progress(&mut self, m: &mut Message, ctx: &mut HandleResponseContext) { if self.prs().get(m.get_from()).is_none() { debug!("{} no progress available for {}", self.tag, m.get_from()); return; @@ -1369,11 +1387,11 @@ impl Raft { let mut prs = self.take_prs(); match m.get_msg_type() { MessageType::MsgAppendResponse => { - self.handle_append_response(m, &mut prs, old_paused, send_append, maybe_commit); + self.handle_append_response(m, &mut prs, ctx); } MessageType::MsgHeartbeatResponse => { let quorum = quorum(prs.voters().len()); - self.handle_heartbeat_response(m, &mut prs, quorum, send_append, more_to_send); + self.handle_heartbeat_response(m, &mut prs, quorum, ctx); } MessageType::MsgSnapStatus => { let pr = prs.get_mut(m.get_from()).unwrap(); @@ -1526,37 +1544,39 @@ impl Raft { _ => {} } - let mut send_append = false; - let mut maybe_commit = false; - let mut old_paused = false; - let mut more_to_send = None; - self.check_message_with_progress( - &mut m, - &mut send_append, - &mut old_paused, - &mut maybe_commit, - &mut more_to_send, - ); - if maybe_commit { + let mut ctx = HandleResponseContext::default(); + self.check_message_with_progress(&mut m, &mut ctx); + if ctx.maybe_commit { if self.maybe_commit() { if self.should_bcast_commit() { self.bcast_append(); } - } else if old_paused { + } else if ctx.old_paused { // update() reset the wait state on this node. If we had delayed sending // an update before, send it now. - send_append = true; + ctx.send_append = true; } } - if send_append { + if ctx.send_append || ctx.loop_append { let from = m.get_from(); let mut prs = self.take_prs(); - self.send_append(from, prs.get_mut(from).unwrap()); + let pr = prs.get_mut(from).unwrap(); + if ctx.send_append { + self.send_append(from, pr); + } + if ctx.loop_append { + while self.maybe_send_append(from, pr, false) {} + } self.set_prs(prs); } - if let Some(to_send) = more_to_send { - self.send(to_send) + if ctx.transfer_leader { + self.send_timeout_now(m.get_from()); + } + if !ctx.more_to_send.is_empty() { + for m in ctx.more_to_send.drain(..) { + self.send(m); + } } Ok(()) diff --git a/src/raft_log.rs b/src/raft_log.rs index e3aa5b84b..ba5ba59ee 100644 --- a/src/raft_log.rs +++ b/src/raft_log.rs @@ -430,8 +430,8 @@ impl RaftLog { /// returned by value. The result is truncated to the max_size in bytes. pub fn slice(&self, low: u64, high: u64, max_size: u64) -> Result> { let err = self.must_check_outofbounds(low, high); - if err.is_some() { - return Err(err.unwrap()); + if let Some(err) = err { + return Err(err); } let mut ents = vec![]; @@ -443,8 +443,7 @@ impl RaftLog { let stored_entries = self.store .entries(low, cmp::min(high, self.unstable.offset), max_size); - if stored_entries.is_err() { - let e = stored_entries.unwrap_err(); + if let Err(e) = stored_entries { match e { Error::Store(StorageError::Compacted) => return Err(e), Error::Store(StorageError::Unavailable) => panic!( diff --git a/src/storage.rs b/src/storage.rs index 37987e08e..abb50b2d5 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -31,6 +31,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp::Ordering; use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}; use eraftpb::{ConfState, Entry, HardState, Snapshot}; @@ -211,19 +212,19 @@ impl MemStorageCore { }; let offset = te[0].get_index() - self.entries[0].get_index(); - if self.entries.len() as u64 > offset { - let mut new_entries: Vec = vec![]; - new_entries.extend_from_slice(&self.entries[..offset as usize]); - new_entries.extend_from_slice(te); - self.entries = new_entries; - } else if self.entries.len() as u64 == offset { - self.entries.extend_from_slice(te); - } else { - panic!( + match (self.entries.len() as u64).cmp(&offset) { + Ordering::Greater => { + let mut new_entries: Vec = vec![]; + new_entries.extend_from_slice(&self.entries[..offset as usize]); + new_entries.extend_from_slice(te); + self.entries = new_entries; + } + Ordering::Equal => self.entries.extend_from_slice(te), + _ => panic!( "missing log entry [last: {}, append at: {}]", self.inner_last_index(), te[0].get_index() - ) + ), } Ok(()) diff --git a/tests/integration_cases/test_raft.rs b/tests/integration_cases/test_raft.rs index 71ab42d24..10fbac59a 100644 --- a/tests/integration_cases/test_raft.rs +++ b/tests/integration_cases/test_raft.rs @@ -332,6 +332,74 @@ fn test_progress_paused() { assert_eq!(ms.len(), 1); } +#[test] +fn test_progress_flow_control() { + setup_for_test(); + let mut cfg = new_test_config(1, vec![1, 2], 5, 1); + cfg.max_inflight_msgs = 3; + cfg.max_size_per_msg = 2048; + let mut r = Interface::new(Raft::new(&cfg, new_storage())); + r.become_candidate(); + r.become_leader(); + + // Throw away all the messages relating to the initial election. + r.read_messages(); + + // While node 2 is in probe state, propose a bunch of entries. + r.mut_prs().get_mut(2).unwrap().become_probe(); + let data: String = std::iter::repeat('a').take(1000).collect(); + for _ in 0..10 { + let msg = new_message_with_entries( + 1, + 1, + MessageType::MsgPropose, + vec![new_entry(0, 0, Some(&data))], + ); + r.step(msg).unwrap(); + } + + let mut ms = r.read_messages(); + // First append has two entries: the empty entry to confirm the + // election, and the first proposal (only one proposal gets sent + // because we're in probe state). + assert_eq!(ms.len(), 1); + assert_eq!(ms[0].msg_type, MessageType::MsgAppend); + assert_eq!(ms[0].entries.len(), 2); + assert_eq!(ms[0].entries[0].data.len(), 0); + assert_eq!(ms[0].entries[1].data.len(), 1000); + + // When this append is acked, we change to replicate state and can + // send multiple messages at once. + let mut msg = new_message(2, 1, MessageType::MsgAppendResponse, 0); + msg.index = ms[0].entries[1].index; + r.step(msg).unwrap(); + ms = r.read_messages(); + assert_eq!(ms.len(), 3); + for (i, m) in ms.iter().enumerate() { + if m.msg_type != MessageType::MsgAppend { + panic!("{}: expected MsgAppend, got {:?}", i, m.msg_type); + } + if m.entries.len() != 2 { + panic!("{}: expected 2 entries, got {}", i, m.entries.len()); + } + } + + // Ack all three of those messages together and get the last two + // messages (containing three entries). + let mut msg = new_message(2, 1, MessageType::MsgAppendResponse, 0); + msg.index = ms[2].entries[1].index; + r.step(msg).unwrap(); + ms = r.read_messages(); + assert_eq!(ms.len(), 2); + for (i, m) in ms.iter().enumerate() { + if m.msg_type != MessageType::MsgAppend { + panic!("{}: expected MsgAppend, got {:?}", i, m.msg_type); + } + } + assert_eq!(ms[0].entries.len(), 2); + assert_eq!(ms[1].entries.len(), 1); +} + #[test] fn test_leader_election() { setup_for_test(); @@ -2161,26 +2229,42 @@ fn test_read_only_option_safe() { assert_eq!(nt.peers[&1].state, StateRole::Leader); let mut tests = vec![ - (1, 10, 11, "ctx1"), - (2, 10, 21, "ctx2"), - (3, 10, 31, "ctx3"), - (1, 10, 41, "ctx4"), - (2, 10, 51, "ctx5"), - (3, 10, 61, "ctx6"), + (1, 10, 11, vec!["ctx1", "ctx11"], false), + (2, 10, 21, vec!["ctx2", "ctx22"], false), + (3, 10, 31, vec!["ctx3", "ctx33"], false), + (1, 10, 41, vec!["ctx4", "ctx44"], true), + (2, 10, 51, vec!["ctx5", "ctx55"], true), + (3, 10, 61, vec!["ctx6", "ctx66"], true), ]; - for (i, (id, proposals, wri, wctx)) in tests.drain(..).enumerate() { + for (i, (id, proposals, wri, wctx, pending)) in tests.drain(..).enumerate() { for _ in 0..proposals { nt.send(vec![new_message(1, 1, MessageType::MsgPropose, 1)]); } - let e = new_entry(0, 0, Some(wctx)); - nt.send(vec![new_message_with_entries( + let msg1 = new_message_with_entries( id, id, MessageType::MsgReadIndex, - vec![e], - )]); + vec![new_entry(0, 0, Some(wctx[0]))], + ); + let msg2 = new_message_with_entries( + id, + id, + MessageType::MsgReadIndex, + vec![new_entry(0, 0, Some(wctx[1]))], + ); + + if pending { + // drop MsgHeartbeatResponse here to prevent leader handling pending ReadIndex request per round + nt.ignore(MessageType::MsgHeartbeatResponse); + nt.send(vec![msg1.clone(), msg1.clone(), msg2.clone()]); + nt.recover(); + // send a ReadIndex request with the last ctx to notify leader to handle pending read requests + nt.send(vec![msg2.clone()]); + } else { + nt.send(vec![msg1.clone(), msg1.clone(), msg2.clone()]); + } let read_states: Vec = nt .peers @@ -2192,16 +2276,18 @@ fn test_read_only_option_safe() { if read_states.is_empty() { panic!("#{}: read_states is empty, want non-empty", i); } - let rs = &read_states[0]; - if rs.index != wri { - panic!("#{}: read_index = {}, want {}", i, rs.index, wri) - } - let vec_wctx = wctx.as_bytes().to_vec(); - if rs.request_ctx != vec_wctx { - panic!( - "#{}: request_ctx = {:?}, want {:?}", - i, rs.request_ctx, vec_wctx - ) + assert_eq!(read_states.len(), wctx.len()); + for (rs, wctx) in read_states.iter().zip(wctx) { + if rs.index != wri { + panic!("#{}: read_index = {}, want {}", i, rs.index, wri) + } + let ctx_bytes = wctx.as_bytes().to_vec(); + if rs.request_ctx != ctx_bytes { + panic!( + "#{}: request_ctx = {:?}, want {:?}", + i, rs.request_ctx, ctx_bytes + ) + } } } } @@ -3502,12 +3588,12 @@ fn test_transfer_non_member() { setup_for_test(); let mut raft = new_test_raft(1, vec![2, 3, 4], 5, 1, new_storage()); raft.step(new_message(2, 1, MessageType::MsgTimeoutNow, 0)) - .expect("");; + .expect(""); raft.step(new_message(2, 1, MessageType::MsgRequestVoteResponse, 0)) - .expect("");; + .expect(""); raft.step(new_message(3, 1, MessageType::MsgRequestVoteResponse, 0)) - .expect("");; + .expect(""); assert_eq!(raft.state, StateRole::Follower); }