From 2e9b52e6d65e4d1a9a345df9274dd10e18dbb8b9 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 9 Jul 2018 10:26:04 +0100 Subject: [PATCH] PoC-1 consensus extracted to "rhododendron" (#284) --- Cargo.lock | 18 +- polkadot/consensus/Cargo.toml | 1 + polkadot/consensus/src/lib.rs | 3 +- polkadot/network/Cargo.toml | 1 + polkadot/network/src/consensus.rs | 35 +- polkadot/network/src/lib.rs | 1 + substrate/bft/Cargo.toml | 1 + substrate/bft/src/error.rs | 4 +- substrate/bft/src/generic/accumulator.rs | 883 ----------------------- substrate/bft/src/generic/mod.rs | 832 --------------------- substrate/bft/src/generic/tests.rs | 469 ------------ substrate/bft/src/lib.rs | 129 ++-- substrate/client/src/client.rs | 3 +- substrate/misbehavior-check/Cargo.toml | 1 + substrate/misbehavior-check/src/lib.rs | 11 +- substrate/test-client/Cargo.toml | 1 + substrate/test-client/src/client_ext.rs | 14 +- substrate/test-client/src/lib.rs | 1 + 18 files changed, 135 insertions(+), 2273 deletions(-) delete mode 100644 substrate/bft/src/generic/accumulator.rs delete mode 100644 substrate/bft/src/generic/mod.rs delete mode 100644 substrate/bft/src/generic/tests.rs diff --git a/Cargo.lock b/Cargo.lock index 52c3e1b875e95..f683075362d30 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1470,6 +1470,7 @@ dependencies = [ "polkadot-runtime 0.1.0", "polkadot-statement-table 0.1.0", "polkadot-transaction-pool 0.1.0", + "rhododendron 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "substrate-bft 0.1.0", "substrate-client 0.1.0", "substrate-codec 0.1.0", @@ -1499,6 +1500,7 @@ dependencies = [ "polkadot-api 0.1.0", "polkadot-consensus 0.1.0", "polkadot-primitives 0.1.0", + "rhododendron 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.64 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.64 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.19 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1802,6 +1804,16 @@ dependencies = [ "winapi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "rhododendron" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "ring" version = "0.12.1" @@ -2121,6 +2133,7 @@ dependencies = [ "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "rhododendron 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "substrate-codec 0.1.0", "substrate-executor 0.1.0", "substrate-keyring 0.1.0", @@ -2194,7 +2207,7 @@ dependencies = [ "hex-literal 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", - "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-hex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.64 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.64 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2250,6 +2263,7 @@ dependencies = [ name = "substrate-misbehavior-check" version = "0.1.0" dependencies = [ + "rhododendron 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "substrate-bft 0.1.0", "substrate-codec 0.1.0", "substrate-keyring 0.1.0", @@ -2645,6 +2659,7 @@ dependencies = [ name = "substrate-test-client" version = "0.1.0" dependencies = [ + "rhododendron 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "substrate-bft 0.1.0", "substrate-client 0.1.0", "substrate-codec 0.1.0", @@ -3422,6 +3437,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum regex-syntax 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)" = "7d707a4fa2637f2dca2ef9fd02225ec7661fe01a53623c1e6515b6916511f7a7" "checksum relay 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1576e382688d7e9deecea24417e350d3062d97e32e45d70b1cde65994ff1489a" "checksum remove_dir_all 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3488ba1b9a2084d38645c4c08276a1752dcbf2c7130d74f1569681ad5d2799c5" +"checksum rhododendron 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9e38401cc1b63e71ec9119115c7e1354fcf54c8006ad59a22409dd8bd93737b2" "checksum ring 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)" = "6f7d28b30a72c01b458428e0ae988d4149c20d902346902be881e3edc4bb325c" "checksum rlp 0.2.1 (git+https://github.com/paritytech/parity.git)" = "" "checksum rlp 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "89db7f8dfdd5eb7ab3ac3ece7a07fd273a680b4b224cb231181280e8996f9f0b" diff --git a/polkadot/consensus/Cargo.toml b/polkadot/consensus/Cargo.toml index 5af40035e2e50..68ad8265518e4 100644 --- a/polkadot/consensus/Cargo.toml +++ b/polkadot/consensus/Cargo.toml @@ -11,6 +11,7 @@ ed25519 = { path = "../../substrate/ed25519" } error-chain = "0.12" log = "0.3" exit-future = "0.1" +rhododendron = "0.2" polkadot-api = { path = "../api" } polkadot-parachain = { path = "../parachain" } polkadot-primitives = { path = "../primitives" } diff --git a/polkadot/consensus/src/lib.rs b/polkadot/consensus/src/lib.rs index 9db60b3797b56..6c39dc66e7d3f 100644 --- a/polkadot/consensus/src/lib.rs +++ b/polkadot/consensus/src/lib.rs @@ -47,6 +47,7 @@ extern crate substrate_client as client; extern crate exit_future; extern crate tokio; +extern crate rhododendron; #[macro_use] extern crate error_chain; @@ -530,7 +531,7 @@ impl bft::Proposer for Proposer } fn import_misbehavior(&self, misbehavior: Vec<(AuthorityId, bft::Misbehavior)>) { - use bft::generic::Misbehavior as GenericMisbehavior; + use rhododendron::Misbehavior as GenericMisbehavior; use runtime_primitives::bft::{MisbehaviorKind, MisbehaviorReport}; use runtime_primitives::MaybeUnsigned; use polkadot_runtime::{Call, Extrinsic, BareExtrinsic, UncheckedExtrinsic, ConsensusCall}; diff --git a/polkadot/network/Cargo.toml b/polkadot/network/Cargo.toml index ad07af5c12367..71ab17d2affb9 100644 --- a/polkadot/network/Cargo.toml +++ b/polkadot/network/Cargo.toml @@ -20,3 +20,4 @@ ed25519 = { path = "../../substrate/ed25519" } futures = "0.1" tokio = "0.1.7" log = "0.4" +rhododendron = "0.2" diff --git a/polkadot/network/src/consensus.rs b/polkadot/network/src/consensus.rs index a2a4950258140..0eb14d9381aaa 100644 --- a/polkadot/network/src/consensus.rs +++ b/polkadot/network/src/consensus.rs @@ -53,8 +53,8 @@ impl Sink for BftSink { fn start_send(&mut self, message: bft::Communication) -> ::futures::StartSend, E> { let network_message = net::LocalizedBftMessage { message: match message { - bft::generic::Communication::Consensus(c) => msg::BftMessage::Consensus(match c { - bft::generic::LocalizedMessage::Propose(proposal) => msg::SignedConsensusMessage::Propose(msg::SignedConsensusProposal { + ::rhododendron::Communication::Consensus(c) => msg::BftMessage::Consensus(match c { + ::rhododendron::LocalizedMessage::Propose(proposal) => msg::SignedConsensusMessage::Propose(msg::SignedConsensusProposal { round_number: proposal.round_number as u32, proposal: proposal.proposal, digest: proposal.digest, @@ -62,17 +62,20 @@ impl Sink for BftSink { digest_signature: proposal.digest_signature.signature, full_signature: proposal.full_signature.signature, }), - bft::generic::LocalizedMessage::Vote(vote) => msg::SignedConsensusMessage::Vote(msg::SignedConsensusVote { + ::rhododendron::LocalizedMessage::Vote(vote) => msg::SignedConsensusMessage::Vote(msg::SignedConsensusVote { sender: vote.sender, signature: vote.signature.signature, vote: match vote.vote { - bft::generic::Vote::Prepare(r, h) => msg::ConsensusVote::Prepare(r as u32, h), - bft::generic::Vote::Commit(r, h) => msg::ConsensusVote::Commit(r as u32, h), - bft::generic::Vote::AdvanceRound(r) => msg::ConsensusVote::AdvanceRound(r as u32), + ::rhododendron::Vote::Prepare(r, h) => msg::ConsensusVote::Prepare(r as u32, h), + ::rhododendron::Vote::Commit(r, h) => msg::ConsensusVote::Commit(r as u32, h), + ::rhododendron::Vote::AdvanceRound(r) => msg::ConsensusVote::AdvanceRound(r as u32), } }), }), - bft::generic::Communication::Auxiliary(justification) => msg::BftMessage::Auxiliary(justification.uncheck().into()), + ::rhododendron::Communication::Auxiliary(justification) => { + let unchecked: bft::UncheckedJustification<_> = justification.uncheck().into(); + msg::BftMessage::Auxiliary(unchecked.into()) + } }, parent_hash: self.parent_hash, }; @@ -90,10 +93,10 @@ impl Sink for BftSink { // check signature and authority validity of message. fn process_bft_message(msg: msg::LocalizedBftMessage, local_id: &SessionKey, authorities: &[SessionKey]) -> Result>, bft::Error> { Ok(Some(match msg.message { - msg::BftMessage::Consensus(c) => bft::generic::Communication::Consensus(match c { - msg::SignedConsensusMessage::Propose(proposal) => bft::generic::LocalizedMessage::Propose({ + msg::BftMessage::Consensus(c) => ::rhododendron::Communication::Consensus(match c { + msg::SignedConsensusMessage::Propose(proposal) => ::rhododendron::LocalizedMessage::Propose({ if &proposal.sender == local_id { return Ok(None) } - let proposal = bft::generic::LocalizedProposal { + let proposal = ::rhododendron::LocalizedProposal { round_number: proposal.round_number as usize, proposal: proposal.proposal, digest: proposal.digest, @@ -112,18 +115,18 @@ fn process_bft_message(msg: msg::LocalizedBftMessage, local_id: &Se trace!(target: "bft", "importing proposal message for round {} from {}", proposal.round_number, Hash::from(proposal.sender.0)); proposal }), - msg::SignedConsensusMessage::Vote(vote) => bft::generic::LocalizedMessage::Vote({ + msg::SignedConsensusMessage::Vote(vote) => ::rhododendron::LocalizedMessage::Vote({ if &vote.sender == local_id { return Ok(None) } - let vote = bft::generic::LocalizedVote { + let vote = ::rhododendron::LocalizedVote { sender: vote.sender, signature: ed25519::LocalizedSignature { signature: vote.signature, signer: ed25519::Public(vote.sender.0), }, vote: match vote.vote { - msg::ConsensusVote::Prepare(r, h) => bft::generic::Vote::Prepare(r as usize, h), - msg::ConsensusVote::Commit(r, h) => bft::generic::Vote::Commit(r as usize, h), - msg::ConsensusVote::AdvanceRound(r) => bft::generic::Vote::AdvanceRound(r as usize), + msg::ConsensusVote::Prepare(r, h) => ::rhododendron::Vote::Prepare(r as usize, h), + msg::ConsensusVote::Commit(r, h) => ::rhododendron::Vote::Commit(r as usize, h), + msg::ConsensusVote::AdvanceRound(r) => ::rhododendron::Vote::AdvanceRound(r as usize), } }; bft::check_vote::(authorities, &msg.parent_hash, &vote)?; @@ -137,7 +140,7 @@ fn process_bft_message(msg: msg::LocalizedBftMessage, local_id: &Se // TODO: get proper error let justification: Result<_, bft::Error> = bft::check_prepare_justification::(authorities, msg.parent_hash, justification) .map_err(|_| bft::ErrorKind::InvalidJustification.into()); - bft::generic::Communication::Auxiliary(justification?) + ::rhododendron::Communication::Auxiliary(justification?) }, })) } diff --git a/polkadot/network/src/lib.rs b/polkadot/network/src/lib.rs index f909d0581b329..91c53338e6571 100644 --- a/polkadot/network/src/lib.rs +++ b/polkadot/network/src/lib.rs @@ -38,6 +38,7 @@ extern crate ed25519; extern crate futures; extern crate parking_lot; extern crate tokio; +extern crate rhododendron; #[macro_use] extern crate log; diff --git a/substrate/bft/Cargo.toml b/substrate/bft/Cargo.toml index 8e9ae8ba93c52..31230f3ff1128 100644 --- a/substrate/bft/Cargo.toml +++ b/substrate/bft/Cargo.toml @@ -14,6 +14,7 @@ tokio = "0.1.7" parking_lot = "0.4" error-chain = "0.12" log = "0.3" +rhododendron = "0.2" [dev-dependencies] substrate-keyring = { path = "../keyring" } diff --git a/substrate/bft/src/error.rs b/substrate/bft/src/error.rs index aa076cb0a64f9..b79b96ff31f2e 100644 --- a/substrate/bft/src/error.rs +++ b/substrate/bft/src/error.rs @@ -74,8 +74,8 @@ error_chain! { } } -impl From<::generic::InputStreamConcluded> for Error { - fn from(_: ::generic::InputStreamConcluded) -> Error { +impl From<::rhododendron::InputStreamConcluded> for Error { + fn from(_: ::rhododendron::InputStreamConcluded) -> Error { ErrorKind::IoTerminated.into() } } diff --git a/substrate/bft/src/generic/accumulator.rs b/substrate/bft/src/generic/accumulator.rs deleted file mode 100644 index 811826b7d68a4..0000000000000 --- a/substrate/bft/src/generic/accumulator.rs +++ /dev/null @@ -1,883 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// This file is part of Substrate. - -// Substrate is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Substrate is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Substrate. If not, see . - -//! Vote accumulator for each round of BFT consensus. - -use std::collections::{HashMap, HashSet}; -use std::collections::hash_map::Entry; -use std::hash::Hash; - -use generic::{Vote, LocalizedMessage, LocalizedProposal}; - -/// Justification for some state at a given round. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct UncheckedJustification { - /// The round. - pub round_number: usize, - /// The digest prepared for. - pub digest: D, - /// Signatures for the prepare messages. - pub signatures: Vec, -} - -impl UncheckedJustification { - /// Fails if there are duplicate signatures or invalid. - /// - /// Provide a closure for checking whether the signature is valid on a - /// digest. - /// - /// The closure should returns a checked justification iff the round number, digest, and signature - /// represent a valid message and the signer was authorized to issue - /// it. - /// - /// The `check_message` closure may vary based on context. - pub fn check(self, threshold: usize, mut check_message: F) - -> Result, Self> - where - F: FnMut(usize, &D, &S) -> Option, - V: Hash + Eq, - { - let checks_out = { - let mut checks_out = || { - let mut voted = HashSet::new(); - - for signature in &self.signatures { - match check_message(self.round_number, &self.digest, signature) { - None => return false, - Some(v) => { - if !voted.insert(v) { - return false; - } - } - } - } - - voted.len() >= threshold - }; - - checks_out() - }; - - if checks_out { - Ok(Justification(self)) - } else { - Err(self) - } - } -} - -/// A checked justification. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct Justification(UncheckedJustification); - -impl Justification { - /// Convert this justification back to unchecked. - pub fn uncheck(self) -> UncheckedJustification { - self.0 - } -} - -impl ::std::ops::Deref for Justification { - type Target = UncheckedJustification; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -/// Type alias to represent a justification specifically for a prepare. -pub type PrepareJustification = Justification; - -/// The round's state, based on imported messages. -#[derive(PartialEq, Eq, Debug)] -pub enum State { - /// No proposal yet. - Begin, - /// Proposal received. - Proposed(Candidate), - /// Seen n - f prepares for this digest. - Prepared(PrepareJustification), - /// Seen n - f commits for a digest. - Committed(Justification), - /// Seen n - f round-advancement messages. - Advanced(Option>), -} - -#[derive(Debug, Default)] -struct VoteCounts { - prepared: usize, - committed: usize, -} - -#[derive(Debug)] -struct Proposal { - proposal: Candidate, - digest: Digest, - digest_signature: Signature, -} - -/// Misbehavior which can occur. -#[derive(Debug, Clone)] -pub enum Misbehavior { - /// Proposed out-of-turn. - ProposeOutOfTurn(usize, Digest, Signature), - /// Issued two conflicting proposals. - DoublePropose(usize, (Digest, Signature), (Digest, Signature)), - /// Issued two conflicting prepare messages. - DoublePrepare(usize, (Digest, Signature), (Digest, Signature)), - /// Issued two conflicting commit messages. - DoubleCommit(usize, (Digest, Signature), (Digest, Signature)), -} - -/// Accumulates messages for a given round of BFT consensus. -/// -/// This isn't tied to the "view" of a single authority. It -/// keeps accurate track of the state of the BFT consensus based -/// on all messages imported. -#[derive(Debug)] -pub struct Accumulator - where - Candidate: Eq + Clone, - Digest: Hash + Eq + Clone, - AuthorityId: Hash + Eq + Clone, - Signature: Eq + Clone, -{ - round_number: usize, - threshold: usize, - round_proposer: AuthorityId, - proposal: Option>, - prepares: HashMap, - commits: HashMap, - vote_counts: HashMap, - advance_round: HashSet, - state: State, -} - -impl Accumulator - where - Candidate: Eq + Clone, - Digest: Hash + Eq + Clone, - AuthorityId: Hash + Eq + Clone, - Signature: Eq + Clone, -{ - /// Create a new state accumulator. - pub fn new(round_number: usize, threshold: usize, round_proposer: AuthorityId) -> Self { - Accumulator { - round_number, - threshold, - round_proposer, - proposal: None, - prepares: HashMap::new(), - commits: HashMap::new(), - vote_counts: HashMap::new(), - advance_round: HashSet::new(), - state: State::Begin, - } - } - - /// How advance votes we have seen. - pub fn advance_votes(&self) -> usize { - self.advance_round.len() - } - - /// Get the round number. - pub fn round_number(&self) -> usize { - self.round_number.clone() - } - - pub fn proposal(&self) -> Option<&Candidate> { - self.proposal.as_ref().map(|p| &p.proposal) - } - - /// Inspect the current consensus state. - pub fn state(&self) -> &State { - &self.state - } - - /// Import a message. Importing duplicates is fine, but the signature - /// and authorization should have already been checked. - pub fn import_message( - &mut self, - message: LocalizedMessage, - ) -> Result<(), Misbehavior> { - // message from different round. - if message.round_number() != self.round_number { - return Ok(()); - } - - match message { - LocalizedMessage::Propose(proposal) => self.import_proposal(proposal), - LocalizedMessage::Vote(vote) => { - let (sender, signature) = (vote.sender, vote.signature); - match vote.vote { - Vote::Prepare(_, d) => self.import_prepare(d, sender, signature), - Vote::Commit(_, d) => self.import_commit(d, sender, signature), - Vote::AdvanceRound(_) => self.import_advance_round(sender), - } - } - } - } - - fn import_proposal( - &mut self, - proposal: LocalizedProposal, - ) -> Result<(), Misbehavior> { - let sender = proposal.sender; - - if sender != self.round_proposer { - return Err(Misbehavior::ProposeOutOfTurn( - self.round_number, - proposal.digest, - proposal.digest_signature) - ); - } - - match self.proposal { - Some(ref p) if &p.digest != &proposal.digest => { - return Err(Misbehavior::DoublePropose( - self.round_number, - { - let old = self.proposal.as_ref().expect("just checked to be Some; qed"); - (old.digest.clone(), old.digest_signature.clone()) - }, - (proposal.digest.clone(), proposal.digest_signature.clone()) - )) - } - _ => {}, - } - - debug!(target: "bft", "Importing proposal for round {}", self.round_number); - - self.proposal = Some(Proposal { - proposal: proposal.proposal.clone(), - digest: proposal.digest, - digest_signature: proposal.digest_signature, - }); - - if let State::Begin = self.state { - self.state = State::Proposed(proposal.proposal); - } - - Ok(()) - } - - fn import_prepare( - &mut self, - digest: Digest, - sender: AuthorityId, - signature: Signature, - ) -> Result<(), Misbehavior> { - // ignore any subsequent prepares by the same sender. - let threshold_prepared = match self.prepares.entry(sender.clone()) { - Entry::Vacant(vacant) => { - vacant.insert((digest.clone(), signature)); - let count = self.vote_counts.entry(digest.clone()).or_insert_with(Default::default); - count.prepared += 1; - - if count.prepared >= self.threshold { - Some(digest) - } else { - None - } - } - Entry::Occupied(occupied) => { - // if digest is different, that's misbehavior. - if occupied.get().0 != digest { - return Err(Misbehavior::DoublePrepare( - self.round_number, - occupied.get().clone(), - (digest, signature) - )); - } - - None - } - }; - - // only allow transition to prepare from begin or proposed state. - let valid_transition = match self.state { - State::Begin | State::Proposed(_) => true, - _ => false, - }; - - if let (true, Some(threshold_prepared)) = (valid_transition, threshold_prepared) { - let signatures = self.prepares - .values() - .filter(|&&(ref d, _)| d == &threshold_prepared) - .map(|&(_, ref s)| s.clone()) - .collect(); - - trace!(target: "bft", "observed threshold-prepare for round {}", self.round_number); - self.state = State::Prepared(Justification(UncheckedJustification { - round_number: self.round_number, - digest: threshold_prepared, - signatures: signatures, - })); - } - - Ok(()) - } - - fn import_commit( - &mut self, - digest: Digest, - sender: AuthorityId, - signature: Signature, - ) -> Result<(), Misbehavior> { - // ignore any subsequent commits by the same sender. - let threshold_committed = match self.commits.entry(sender.clone()) { - Entry::Vacant(vacant) => { - vacant.insert((digest.clone(), signature)); - let count = self.vote_counts.entry(digest.clone()).or_insert_with(Default::default); - count.committed += 1; - - if count.committed >= self.threshold { - Some(digest) - } else { - None - } - } - Entry::Occupied(occupied) => { - // if digest is different, that's misbehavior. - if occupied.get().0 != digest { - return Err(Misbehavior::DoubleCommit( - self.round_number, - occupied.get().clone(), - (digest, signature) - )); - } - - None - } - }; - - // transition to concluded state always valid. - // only weird case is if the prior state was "advanced", - // but technically it's the same behavior as if the order of receiving - // the last "advance round" and "commit" messages were reversed. - if let Some(threshold_committed) = threshold_committed { - let signatures = self.commits - .values() - .filter(|&&(ref d, _)| d == &threshold_committed) - .map(|&(_, ref s)| s.clone()) - .collect(); - - trace!(target: "bft", "observed threshold-commit for round {}", self.round_number); - self.state = State::Committed(Justification(UncheckedJustification { - round_number: self.round_number, - digest: threshold_committed, - signatures: signatures, - })); - } - - Ok(()) - } - - fn import_advance_round( - &mut self, - sender: AuthorityId, - ) -> Result<(), Misbehavior> { - self.advance_round.insert(sender); - - if self.advance_round.len() < self.threshold { return Ok(()) } - trace!(target: "bft", "Witnessed threshold advance-round messages for round {}", self.round_number); - - // allow transition to new round only if we haven't produced a justification - // yet. - self.state = match ::std::mem::replace(&mut self.state, State::Begin) { - State::Committed(j) => State::Committed(j), - State::Prepared(j) => State::Advanced(Some(j)), - State::Advanced(j) => State::Advanced(j), - State::Begin | State::Proposed(_) => State::Advanced(None), - }; - - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use generic::{LocalizedMessage, LocalizedProposal, LocalizedVote}; - - #[derive(Clone, PartialEq, Eq, Debug)] - pub struct Candidate(usize); - - #[derive(Hash, PartialEq, Eq, Clone, Debug)] - pub struct Digest(usize); - - #[derive(Hash, PartialEq, Eq, Debug, Clone)] - pub struct AuthorityId(usize); - - #[derive(PartialEq, Eq, Clone, Debug)] - pub struct Signature(usize, usize); - - #[test] - fn justification_checks_out() { - let mut justification = UncheckedJustification { - round_number: 2, - digest: Digest(600), - signatures: (0..10).map(|i| Signature(600, i)).collect(), - }; - - let check_message = |r, d: &Digest, s: &Signature| { - if r == 2 && d.0 == 600 && s.0 == 600 { - Some(AuthorityId(s.1)) - } else { - None - } - }; - - assert!(justification.clone().check(7, &check_message).is_ok()); - assert!(justification.clone().check(11, &check_message).is_err()); - - { - // one bad signature is enough to spoil it. - justification.signatures.push(Signature(1001, 255)); - assert!(justification.clone().check(7, &check_message).is_err()); - - justification.signatures.pop(); - } - // duplicates not allowed. - justification.signatures.extend((0..10).map(|i| Signature(600, i))); - assert!(justification.clone().check(11, &check_message).is_err()); - } - - #[test] - fn accepts_proposal_from_proposer_only() { - let mut accumulator = Accumulator::<_, Digest, _, _>::new(1, 7, AuthorityId(8)); - assert_eq!(accumulator.state(), &State::Begin); - - let res = accumulator.import_message(LocalizedMessage::Propose(LocalizedProposal { - sender: AuthorityId(5), - full_signature: Signature(999, 5), - digest_signature: Signature(999, 5), - proposal: Candidate(999), - digest: Digest(999), - round_number: 1, - })); - - assert!(res.is_err()); - - assert_eq!(accumulator.state(), &State::Begin); - - accumulator.import_message(LocalizedMessage::Propose(LocalizedProposal { - sender: AuthorityId(8), - full_signature: Signature(999, 8), - digest_signature: Signature(999, 8), - proposal: Candidate(999), - digest: Digest(999), - round_number: 1, - })).unwrap(); - - assert_eq!(accumulator.state(), &State::Proposed(Candidate(999))); - } - - #[test] - fn reaches_prepare_phase() { - let mut accumulator = Accumulator::new(1, 7, AuthorityId(8)); - assert_eq!(accumulator.state(), &State::Begin); - - accumulator.import_message(LocalizedMessage::Propose(LocalizedProposal { - sender: AuthorityId(8), - full_signature: Signature(999, 8), - digest_signature: Signature(999, 8), - round_number: 1, - proposal: Candidate(999), - digest: Digest(999), - })).unwrap(); - - assert_eq!(accumulator.state(), &State::Proposed(Candidate(999))); - - for i in 0..6 { - accumulator.import_message(LocalizedVote { - sender: AuthorityId(i), - signature: Signature(999, i), - vote: Vote::Prepare(1, Digest(999)), - }.into()).unwrap(); - - assert_eq!(accumulator.state(), &State::Proposed(Candidate(999))); - } - - accumulator.import_message(LocalizedVote { - sender: AuthorityId(7), - signature: Signature(999, 7), - vote: Vote::Prepare(1, Digest(999)), - }.into()).unwrap(); - - match accumulator.state() { - &State::Prepared(ref j) => assert_eq!(j.digest, Digest(999)), - s => panic!("wrong state: {:?}", s), - } - } - - #[test] - fn prepare_to_commit() { - let mut accumulator = Accumulator::new(1, 7, AuthorityId(8)); - assert_eq!(accumulator.state(), &State::Begin); - - accumulator.import_message(LocalizedMessage::Propose(LocalizedProposal { - sender: AuthorityId(8), - full_signature: Signature(999, 8), - digest_signature: Signature(999, 8), - round_number: 1, - proposal: Candidate(999), - digest: Digest(999), - })).unwrap(); - - assert_eq!(accumulator.state(), &State::Proposed(Candidate(999))); - - for i in 0..6 { - accumulator.import_message(LocalizedVote { - sender: AuthorityId(i), - signature: Signature(999, i), - vote: Vote::Prepare(1, Digest(999)), - }.into()).unwrap(); - - assert_eq!(accumulator.state(), &State::Proposed(Candidate(999))); - } - - accumulator.import_message(LocalizedVote { - sender: AuthorityId(7), - signature: Signature(999, 7), - vote: Vote::Prepare(1, Digest(999)), - }.into()).unwrap(); - - match accumulator.state() { - &State::Prepared(ref j) => assert_eq!(j.digest, Digest(999)), - s => panic!("wrong state: {:?}", s), - } - - for i in 0..6 { - accumulator.import_message(LocalizedVote { - sender: AuthorityId(i), - signature: Signature(999, i), - vote: Vote::Commit(1, Digest(999)), - }.into()).unwrap(); - - match accumulator.state() { - &State::Prepared(_) => {}, - s => panic!("wrong state: {:?}", s), - } - } - - accumulator.import_message(LocalizedVote { - sender: AuthorityId(7), - signature: Signature(999, 7), - vote: Vote::Commit(1, Digest(999)), - }.into()).unwrap(); - - match accumulator.state() { - &State::Committed(ref j) => assert_eq!(j.digest, Digest(999)), - s => panic!("wrong state: {:?}", s), - } - } - - #[test] - fn prepare_to_advance() { - let mut accumulator = Accumulator::new(1, 7, AuthorityId(8)); - assert_eq!(accumulator.state(), &State::Begin); - - accumulator.import_message(LocalizedMessage::Propose(LocalizedProposal { - sender: AuthorityId(8), - full_signature: Signature(999, 8), - digest_signature: Signature(999, 8), - round_number: 1, - proposal: Candidate(999), - digest: Digest(999), - })).unwrap(); - - assert_eq!(accumulator.state(), &State::Proposed(Candidate(999))); - - for i in 0..7 { - accumulator.import_message(LocalizedVote { - sender: AuthorityId(i), - signature: Signature(999, i), - vote: Vote::Prepare(1, Digest(999)), - }.into()).unwrap(); - } - - match accumulator.state() { - &State::Prepared(ref j) => assert_eq!(j.digest, Digest(999)), - s => panic!("wrong state: {:?}", s), - } - - for i in 0..6 { - accumulator.import_message(LocalizedVote { - sender: AuthorityId(i), - signature: Signature(999, i), - vote: Vote::AdvanceRound(1), - }.into()).unwrap(); - - match accumulator.state() { - &State::Prepared(_) => {}, - s => panic!("wrong state: {:?}", s), - } - } - - accumulator.import_message(LocalizedVote { - sender: AuthorityId(7), - signature: Signature(999, 7), - vote: Vote::AdvanceRound(1), - }.into()).unwrap(); - - match accumulator.state() { - &State::Advanced(Some(_)) => {}, - s => panic!("wrong state: {:?}", s), - } - } - - #[test] - fn conclude_different_than_proposed() { - let mut accumulator = Accumulator::::new(1, 7, AuthorityId(8)); - assert_eq!(accumulator.state(), &State::Begin); - - for i in 0..7 { - accumulator.import_message(LocalizedVote { - sender: AuthorityId(i), - signature: Signature(999, i), - vote: Vote::Prepare(1, Digest(999)), - }.into()).unwrap(); - } - - match accumulator.state() { - &State::Prepared(ref j) => assert_eq!(j.digest, Digest(999)), - s => panic!("wrong state: {:?}", s), - } - - for i in 0..7 { - accumulator.import_message(LocalizedVote { - sender: AuthorityId(i), - signature: Signature(999, i), - vote: Vote::Commit(1, Digest(999)), - }.into()).unwrap(); - } - - match accumulator.state() { - &State::Committed(ref j) => assert_eq!(j.digest, Digest(999)), - s => panic!("wrong state: {:?}", s), - } - } - - #[test] - fn propose_after_prepared_does_not_clobber_state() { - let mut accumulator = Accumulator::::new(1, 7, AuthorityId(8)); - assert_eq!(accumulator.state(), &State::Begin); - - for i in 0..7 { - accumulator.import_message(LocalizedVote { - sender: AuthorityId(i), - signature: Signature(999, i), - vote: Vote::Prepare(1, Digest(999)), - }.into()).unwrap(); - } - - match accumulator.state() { - &State::Prepared(ref j) => assert_eq!(j.digest, Digest(999)), - s => panic!("wrong state: {:?}", s), - } - - accumulator.import_message(LocalizedMessage::Propose(LocalizedProposal { - sender: AuthorityId(8), - full_signature: Signature(999, 8), - digest_signature: Signature(999, 8), - round_number: 1, - proposal: Candidate(999), - digest: Digest(999), - })).unwrap(); - - match accumulator.state() { - &State::Prepared(ref j) => assert_eq!(j.digest, Digest(999)), - s => panic!("wrong state: {:?}", s), - } - } - - #[test] - fn propose_after_committed_does_not_clobber_state() { - let mut accumulator = Accumulator::::new(1, 7, AuthorityId(8)); - assert_eq!(accumulator.state(), &State::Begin); - - for i in 0..7 { - accumulator.import_message(LocalizedVote { - sender: AuthorityId(i), - signature: Signature(999, i), - vote: Vote::Commit(1, Digest(999)), - }.into()).unwrap(); - } - - match accumulator.state() { - &State::Committed(ref j) => assert_eq!(j.digest, Digest(999)), - s => panic!("wrong state: {:?}", s), - } - - accumulator.import_message(LocalizedMessage::Propose(LocalizedProposal { - sender: AuthorityId(8), - full_signature: Signature(999, 8), - digest_signature: Signature(999, 8), - round_number: 1, - proposal: Candidate(999), - digest: Digest(999), - })).unwrap(); - - match accumulator.state() { - &State::Committed(ref j) => assert_eq!(j.digest, Digest(999)), - s => panic!("wrong state: {:?}", s), - } - } - - #[test] - fn propose_after_advance_does_not_clobber_state() { - let mut accumulator = Accumulator::::new(1, 7, AuthorityId(8)); - assert_eq!(accumulator.state(), &State::Begin); - - for i in 0..7 { - accumulator.import_message(LocalizedVote { - sender: AuthorityId(i), - signature: Signature(1, i), - vote: Vote::AdvanceRound(1), - }.into()).unwrap(); - } - - match accumulator.state() { - &State::Advanced(_) => {} - s => panic!("wrong state: {:?}", s), - } - - accumulator.import_message(LocalizedMessage::Propose(LocalizedProposal { - sender: AuthorityId(8), - full_signature: Signature(999, 8), - digest_signature: Signature(999, 8), - round_number: 1, - proposal: Candidate(999), - digest: Digest(999), - })).unwrap(); - - match accumulator.state() { - &State::Advanced(_) => {} - s => panic!("wrong state: {:?}", s), - } - } - - #[test] - fn begin_to_advance() { - let mut accumulator = Accumulator::::new(1, 7, AuthorityId(8)); - assert_eq!(accumulator.state(), &State::Begin); - - for i in 0..7 { - accumulator.import_message(LocalizedVote { - sender: AuthorityId(i), - signature: Signature(1, i), - vote: Vote::AdvanceRound(1), - }.into()).unwrap(); - } - - match accumulator.state() { - &State::Advanced(ref j) => assert!(j.is_none()), - s => panic!("wrong state: {:?}", s), - } - } - - #[test] - fn conclude_without_prepare() { - let mut accumulator = Accumulator::::new(1, 7, AuthorityId(8)); - assert_eq!(accumulator.state(), &State::Begin); - - for i in 0..7 { - accumulator.import_message(LocalizedVote { - sender: AuthorityId(i), - signature: Signature(999, i), - vote: Vote::Commit(1, Digest(999)), - }.into()).unwrap(); - } - - match accumulator.state() { - &State::Committed(ref j) => assert_eq!(j.digest, Digest(999)), - s => panic!("wrong state: {:?}", s), - } - } - - #[test] - fn double_prepare_is_misbehavior() { - let mut accumulator = Accumulator::::new(1, 7, AuthorityId(8)); - assert_eq!(accumulator.state(), &State::Begin); - - for i in 0..7 { - accumulator.import_message(LocalizedVote { - sender: AuthorityId(i), - signature: Signature(999, i), - vote: Vote::Prepare(1, Digest(999)), - }.into()).unwrap(); - - let res = accumulator.import_message(LocalizedVote { - sender: AuthorityId(i), - signature: Signature(123, i), - vote: Vote::Prepare(1, Digest(123)), - }.into()); - - assert!(res.is_err()); - - } - } - - #[test] - fn double_commit_is_misbehavior() { - let mut accumulator = Accumulator::::new(1, 7, AuthorityId(8)); - assert_eq!(accumulator.state(), &State::Begin); - - for i in 0..7 { - accumulator.import_message(LocalizedVote { - sender: AuthorityId(i), - signature: Signature(999, i), - vote: Vote::Commit(1, Digest(999)), - }.into()).unwrap(); - - let res = accumulator.import_message(LocalizedVote { - sender: AuthorityId(i), - signature: Signature(123, i), - vote: Vote::Commit(1, Digest(123)), - }.into()); - - assert!(res.is_err()); - - } - } - - #[test] - fn double_propose_is_misbehavior() { - let mut accumulator = Accumulator::::new(1, 7, AuthorityId(8)); - assert_eq!(accumulator.state(), &State::Begin); - - accumulator.import_message(LocalizedMessage::Propose(LocalizedProposal { - sender: AuthorityId(8), - full_signature: Signature(999, 8), - digest_signature: Signature(999, 8), - round_number: 1, - proposal: Candidate(999), - digest: Digest(999), - })).unwrap(); - - let res = accumulator.import_message(LocalizedMessage::Propose(LocalizedProposal { - sender: AuthorityId(8), - full_signature: Signature(500, 8), - digest_signature: Signature(500, 8), - round_number: 1, - proposal: Candidate(500), - digest: Digest(500), - })); - - assert!(res.is_err()); - } -} diff --git a/substrate/bft/src/generic/mod.rs b/substrate/bft/src/generic/mod.rs deleted file mode 100644 index f2cefd63cd310..0000000000000 --- a/substrate/bft/src/generic/mod.rs +++ /dev/null @@ -1,832 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// This file is part of Substrate. - -// Substrate is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Substrate is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Substrate. If not, see . - -//! BFT Agreement based on a rotating proposer in different rounds. -//! Very general implementation. - -use std::collections::{HashMap, BTreeMap, VecDeque}; -use std::collections::hash_map; -use std::fmt::Debug; -use std::hash::Hash; - -use futures::{future, Future, Stream, Sink, Poll, Async, AsyncSink}; - -use self::accumulator::State; - -pub use self::accumulator::{Accumulator, Justification, PrepareJustification, UncheckedJustification, Misbehavior}; - -mod accumulator; - -#[cfg(test)] -mod tests; - -/// Votes during a round. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum Vote { - /// Prepare to vote for proposal with digest D. - Prepare(usize, D), - /// Commit to proposal with digest D.. - Commit(usize, D), - /// Propose advancement to a new round. - AdvanceRound(usize), -} - -impl Vote { - /// Extract the round number. - pub fn round_number(&self) -> usize { - match *self { - Vote::Prepare(round, _) => round, - Vote::Commit(round, _) => round, - Vote::AdvanceRound(round) => round, - } - } -} - -/// Messages over the proposal. -/// Each message carries an associated round number. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum Message { - /// A proposal itself. - Propose(usize, C), - /// A vote of some kind, localized to a round number. - Vote(Vote), -} - -impl From> for Message { - fn from(vote: Vote) -> Self { - Message::Vote(vote) - } -} - -/// A localized proposal message. Contains two signed pieces of data. -#[derive(Debug, Clone)] -pub struct LocalizedProposal { - /// The round number. - pub round_number: usize, - /// The proposal sent. - pub proposal: C, - /// The digest of the proposal. - pub digest: D, - /// The sender of the proposal - pub sender: V, - /// The signature on the message (propose, round number, digest) - pub digest_signature: S, - /// The signature on the message (propose, round number, proposal) - pub full_signature: S, -} - -/// A localized vote message, including the sender. -#[derive(Debug, Clone)] -pub struct LocalizedVote { - /// The message sent. - pub vote: Vote, - /// The sender of the message - pub sender: V, - /// The signature of the message. - pub signature: S, -} - -/// A localized message. -#[derive(Debug, Clone)] -pub enum LocalizedMessage { - /// A proposal. - Propose(LocalizedProposal), - /// A vote. - Vote(LocalizedVote), -} - -impl LocalizedMessage { - /// Extract the sender. - pub fn sender(&self) -> &V { - match *self { - LocalizedMessage::Propose(ref proposal) => &proposal.sender, - LocalizedMessage::Vote(ref vote) => &vote.sender, - } - } - - /// Extract the round number. - pub fn round_number(&self) -> usize { - match *self { - LocalizedMessage::Propose(ref proposal) => proposal.round_number, - LocalizedMessage::Vote(ref vote) => vote.vote.round_number(), - } - } -} - -impl From> for LocalizedMessage { - fn from(vote: LocalizedVote) -> Self { - LocalizedMessage::Vote(vote) - } -} - -/// Context necessary for agreement. -/// -/// Provides necessary types for protocol messages, and functions necessary for a -/// participant to evaluate and create those messages. -pub trait Context { - /// Errors which can occur from the futures in this context. - type Error: From; - /// Candidate proposed. - type Candidate: Debug + Eq + Clone; - /// Candidate digest. - type Digest: Debug + Hash + Eq + Clone; - /// Authority ID. - type AuthorityId: Debug + Hash + Eq + Clone; - /// Signature. - type Signature: Debug + Eq + Clone; - /// A future that resolves when a round timeout is concluded. - type RoundTimeout: Future; - /// A future that resolves when a proposal is ready. - type CreateProposal: Future; - /// A future that resolves when a proposal has been evaluated. - type EvaluateProposal: Future; - - /// Get the local authority ID. - fn local_id(&self) -> Self::AuthorityId; - - /// Get the best proposal. - fn proposal(&self) -> Self::CreateProposal; - - /// Get the digest of a candidate. - fn candidate_digest(&self, candidate: &Self::Candidate) -> Self::Digest; - - /// Sign a message using the local authority ID. - /// In the case of a proposal message, it should sign on the hash and - /// the bytes of the proposal. - fn sign_local(&self, message: Message) - -> LocalizedMessage; - - /// Get the proposer for a given round of consensus. - fn round_proposer(&self, round: usize) -> Self::AuthorityId; - - /// Whether the proposal is valid. - fn proposal_valid(&self, proposal: &Self::Candidate) -> Self::EvaluateProposal; - - /// Create a round timeout. The context will determine the correct timeout - /// length, and create a future that will resolve when the timeout is - /// concluded. - fn begin_round_timeout(&self, round: usize) -> Self::RoundTimeout; -} - -/// Communication that can occur between participants in consensus. -#[derive(Debug, Clone)] -pub enum Communication { - /// A consensus message (proposal or vote) - Consensus(LocalizedMessage), - /// Auxiliary communication (just proof-of-lock for now). - Auxiliary(PrepareJustification), -} - -/// Hack to get around type alias warning. -pub trait TypeResolve { - /// Communication type. - type Communication; -} - -impl TypeResolve for C { - type Communication = Communication; -} - -#[derive(Debug)] -struct Sending { - items: VecDeque, - flushing: bool, -} - -impl Sending { - fn with_capacity(n: usize) -> Self { - Sending { - items: VecDeque::with_capacity(n), - flushing: false, - } - } - - fn push(&mut self, item: T) { - self.items.push_back(item); - self.flushing = false; - } - - // process all the sends into the sink. - fn process_all>(&mut self, sink: &mut S) -> Poll<(), S::SinkError> { - while let Some(item) = self.items.pop_front() { - match sink.start_send(item) { - Err(e) => return Err(e), - Ok(AsyncSink::NotReady(item)) => { - self.items.push_front(item); - return Ok(Async::NotReady); - } - Ok(AsyncSink::Ready) => { self.flushing = true; } - } - } - - if self.flushing { - match sink.poll_complete() { - Err(e) => return Err(e), - Ok(Async::NotReady) => return Ok(Async::NotReady), - Ok(Async::Ready(())) => { self.flushing = false; } - } - } - - Ok(Async::Ready(())) - } -} - -/// Error returned when the input stream concludes. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub struct InputStreamConcluded; - -impl ::std::fmt::Display for InputStreamConcluded { - fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { - write!(f, "{}", ::std::error::Error::description(self)) - } -} - -impl ::std::error::Error for InputStreamConcluded { - fn description(&self) -> &str { - "input stream of messages concluded prematurely" - } -} - -// get the "full BFT" threshold based on an amount of nodes and -// a maximum faulty. if nodes == 3f + 1, then threshold == 2f + 1. -fn bft_threshold(nodes: usize, max_faulty: usize) -> usize { - nodes - max_faulty -} - -/// Committed successfully. -#[derive(Debug, Clone)] -pub struct Committed { - /// The candidate committed for. This will be unknown if - /// we never witnessed the proposal of the last round. - pub candidate: Option, - /// A justification for the candidate. - pub justification: Justification, -} - -struct Locked { - justification: PrepareJustification, -} - -impl Locked { - fn digest(&self) -> &D { - &self.justification.digest - } -} - -// the state of the local node during the current state of consensus. -// -// behavior is different when locked on a proposal. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum LocalState { - Start, - Proposed, - Prepared(bool), // whether we thought it valid. - Committed, - VoteAdvance, -} - -// This structure manages a single "view" of consensus. -// -// We maintain two message accumulators: one for the round we are currently in, -// and one for a future round. -// -// We advance the round accumulators when one of two conditions is met: -// - we witness consensus of advancement in the current round. in this case we -// advance by one. -// - a higher threshold-prepare is broadcast to us. in this case we can -// advance to the round of the threshold-prepare. this is an indication -// that we have experienced severe asynchrony/clock drift with the remainder -// of the other authorities, and it is unlikely that we can assist in -// consensus meaningfully. nevertheless we make an attempt. -struct Strategy { - nodes: usize, - max_faulty: usize, - fetching_proposal: Option, - evaluating_proposal: Option, - round_timeout: future::Fuse, - local_state: LocalState, - locked: Option>, - notable_candidates: HashMap, - current_accumulator: Accumulator, - future_accumulators: BTreeMap>, - local_id: C::AuthorityId, - misbehavior: HashMap>, -} - -impl Strategy { - fn create(context: &C, nodes: usize, max_faulty: usize) -> Self { - let timeout = context.begin_round_timeout(0); - let threshold = bft_threshold(nodes, max_faulty); - - let current_accumulator = Accumulator::new( - 0, - threshold, - context.round_proposer(0), - ); - - Strategy { - nodes, - max_faulty, - current_accumulator, - future_accumulators: BTreeMap::new(), - fetching_proposal: None, - evaluating_proposal: None, - local_state: LocalState::Start, - locked: None, - notable_candidates: HashMap::new(), - round_timeout: timeout.fuse(), - local_id: context.local_id(), - misbehavior: HashMap::new(), - } - } - - fn current_round(&self) -> usize { - self.current_accumulator.round_number() - } - - fn import_message( - &mut self, - context: &C, - msg: LocalizedMessage - ) { - let round_number = msg.round_number(); - - let sender = msg.sender().clone(); - let current_round = self.current_round(); - let misbehavior = if round_number == current_round { - self.current_accumulator.import_message(msg) - } else if round_number > current_round { - let threshold = bft_threshold(self.nodes, self.max_faulty); - - let mut future_acc = self.future_accumulators.entry(round_number).or_insert_with(|| { - Accumulator::new( - round_number, - threshold, - context.round_proposer(round_number), - ) - }); - - future_acc.import_message(msg) - } else { - Ok(()) - }; - - if let Err(misbehavior) = misbehavior { - self.misbehavior.insert(sender, misbehavior); - } - } - - fn import_lock_proof( - &mut self, - context: &C, - justification: PrepareJustification, - ) { - // TODO: find a way to avoid processing of the signatures if the sender is - // not the primary or the round number is low. - if justification.round_number > self.current_round() { - // jump ahead to the prior round as this is an indication of a supermajority - // good nodes being at least on that round. - self.advance_to_round(context, justification.round_number); - } - - let lock_to_new = self.locked.as_ref() - .map_or(true, |l| l.justification.round_number < justification.round_number); - - if lock_to_new { - self.locked = Some(Locked { justification }) - } - } - - // poll the strategy: this will queue messages to be sent and advance - // rounds if necessary. - // - // only call within the context of a `Task`. - fn poll( - &mut self, - context: &C, - sending: &mut Sending<::Communication> - ) - -> Poll, C::Error> - { - let mut last_watermark = (self.current_round(), self.local_state); - - // poll until either completion or state doesn't change. - loop { - trace!(target: "bft", "Polling BFT logic. State={:?}", last_watermark); - match self.poll_once(context, sending)? { - Async::Ready(x) => return Ok(Async::Ready(x)), - Async::NotReady => { - let new_watermark = (self.current_round(), self.local_state); - - if new_watermark == last_watermark { - return Ok(Async::NotReady) - } else { - last_watermark = new_watermark; - } - } - } - } - } - - // perform one round of polling: attempt to broadcast messages and change the state. - // if the round or internal round-state changes, this should be called again. - fn poll_once( - &mut self, - context: &C, - sending: &mut Sending<::Communication> - ) - -> Poll, C::Error> - { - self.propose(context, sending)?; - self.prepare(context, sending)?; - self.commit(context, sending); - self.vote_advance(context, sending)?; - - let advance = match self.current_accumulator.state() { - &State::Advanced(ref p_just) => { - // lock to any witnessed prepare justification. - if let Some(p_just) = p_just.as_ref() { - self.locked = Some(Locked { justification: p_just.clone() }); - } - - let round_number = self.current_round(); - Some(round_number + 1) - } - &State::Committed(ref just) => { - // fetch the agreed-upon candidate: - // - we may not have received the proposal in the first place - // - there is no guarantee that the proposal we got was agreed upon - // (can happen if faulty primary) - // - look in the candidates of prior rounds just in case. - let candidate = self.current_accumulator - .proposal() - .and_then(|c| if context.candidate_digest(c) == just.digest { - Some(c.clone()) - } else { - None - }) - .or_else(|| self.notable_candidates.get(&just.digest).cloned()); - - let committed = Committed { - candidate, - justification: just.clone() - }; - - return Ok(Async::Ready(committed)) - } - _ => None, - }; - - if let Some(new_round) = advance { - self.advance_to_round(context, new_round); - } - - Ok(Async::NotReady) - } - - fn propose( - &mut self, - context: &C, - sending: &mut Sending<::Communication> - ) - -> Result<(), C::Error> - { - if let LocalState::Start = self.local_state { - let mut propose = false; - if let &State::Begin = self.current_accumulator.state() { - let round_number = self.current_round(); - let primary = context.round_proposer(round_number); - propose = self.local_id == primary; - }; - - if !propose { return Ok(()) } - - // obtain the proposal to broadcast. - let proposal = match self.locked { - Some(ref locked) => { - // TODO: it's possible but very unlikely that we don't have the - // corresponding proposal for what we are locked to. - // - // since this is an edge case on an edge case, it is fine - // to eat the round timeout for now, but it can be optimized by - // broadcasting an advance vote. - self.notable_candidates.get(locked.digest()).cloned() - } - None => { - let res = self.fetching_proposal - .get_or_insert_with(|| context.proposal()) - .poll()?; - - match res { - Async::Ready(p) => Some(p), - Async::NotReady => None, - } - } - }; - - if let Some(proposal) = proposal { - self.fetching_proposal = None; - - let message = Message::Propose( - self.current_round(), - proposal - ); - - self.import_and_send_message(message, context, sending); - - // broadcast the justification along with the proposal if we are locked. - if let Some(ref locked) = self.locked { - sending.push( - Communication::Auxiliary(locked.justification.clone()) - ); - } - - self.local_state = LocalState::Proposed; - } - } - - Ok(()) - } - - fn prepare( - &mut self, - context: &C, - sending: &mut Sending<::Communication> - ) - -> Result<(), C::Error> - { - // prepare only upon start or having proposed. - match self.local_state { - LocalState::Start | LocalState::Proposed => {}, - _ => return Ok(()) - }; - - let mut prepare_for = None; - - // we can't prepare until something was proposed. - if let &State::Proposed(ref candidate) = self.current_accumulator.state() { - let digest = context.candidate_digest(candidate); - - // vote to prepare only if we believe the candidate to be valid and - // we are not locked on some other candidate. - match self.locked { - Some(ref locked) if locked.digest() != &digest => {} - Some(_) => { - // don't check validity if we are locked. - // this is necessary to preserve the liveness property. - self.local_state = LocalState::Prepared(true); - prepare_for = Some(digest); - } - None => { - let res = self.evaluating_proposal - .get_or_insert_with(|| context.proposal_valid(candidate)) - .poll()?; - - if let Async::Ready(valid) = res { - self.evaluating_proposal = None; - self.local_state = LocalState::Prepared(valid); - - if valid { - prepare_for = Some(digest); - } - } - } - } - } - - if let Some(digest) = prepare_for { - let message = Vote::Prepare( - self.current_round(), - digest - ).into(); - - self.import_and_send_message(message, context, sending); - } - - Ok(()) - } - - fn commit( - &mut self, - context: &C, - sending: &mut Sending<::Communication> - ) { - // commit only if we haven't voted to advance or committed already - match self.local_state { - LocalState::Committed | LocalState::VoteAdvance => return, - _ => {} - } - - let mut commit_for = None; - - if let &State::Prepared(ref p_just) = self.current_accumulator.state() { - // we are now locked to this prepare justification. - let digest = p_just.digest.clone(); - self.locked = Some(Locked { justification: p_just.clone() }); - commit_for = Some(digest); - } - - if let Some(digest) = commit_for { - let message = Vote::Commit( - self.current_round(), - digest - ).into(); - - self.import_and_send_message(message, context, sending); - self.local_state = LocalState::Committed; - } - } - - fn vote_advance( - &mut self, - context: &C, - sending: &mut Sending<::Communication> - ) - -> Result<(), C::Error> - { - // we can vote for advancement under all circumstances unless we have already. - if let LocalState::VoteAdvance = self.local_state { return Ok(()) } - - // if we got f + 1 advance votes, or the timeout has fired, and we haven't - // sent an AdvanceRound message yet, do so. - let mut attempt_advance = self.current_accumulator.advance_votes() > self.max_faulty; - - // if we evaluated the proposal and it was bad, vote to advance round. - if let LocalState::Prepared(false) = self.local_state { - attempt_advance = true; - } - - // if the timeout has fired, vote to advance round. - if let Async::Ready(_) = self.round_timeout.poll()? { - attempt_advance = true; - } - - if attempt_advance { - let message = Vote::AdvanceRound( - self.current_round(), - ).into(); - - self.import_and_send_message(message, context, sending); - self.local_state = LocalState::VoteAdvance; - } - - Ok(()) - } - - fn advance_to_round(&mut self, context: &C, round: usize) { - assert!(round > self.current_round()); - trace!(target: "bft", "advancing to round {}", round); - - self.fetching_proposal = None; - self.evaluating_proposal = None; - self.round_timeout = context.begin_round_timeout(round).fuse(); - self.local_state = LocalState::Start; - - // when advancing from a round, store away the witnessed proposal. - // - // if we or other participants end up locked on that candidate, - // we will have it. - if let Some(proposal) = self.current_accumulator.proposal() { - let digest = context.candidate_digest(proposal); - self.notable_candidates.entry(digest).or_insert_with(|| proposal.clone()); - } - - // if we jump ahead more than one round, get rid of the ones in between. - for irrelevant in (self.current_round() + 1)..round { - self.future_accumulators.remove(&irrelevant); - } - - // use stored future accumulator for given round or create if it doesn't exist. - self.current_accumulator = match self.future_accumulators.remove(&round) { - Some(x) => x, - None => Accumulator::new( - round, - bft_threshold(self.nodes, self.max_faulty), - context.round_proposer(round), - ), - }; - } - - fn import_and_send_message( - &mut self, - message: Message, - context: &C, - sending: &mut Sending<::Communication> - ) { - let signed_message = context.sign_local(message); - self.import_message(context, signed_message.clone()); - sending.push(Communication::Consensus(signed_message)); - } -} - -/// Future that resolves upon BFT agreement for a candidate. -#[must_use = "futures do nothing unless polled"] -pub struct Agreement { - context: C, - input: I, - output: O, - concluded: Option>, - sending: Sending<::Communication>, - strategy: Strategy, -} - -impl Future for Agreement - where - C: Context, - I: Stream::Communication,Error=C::Error>, - O: Sink::Communication,SinkError=C::Error>, -{ - type Item = Committed; - type Error = C::Error; - - fn poll(&mut self) -> Poll { - // even if we've observed the conclusion, wait until all - // pending outgoing messages are flushed. - if let Some(just) = self.concluded.take() { - return Ok(match self.sending.process_all(&mut self.output)? { - Async::Ready(()) => Async::Ready(just), - Async::NotReady => { - self.concluded = Some(just); - Async::NotReady - } - }) - } - - // drive state machine as long as there are new messages. - let mut driving = true; - while driving { - driving = match self.input.poll()? { - Async::Ready(msg) => { - match msg.ok_or(InputStreamConcluded)? { - Communication::Consensus(message) => self.strategy.import_message(&self.context, message), - Communication::Auxiliary(lock_proof) - => self.strategy.import_lock_proof(&self.context, lock_proof), - } - - true - } - Async::NotReady => false, - }; - - // drive state machine after handling new input. - if let Async::Ready(just) = self.strategy.poll(&self.context, &mut self.sending)? { - self.concluded = Some(just); - return self.poll(); - } - } - - // make progress on flushing all pending messages. - let _ = self.sending.process_all(&mut self.output)?; - Ok(Async::NotReady) - } -} - -impl Agreement { - /// Get a reference to the underlying context. - pub fn context(&self) -> &C { - &self.context - } - - /// Drain the misbehavior vector. - pub fn drain_misbehavior(&mut self) -> hash_map::Drain> { - self.strategy.misbehavior.drain() - } -} - -/// Attempt to reach BFT agreement on a candidate. -/// -/// `nodes` is the number of nodes in the system. -/// `max_faulty` is the maximum number of faulty nodes. Should be less than -/// 1/3 of `nodes`, otherwise agreement may never be reached. -/// -/// The input and output streams should follow the constraints documented in the crate root. -pub fn agree(context: C, nodes: usize, max_faulty: usize, input: I, output: O) - -> Agreement - where - C: Context, - I: Stream::Communication,Error=C::Error>, - O: Sink::Communication,SinkError=C::Error>, -{ - let strategy = Strategy::create(&context, nodes, max_faulty); - Agreement { - context, - input, - output, - concluded: None, - sending: Sending::with_capacity(4), - strategy: strategy, - } -} diff --git a/substrate/bft/src/generic/tests.rs b/substrate/bft/src/generic/tests.rs deleted file mode 100644 index fc88f1e24610d..0000000000000 --- a/substrate/bft/src/generic/tests.rs +++ /dev/null @@ -1,469 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Substrate is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Substrate is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Substrate. If not, see . - -//! Tests for the candidate agreement strategy. - -use super::*; - -use std::collections::BTreeSet; -use std::sync::{Arc, Mutex}; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::time::{Instant, Duration}; - -use futures::sync::mpsc; -use futures::future::{self, FutureResult}; - -use tokio::timer::Delay; -use tokio::runtime; -use tokio::prelude::*; - -const ROUND_DURATION: Duration = Duration::from_millis(50); - -struct Network { - endpoints: Vec>, - input: mpsc::UnboundedReceiver<(usize, T)>, -} - -impl Network { - fn new(nodes: usize) - -> (Self, Vec>, Vec>) - { - let mut inputs = Vec::with_capacity(nodes); - let mut outputs = Vec::with_capacity(nodes); - let mut endpoints = Vec::with_capacity(nodes); - - let (in_tx, in_rx) = mpsc::unbounded(); - for _ in 0..nodes { - let (out_tx, out_rx) = mpsc::unbounded(); - inputs.push(in_tx.clone()); - outputs.push(out_rx); - endpoints.push(out_tx); - } - - let network = Network { - endpoints, - input: in_rx, - }; - - (network, inputs, outputs) - } - - fn route_in_background(self) { - ::tokio::executor::spawn(self); - } -} - -impl Future for Network { - type Item = (); - type Error = (); - - fn poll(&mut self) -> Poll<(), Self::Error> { - match try_ready!(self.input.poll()) { - None => Ok(Async::Ready(())), - Some((sender, item)) => { - { - let receiving_endpoints = self.endpoints - .iter() - .enumerate() - .filter(|&(i, _)| i != sender) - .map(|(_, x)| x); - - for endpoint in receiving_endpoints { - let _ = endpoint.unbounded_send(item.clone()); - } - } - - self.poll() - } - } - } -} - -#[derive(Debug, PartialEq, Eq, Clone, Hash)] -struct Candidate(usize); - -#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)] -struct Digest(usize); - -#[derive(Debug, PartialEq, Eq, Clone, Hash)] -struct AuthorityId(usize); - -#[derive(Debug, PartialEq, Eq, Clone)] -struct Signature(Message, AuthorityId); - -#[derive(Debug)] -struct Error; - -impl From for Error { - fn from(_: InputStreamConcluded) -> Error { - Error - } -} - -struct TestContext { - local_id: AuthorityId, - proposal: Mutex, - node_count: usize, - current_round: Arc, - evaluated: Mutex>, -} - -impl Context for TestContext { - type Error = Error; - type Candidate = Candidate; - type Digest = Digest; - type AuthorityId = AuthorityId; - type Signature = Signature; - type RoundTimeout = Box>; - type CreateProposal = FutureResult; - type EvaluateProposal = FutureResult; - - fn local_id(&self) -> AuthorityId { - self.local_id.clone() - } - - fn proposal(&self) -> Self::CreateProposal { - let proposal = { - let mut p = self.proposal.lock().unwrap(); - let x = *p; - *p += self.node_count; - x - }; - - Ok(Candidate(proposal)).into_future() - } - - fn candidate_digest(&self, candidate: &Candidate) -> Digest { - Digest(candidate.0) - } - - fn sign_local(&self, message: Message) - -> LocalizedMessage - { - let signature = Signature(message.clone(), self.local_id.clone()); - - match message { - Message::Propose(r, proposal) => LocalizedMessage::Propose(LocalizedProposal { - round_number: r, - digest: Digest(proposal.0), - proposal, - digest_signature: signature.clone(), - full_signature: signature, - sender: self.local_id.clone(), - }), - Message::Vote(vote) => LocalizedMessage::Vote(LocalizedVote { - vote, - signature, - sender: self.local_id.clone(), - }), - } - } - - fn round_proposer(&self, round: usize) -> AuthorityId { - AuthorityId(round % self.node_count) - } - - fn proposal_valid(&self, proposal: &Candidate) -> FutureResult { - if !self.evaluated.lock().unwrap().insert(proposal.0) { - panic!("Evaluated proposal {:?} twice", proposal.0); - } - - Ok(proposal.0 % 3 != 0).into_future() - } - - fn begin_round_timeout(&self, round: usize) -> Self::RoundTimeout { - if round < self.current_round.load(Ordering::SeqCst) { - Box::new(Ok(()).into_future()) - } else { - let mut round_duration = ROUND_DURATION; - for _ in 0..round { - round_duration *= 2; - } - - let current_round = self.current_round.clone(); - let timeout = Delay::new(Instant::now() + round_duration) - .map(move |_| { - current_round.compare_and_swap(round, round + 1, Ordering::SeqCst); - }) - .map_err(|_| Error); - - Box::new(timeout) - } - } -} - -fn test_harness(f: F) -> Result where - F: FnOnce() -> X, - X: IntoFuture, -{ - let mut runtime = runtime::current_thread::Runtime::new().unwrap(); - runtime.block_on(future::lazy(f)) -} - -#[test] -fn consensus_completes_with_minimum_good() { - let results = test_harness(|| { - let node_count = 10; - let max_faulty = 3; - - let (network, net_send, net_recv) = Network::new(node_count); - network.route_in_background(); - - let nodes = net_send - .into_iter() - .zip(net_recv) - .take(node_count - max_faulty) - .enumerate() - .map(|(i, (tx, rx))| { - let ctx = TestContext { - local_id: AuthorityId(i), - proposal: Mutex::new(i), - current_round: Arc::new(AtomicUsize::new(0)), - evaluated: Mutex::new(BTreeSet::new()), - node_count, - }; - - agree( - ctx, - node_count, - max_faulty, - rx.map_err(|_| Error), - tx.sink_map_err(|_| Error).with(move |t| Ok((i, t))), - ) - }) - .collect::>(); - - ::futures::future::join_all(nodes) - .deadline(Instant::now() + Duration::from_millis(500)) - }).unwrap(); - - for result in &results { - assert_eq!(&result.justification.digest, &results[0].justification.digest); - } -} - -#[test] -fn consensus_completes_with_minimum_good_all_initial_proposals_bad() { - let results = test_harness(|| { - let node_count = 10; - let max_faulty = 3; - - let (network, net_send, net_recv) = Network::new(node_count); - network.route_in_background(); - - let nodes = net_send - .into_iter() - .zip(net_recv) - .take(node_count - max_faulty) - .enumerate() - .map(|(i, (tx, rx))| { - // the first 5 proposals are going to be bad. - let proposal = if i < 5 { - i * 3 // proposals considered bad in the tests if they are % 3 - } else { - (i * 3) + 1 - }; - - let ctx = TestContext { - local_id: AuthorityId(i), - proposal: Mutex::new(proposal), - current_round: Arc::new(AtomicUsize::new(0)), - evaluated: Mutex::new(BTreeSet::new()), - node_count, - }; - - agree( - ctx, - node_count, - max_faulty, - rx.map_err(|_| Error), - tx.sink_map_err(|_| Error).with(move |t| Ok((i, t))), - ) - }) - .collect::>(); - - ::futures::future::join_all(nodes) - .deadline(Instant::now() + Duration::from_millis(500)) - }).unwrap(); - - for result in &results { - assert_eq!(&result.justification.digest, &results[0].justification.digest); - } -} - -#[test] -fn consensus_does_not_complete_without_enough_nodes() { - let result = test_harness(|| { - let node_count = 10; - let max_faulty = 3; - - let (network, net_send, net_recv) = Network::new(node_count); - network.route_in_background(); - - let nodes = net_send - .into_iter() - .zip(net_recv) - .take(node_count - max_faulty - 1) - .enumerate() - .map(|(i, (tx, rx))| { - let ctx = TestContext { - local_id: AuthorityId(i), - proposal: Mutex::new(i), - current_round: Arc::new(AtomicUsize::new(0)), - evaluated: Mutex::new(BTreeSet::new()), - node_count, - }; - - agree( - ctx, - node_count, - max_faulty, - rx.map_err(|_| Error), - tx.sink_map_err(|_| Error).with(move |t| Ok((i, t))), - ) - }) - .collect::>(); - - ::futures::future::join_all(nodes) - .deadline(Instant::now() + Duration::from_millis(500)) - }); - - match result { - Ok(_) => panic!("completed wrongly"), - Err(ref e) if e.is_inner() => panic!("failed for wrong reason"), - Err(_) => {}, - } -} - -#[test] -fn threshold_plus_one_locked_on_proposal_only_one_with_candidate() { - let locked_digest = Digest(999_999_999); - - let results = test_harness(move || { - let node_count = 10; - let max_faulty = 3; - - let locked_proposal = Candidate(999_999_999); - let locked_round = 1; - let justification = UncheckedJustification { - round_number: locked_round, - digest: locked_digest.clone(), - signatures: (0..7) - .map(|i| Signature(Message::Vote(Vote::Prepare(locked_round, locked_digest.clone())), AuthorityId(i))) - .collect() - }.check(7, |_, _, s| Some(s.1.clone())).unwrap(); - - let (network, net_send, net_recv) = Network::new(node_count); - network.route_in_background(); - - let nodes = net_send - .into_iter() - .zip(net_recv) - .enumerate() - .map(|(i, (tx, rx))| { - let ctx = TestContext { - local_id: AuthorityId(i), - proposal: Mutex::new(i), - current_round: Arc::new(AtomicUsize::new(locked_round + 1)), - evaluated: Mutex::new(BTreeSet::new()), - node_count, - }; - let mut agreement = agree( - ctx, - node_count, - max_faulty, - rx.map_err(|_| Error), - tx.sink_map_err(|_| Error).with(move |t| Ok((i, t))), - ); - - agreement.strategy.advance_to_round( - &agreement.context, - locked_round + 1 - ); - - if i <= max_faulty { - agreement.strategy.locked = Some(Locked { - justification: justification.clone(), - }) - } - - if i == max_faulty { - agreement.strategy.notable_candidates.insert( - locked_digest.clone(), - locked_proposal.clone(), - ); - } - - agreement - }) - .collect::>(); - - ::futures::future::join_all(nodes) - .deadline(Instant::now() + Duration::from_millis(1000)) - }).unwrap(); - - for result in &results { - assert_eq!(&result.justification.digest, &locked_digest); - } -} - -#[test] -fn consensus_completes_even_when_nodes_start_with_a_delay() { - let results = test_harness(|| { - let node_count = 10; - let max_faulty = 3; - let base_sleep = Duration::from_millis(75); - - let now = Instant::now(); - let (network, net_send, net_recv) = Network::new(node_count); - network.route_in_background(); - - let nodes = net_send - .into_iter() - .zip(net_recv) - .take(node_count - max_faulty) - .enumerate() - .map(move |(i, (tx, rx))| { - let ctx = TestContext { - local_id: AuthorityId(i), - proposal: Mutex::new(i), - current_round: Arc::new(AtomicUsize::new(0)), - evaluated: Mutex::new(BTreeSet::new()), - node_count, - }; - - let sleep_duration = base_sleep * i as u32; - - Delay::new(now + sleep_duration).map_err(|_| Error).and_then(move |_| { - agree( - ctx, - node_count, - max_faulty, - rx.map_err(|_| Error), - tx.sink_map_err(|_| Error).with(move |t| Ok((i, t))), - ) - }) - }) - .collect::>(); - - ::futures::future::join_all(nodes) - .deadline(now + Duration::from_millis(750)) - }).unwrap(); - - for result in &results { - assert_eq!(&result.justification.digest, &results[0].justification.digest); - } -} diff --git a/substrate/bft/src/lib.rs b/substrate/bft/src/lib.rs index 4a71e79dbe79e..2da5a79726fc3 100644 --- a/substrate/bft/src/lib.rs +++ b/substrate/bft/src/lib.rs @@ -31,7 +31,6 @@ //! set for this block height. pub mod error; -pub mod generic; extern crate substrate_codec as codec; extern crate substrate_primitives as primitives; @@ -40,6 +39,7 @@ extern crate substrate_runtime_primitives as runtime_primitives; extern crate ed25519; extern crate tokio; extern crate parking_lot; +extern crate rhododendron; #[macro_use] extern crate log; @@ -66,15 +66,15 @@ use futures::sync::oneshot; use tokio::timer::Delay; use parking_lot::Mutex; -pub use generic::InputStreamConcluded; +pub use rhododendron::InputStreamConcluded; pub use error::{Error, ErrorKind}; /// Messages over the proposal. /// Each message carries an associated round number. -pub type Message = generic::Message::Hash>; +pub type Message = rhododendron::Message::Hash>; /// Localized message type. -pub type LocalizedMessage = generic::LocalizedMessage< +pub type LocalizedMessage = rhododendron::LocalizedMessage< B, ::Hash, AuthorityId, @@ -82,45 +82,62 @@ pub type LocalizedMessage = generic::LocalizedMessage< >; /// Justification of some hash. -pub type Justification = generic::Justification; +pub type Justification = rhododendron::Justification; /// Justification of a prepare message. -pub type PrepareJustification = generic::PrepareJustification; +pub type PrepareJustification = rhododendron::PrepareJustification; /// Unchecked justification. -pub type UncheckedJustification = generic::UncheckedJustification; +pub struct UncheckedJustification(rhododendron::UncheckedJustification); + +impl UncheckedJustification { + /// Create a new, unchecked justification. + pub fn new(digest: H, signatures: Vec, round_number: usize) -> Self { + UncheckedJustification(rhododendron::UncheckedJustification { + digest, + signatures, + round_number, + }) + } +} + +impl From> for UncheckedJustification { + fn from(inner: rhododendron::UncheckedJustification) -> Self { + UncheckedJustification(inner) + } +} impl From> for UncheckedJustification { fn from(just: PrimitiveJustification) -> Self { - UncheckedJustification { + UncheckedJustification(rhododendron::UncheckedJustification { round_number: just.round_number as usize, digest: just.hash, signatures: just.signatures.into_iter().map(|(from, sig)| LocalizedSignature { signer: from.into(), signature: sig, }).collect(), - } + }) } } impl Into> for UncheckedJustification { fn into(self) -> PrimitiveJustification { PrimitiveJustification { - round_number: self.round_number as u32, - hash: self.digest, - signatures: self.signatures.into_iter().map(|s| (s.signer.into(), s.signature)).collect(), + round_number: self.0.round_number as u32, + hash: self.0.digest, + signatures: self.0.signatures.into_iter().map(|s| (s.signer.into(), s.signature)).collect(), } } } /// Result of a committed round of BFT -pub type Committed = generic::Committed::Hash, LocalizedSignature>; +pub type Committed = rhododendron::Committed::Hash, LocalizedSignature>; /// Communication between BFT participants. -pub type Communication = generic::Communication::Hash, AuthorityId, LocalizedSignature>; +pub type Communication = rhododendron::Communication::Hash, AuthorityId, LocalizedSignature>; /// Misbehavior observed from BFT participants. -pub type Misbehavior = generic::Misbehavior; +pub type Misbehavior = rhododendron::Misbehavior; /// Environment producer for a BFT instance. Creates proposer instance and communication streams. pub trait Environment { @@ -187,7 +204,7 @@ struct BftInstance { proposer: P, } -impl> generic::Context for BftInstance +impl> rhododendron::Context for BftInstance where B: Clone + Eq, B::Hash: ::std::hash::Hash, @@ -250,7 +267,7 @@ pub struct BftFuture where InStream: Stream, Error=P::Error>, OutSink: Sink, SinkError=P::Error>, { - inner: generic::Agreement, InStream, OutSink>, + inner: rhododendron::Agreement, InStream, OutSink>, cancel: Arc, send_task: Option>, import: Arc, @@ -411,7 +428,7 @@ impl BftService authorities: authorities, }; - let agreement = generic::agree( + let agreement = rhododendron::agree( bft_instance, n, max_faulty, @@ -467,7 +484,7 @@ fn check_justification_signed_message(authorities: &[AuthorityId], message: & -> Result, UncheckedJustification> { // TODO: return additional error information. - just.check(authorities.len() - max_faulty_of(authorities.len()), |_, _, sig| { + just.0.check(authorities.len() - max_faulty_of(authorities.len()), |_, _, sig| { let auth_id = sig.signer.clone().into(); if !authorities.contains(&auth_id) { return None } @@ -476,7 +493,7 @@ fn check_justification_signed_message(authorities: &[AuthorityId], message: & } else { None } - }) + }).map_err(UncheckedJustification) } /// Check a full justification for a header hash. @@ -488,7 +505,7 @@ pub fn check_justification(authorities: &[AuthorityId], parent: B::Has { let message = Slicable::encode(&PrimitiveMessage:: { parent, - action: PrimitiveAction::Commit(just.round_number as u32, just.digest.clone()), + action: PrimitiveAction::Commit(just.0.round_number as u32, just.0.digest.clone()), }); check_justification_signed_message(authorities, &message[..], just) @@ -503,7 +520,7 @@ pub fn check_prepare_justification(authorities: &[AuthorityId], parent { let message = Slicable::encode(&PrimitiveMessage:: { parent, - action: PrimitiveAction::Prepare(just.round_number as u32, just.digest.clone()), + action: PrimitiveAction::Prepare(just.0.round_number as u32, just.0.digest.clone()), }); check_justification_signed_message(authorities, &message[..], just) @@ -514,7 +531,7 @@ pub fn check_prepare_justification(authorities: &[AuthorityId], parent pub fn check_proposal( authorities: &[AuthorityId], parent_hash: &B::Hash, - propose: &::generic::LocalizedProposal) + propose: &::rhododendron::LocalizedProposal) -> Result<(), Error> { if !authorities.contains(&propose.sender) { @@ -532,7 +549,7 @@ pub fn check_proposal( pub fn check_vote( authorities: &[AuthorityId], parent_hash: &B::Hash, - vote: &::generic::LocalizedVote) + vote: &::rhododendron::LocalizedVote) -> Result<(), Error> { if !authorities.contains(&vote.sender) { @@ -540,9 +557,9 @@ pub fn check_vote( } let action = match vote.vote { - ::generic::Vote::Prepare(r, ref h) => PrimitiveAction::Prepare(r as u32, h.clone()), - ::generic::Vote::Commit(r, ref h) => PrimitiveAction::Commit(r as u32, h.clone()), - ::generic::Vote::AdvanceRound(r) => PrimitiveAction::AdvanceRound(r as u32), + ::rhododendron::Vote::Prepare(r, ref h) => PrimitiveAction::Prepare(r as u32, h.clone()), + ::rhododendron::Vote::Commit(r, ref h) => PrimitiveAction::Commit(r as u32, h.clone()), + ::rhododendron::Vote::AdvanceRound(r) => PrimitiveAction::AdvanceRound(r as u32), }; check_action::(action, parent_hash, &vote.signature) } @@ -579,12 +596,12 @@ pub fn sign_message(message: Message, key: &ed25519::Pair, }; match message { - ::generic::Message::Propose(r, proposal) => { + ::rhododendron::Message::Propose(r, proposal) => { let header_hash = proposal.hash(); let action_header = PrimitiveAction::ProposeHeader(r as u32, header_hash.clone()); let action_propose = PrimitiveAction::Propose(r as u32, proposal.clone()); - ::generic::LocalizedMessage::Propose(::generic::LocalizedProposal { + ::rhododendron::LocalizedMessage::Propose(::rhododendron::LocalizedProposal { round_number: r, proposal, digest: header_hash, @@ -593,14 +610,14 @@ pub fn sign_message(message: Message, key: &ed25519::Pair, full_signature: sign_action(action_propose), }) } - ::generic::Message::Vote(vote) => { + ::rhododendron::Message::Vote(vote) => { let action = match vote { - ::generic::Vote::Prepare(r, ref h) => PrimitiveAction::Prepare(r as u32, h.clone()), - ::generic::Vote::Commit(r, ref h) => PrimitiveAction::Commit(r as u32, h.clone()), - ::generic::Vote::AdvanceRound(r) => PrimitiveAction::AdvanceRound(r as u32), + ::rhododendron::Vote::Prepare(r, ref h) => PrimitiveAction::Prepare(r as u32, h.clone()), + ::rhododendron::Vote::Commit(r, ref h) => PrimitiveAction::Commit(r as u32, h.clone()), + ::rhododendron::Vote::AdvanceRound(r) => PrimitiveAction::AdvanceRound(r as u32), }; - ::generic::LocalizedMessage::Vote(::generic::LocalizedVote { + ::rhododendron::LocalizedMessage::Vote(::rhododendron::LocalizedVote { vote: vote, sender: signer.clone().into(), signature: sign_action(action), @@ -708,9 +725,9 @@ mod tests { } } - fn sign_vote(vote: ::generic::Vote, key: &ed25519::Pair, parent_hash: H256) -> LocalizedSignature { + fn sign_vote(vote: ::rhododendron::Vote, key: &ed25519::Pair, parent_hash: H256) -> LocalizedSignature { match sign_message::(vote.into(), key, parent_hash) { - ::generic::LocalizedMessage::Vote(vote) => vote.signature, + ::rhododendron::LocalizedMessage::Vote(vote) => vote.signature, _ => panic!("signing vote leads to signed vote"), } } @@ -792,45 +809,45 @@ mod tests { Keyring::Eve.into(), ]; - let unchecked = UncheckedJustification { + let unchecked = UncheckedJustification(rhododendron::UncheckedJustification { digest: hash, round_number: 1, signatures: authorities_keys.iter().take(3).map(|key| { - sign_vote(generic::Vote::Commit(1, hash).into(), key, parent_hash) + sign_vote(rhododendron::Vote::Commit(1, hash).into(), key, parent_hash) }).collect(), - }; + }); assert!(check_justification::(&authorities, parent_hash, unchecked).is_ok()); - let unchecked = UncheckedJustification { + let unchecked = UncheckedJustification(rhododendron::UncheckedJustification { digest: hash, round_number: 0, // wrong round number (vs. the signatures) signatures: authorities_keys.iter().take(3).map(|key| { - sign_vote(generic::Vote::Commit(1, hash).into(), key, parent_hash) + sign_vote(rhododendron::Vote::Commit(1, hash).into(), key, parent_hash) }).collect(), - }; + }); assert!(check_justification::(&authorities, parent_hash, unchecked).is_err()); // not enough signatures. - let unchecked = UncheckedJustification { + let unchecked = UncheckedJustification(rhododendron::UncheckedJustification { digest: hash, round_number: 1, signatures: authorities_keys.iter().take(2).map(|key| { - sign_vote(generic::Vote::Commit(1, hash).into(), key, parent_hash) + sign_vote(rhododendron::Vote::Commit(1, hash).into(), key, parent_hash) }).collect(), - }; + }); assert!(check_justification::(&authorities, parent_hash, unchecked).is_err()); // wrong hash. - let unchecked = UncheckedJustification { + let unchecked = UncheckedJustification(rhododendron::UncheckedJustification { digest: [0xfe; 32].into(), round_number: 1, signatures: authorities_keys.iter().take(3).map(|key| { - sign_vote(generic::Vote::Commit(1, hash).into(), key, parent_hash) + sign_vote(rhododendron::Vote::Commit(1, hash).into(), key, parent_hash) }).collect(), - }; + }); assert!(check_justification::(&authorities, parent_hash, unchecked).is_err()); } @@ -849,8 +866,8 @@ mod tests { extrinsics: Default::default() }; - let proposal = sign_message(::generic::Message::Propose(1, block.clone()), &Keyring::Alice.pair(), parent_hash);; - if let ::generic::LocalizedMessage::Propose(proposal) = proposal { + let proposal = sign_message(::rhododendron::Message::Propose(1, block.clone()), &Keyring::Alice.pair(), parent_hash);; + if let ::rhododendron::LocalizedMessage::Propose(proposal) = proposal { assert!(check_proposal(&authorities, &parent_hash, &proposal).is_ok()); let mut invalid_round = proposal.clone(); invalid_round.round_number = 0; @@ -863,8 +880,8 @@ mod tests { } // Not an authority - let proposal = sign_message::(::generic::Message::Propose(1, block), &Keyring::Bob.pair(), parent_hash);; - if let ::generic::LocalizedMessage::Propose(proposal) = proposal { + let proposal = sign_message::(::rhododendron::Message::Propose(1, block), &Keyring::Bob.pair(), parent_hash);; + if let ::rhododendron::LocalizedMessage::Propose(proposal) = proposal { assert!(check_proposal(&authorities, &parent_hash, &proposal).is_err()); } else { assert!(false); @@ -881,8 +898,8 @@ mod tests { Keyring::Eve.to_raw_public().into(), ]; - let vote = sign_message::(::generic::Message::Vote(::generic::Vote::Prepare(1, hash)), &Keyring::Alice.pair(), parent_hash);; - if let ::generic::LocalizedMessage::Vote(vote) = vote { + let vote = sign_message::(::rhododendron::Message::Vote(::rhododendron::Vote::Prepare(1, hash)), &Keyring::Alice.pair(), parent_hash);; + if let ::rhododendron::LocalizedMessage::Vote(vote) = vote { assert!(check_vote::(&authorities, &parent_hash, &vote).is_ok()); let mut invalid_sender = vote.clone(); invalid_sender.signature.signer = Keyring::Eve.into(); @@ -892,8 +909,8 @@ mod tests { } // Not an authority - let vote = sign_message::(::generic::Message::Vote(::generic::Vote::Prepare(1, hash)), &Keyring::Bob.pair(), parent_hash);; - if let ::generic::LocalizedMessage::Vote(vote) = vote { + let vote = sign_message::(::rhododendron::Message::Vote(::rhododendron::Vote::Prepare(1, hash)), &Keyring::Bob.pair(), parent_hash);; + if let ::rhododendron::LocalizedMessage::Vote(vote) = vote { assert!(check_vote::(&authorities, &parent_hash, &vote).is_err()); } else { assert!(false); diff --git a/substrate/client/src/client.rs b/substrate/client/src/client.rs index 1438e66ead415..b4c28e634724e 100644 --- a/substrate/client/src/client.rs +++ b/substrate/client/src/client.rs @@ -335,7 +335,8 @@ impl Client where let is_new_best = header.number() == &(self.backend.blockchain().info()?.best_number + One::one()); trace!("Imported {}, (#{}), best={}, origin={:?}", hash, header.number(), is_new_best, origin); - transaction.set_block_data(header.clone(), body, Some(justification.uncheck().into()), is_new_best)?; + let unchecked: bft::UncheckedJustification<_> = justification.uncheck().into(); + transaction.set_block_data(header.clone(), body, Some(unchecked.into()), is_new_best)?; if let Some(storage_update) = storage_update { transaction.update_storage(storage_update)?; } diff --git a/substrate/misbehavior-check/Cargo.toml b/substrate/misbehavior-check/Cargo.toml index 3ab615b2d411a..c51e705369564 100644 --- a/substrate/misbehavior-check/Cargo.toml +++ b/substrate/misbehavior-check/Cargo.toml @@ -11,6 +11,7 @@ substrate-runtime-io = { path = "../runtime-io", default-features = false } [dev-dependencies] substrate-bft = { path = "../bft" } +rhododendron = "0.2" substrate-keyring = { path = "../keyring" } [features] diff --git a/substrate/misbehavior-check/src/lib.rs b/substrate/misbehavior-check/src/lib.rs index 634fd5f2b63c9..fc1e62aa40ba4 100644 --- a/substrate/misbehavior-check/src/lib.rs +++ b/substrate/misbehavior-check/src/lib.rs @@ -27,6 +27,8 @@ extern crate substrate_runtime_primitives as runtime_primitives; extern crate substrate_bft; #[cfg(test)] extern crate substrate_keyring as keyring; +#[cfg(test)] +extern crate rhododendron; use codec::Slicable; use primitives::{AuthorityId, Signature}; @@ -85,7 +87,6 @@ pub fn evaluate_misbehavior( mod tests { use super::*; - use substrate_bft::generic; use keyring::ed25519; use keyring::Keyring; @@ -95,26 +96,26 @@ mod tests { fn sign_prepare(key: &ed25519::Pair, round: u32, hash: H256, parent_hash: H256) -> (H256, Signature) { let msg = substrate_bft::sign_message::( - generic::Message::Vote(generic::Vote::Prepare(round as _, hash)), + rhododendron::Message::Vote(rhododendron::Vote::Prepare(round as _, hash)), key, parent_hash ); match msg { - generic::LocalizedMessage::Vote(vote) => (hash, vote.signature.signature), + rhododendron::LocalizedMessage::Vote(vote) => (hash, vote.signature.signature), _ => panic!("signing vote leads to signed vote"), } } fn sign_commit(key: &ed25519::Pair, round: u32, hash: H256, parent_hash: H256) -> (H256, Signature) { let msg = substrate_bft::sign_message::( - generic::Message::Vote(generic::Vote::Commit(round as _, hash)), + rhododendron::Message::Vote(rhododendron::Vote::Commit(round as _, hash)), key, parent_hash ); match msg { - generic::LocalizedMessage::Vote(vote) => (hash, vote.signature.signature), + rhododendron::LocalizedMessage::Vote(vote) => (hash, vote.signature.signature), _ => panic!("signing vote leads to signed vote"), } } diff --git a/substrate/test-client/Cargo.toml b/substrate/test-client/Cargo.toml index bec520419f48e..4897dae974704 100644 --- a/substrate/test-client/Cargo.toml +++ b/substrate/test-client/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" authors = ["Parity Technologies "] [dependencies] +rhododendron = "0.2" substrate-bft = { path = "../bft" } substrate-client = { path = "../client" } substrate-codec = { path = "../codec" } diff --git a/substrate/test-client/src/client_ext.rs b/substrate/test-client/src/client_ext.rs index 8d1fa946ac978..2fd26f5e67a43 100644 --- a/substrate/test-client/src/client_ext.rs +++ b/substrate/test-client/src/client_ext.rs @@ -68,22 +68,22 @@ fn fake_justify(header: &runtime::Header) -> bft::UncheckedJustification( - bft::generic::Vote::Commit(1, hash).into(), + ::rhododendron::Vote::Commit(1, hash).into(), key, header.parent_hash ); match msg { - bft::generic::LocalizedMessage::Vote(vote) => vote.signature, + ::rhododendron::LocalizedMessage::Vote(vote) => vote.signature, _ => panic!("signing vote leads to signed vote"), } }).collect(), - round_number: 1, - } + 1, + ) } fn genesis_config() -> GenesisConfig { diff --git a/substrate/test-client/src/lib.rs b/substrate/test-client/src/lib.rs index fed17e46cda72..267581bc8f92a 100644 --- a/substrate/test-client/src/lib.rs +++ b/substrate/test-client/src/lib.rs @@ -18,6 +18,7 @@ #![warn(missing_docs)] +extern crate rhododendron; extern crate substrate_bft as bft; extern crate substrate_codec as codec; extern crate substrate_keyring as keyring;