diff --git a/harness/tests/integration_cases/test_raft.rs b/harness/tests/integration_cases/test_raft.rs index c9b9ac63c..9d7b2d007 100644 --- a/harness/tests/integration_cases/test_raft.rs +++ b/harness/tests/integration_cases/test_raft.rs @@ -5075,3 +5075,95 @@ fn test_read_when_quorum_becomes_less() { .unwrap(); assert!(!network.peers[&1].read_states.is_empty()); } + +#[test] +fn test_uncommitted_entries_size_limit() { + let l = default_logger(); + let config = &Config { + id: 1, + max_uncommitted_size: 12, + ..Config::default() + }; + let mut nt = Network::new_with_config(vec![None, None, None], config, &l); + let data = b"hello world!".to_vec(); + + nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]); + + // should return ProposalDropped error + { + let mut entry = Entry::default(); + entry.data = b"hello world and raft".to_vec(); + let mut msg = Message::default(); + msg.from = 1; + msg.to = 1; + msg.set_msg_type(MessageType::MsgPropose); + msg.set_entries(vec![entry].into()); + + let result = nt.dispatch(vec![msg].to_vec()); + assert_eq!(result.unwrap_err(), raft::Error::ProposalDropped); + } + + // should return ok + { + let mut entry = Entry::default(); + entry.data = data.clone(); + let mut msg = Message::default(); + msg.from = 1; + msg.to = 1; + msg.set_msg_type(MessageType::MsgPropose); + msg.set_entries(vec![entry].into()); + + let result = nt.dispatch(vec![msg].to_vec()); + assert!(result.is_ok()); + } + + // then next proposal should be dropped + { + let mut entry = Entry::default(); + entry.data = b"!".to_vec(); + let mut msg = Message::default(); + msg.from = 1; + msg.to = 1; + msg.set_msg_type(MessageType::MsgPropose); + msg.set_entries(vec![entry].into()); + + let result = nt.dispatch(vec![msg].to_vec()); + assert!(!result.is_ok()); + assert_eq!(result.unwrap_err(), raft::Error::ProposalDropped); + } + + // but entry with empty size should be accepted + { + let entry = Entry::default(); + let mut msg = Message::default(); + msg.from = 1; + msg.to = 1; + msg.set_msg_type(MessageType::MsgPropose); + msg.set_entries(vec![entry].into()); + + let result = nt.dispatch(vec![msg].to_vec()); + assert!(result.is_ok()); + } + + // after reduce, new proposal should be accecpted + { + let mut entry = Entry::default(); + entry.data = data.clone(); + let mut msg = Message::default(); + msg.from = 1; + msg.to = 1; + msg.set_msg_type(MessageType::MsgPropose); + msg.set_entries(vec![entry].into()); + + // consume entry + let mut entry = Entry::default(); + entry.data = data.clone(); + nt.peers + .get_mut(&1) + .unwrap() + .reduce_uncommitted_size(&vec![entry]); + + let result = nt.dispatch(vec![msg].to_vec()); + assert!(result.is_ok()); + } +} diff --git a/harness/tests/integration_cases/test_raw_node.rs b/harness/tests/integration_cases/test_raw_node.rs index 1c7e0b960..8a75afd30 100644 --- a/harness/tests/integration_cases/test_raw_node.rs +++ b/harness/tests/integration_cases/test_raw_node.rs @@ -70,6 +70,24 @@ fn new_raw_node( RawNode::new(&config, storage, logger).unwrap() } +fn new_raw_node_with_config( + peers: Vec, + config: &Config, + storage: MemStorage, + logger: &Logger, +) -> RawNode { + if storage.initial_state().unwrap().initialized() && peers.is_empty() { + panic!("new_raw_node with empty peers on initialized store"); + } + if !peers.is_empty() && !storage.initial_state().unwrap().initialized() { + storage + .wl() + .apply_snapshot(new_snapshot(1, 1, peers)) + .unwrap(); + } + RawNode::new(&config, storage, logger).unwrap() +} + /// Ensures that RawNode::step ignore local message. #[test] fn test_raw_node_step() { @@ -728,3 +746,55 @@ fn test_set_priority() { assert_eq!(raw_node.raft.priority, p); } } + +#[test] +fn test_bounded_uncommitted_entries_growth_with_partition() { + let l = default_logger(); + let config = &Config { + id: 1, + max_uncommitted_size: 12, + ..Config::default() + }; + let s = new_storage(); + let mut raw_node = new_raw_node_with_config(vec![1], config, s.clone(), &l); + + // wait raw_node to be leader + raw_node.campaign().unwrap(); + loop { + let rd = raw_node.ready(); + if rd + .ss() + .map_or(false, |ss| ss.leader_id == raw_node.raft.leader_id) + { + break; + } + + raw_node.advance(rd); + } + + // should be accepted + { + let data = b"hello world!".to_vec(); + let result = raw_node.propose(vec![], data); + assert!(result.is_ok()); + } + + // shoule be dropped + { + let data = b"hello world!".to_vec(); + let result = raw_node.propose(vec![], data); + assert!(!result.is_ok()); + assert_eq!(result.unwrap_err(), Error::ProposalDropped) + } + + // should be accepted when previous data has been committed + { + let rd = raw_node.ready(); + s.wl().append(rd.entries()).unwrap(); + raw_node.advance(rd); + + let data = b"hello world!".to_vec(); + let result = raw_node.propose(vec![], data); + assert!(result.is_ok()); + } +} diff --git a/src/config.rs b/src/config.rs index a6772723b..1f776dfd1 100644 --- a/src/config.rs +++ b/src/config.rs @@ -15,6 +15,7 @@ // limitations under the License. pub use super::read_only::{ReadOnlyOption, ReadState}; +use super::util::NO_LIMIT; use super::{ errors::{Error, Result}, INVALID_ID, @@ -90,6 +91,10 @@ pub struct Config { /// The election priority of this node. pub priority: u64, + + /// Specify maximum of uncommited entry size. + /// When this limit is reached, all proposals to append new log will be dropped + pub max_uncommitted_size: usize, } impl Default for Config { @@ -110,6 +115,7 @@ impl Default for Config { skip_bcast_commit: false, batch_append: false, priority: 0, + max_uncommitted_size: NO_LIMIT as usize, } } } @@ -189,6 +195,12 @@ impl Config { )); } + if self.max_uncommitted_size == 0 { + return Err(Error::ConfigInvalid( + "max uncommitted entries size must be greater than 0".to_owned(), + )); + } + Ok(()) } } diff --git a/src/log_unstable.rs b/src/log_unstable.rs index 96d423496..829a3882d 100644 --- a/src/log_unstable.rs +++ b/src/log_unstable.rs @@ -118,6 +118,10 @@ impl Unstable { } /// Append entries to unstable, truncate local block first if overlapped. + /// + /// # Panics + /// + /// Panics if truncate logs to the entry before snapshot pub fn truncate_and_append(&mut self, ents: &[Entry]) { let after = ents[0].index; if after == self.offset + self.entries.len() as u64 { diff --git a/src/raft.rs b/src/raft.rs index acebd4150..3fd9d512d 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -34,6 +34,7 @@ use super::Config; use crate::confchange::Changer; use crate::quorum::VoteResult; use crate::util; +use crate::util::NO_LIMIT; use crate::{confchange, Progress, ProgressState, ProgressTracker}; // CAMPAIGN_PRE_ELECTION represents the first phase of a normal election when @@ -177,6 +178,11 @@ pub struct RaftCore { /// The election priority of this node. pub priority: u64, + + /// Specify maximum of uncommited entry size. + /// When this limit is reached, all proposals to append new log will be dropped + max_uncommitted_size: usize, + uncommitted_size: usize, } /// A struct that represents the raft consensus itself. Stores details concerning the current @@ -269,13 +275,15 @@ impl Raft { pending_conf_index: Default::default(), vote: Default::default(), heartbeat_elapsed: Default::default(), - randomized_election_timeout: 0, + randomized_election_timeout: Default::default(), min_election_timeout: c.min_election_tick(), max_election_timeout: c.max_election_tick(), skip_bcast_commit: c.skip_bcast_commit, batch_append: c.batch_append, logger, priority: c.priority, + max_uncommitted_size: c.max_uncommitted_size, + uncommitted_size: Default::default(), }, }; confchange::restore(&mut r.prs, r.r.raft_log.last_index(), conf_state)?; @@ -861,6 +869,7 @@ impl Raft { let last_index = self.raft_log.last_index(); let committed = self.raft_log.committed; + self.uncommitted_size = 0; let self_id = self.id; for (&id, mut pr) in self.mut_prs().iter_mut() { pr.reset(last_index + 1); @@ -871,9 +880,13 @@ impl Raft { } } - /// Appends a slice of entries to the log. The entries are updated to match - /// the current index and term. - pub fn append_entry(&mut self, es: &mut [Entry]) { + /// Appends a slice of entries to the log. + /// The entries are updated to match the current index and term. + pub fn append_entry(&mut self, es: &mut [Entry]) -> bool { + if !self.maybe_increase_uncommitted_size(es) { + return false; + } + let mut li = self.raft_log.last_index(); for (i, e) in es.iter_mut().enumerate() { e.term = self.term; @@ -887,6 +900,8 @@ impl Raft { // Regardless of maybe_commit's return, our caller will call bcastAppend. self.maybe_commit(); + + true } /// Returns true to indicate that there will probably be some readiness need to be handled. @@ -1043,6 +1058,8 @@ impl Raft { // could be expensive. self.pending_conf_index = self.raft_log.last_index(); + // no need to check result becase append_entry never refuse entries + // which size is zero self.append_entry(&mut [Entry::default()]); info!( @@ -1761,7 +1778,10 @@ impl Raft { e.set_entry_type(EntryType::EntryNormal); } } - self.append_entry(&mut m.mut_entries()); + if !self.append_entry(&mut m.mut_entries()) { + // return ProposalDropped when uncommitted size limit is reached + return Err(Error::ProposalDropped); + } self.bcast_append(); return Ok(()); } @@ -2457,4 +2477,47 @@ impl Raft { to_send.set_entries(req.take_entries()); Some(to_send) } + + /// Reduce size of 'ents' from uncommitted size. + /// + /// # Panics + /// + /// Panics if size of 'ents' is greater than uncommitted size + pub fn reduce_uncommitted_size(&mut self, ents: &[Entry]) { + // fast path for NO_LIMIT and non-leader endpoint + if self.max_uncommitted_size == NO_LIMIT as usize || self.state != StateRole::Leader { + return; + } + + let mut size: usize = 0; + for entry in ents { + size += entry.get_data().len() + } + + if size > self.uncommitted_size { + fatal!( + self.r.logger, + "try to reduce uncommitted size less than 0, first index of pending ents is {}", + ents[0].get_index() + ); + } else { + self.uncommitted_size -= size; + } + } + + /// Increase size of 'ents' to uncommitted size. Return true when size limit + /// is satisfied. Otherwise return false and uncommitted size remains unchanged. + pub fn maybe_increase_uncommitted_size(&mut self, ents: &[Entry]) -> bool { + let mut size: usize = 0; + for entry in ents { + size += entry.get_data().len() + } + + if size + self.uncommitted_size > self.max_uncommitted_size { + false + } else { + self.uncommitted_size += size; + true + } + } } diff --git a/src/raw_node.rs b/src/raw_node.rs index 61f078917..a2d966a5d 100644 --- a/src/raw_node.rs +++ b/src/raw_node.rs @@ -263,6 +263,11 @@ impl RawNode { if !rd.read_states.is_empty() { self.raft.read_states.clear(); } + // update raft uncommitted entries size + if rd.committed_entries.is_some() { + self.raft + .reduce_uncommitted_size(&rd.committed_entries.unwrap_or_default()) + } } fn commit_apply(&mut self, applied: u64) {