diff --git a/Cargo.lock b/Cargo.lock index 6fafdb2b69d3c..8b04d38ccd992 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1918,7 +1918,7 @@ dependencies = [ "lazy_static 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.41 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)", - "rand 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1992,7 +1992,7 @@ dependencies = [ [[package]] name = "rhododendron" -version = "0.3.2" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2345,7 +2345,7 @@ dependencies = [ "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", - "rhododendron 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "rhododendron 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "substrate-codec 0.1.0", "substrate-executor 0.1.0", "substrate-keyring 0.1.0", @@ -2529,7 +2529,7 @@ dependencies = [ name = "substrate-misbehavior-check" version = "0.1.0" dependencies = [ - "rhododendron 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "rhododendron 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "substrate-bft 0.1.0", "substrate-codec 0.1.0", "substrate-keyring 0.1.0", @@ -3004,7 +3004,7 @@ name = "substrate-test-client" version = "0.1.0" dependencies = [ "hashdb 0.2.1 (git+https://github.com/paritytech/parity-common)", - "rhododendron 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "rhododendron 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "substrate-bft 0.1.0", "substrate-client 0.1.0", "substrate-codec 0.1.0", @@ -3899,7 +3899,7 @@ dependencies = [ "checksum regex-syntax 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "747ba3b235651f6e2f67dfa8bcdcd073ddb7c243cb21c442fc12395dfcac212d" "checksum relay 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1576e382688d7e9deecea24417e350d3062d97e32e45d70b1cde65994ff1489a" "checksum remove_dir_all 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3488ba1b9a2084d38645c4c08276a1752dcbf2c7130d74f1569681ad5d2799c5" -"checksum rhododendron 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "289a6395497f70b8076bf5b9c223e1dc5c0a77619d0a75124f7d4c728d09d2d8" +"checksum rhododendron 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "e20523445e693f394c0e487113ae656071311c9ee4c1e914441bece8c929b21d" "checksum ring 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)" = "6f7d28b30a72c01b458428e0ae988d4149c20d902346902be881e3edc4bb325c" "checksum rlp 0.2.1 (git+https://github.com/paritytech/parity-common)" = "" "checksum rlp 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "89db7f8dfdd5eb7ab3ac3ece7a07fd273a680b4b224cb231181280e8996f9f0b" diff --git a/substrate/bft/src/lib.rs b/substrate/bft/src/lib.rs index 882ccdffee405..5103cedc3dd3d 100644 --- a/substrate/bft/src/lib.rs +++ b/substrate/bft/src/lib.rs @@ -47,15 +47,13 @@ extern crate rhododendron; #[macro_use] extern crate log; -#[macro_use] extern crate futures; #[macro_use] extern crate error_chain; -use std::mem; use std::sync::Arc; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::{Instant, Duration}; use codec::Encode; @@ -65,7 +63,7 @@ use runtime_primitives::traits::{Block, Header}; use runtime_primitives::bft::{Message as PrimitiveMessage, Action as PrimitiveAction, Justification as PrimitiveJustification}; use primitives::AuthorityId; -use futures::{task, Async, Stream, Sink, Future, IntoFuture}; +use futures::{Async, Stream, Sink, Future, IntoFuture}; use futures::sync::oneshot; use tokio::timer::Delay; use parking_lot::Mutex; @@ -73,6 +71,13 @@ use parking_lot::Mutex; pub use rhododendron::{InputStreamConcluded, AdvanceRoundReason}; pub use error::{Error, ErrorKind}; +// statuses for an agreement +mod status { + pub const LIVE: usize = 0; + pub const BAD: usize = 1; + pub const GOOD: usize = 2; +} + /// Messages over the proposal. /// Each message carries an associated round number. pub type Message = rhododendron::Message::Hash>; @@ -193,7 +198,7 @@ pub trait Proposer { /// Block import trait. pub trait BlockImport { /// Import a block alongside its corresponding justification. - fn import_block(&self, block: B, justification: Justification, authorities: &[AuthorityId]); + fn import_block(&self, block: B, justification: Justification, authorities: &[AuthorityId]) -> bool; } /// Trait for getting the authorities at a given block. @@ -336,8 +341,8 @@ pub struct BftFuture where OutSink: Sink, SinkError=P::Error>, { inner: rhododendron::Agreement, InStream, OutSink>, - cancel: Arc, - send_task: Option>, + status: Arc, + cancel: oneshot::Receiver<()>, import: Arc, } @@ -354,18 +359,19 @@ impl Future for BftFuture ::futures::Poll<(), ()> { - // send the task to the bft service so this can be cancelled. - if let Some(sender) = self.send_task.take() { - let _ = sender.send(task::current()); - } - // service has canceled the future. bail - if self.cancel.load(Ordering::Acquire) { - return Ok(Async::Ready(())) - } + let cancel = match self.cancel.poll() { + Ok(Async::Ready(())) | Err(_) => true, + Ok(Async::NotReady) => false, + }; // TODO: handle and log this error in a way which isn't noisy on exit. - let committed = try_ready!(self.inner.poll().map_err(|_| ())); + let committed = match self.inner.poll().map_err(|_| ()) { + Ok(Async::Ready(x)) => x, + Ok(Async::NotReady) => + return Ok(if cancel { Async::Ready(()) } else { Async::NotReady }), + Err(()) => return Err(()), + }; // if something was committed, the round leader must have proposed. self.inner.context().proposer.on_round_end(committed.round_number, true); @@ -373,11 +379,26 @@ impl Future for BftFuture Drop for BftFuture // TODO: have a trait member to pass misbehavior reports into. let misbehavior = self.inner.drain_misbehavior().collect::>(); self.inner.context().proposer.import_misbehavior(misbehavior); - self.cancel.store(true, Ordering::Release); } } struct AgreementHandle { - cancel: Arc, - task: Option>, + status: Arc, + send_cancel: Option>, } impl AgreementHandle { - fn is_live(&self) -> bool { - !self.cancel.load(Ordering::Acquire) + fn status(&self) -> usize { + self.status.load(Ordering::Acquire) } } impl Drop for AgreementHandle { fn drop(&mut self) { - let task = match self.task.take() { - Some(t) => t, - None => return, - }; - - // if this fails, the task is definitely not live anyway. - if let Ok(task) = task.wait() { - self.cancel.store(true, Ordering::Release); - task.notify(); + if let Some(sender) = self.send_cancel.take() { + let _ = sender.send(()); } } } @@ -486,7 +499,12 @@ impl BftService where { let hash = header.hash(); - if self.last_agreement().map_or(false, |last| last.parent_hash == hash) { + + let mut live_agreement = self.live_agreement.lock(); + let can_build = live_agreement.as_ref() + .map_or(true, |x| self.can_build_on_inner(header, x)); + + if !can_build { return Ok(None) } @@ -494,14 +512,15 @@ impl BftService let n = authorities.len(); let max_faulty = max_faulty_of(n); + trace!(target: "bft", "Initiating agreement on top of #{}, {:?}", header.number(), hash); trace!(target: "bft", "max_faulty_of({})={}", n, max_faulty); let local_id = self.local_id(); if !authorities.contains(&local_id) { // cancel current agreement - self.live_agreement.lock().take(); - Err(From::from(ErrorKind::InvalidAuthority(local_id)))?; + live_agreement.take(); + Err(ErrorKind::InvalidAuthority(local_id).into())?; } let (proposer, input, output) = self.factory.init(header, &authorities, self.key.clone())?; @@ -529,8 +548,11 @@ impl BftService trace!(target: "bft", "Round cache: {:?}", &*cache); if cache.hash.as_ref() == Some(&hash) { trace!(target: "bft", "Fast-forwarding to round {}", cache.start_round); - agreement.fast_forward(cache.start_round); + let start_round = cache.start_round; cache.start_round += 1; + + drop(cache); + agreement.fast_forward(start_round); } else { *cache = RoundCache { hash: Some(hash.clone()), @@ -539,22 +561,19 @@ impl BftService } } - let cancel = Arc::new(AtomicBool::new(false)); + let status = Arc::new(AtomicUsize::new(status::LIVE)); let (tx, rx) = oneshot::channel(); // cancel current agreement. - // defers drop of live to the end. - let _preempted_consensus = { - mem::replace(&mut *self.live_agreement.lock(), Some((hash, AgreementHandle { - task: Some(rx), - cancel: cancel.clone(), - }))) - }; + *live_agreement = Some((hash, AgreementHandle { + send_cancel: Some(tx), + status: status.clone(), + })); Ok(Some(BftFuture { inner: agreement, - cancel: cancel, - send_task: Some(tx), + status: status, + cancel: rx, import: self.client.clone(), })) } @@ -564,21 +583,24 @@ impl BftService self.live_agreement.lock().take(); } - /// Get current agreement parent hash if any. - pub fn last_agreement(&self) -> Option> { - self.live_agreement.lock() - .as_ref() - .map(|&(ref h, ref handle)| LastAgreement { parent_hash: h.clone(), live: handle.is_live() }) + /// Whether we can build using the given header. + pub fn can_build_on(&self, header: &B::Header) -> bool { + self.live_agreement.lock().as_ref() + .map_or(true, |x| self.can_build_on_inner(header, x)) } -} + /// Get a reference to the underyling client. + pub fn client(&self) -> &I { &*self.client } -/// Struct representing the last agreement the service has processed. -pub struct LastAgreement { - /// The parent hash that agreement was building on. - pub parent_hash: H, - /// Whether that agreement was live. - pub live: bool, + fn can_build_on_inner(&self, header: &B::Header, live: &(B::Hash, AgreementHandle)) -> bool { + let hash = header.hash(); + let &(ref live_hash, ref handle) = live; + match handle.status() { + _ if *header.parent_hash() == *live_hash => true, // can always follow with next block. + status::BAD => hash == *live_hash, // bad block can be re-agreed on. + _ => false, // canceled won't appear since we overwrite the handle before returning. + } + } } /// Given a total number of authorities, yield the maximum faulty that would be allowed. @@ -746,7 +768,6 @@ mod tests { use runtime_primitives::testing::{Block as GenericTestBlock, Header as TestHeader}; use primitives::H256; use self::keyring::Keyring; - use tokio::executor::current_thread; extern crate substrate_keyring as keyring; @@ -758,8 +779,9 @@ mod tests { } impl BlockImport for FakeClient { - fn import_block(&self, block: TestBlock, _justification: Justification, _authorities: &[AuthorityId]) { - assert!(self.imported_heights.lock().insert(block.header.number)) + fn import_block(&self, block: TestBlock, _justification: Justification, _authorities: &[AuthorityId]) -> bool { + assert!(self.imported_heights.lock().insert(block.header.number)); + true } } @@ -888,21 +910,18 @@ mod tests { second.parent_hash = first_hash; let second_hash = second.hash(); - let bft = service.build_upon(&first).unwrap(); + let mut first_bft = service.build_upon(&first).unwrap().unwrap(); assert!(service.live_agreement.lock().as_ref().unwrap().0 == first_hash); - let mut core = current_thread::CurrentThread::new(); - - // turn the core so the future gets polled and sends its task to the - // service. otherwise it deadlocks. - core.spawn(bft.unwrap()); - core.run_timeout(::std::time::Duration::from_millis(100)).unwrap(); - let bft = service.build_upon(&second).unwrap(); + let _second_bft = service.build_upon(&second).unwrap(); assert!(service.live_agreement.lock().as_ref().unwrap().0 != first_hash); assert!(service.live_agreement.lock().as_ref().unwrap().0 == second_hash); - core.spawn(bft.unwrap()); - core.run_timeout(::std::time::Duration::from_millis(100)).unwrap(); + // first_bft has been cancelled. need to swap out so we can check it. + let (_tx, mut rx) = oneshot::channel(); + ::std::mem::swap(&mut rx, &mut first_bft.cancel); + + assert!(rx.wait().is_ok()); } #[test] @@ -1041,4 +1060,29 @@ mod tests { assert!(false); } } + + #[test] + fn drop_bft_future_does_not_deadlock() { + let client = FakeClient { + authorities: vec![ + Keyring::One.to_raw_public().into(), + Keyring::Two.to_raw_public().into(), + Keyring::Alice.to_raw_public().into(), + Keyring::Eve.to_raw_public().into(), + ], + imported_heights: Mutex::new(HashSet::new()), + }; + + let service = make_service(client); + + let first = from_block_number(2); + let first_hash = first.hash(); + + let mut second = from_block_number(3); + second.parent_hash = first_hash; + + let _ = service.build_upon(&first).unwrap(); + assert!(service.live_agreement.lock().as_ref().unwrap().0 == first_hash); + service.live_agreement.lock().take(); + } } diff --git a/substrate/client/src/client.rs b/substrate/client/src/client.rs index 2bd8e9f8a6863..2bd53d1a881ff 100644 --- a/substrate/client/src/client.rs +++ b/substrate/client/src/client.rs @@ -518,7 +518,7 @@ impl bft::BlockImport for Client block: Block, justification: ::bft::Justification, authorities: &[AuthorityId] - ) { + ) -> bool { let (header, extrinsics) = block.deconstruct(); let justified_header = JustifiedHeader { header: header, @@ -526,7 +526,7 @@ impl bft::BlockImport for Client authorities: authorities.to_vec(), }; - let _ = self.import_block(BlockOrigin::ConsensusBroadcast, justified_header, Some(extrinsics)); + self.import_block(BlockOrigin::ConsensusBroadcast, justified_header, Some(extrinsics)).is_ok() } }