Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
push cancel to BFTfuture rather than waiting for task
Browse files Browse the repository at this point in the history
  • Loading branch information
rphmeier committed Aug 28, 2018
1 parent 623aab4 commit 6c6cae1
Showing 1 changed file with 33 additions and 44 deletions.
77 changes: 33 additions & 44 deletions substrate/bft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ extern crate rhododendron;
#[macro_use]
extern crate log;

#[macro_use]
extern crate futures;

#[macro_use]
Expand All @@ -64,7 +63,7 @@ use runtime_primitives::traits::{Block, Header};
use runtime_primitives::bft::{Message as PrimitiveMessage, Action as PrimitiveAction, Justification as PrimitiveJustification};
use primitives::AuthorityId;

use futures::{task, Async, Stream, Sink, Future, IntoFuture};
use futures::{Async, Stream, Sink, Future, IntoFuture};
use futures::sync::oneshot;
use tokio::timer::Delay;
use parking_lot::Mutex;
Expand All @@ -75,9 +74,8 @@ pub use error::{Error, ErrorKind};
// statuses for an agreement
mod status {
pub const LIVE: usize = 0;
pub const CANCELED: usize = 1;
pub const BAD: usize = 2;
pub const GOOD: usize = 3;
pub const BAD: usize = 1;
pub const GOOD: usize = 2;
}

/// Messages over the proposal.
Expand Down Expand Up @@ -344,7 +342,7 @@ pub struct BftFuture<B, P, I, InStream, OutSink> where
{
inner: rhododendron::Agreement<BftInstance<B, P>, InStream, OutSink>,
status: Arc<AtomicUsize>,
send_task: Option<oneshot::Sender<Option<task::Task>>>,
cancel: oneshot::Receiver<()>,
import: Arc<I>,
}

Expand All @@ -361,18 +359,19 @@ impl<B, P, I, InStream, OutSink> Future for BftFuture<B, P, I, InStream, OutSink
type Error = ();

fn poll(&mut self) -> ::futures::Poll<(), ()> {
// send the task to the bft service so this can be cancelled.
if let Some(sender) = self.send_task.take() {
let _ = sender.send(Some(task::current()));
}

// service has canceled the future. bail
if self.status.load(Ordering::Acquire) == status::CANCELED {
return Ok(Async::Ready(()))
}
let cancel = match self.cancel.poll() {
Ok(Async::Ready(())) | Err(_) => true,
Ok(Async::NotReady) => false,
};

// TODO: handle and log this error in a way which isn't noisy on exit.
let committed = try_ready!(self.inner.poll().map_err(|_| ()));
let committed = match self.inner.poll().map_err(|_| ()) {
Ok(Async::Ready(x)) => x,
Ok(Async::NotReady) =>
return Ok(if cancel { Async::Ready(()) } else { Async::NotReady }),
Err(()) => return Err(()),
};

// if something was committed, the round leader must have proposed.
self.inner.context().proposer.on_round_end(committed.round_number, true);
Expand All @@ -397,6 +396,9 @@ impl<B, P, I, InStream, OutSink> Future for BftFuture<B, P, I, InStream, OutSink
} else {
self.status.store(status::GOOD, Ordering::Release);
}
} else {
// assume good unless we received the proposal.
self.status.store(status::GOOD, Ordering::Release);
}

