Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
Cleaner binary shutdown system (#8284)
Browse files Browse the repository at this point in the history
* Cleaner shutdown system when executing

* Simplify set_exit_handler for Client

* Minor change

* Fix submodule
  • Loading branch information
tomaka authored and 5chdn committed Apr 4, 2018
1 parent 0455aa9 commit e12a515
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 77 deletions.
11 changes: 7 additions & 4 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ pub struct Client {
registrar_address: Option<Address>,

/// A closure to call when we want to restart the client
exit_handler: Mutex<Option<Box<Fn(bool, Option<String>) + 'static + Send>>>,
exit_handler: Mutex<Option<Box<Fn(String) + 'static + Send>>>,

importer: Importer,
}
Expand Down Expand Up @@ -825,8 +825,11 @@ impl Client {
self.notify.write().push(Arc::downgrade(&target));
}

/// Set a closure to call when we want to restart the client
pub fn set_exit_handler<F>(&self, f: F) where F: Fn(bool, Option<String>) + 'static + Send {
/// Set a closure to call when the client wants to be restarted.
///
/// The parameter passed to the callback is the name of the new chain spec to use after
/// the restart.
pub fn set_exit_handler<F>(&self, f: F) where F: Fn(String) + 'static + Send {
*self.exit_handler.lock() = Some(Box::new(f));
}

Expand Down Expand Up @@ -1625,7 +1628,7 @@ impl BlockChainClient for Client {
return;
}
if let Some(ref h) = *self.exit_handler.lock() {
(*h)(true, Some(new_spec_name));
(*h)(new_spec_name);
} else {
warn!("Not hypervised; cannot change chain.");
}
Expand Down
166 changes: 93 additions & 73 deletions parity/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.

