This repository has been archived by the owner on Nov 15, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2.7k
handle exit and avoid threads hanging #137
Merged
Merged
Changes from 4 commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
2fc9ee6
barrier on starting network
rphmeier 6ff852f
handle exit better
rphmeier f20987b
give consensus service its own internal exit signal
rphmeier 1e6af1c
update comment
rphmeier 8dda4b2
Merge branch 'master' into rh-hanging
tomusdrw 30e9d3f
remove stop_notifications and fix build
rphmeier File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,14 +28,15 @@ extern crate polkadot_api; | |
extern crate polkadot_consensus as consensus; | ||
extern crate polkadot_transaction_pool as transaction_pool; | ||
extern crate polkadot_keystore as keystore; | ||
extern crate substrate_client as client; | ||
extern crate substrate_runtime_io as runtime_io; | ||
extern crate substrate_primitives as primitives; | ||
extern crate substrate_network as network; | ||
extern crate substrate_codec as codec; | ||
extern crate substrate_executor; | ||
|
||
extern crate exit_future; | ||
extern crate tokio_core; | ||
extern crate substrate_client as client; | ||
|
||
#[macro_use] | ||
extern crate error_chain; | ||
|
@@ -65,6 +66,7 @@ use polkadot_runtime::{GenesisConfig, ConsensusConfig, CouncilConfig, DemocracyC | |
use client::{genesis, BlockchainEvents}; | ||
use client::in_mem::Backend as InMemory; | ||
use network::ManageNetwork; | ||
use exit_future::Signal; | ||
|
||
pub use self::error::{ErrorKind, Error}; | ||
pub use config::{Configuration, Role, ChainSpec}; | ||
|
@@ -77,6 +79,7 @@ pub struct Service { | |
client: Arc<Client>, | ||
network: Arc<network::Service>, | ||
transaction_pool: Arc<Mutex<TransactionPool>>, | ||
signal: Option<Signal>, | ||
_consensus: Option<consensus::Service>, | ||
} | ||
|
||
|
@@ -242,6 +245,10 @@ fn local_testnet_config() -> ChainConfig { | |
impl Service { | ||
/// Creates and register protocol with the network service | ||
pub fn new(mut config: Configuration) -> Result<Service, error::Error> { | ||
use std::sync::Barrier; | ||
|
||
let (signal, exit) = ::exit_future::signal(); | ||
|
||
// Create client | ||
let executor = polkadot_executor::Executor::new(); | ||
let mut storage = Default::default(); | ||
|
@@ -284,7 +291,39 @@ impl Service { | |
chain: client.clone(), | ||
transaction_pool: transaction_pool_adapter, | ||
}; | ||
|
||
let network = network::Service::new(network_params)?; | ||
let barrier = ::std::sync::Arc::new(Barrier::new(2)); | ||
|
||
let thread = { | ||
let client = client.clone(); | ||
let network = network.clone(); | ||
let txpool = transaction_pool.clone(); | ||
|
||
let thread_barrier = barrier.clone(); | ||
thread::spawn(move || { | ||
network.start_network(); | ||
|
||
thread_barrier.wait(); | ||
let mut core = Core::new().expect("tokio::Core could not be created"); | ||
let events = client.import_notification_stream().for_each(move |notification| { | ||
network.on_block_imported(notification.hash, ¬ification.header); | ||
prune_imported(&*client, &*txpool, notification.hash); | ||
|
||
Ok(()) | ||
}); | ||
|
||
core.handle().spawn(events); | ||
if let Err(e) = core.run(exit) { | ||
debug!("Polkadot service event loop shutdown with {:?}", e); | ||
} | ||
debug!("Polkadot service shutdown"); | ||
}) | ||
}; | ||
|
||
// wait for the network to start up before starting the consensus | ||
// service. | ||
barrier.wait(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this really necessary? There's no event loop blocked on the network anymore. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's also nice to make sure the network is started before we begin the consensus process. |
||
|
||
// Spin consensus service if configured | ||
let consensus_service = if config.roles & Role::VALIDATOR == Role::VALIDATOR { | ||
|
@@ -296,28 +335,12 @@ impl Service { | |
None | ||
}; | ||
|
||
let thread_client = client.clone(); | ||
let thread_network = network.clone(); | ||
let thread_txpool = transaction_pool.clone(); | ||
let thread = thread::spawn(move || { | ||
thread_network.start_network(); | ||
let mut core = Core::new().expect("tokio::Core could not be created"); | ||
let events = thread_client.import_notification_stream().for_each(|notification| { | ||
thread_network.on_block_imported(notification.hash, ¬ification.header); | ||
prune_imported(&*thread_client, &*thread_txpool, notification.hash); | ||
|
||
Ok(()) | ||
}); | ||
if let Err(e) = core.run(events) { | ||
debug!("Polkadot service event loop shutdown with {:?}", e); | ||
} | ||
debug!("Polkadot service shutdown"); | ||
}); | ||
Ok(Service { | ||
thread: Some(thread), | ||
client: client, | ||
network: network, | ||
transaction_pool: transaction_pool, | ||
signal: Some(signal), | ||
_consensus: consensus_service, | ||
}) | ||
} | ||
|
@@ -359,6 +382,11 @@ impl Drop for Service { | |
fn drop(&mut self) { | ||
self.client.stop_notifications(); | ||
self.network.stop_network(); | ||
|
||
if let Some(signal) = self.signal.take() { | ||
signal.fire(); | ||
} | ||
|
||
if let Some(thread) = self.thread.take() { | ||
thread.join().expect("The service thread has panicked"); | ||
} | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not needed fro this PR, but in general i'd like to get away from anonymous messages at anything more verbose than the
info!
level. enablingdebug
andtrace
is a currently a mess, so doesn't really make sense to enable them without specifying a target filter.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can always filter by module. I usually use an explicit target to logically group logs coming from several modules under a single filter. If there is just a single module there's no need for that.