self.inner.context().update_round_cache(committed.round_number);
Expand All @@ -413,10 +415,6 @@ impl<B, P, I, InStream, OutSink> Drop for BftFuture<B, P, I, InStream, OutSink>
OutSink: Sink<SinkItem=Communication<B>, SinkError=P::Error>,
{
fn drop(&mut self) {
if let Some(sender) = self.send_task.take() {
let _ = sender.send(None);
}

// TODO: have a trait member to pass misbehavior reports into.
let misbehavior = self.inner.drain_misbehavior().collect::<Vec<_>>();
self.inner.context().proposer.import_misbehavior(misbehavior);
Expand All @@ -425,7 +423,7 @@ impl<B, P, I, InStream, OutSink> Drop for BftFuture<B, P, I, InStream, OutSink>

struct AgreementHandle {
status: Arc<AtomicUsize>,
task: Option<oneshot::Receiver<Option<task::Task>>>,
send_cancel: Option<oneshot::Sender<()>>,
}

impl AgreementHandle {
Expand All @@ -436,15 +434,8 @@ impl AgreementHandle {

impl Drop for AgreementHandle {
fn drop(&mut self) {
let task = match self.task.take() {
Some(t) => t,
None => return,
};

// if this fails, the task is definitely not live anyway.
if let Ok(Some(task)) = task.wait() {
self.status.compare_and_swap(status::LIVE, status::CANCELED, Ordering::SeqCst);
task.notify();
if let Some(sender) = self.send_cancel.take() {
let _ = sender.send(());
}
}
}
Expand Down Expand Up @@ -557,8 +548,11 @@ impl<B, P, I> BftService<B, P, I>
trace!(target: "bft", "Round cache: {:?}", &*cache);
if cache.hash.as_ref() == Some(&hash) {
trace!(target: "bft", "Fast-forwarding to round {}", cache.start_round);
agreement.fast_forward(cache.start_round);
let start_round = cache.start_round;
cache.start_round += 1;

drop(cache);
agreement.fast_forward(start_round);
} else {
*cache = RoundCache {
hash: Some(hash.clone()),
Expand All @@ -572,14 +566,14 @@ impl<B, P, I> BftService<B, P, I>

// cancel current agreement.
*live_agreement = Some((hash, AgreementHandle {
task: Some(rx),
send_cancel: Some(tx),
status: status.clone(),
}));

Ok(Some(BftFuture {
inner: agreement,
status: status,
send_task: Some(tx),
cancel: rx,
import: self.client.clone(),
}))
}
Expand Down Expand Up @@ -774,7 +768,6 @@ mod tests {
use runtime_primitives::testing::{Block as GenericTestBlock, Header as TestHeader};
use primitives::H256;
use self::keyring::Keyring;
use tokio::executor::current_thread;

extern crate substrate_keyring as keyring;

Expand Down Expand Up @@ -917,21 +910,18 @@ mod tests {
second.parent_hash = first_hash;
let second_hash = second.hash();

let bft = service.build_upon(&first).unwrap();
let mut first_bft = service.build_upon(&first).unwrap().unwrap();
assert!(service.live_agreement.lock().as_ref().unwrap().0 == first_hash);

let mut core = current_thread::CurrentThread::new();

// turn the core so the future gets polled and sends its task to the
// service. otherwise it deadlocks.
core.spawn(bft.unwrap());
core.run_timeout(::std::time::Duration::from_millis(100)).unwrap();
let bft = service.build_upon(&second).unwrap();
let _second_bft = service.build_upon(&second).unwrap();
assert!(service.live_agreement.lock().as_ref().unwrap().0 != first_hash);
assert!(service.live_agreement.lock().as_ref().unwrap().0 == second_hash);

core.spawn(bft.unwrap());
core.run_timeout(::std::time::Duration::from_millis(100)).unwrap();
// first_bft has been cancelled. need to swap out so we can check it.
let (_tx, mut rx) = oneshot::channel();
::std::mem::swap(&mut rx, &mut first_bft.cancel);

assert!(rx.wait().is_ok());
}

#[test]
Expand Down Expand Up @@ -1090,7 +1080,6 @@ mod tests {

let mut second = from_block_number(3);
second.parent_hash = first_hash;
let second_hash = second.hash();

let _ = service.build_upon(&first).unwrap();
assert!(service.live_agreement.lock().as_ref().unwrap().0 == first_hash);
Expand Down

0 comments on commit 6c6cae1

Please sign in to comment.