Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

handle exit and avoid threads hanging #137

Merged
merged 6 commits into from
Apr 18, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 9 additions & 5 deletions polkadot/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,15 @@ pub fn run<I, T>(args: I) -> error::Result<()> where

informant::start(&service, core.handle());

let (exit_send, exit) = mpsc::channel(1);
ctrlc::CtrlC::set_handler(move || {
exit_send.clone().send(()).wait().expect("Error sending exit notification");
});
core.run(exit.into_future()).expect("Error running informant event loop");
{
// can't use signal directly here because CtrlC takes only `Fn`.
let (exit_send, exit) = mpsc::channel(1);
ctrlc::CtrlC::set_handler(move || {
exit_send.clone().send(()).wait().expect("Error sending exit notification");
});
core.run(exit.into_future()).expect("Error running informant event loop");
}

Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions polkadot/consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ tokio-core = "0.1.12"
ed25519 = { path = "../../substrate/ed25519" }
error-chain = "0.11"
log = "0.3"
exit-future = "0.1"
polkadot-api = { path = "../api" }
polkadot-collator = { path = "../collator" }
polkadot-primitives = { path = "../primitives" }
Expand Down
1 change: 1 addition & 0 deletions polkadot/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ extern crate substrate_primitives as primitives;
extern crate substrate_runtime_support as runtime_support;
extern crate substrate_network;

