Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

ensure proposal evaluation occurs at most once per round #125

Merged
merged 3 commits into from
Apr 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions substrate/bft/src/generic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ impl<D, S> Locked<D, S> {
enum LocalState {
Start,
Proposed,
Prepared,
Prepared(bool), // whether we thought it valid.
Committed,
VoteAdvance,
}
Expand Down Expand Up @@ -582,6 +582,7 @@ impl<C: Context> Strategy<C> {
Some(_) => {
// don't check validity if we are locked.
// this is necessary to preserve the liveness property.
self.local_state = LocalState::Prepared(true);
prepare_for = Some(digest);
}
None => {
Expand All @@ -591,6 +592,8 @@ impl<C: Context> Strategy<C> {

if let Async::Ready(valid) = res {
self.evaluating_proposal = None;
self.local_state = LocalState::Prepared(valid);

if valid {
prepare_for = Some(digest);
}
Expand All @@ -606,7 +609,6 @@ impl<C: Context> Strategy<C> {
).into();

self.import_and_send_message(message, context, sending);
self.local_state = LocalState::Prepared;
}

Ok(())
Expand Down Expand Up @@ -657,6 +659,12 @@ impl<C: Context> Strategy<C> {
// sent an AdvanceRound message yet, do so.
let mut attempt_advance = self.current_accumulator.advance_votes() > self.max_faulty;

// if we evaluated the proposal and it was bad, vote to advance round.
if let LocalState::Prepared(false) = self.local_state {
attempt_advance = true;
}

// if the timeout has fired, vote to advance round.
if let Async::Ready(_) = self.round_timeout.poll()? {
attempt_advance = true;
}
Expand Down
71 changes: 69 additions & 2 deletions substrate/bft/src/generic/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

use super::*;

use std::collections::BTreeSet;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
Expand Down Expand Up @@ -117,6 +118,7 @@ struct TestContext {
node_count: usize,
current_round: Arc<AtomicUsize>,
timer: Timer,
evaluated: Mutex<BTreeSet<usize>>,
}

impl Context for TestContext {
Expand All @@ -137,7 +139,7 @@ impl Context for TestContext {
let proposal = {
let mut p = self.proposal.lock().unwrap();
let x = *p;
*p = (*p * 2) + 1;
*p += self.node_count;
x
};

Expand Down Expand Up @@ -175,6 +177,10 @@ impl Context for TestContext {
}

fn proposal_valid(&self, proposal: &Candidate) -> FutureResult<bool, Error> {
if !self.evaluated.lock().unwrap().insert(proposal.0) {
panic!("Evaluated proposal {:?} twice", proposal.0);
}

Ok(proposal.0 % 3 != 0).into_future()
}

Expand Down Expand Up @@ -230,6 +236,64 @@ fn consensus_completes_with_minimum_good() {
proposal: Mutex::new(i),
current_round: Arc::new(AtomicUsize::new(0)),
timer: timer.clone(),
evaluated: Mutex::new(BTreeSet::new()),
node_count,
};

agree(
ctx,
node_count,
max_faulty,
rx.map_err(|_| Error),
tx.sink_map_err(|_| Error).with(move |t| Ok((i, t))),
)
})
.collect::<Vec<_>>();

let timeout = timeout_in(Duration::from_millis(500)).map_err(|_| Error);
let results = ::futures::future::join_all(nodes)
.map(Some)
.select(timeout.map(|_| None))
.wait()
.map(|(i, _)| i)
.map_err(|(e, _)| e)
.expect("to complete")
.expect("to not time out");

for result in &results {
assert_eq!(&result.justification.digest, &results[0].justification.digest);
}
}

#[test]
fn consensus_completes_with_minimum_good_all_initial_proposals_bad() {
let node_count = 10;
let max_faulty = 3;

let timer = tokio_timer::wheel().tick_duration(ROUND_DURATION).build();

let (network, net_send, net_recv) = Network::new(node_count);
network.route_on_thread();

let nodes = net_send
.into_iter()
.zip(net_recv)
.take(node_count - max_faulty)
.enumerate()
.map(|(i, (tx, rx))| {
// the first 5 proposals are going to be bad.
let proposal = if i < 5 {
i * 3 // proposals considered bad in the tests if they are % 3
} else {
(i * 3) + 1
};

let ctx = TestContext {
local_id: AuthorityId(i),
proposal: Mutex::new(proposal),
current_round: Arc::new(AtomicUsize::new(0)),
timer: timer.clone(),
evaluated: Mutex::new(BTreeSet::new()),
node_count,
};

Expand Down Expand Up @@ -279,6 +343,7 @@ fn consensus_does_not_complete_without_enough_nodes() {
proposal: Mutex::new(i),
current_round: Arc::new(AtomicUsize::new(0)),
timer: timer.clone(),
evaluated: Mutex::new(BTreeSet::new()),
node_count,
};

Expand Down Expand Up @@ -335,6 +400,7 @@ fn threshold_plus_one_locked_on_proposal_only_one_with_candidate() {
proposal: Mutex::new(i),
current_round: Arc::new(AtomicUsize::new(locked_round + 1)),
timer: timer.clone(),
evaluated: Mutex::new(BTreeSet::new()),
node_count,
};
let mut agreement = agree(
Expand Down Expand Up @@ -367,7 +433,7 @@ fn threshold_plus_one_locked_on_proposal_only_one_with_candidate() {
})
.collect::<Vec<_>>();

let timeout = timeout_in(Duration::from_millis(500)).map_err(|_| Error);
let timeout = timeout_in(Duration::from_millis(1000)).map_err(|_| Error);
let results = ::futures::future::join_all(nodes)
.map(Some)
.select(timeout.map(|_| None))
Expand Down Expand Up @@ -404,6 +470,7 @@ fn consensus_completes_even_when_nodes_start_with_a_delay() {
proposal: Mutex::new(i),
current_round: Arc::new(AtomicUsize::new(0)),
timer: timer.clone(),
evaluated: Mutex::new(BTreeSet::new()),
node_count,
};

Expand Down