From 093dfa4ffffe2a5b8d6831a8103898ed5a21a72a Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Fri, 13 Apr 2018 21:30:09 +0200 Subject: [PATCH 1/3] ensure proposal evaluation occurs at most once per round --- substrate/bft/src/generic/mod.rs | 6 ++++-- substrate/bft/src/generic/tests.rs | 14 ++++++++++++-- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/substrate/bft/src/generic/mod.rs b/substrate/bft/src/generic/mod.rs index aa0fe2c549a3f..c430575bd3365 100644 --- a/substrate/bft/src/generic/mod.rs +++ b/substrate/bft/src/generic/mod.rs @@ -293,7 +293,7 @@ impl Locked { enum LocalState { Start, Proposed, - Prepared, + Prepared(bool), // whether we thought it valid. Committed, VoteAdvance, } @@ -582,6 +582,7 @@ impl Strategy { Some(_) => { // don't check validity if we are locked. // this is necessary to preserve the liveness property. + self.local_state = LocalState::Prepared(true); prepare_for = Some(digest); } None => { @@ -591,6 +592,8 @@ impl Strategy { if let Async::Ready(valid) = res { self.evaluating_proposal = None; + self.local_state = LocalState::Prepared(valid); + if valid { prepare_for = Some(digest); } @@ -606,7 +609,6 @@ impl Strategy { ).into(); self.import_and_send_message(message, context, sending); - self.local_state = LocalState::Prepared; } Ok(()) diff --git a/substrate/bft/src/generic/tests.rs b/substrate/bft/src/generic/tests.rs index 00d8ccf9a54d9..f07f1ff71dba3 100644 --- a/substrate/bft/src/generic/tests.rs +++ b/substrate/bft/src/generic/tests.rs @@ -18,6 +18,7 @@ use super::*; +use std::collections::BTreeSet; use std::sync::{Arc, Mutex}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Duration; @@ -117,6 +118,7 @@ struct TestContext { node_count: usize, current_round: Arc, timer: Timer, + evaluated: Mutex>, } impl Context for TestContext { @@ -137,7 +139,7 @@ impl Context for TestContext { let proposal = { let mut p = self.proposal.lock().unwrap(); let x = *p; - *p = (*p * 2) + 1; + *p += self.node_count; x }; @@ -175,6 +177,10 @@ impl Context for TestContext { } fn proposal_valid(&self, proposal: &Candidate) -> FutureResult { + if !self.evaluated.lock().unwrap().insert(proposal.0) { + panic!("Evaluated proposal {:?} twice", proposal.0); + } + Ok(proposal.0 % 3 != 0).into_future() } @@ -230,6 +236,7 @@ fn consensus_completes_with_minimum_good() { proposal: Mutex::new(i), current_round: Arc::new(AtomicUsize::new(0)), timer: timer.clone(), + evaluated: Mutex::new(BTreeSet::new()), node_count, }; @@ -279,6 +286,7 @@ fn consensus_does_not_complete_without_enough_nodes() { proposal: Mutex::new(i), current_round: Arc::new(AtomicUsize::new(0)), timer: timer.clone(), + evaluated: Mutex::new(BTreeSet::new()), node_count, }; @@ -335,6 +343,7 @@ fn threshold_plus_one_locked_on_proposal_only_one_with_candidate() { proposal: Mutex::new(i), current_round: Arc::new(AtomicUsize::new(locked_round + 1)), timer: timer.clone(), + evaluated: Mutex::new(BTreeSet::new()), node_count, }; let mut agreement = agree( @@ -367,7 +376,7 @@ fn threshold_plus_one_locked_on_proposal_only_one_with_candidate() { }) .collect::>(); - let timeout = timeout_in(Duration::from_millis(500)).map_err(|_| Error); + let timeout = timeout_in(Duration::from_millis(1000)).map_err(|_| Error); let results = ::futures::future::join_all(nodes) .map(Some) .select(timeout.map(|_| None)) @@ -404,6 +413,7 @@ fn consensus_completes_even_when_nodes_start_with_a_delay() { proposal: Mutex::new(i), current_round: Arc::new(AtomicUsize::new(0)), timer: timer.clone(), + evaluated: Mutex::new(BTreeSet::new()), node_count, }; From bfff5c1aef31e235578b4a8e36658a98d3237234 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Sat, 14 Apr 2018 13:53:19 +0200 Subject: [PATCH 2/3] add test --- substrate/bft/src/generic/mod.rs | 6 +++ substrate/bft/src/generic/tests.rs | 59 ++++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+) diff --git a/substrate/bft/src/generic/mod.rs b/substrate/bft/src/generic/mod.rs index c430575bd3365..0d0b1058934a6 100644 --- a/substrate/bft/src/generic/mod.rs +++ b/substrate/bft/src/generic/mod.rs @@ -659,6 +659,12 @@ impl Strategy { // sent an AdvanceRound message yet, do so. let mut attempt_advance = self.current_accumulator.advance_votes() > self.max_faulty; + // if we evaluated the proposal and it was bad, vote to advance round. + if let LocalState::Prepared(false) = self.local_state { + attempt_advance = true; + } + + // if the timeout has fired, vote to advance round. if let Async::Ready(_) = self.round_timeout.poll()? { attempt_advance = true; } diff --git a/substrate/bft/src/generic/tests.rs b/substrate/bft/src/generic/tests.rs index f07f1ff71dba3..3a97398b8c83c 100644 --- a/substrate/bft/src/generic/tests.rs +++ b/substrate/bft/src/generic/tests.rs @@ -177,6 +177,8 @@ impl Context for TestContext { } fn proposal_valid(&self, proposal: &Candidate) -> FutureResult { + println!("Evaluating proposal {} on authority {:?}", proposal.0, self.local_id); + if !self.evaluated.lock().unwrap().insert(proposal.0) { panic!("Evaluated proposal {:?} twice", proposal.0); } @@ -265,6 +267,63 @@ fn consensus_completes_with_minimum_good() { } } +#[test] +fn consensus_completes_with_minimum_good_all_initial_proposals_bad() { + let node_count = 10; + let max_faulty = 3; + + let timer = tokio_timer::wheel().tick_duration(ROUND_DURATION).build(); + + let (network, net_send, net_recv) = Network::new(node_count); + network.route_on_thread(); + + let nodes = net_send + .into_iter() + .zip(net_recv) + .take(node_count - max_faulty) + .enumerate() + .map(|(i, (tx, rx))| { + // the first 5 proposals are going to be bad. + let proposal = if i < 5 { + i * 3 // proposals considered bad in the tests if they are % 3 + } else { + (i * 3) + 1 + }; + + let ctx = TestContext { + local_id: AuthorityId(i), + proposal: Mutex::new(proposal), + current_round: Arc::new(AtomicUsize::new(0)), + timer: timer.clone(), + evaluated: Mutex::new(BTreeSet::new()), + node_count, + }; + + agree( + ctx, + node_count, + max_faulty, + rx.map_err(|_| Error), + tx.sink_map_err(|_| Error).with(move |t| Ok((i, t))), + ) + }) + .collect::>(); + + let timeout = timeout_in(Duration::from_millis(500)).map_err(|_| Error); + let results = ::futures::future::join_all(nodes) + .map(Some) + .select(timeout.map(|_| None)) + .wait() + .map(|(i, _)| i) + .map_err(|(e, _)| e) + .expect("to complete") + .expect("to not time out"); + + for result in &results { + assert_eq!(&result.justification.digest, &results[0].justification.digest); + } +} + #[test] fn consensus_does_not_complete_without_enough_nodes() { let node_count = 10; From 699e21be31c5b7200d35c0cc6e311a315d92d354 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Sat, 14 Apr 2018 13:54:24 +0200 Subject: [PATCH 3/3] remove println --- substrate/bft/src/generic/tests.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/substrate/bft/src/generic/tests.rs b/substrate/bft/src/generic/tests.rs index 3a97398b8c83c..b683d751e6ed5 100644 --- a/substrate/bft/src/generic/tests.rs +++ b/substrate/bft/src/generic/tests.rs @@ -177,8 +177,6 @@ impl Context for TestContext { } fn proposal_valid(&self, proposal: &Candidate) -> FutureResult { - println!("Evaluating proposal {} on authority {:?}", proposal.0, self.local_id); - if !self.evaluated.lock().unwrap().insert(proposal.0) { panic!("Evaluated proposal {:?} twice", proposal.0); }