extern crate exit_future;
extern crate tokio_core;
extern crate substrate_keyring;
extern crate substrate_client as client;
Expand Down
73 changes: 46 additions & 27 deletions polkadot/consensus/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,11 @@ impl Service {
client: Arc<C>,
network: Arc<net::ConsensusService>,
transaction_pool: Arc<Mutex<TransactionPool>>,
key: ed25519::Pair
key: ed25519::Pair,
exit: ::exit_future::Exit,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I've mentioned in personal discussion I can't say I like this. This just introduced additional complexity to the service interface and leaks away implementation details. What happens if the service is dropped before exit is signalled? This is something the user of this interface should not worry about.

) -> Service
where C: BlockchainEvents + ChainHead + bft::BlockImport + bft::Authorities + PolkadotApi + Send + Sync + 'static
where
C: BlockchainEvents + ChainHead + bft::BlockImport + bft::Authorities + PolkadotApi + Send + Sync + 'static,
{
let thread = thread::spawn(move || {
let mut core = reactor::Core::new().expect("tokio::Core could not be created");
Expand All @@ -281,41 +283,58 @@ impl Service {
let messages = SharedMessageCollection::new();
let bft_service = Arc::new(BftService::new(client.clone(), key, factory));

let handle = core.handle();
let notifications = client.import_notification_stream().for_each(|notification| {
if notification.is_new_best {
start_bft(&notification.header, handle.clone(), &*client, network.clone(), &*bft_service, messages.clone());
}
Ok(())
});
let notifications = {
let handle = core.handle();
let network = network.clone();
let client = client.clone();
let bft_service = bft_service.clone();
let messages = messages.clone();

let interval = reactor::Interval::new_at(Instant::now() + Duration::from_millis(TIMER_DELAY_MS), Duration::from_millis(TIMER_INTERVAL_MS), &handle).unwrap();
client.import_notification_stream().for_each(move |notification| {
if notification.is_new_best {
start_bft(&notification.header, handle.clone(), &*client, network.clone(), &*bft_service, messages.clone());
}
Ok(())
})
};

let interval = reactor::Interval::new_at(
Instant::now() + Duration::from_millis(TIMER_DELAY_MS),
Duration::from_millis(TIMER_INTERVAL_MS),
&core.handle(),
).expect("it is always possible to create an interval with valid params");
let mut prev_best = match client.best_block_header() {
Ok(header) => header.blake2_256(),
Err(e) => {
warn!("Cant's start consensus service. Error reading best block header: {:?}", e);
return;
}
};
let c = client.clone();
let s = bft_service.clone();
let n = network.clone();
let m = messages.clone();
let handle = core.handle();
let timed = interval.map_err(|e| debug!("Timer error: {:?}", e)).for_each(move |_| {
if let Ok(best_block) = c.best_block_header() {
let hash = best_block.blake2_256();
m.collect_garbage();
if hash == prev_best {
debug!("Starting consensus round after a timeout");
start_bft(&best_block, handle.clone(), &*c, n.clone(), &*s, m.clone());

let timed = {
let c = client.clone();
let s = bft_service.clone();
let n = network.clone();
let m = messages.clone();
let handle = core.handle();

interval.map_err(|e| debug!("Timer error: {:?}", e)).for_each(move |_| {
if let Ok(best_block) = c.best_block_header() {
let hash = best_block.blake2_256();
m.collect_garbage();
if hash == prev_best {
debug!("Starting consensus round after a timeout");
start_bft(&best_block, handle.clone(), &*c, n.clone(), &*s, m.clone());
}
prev_best = hash;
}
prev_best = hash;
}
Ok(())
});
Ok(())
})
};

core.handle().spawn(notifications);
core.handle().spawn(timed);
if let Err(e) = core.run(notifications) {
if let Err(e) = core.run(exit) {
debug!("BFT event loop error {:?}", e);
}
});
Expand Down
1 change: 1 addition & 0 deletions polkadot/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ tokio-timer = "0.1.2"
error-chain = "0.11"
log = "0.3"
tokio-core = "0.1.12"
exit-future = "0.1"
ed25519 = { path = "../../substrate/ed25519" }
polkadot-primitives = { path = "../primitives" }
polkadot-runtime = { path = "../runtime" }
Expand Down
68 changes: 49 additions & 19 deletions polkadot/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@ extern crate polkadot_api;
extern crate polkadot_consensus as consensus;
extern crate polkadot_transaction_pool as transaction_pool;
extern crate polkadot_keystore as keystore;
extern crate substrate_client as client;
extern crate substrate_runtime_io as runtime_io;
extern crate substrate_primitives as primitives;
extern crate substrate_network as network;
extern crate substrate_codec as codec;
extern crate substrate_executor;

extern crate exit_future;
extern crate tokio_core;
extern crate substrate_client as client;

#[macro_use]
extern crate error_chain;
Expand Down Expand Up @@ -65,6 +66,7 @@ use polkadot_runtime::{GenesisConfig, ConsensusConfig, CouncilConfig, DemocracyC
use client::{genesis, BlockchainEvents};
use client::in_mem::Backend as InMemory;
use network::ManageNetwork;
use exit_future::Signal;

pub use self::error::{ErrorKind, Error};
pub use config::{Configuration, Role, ChainSpec};
Expand All @@ -77,6 +79,7 @@ pub struct Service {
client: Arc<Client>,
network: Arc<network::Service>,
transaction_pool: Arc<Mutex<TransactionPool>>,
signal: Option<Signal>,
_consensus: Option<consensus::Service>,
}

Expand Down Expand Up @@ -242,6 +245,10 @@ fn local_testnet_config() -> ChainConfig {
impl Service {
/// Creates and register protocol with the network service
pub fn new(mut config: Configuration) -> Result<Service, error::Error> {
use std::sync::Barrier;

let (signal, exit) = ::exit_future::signal();

// Create client
let executor = polkadot_executor::Executor::new();
let mut storage = Default::default();
Expand Down Expand Up @@ -284,40 +291,58 @@ impl Service {
chain: client.clone(),
transaction_pool: transaction_pool_adapter,
};

let network = network::Service::new(network_params)?;
let barrier = ::std::sync::Arc::new(Barrier::new(2));

let thread = {
let client = client.clone();
let network = network.clone();
let txpool = transaction_pool.clone();
let exit = exit.clone();

let thread_barrier = barrier.clone();
thread::spawn(move || {
network.start_network();

thread_barrier.wait();
let mut core = Core::new().expect("tokio::Core could not be created");
let events = client.import_notification_stream().for_each(move |notification| {
network.on_block_imported(notification.hash, &notification.header);
prune_imported(&*client, &*txpool, notification.hash);

Ok(())
});

core.handle().spawn(events);
if let Err(e) = core.run(exit) {
debug!("Polkadot service event loop shutdown with {:?}", e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed fro this PR, but in general i'd like to get away from anonymous messages at anything more verbose than the info! level. enabling debug and trace is a currently a mess, so doesn't really make sense to enable them without specifying a target filter.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can always filter by module. I usually use an explicit target to logically group logs coming from several modules under a single filter. If there is just a single module there's no need for that.

}
debug!("Polkadot service shutdown");
})
};

// before returning, make sure the network is started. avoids a race
// between the drop killing notification listeners and the new notification
// stream being started.
barrier.wait();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really necessary? There's no event loop blocked on the network anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's also nice to make sure the network is started before we begin the consensus process.


// Spin consensus service if configured
let consensus_service = if config.roles & Role::VALIDATOR == Role::VALIDATOR {
// Load the first available key. Code above makes sure it exisis.
let key = keystore.load(&keystore.contents()?[0], "")?;
info!("Using authority key {:?}", key.public());
Some(consensus::Service::new(client.clone(), network.clone(), transaction_pool.clone(), key))
Some(consensus::Service::new(client.clone(), network.clone(), transaction_pool.clone(), key, exit))
} else {
None
};

let thread_client = client.clone();
let thread_network = network.clone();
let thread_txpool = transaction_pool.clone();
let thread = thread::spawn(move || {
thread_network.start_network();
let mut core = Core::new().expect("tokio::Core could not be created");
let events = thread_client.import_notification_stream().for_each(|notification| {
thread_network.on_block_imported(notification.hash, &notification.header);
prune_imported(&*thread_client, &*thread_txpool, notification.hash);

Ok(())
});
if let Err(e) = core.run(events) {
debug!("Polkadot service event loop shutdown with {:?}", e);
}
debug!("Polkadot service shutdown");
});
Ok(Service {
thread: Some(thread),
client: client,
network: network,
transaction_pool: transaction_pool,
signal: Some(signal),
_consensus: consensus_service,
})
}
Expand Down Expand Up @@ -359,6 +384,11 @@ impl Drop for Service {
fn drop(&mut self) {
self.client.stop_notifications();
self.network.stop_network();

if let Some(signal) = self.signal.take() {
signal.fire();
}

if let Some(thread) = self.thread.take() {
thread.join().expect("The service thread has panicked");
}
Expand Down