use std::any::Any;
use std::fmt;
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -182,7 +183,7 @@ impl ::local_store::NodeInfo for FullNodeInfo {
type LightClient = ::light::client::Client<::light_helpers::EpochFetch>;

// helper for light execution.
fn execute_light_impl(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> Result<((bool, Option<String>), Weak<LightClient>), String> {
fn execute_light_impl(cmd: RunCmd, logger: Arc<RotatingLogger>) -> Result<RunningClient, String> {
use light::client as light_client;
use ethsync::{LightSyncParams, LightSync, ManageNetwork};
use parking_lot::{Mutex, RwLock};
Expand Down Expand Up @@ -260,7 +261,7 @@ fn execute_light_impl(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger

let service = light_client::Service::start(config, &spec, fetch, db, cache.clone())
.map_err(|e| format!("Error starting light client: {}", e))?;
let client = service.client();
let client = service.client().clone();
let txq = Arc::new(RwLock::new(::light::transaction_queue::TransactionQueue::default()));
let provider = ::light::provider::LightProvider::new(client.clone(), txq.clone());

Expand Down Expand Up @@ -402,10 +403,10 @@ fn execute_light_impl(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger
};

// start rpc servers
let _ws_server = rpc::new_ws(cmd.ws_conf, &dependencies)?;
let _http_server = rpc::new_http("HTTP JSON-RPC", "jsonrpc", cmd.http_conf.clone(), &dependencies, dapps_middleware)?;
let _ipc_server = rpc::new_ipc(cmd.ipc_conf, &dependencies)?;
let _ui_server = rpc::new_http("Parity Wallet (UI)", "ui", cmd.ui_conf.clone().into(), &dependencies, ui_middleware)?;
let ws_server = rpc::new_ws(cmd.ws_conf, &dependencies)?;
let http_server = rpc::new_http("HTTP JSON-RPC", "jsonrpc", cmd.http_conf.clone(), &dependencies, dapps_middleware)?;
let ipc_server = rpc::new_ipc(cmd.ipc_conf, &dependencies)?;
let ui_server = rpc::new_http("Parity Wallet (UI)", "ui", cmd.ui_conf.clone().into(), &dependencies, ui_middleware)?;

// the informant
let informant = Arc::new(Informant::new(
Expand All @@ -421,17 +422,18 @@ fn execute_light_impl(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger

service.register_handler(informant.clone()).map_err(|_| "Unable to register informant handler".to_owned())?;

// wait for ctrl-c and then shut down the informant.
let res = wait_for_exit(None, None, can_restart);
informant.shutdown();

// Create a weak reference to the client so that we can wait on shutdown until it is dropped
let weak_client = Arc::downgrade(&client);

Ok((res, weak_client))
Ok(RunningClient::Light {
informant,
client,
keep_alive: Box::new((service, ws_server, http_server, ipc_server, ui_server)),
})
}

pub fn execute_impl(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> Result<((bool, Option<String>), Weak<Client>), String> {
fn execute_impl<Cr, Rr>(cmd: RunCmd, logger: Arc<RotatingLogger>, on_client_rq: Cr,
on_updater_rq: Rr) -> Result<RunningClient, String>
where Cr: Fn(String) + 'static + Send,
Rr: Fn() + 'static + Send
{
// load spec
let spec = cmd.spec.spec(&cmd.dirs.cache)?;

Expand Down Expand Up @@ -854,7 +856,7 @@ pub fn execute_impl(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>)
});

// the watcher must be kept alive.
let _watcher = match cmd.no_periodic_snapshot {
let watcher = match cmd.no_periodic_snapshot {
true => None,
false => {
let sync = sync_provider.clone();
Expand All @@ -881,23 +883,58 @@ pub fn execute_impl(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>)
open_dapp(&cmd.dapps_conf, &cmd.http_conf, &dapp)?;
}

// Create a weak reference to the client so that we can wait on shutdown until it is dropped
let weak_client = Arc::downgrade(&client);

// Handle exit
let restart = wait_for_exit(Some(updater), Some(client), can_restart);
client.set_exit_handler(on_client_rq);
updater.set_exit_handler(on_updater_rq);

info!("Finishing work, please wait...");

// drop this stuff as soon as exit detected.
drop((ws_server, http_server, ipc_server, ui_server, secretstore_key_server, ipfs_server, event_loop));
Ok(RunningClient::Full {
informant,
client,
keep_alive: Box::new((watcher, service, updater, ws_server, http_server, ipc_server, ui_server, secretstore_key_server, ipfs_server, event_loop)),
})
}

// to make sure timer does not spawn requests while shutdown is in progress
informant.shutdown();
// just Arc is dropping here, to allow other reference release in its default time
drop(informant);
enum RunningClient {
Light {
informant: Arc<Informant<LightNodeInformantData>>,
client: Arc<LightClient>,
keep_alive: Box<Any>,
},
Full {
informant: Arc<Informant<FullNodeInformantData>>,
client: Arc<Client>,
keep_alive: Box<Any>,
},
}

Ok((restart, weak_client))
impl RunningClient {
fn shutdown(self) {
match self {
RunningClient::Light { informant, client, keep_alive } => {
// Create a weak reference to the client so that we can wait on shutdown
// until it is dropped
let weak_client = Arc::downgrade(&client);
drop(keep_alive);
informant.shutdown();
drop(informant);
drop(client);
wait_for_drop(weak_client);
},
RunningClient::Full { informant, client, keep_alive } => {
info!("Finishing work, please wait...");
// Create a weak reference to the client so that we can wait on shutdown
// until it is dropped
let weak_client = Arc::downgrade(&client);
// drop this stuff as soon as exit detected.
drop(keep_alive);
// to make sure timer does not spawn requests while shutdown is in progress
informant.shutdown();
// just Arc is dropping here, to allow other reference release in its default time
drop(informant);
drop(client);
wait_for_drop(weak_client);
}
}
}
}

pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> Result<(bool, Option<String>), String> {
Expand All @@ -917,18 +954,34 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> R
// increase max number of open files
raise_fd_limit();

fn wait<T>(res: Result<((bool, Option<String>), Weak<T>), String>) -> Result<(bool, Option<String>), String> {
res.map(|(restart, weak_client)| {
wait_for_drop(weak_client);
restart
})
}
let exit = Arc::new((Mutex::new((false, None)), Condvar::new()));

if cmd.light {
wait(execute_light_impl(cmd, can_restart, logger))
let running_client = if cmd.light {
execute_light_impl(cmd, logger)?
} else if can_restart {
let e1 = exit.clone();
let e2 = exit.clone();
execute_impl(cmd, logger,
move |new_chain: String| { *e1.0.lock() = (true, Some(new_chain)); e1.1.notify_all(); },
move || { *e2.0.lock() = (true, None); e2.1.notify_all(); })?
} else {
wait(execute_impl(cmd, can_restart, logger))
}
trace!(target: "mode", "Not hypervised: not setting exit handlers.");
execute_impl(cmd, logger, move |_| {}, move || {})?
};

// Handle possible exits
CtrlC::set_handler({
let e = exit.clone();
move || { e.1.notify_all(); }
});

// Wait for signal
let mut l = exit.0.lock();
let _ = exit.1.wait(&mut l);

running_client.shutdown();

Ok(l.clone())
}

#[cfg(not(windows))]
Expand Down Expand Up @@ -1029,39 +1082,6 @@ fn build_create_account_hint(spec: &SpecType, keys: &str) -> String {
format!("You can create an account via RPC, UI or `parity account new --chain {} --keys-path {}`.", spec, keys)
}

fn wait_for_exit(
updater: Option<Arc<Updater>>,
client: Option<Arc<Client>>,
can_restart: bool
) -> (bool, Option<String>) {
let exit = Arc::new((Mutex::new((false, None)), Condvar::new()));

// Handle possible exits
let e = exit.clone();
CtrlC::set_handler(move || { e.1.notify_all(); });

if can_restart {
if let Some(updater) = updater {
// Handle updater wanting to restart us
let e = exit.clone();
updater.set_exit_handler(move || { *e.0.lock() = (true, None); e.1.notify_all(); });
}

if let Some(client) = client {
// Handle updater wanting to restart us
let e = exit.clone();
client.set_exit_handler(move |restart, new_chain: Option<String>| { *e.0.lock() = (restart, new_chain); e.1.notify_all(); });
}
} else {
trace!(target: "mode", "Not hypervised: not setting exit handlers.");
}

// Wait for signal
let mut l = exit.0.lock();
let _ = exit.1.wait(&mut l);
l.clone()
}

fn wait_for_drop<T>(w: Weak<T>) {
let sleep_duration = Duration::from_secs(1);
let warn_timeout = Duration::from_secs(60);
Expand Down

0 comments on commit e12a515

Please sign in to comment.