From 1f0e035c3b2e0589fad8bb90611c1e832b52bbb5 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Sat, 5 Sep 2020 09:50:32 +1000 Subject: [PATCH 1/4] Use async for main.rs --- beacon_node/src/lib.rs | 5 +-- lighthouse/src/main.rs | 86 +++++++++++++++++++----------------------- 2 files changed, 40 insertions(+), 51 deletions(-) diff --git a/beacon_node/src/lib.rs b/beacon_node/src/lib.rs index 19931916013..a09f8c6cd32 100644 --- a/beacon_node/src/lib.rs +++ b/beacon_node/src/lib.rs @@ -7,7 +7,7 @@ mod config; pub use beacon_chain; pub use cli::cli_app; pub use client::{Client, ClientBuilder, ClientConfig, ClientGenesis}; -pub use config::{get_data_dir, get_eth2_testnet_config, set_network_config}; +pub use config::{get_config, get_data_dir, get_eth2_testnet_config, set_network_config}; pub use eth2_config::Eth2Config; use beacon_chain::events::TeeEventHandler; @@ -17,7 +17,6 @@ use beacon_chain::{ builder::Witness, eth1_chain::CachingEth1Backend, slot_clock::SystemTimeSlotClock, }; use clap::ArgMatches; -use config::get_config; use environment::RuntimeContext; use slog::{info, warn}; use std::ops::{Deref, DerefMut}; @@ -54,7 +53,7 @@ impl ProductionBeaconNode { /// configurations hosted remotely. pub async fn new_from_cli( context: RuntimeContext, - matches: &ArgMatches<'_>, + matches: ArgMatches<'static>, ) -> Result { let client_config = get_config::( &matches, diff --git a/lighthouse/src/main.rs b/lighthouse/src/main.rs index 52e90f17980..d9006cce0fd 100644 --- a/lighthouse/src/main.rs +++ b/lighthouse/src/main.rs @@ -255,61 +255,51 @@ fn run( "name" => testnet_name ); - let beacon_node = if let Some(sub_matches) = matches.subcommand_matches("beacon_node") { - let runtime_context = environment.core_context(); - - let beacon = environment - .runtime() - .block_on(ProductionBeaconNode::new_from_cli( - runtime_context, - sub_matches, - )) - .map_err(|e| format!("Failed to start beacon node: {}", e))?; - - Some(beacon) - } else { - None - }; - - let validator_client = if let Some(sub_matches) = matches.subcommand_matches("validator_client") - { - let runtime_context = environment.core_context(); - - let mut validator = environment - .runtime() - .block_on(ProductionValidatorClient::new_from_cli( - runtime_context, - sub_matches, - )) - .map_err(|e| format!("Failed to init validator client: {}", e))?; - - environment - .core_context() - .executor - .runtime_handle() - .enter(|| { - validator - .start_service() - .map_err(|e| format!("Failed to start validator client service: {}", e)) - })?; - - Some(validator) - } else { - None + match matches.subcommand() { + ("beacon_node", Some(matches)) => { + let context = environment.core_context(); + let log = context.log().clone(); + let config = beacon_node::get_config::( + matches, + &context.eth2_config.spec_constants, + &context.eth2_config().spec, + context.log().clone(), + )?; + environment.runtime().spawn(async move { + if let Err(e) = ProductionBeaconNode::new(context, config).await { + crit!(log, "Failed to start beacon node"; "error" => e); + } + }) + } + ("validator_client", Some(matches)) => { + let context = environment.core_context(); + let log = context.log().clone(); + let config = validator_client::Config::from_cli(&matches) + .map_err(|e| format!("Unable to initialize validator config: {}", e))?; + environment.runtime().spawn(async move { + let run = async { + ProductionValidatorClient::new(context, config) + .await? + .start_service()?; + + Ok::<(), String>(()) + }; + if let Err(e) = run.await { + crit!(log, "Failed to start validator client"; "error" => e); + } + }) + } + _ => { + crit!(log, "No subcommand supplied. See --help ."); + return Err("No subcommand supplied.".into()); + } }; - if beacon_node.is_none() && validator_client.is_none() { - crit!(log, "No subcommand supplied. See --help ."); - return Err("No subcommand supplied.".into()); - } - // Block this thread until we get a ctrl-c or a task sends a shutdown signal. environment.block_until_shutdown_requested()?; info!(log, "Shutting down.."); environment.fire_signal(); - drop(beacon_node); - drop(validator_client); // Shutdown the environment once all tasks have completed. environment.shutdown_on_idle(); From be4e7cf1921e47fb3a7656c7dfc6a4185a837c3b Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Sat, 5 Sep 2020 10:29:38 +1000 Subject: [PATCH 2/4] Use spawn_blocking for validator keystores --- .../src/initialized_validators.rs | 34 +++++++++++++------ validator_client/src/lib.rs | 1 + 2 files changed, 25 insertions(+), 10 deletions(-) diff --git a/validator_client/src/initialized_validators.rs b/validator_client/src/initialized_validators.rs index 436dcb4bae3..64912e09bcb 100644 --- a/validator_client/src/initialized_validators.rs +++ b/validator_client/src/initialized_validators.rs @@ -54,6 +54,8 @@ pub enum Error { PasswordUnknown(PathBuf), /// There was an error reading from stdin. UnableToReadPasswordFromUser(String), + /// There was an error running a tokio async task. + TokioJoin(tokio::task::JoinError), } /// A method used by a validator to sign messages. @@ -279,7 +281,7 @@ pub struct InitializedValidators { impl InitializedValidators { /// Instantiates `Self`, initializing all validators in `definitions`. - pub fn from_definitions( + pub async fn from_definitions( definitions: ValidatorDefinitions, validators_dir: PathBuf, strict_lockfiles: bool, @@ -292,7 +294,7 @@ impl InitializedValidators { validators: HashMap::default(), log, }; - this.update_validators()?; + this.update_validators().await?; Ok(this) } @@ -328,7 +330,7 @@ impl InitializedValidators { /// validator will be removed from `self.validators`. /// /// Saves the `ValidatorDefinitions` to file, even if no definitions were changed. - pub fn set_validator_status( + pub async fn set_validator_status( &mut self, voting_public_key: &PublicKey, enabled: bool, @@ -342,7 +344,7 @@ impl InitializedValidators { def.enabled = enabled; } - self.update_validators()?; + self.update_validators().await?; self.definitions .save(&self.validators_dir) @@ -362,7 +364,7 @@ impl InitializedValidators { /// A validator is considered "already known" and skipped if the public key is already known. /// I.e., if there are two different definitions with the same public key then the second will /// be ignored. - fn update_validators(&mut self) -> Result<(), Error> { + async fn update_validators(&mut self) -> Result<(), Error> { for def in self.definitions.as_slice() { if def.enabled { match &def.signing_definition { @@ -371,11 +373,23 @@ impl InitializedValidators { continue; } - match InitializedValidator::from_definition( - def.clone(), - self.strict_lockfiles, - &self.log, - ) { + // Decoding a local keystore can take several seconds, therefore it's best + // to keep if off the core executor. This also has the fortunate effect of + // interrupting the potentially long-running task during shut down. + let inner_def = def.clone(); + let strict_lockfiles = self.strict_lockfiles; + let inner_log = self.log.clone(); + let result = tokio::task::spawn_blocking(move || { + InitializedValidator::from_definition( + inner_def, + strict_lockfiles, + &inner_log, + ) + }) + .await + .map_err(Error::TokioJoin)?; + + match result { Ok(init) => { self.validators .insert(init.voting_public_key().clone(), init); diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 220d82a66ae..c9b3094fb9e 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -93,6 +93,7 @@ impl ProductionValidatorClient { config.strict_lockfiles, log.clone(), ) + .await .map_err(|e| format!("Unable to initialize validators: {:?}", e))?; info!( From 59f4d02e8f6a8e30175a5fc910e655b963a0e0d9 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Sat, 5 Sep 2020 11:58:39 +1000 Subject: [PATCH 3/4] Update VC flags, shutdown VC more gracefully --- lighthouse/src/main.rs | 18 ++++- validator_client/src/cli.rs | 12 ++- validator_client/src/config.rs | 8 +- .../src/initialized_validators.rs | 17 ++-- validator_client/src/lib.rs | 77 +++++++++++-------- 5 files changed, 81 insertions(+), 51 deletions(-) diff --git a/lighthouse/src/main.rs b/lighthouse/src/main.rs index d9006cce0fd..f4d7cc8cae9 100644 --- a/lighthouse/src/main.rs +++ b/lighthouse/src/main.rs @@ -259,6 +259,7 @@ fn run( ("beacon_node", Some(matches)) => { let context = environment.core_context(); let log = context.log().clone(); + let executor = context.executor.clone(); let config = beacon_node::get_config::( matches, &context.eth2_config.spec_constants, @@ -266,14 +267,20 @@ fn run( context.log().clone(), )?; environment.runtime().spawn(async move { - if let Err(e) = ProductionBeaconNode::new(context, config).await { - crit!(log, "Failed to start beacon node"; "error" => e); + if let Err(e) = ProductionBeaconNode::new(context.clone(), config).await { + crit!(log, "Failed to start beacon node"; "reason" => e); + // Ignore the error since it always occurs during normal operation when + // shutting down. + let _ = executor + .shutdown_sender() + .try_send("Failed to start beacon node"); } }) } ("validator_client", Some(matches)) => { let context = environment.core_context(); let log = context.log().clone(); + let executor = context.executor.clone(); let config = validator_client::Config::from_cli(&matches) .map_err(|e| format!("Unable to initialize validator config: {}", e))?; environment.runtime().spawn(async move { @@ -285,7 +292,12 @@ fn run( Ok::<(), String>(()) }; if let Err(e) = run.await { - crit!(log, "Failed to start validator client"; "error" => e); + crit!(log, "Failed to start validator client"; "reason" => e); + // Ignore the error since it always occurs during normal operation when + // shutting down. + let _ = executor + .shutdown_sender() + .try_send("Failed to start validator client"); } }) } diff --git a/validator_client/src/cli.rs b/validator_client/src/cli.rs index ed320c24cde..7ac483439ce 100644 --- a/validator_client/src/cli.rs +++ b/validator_client/src/cli.rs @@ -37,11 +37,15 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { nodes using the same key. Automatically enabled unless `--strict` is specified", )) .arg( - Arg::with_name("strict-lockfiles") - .long("strict-lockfiles") + Arg::with_name("delete-lockfiles") + .long("delete-lockfiles") .help( - "If present, do not load validators that are guarded by a lockfile. Note: for \ - Eth2 mainnet, this flag will likely be removed and its behaviour will become default." + "If present, ignore and delete any keystore lockfiles encountered during start up. \ + This is useful if the validator client did not exit gracefully on the last run. \ + WARNING: lockfiles help prevent users from accidentally running the same validator \ + using two different validator clients, an action that likely leads to slashing. \ + Ensure you are certain that there are no other validator client instances running \ + that might also be using the same keystores." ) ) .arg( diff --git a/validator_client/src/config.rs b/validator_client/src/config.rs index 482c4ed7007..4a11c5aecdc 100644 --- a/validator_client/src/config.rs +++ b/validator_client/src/config.rs @@ -24,8 +24,8 @@ pub struct Config { /// If true, the validator client will still poll for duties and produce blocks even if the /// beacon node is not synced at startup. pub allow_unsynced_beacon_node: bool, - /// If true, refuse to unlock a keypair that is guarded by a lockfile. - pub strict_lockfiles: bool, + /// If true, delete any validator keystore lockfiles that would prevent starting. + pub delete_lockfiles: bool, /// If true, don't scan the validators dir for new keystores. pub disable_auto_discover: bool, /// Graffiti to be inserted everytime we create a block. @@ -46,7 +46,7 @@ impl Default for Config { secrets_dir, http_server: DEFAULT_HTTP_SERVER.to_string(), allow_unsynced_beacon_node: false, - strict_lockfiles: false, + delete_lockfiles: false, disable_auto_discover: false, graffiti: None, } @@ -77,7 +77,7 @@ impl Config { } config.allow_unsynced_beacon_node = cli_args.is_present("allow-unsynced"); - config.strict_lockfiles = cli_args.is_present("strict-lockfiles"); + config.delete_lockfiles = cli_args.is_present("delete-lockfiles"); config.disable_auto_discover = cli_args.is_present("disable-auto-discover"); if let Some(secrets_dir) = parse_optional(cli_args, "secrets-dir")? { diff --git a/validator_client/src/initialized_validators.rs b/validator_client/src/initialized_validators.rs index 64912e09bcb..400768f5cb4 100644 --- a/validator_client/src/initialized_validators.rs +++ b/validator_client/src/initialized_validators.rs @@ -56,6 +56,8 @@ pub enum Error { UnableToReadPasswordFromUser(String), /// There was an error running a tokio async task. TokioJoin(tokio::task::JoinError), + /// There was a filesystem error when deleting a lockfile. + UnableToDeleteLockfile(io::Error), } /// A method used by a validator to sign messages. @@ -88,7 +90,7 @@ impl InitializedValidator { /// If the validator is unable to be initialized for whatever reason. pub fn from_definition( def: ValidatorDefinition, - strict_lockfiles: bool, + delete_lockfiles: bool, log: &Logger, ) -> Result { if !def.enabled { @@ -152,16 +154,17 @@ impl InitializedValidator { })?; if voting_keystore_lockfile_path.exists() { - if strict_lockfiles { - return Err(Error::LockfileExists(voting_keystore_lockfile_path)); - } else { - // If **not** respecting lockfiles, just raise a warning if the voting - // keypair cannot be unlocked. + if delete_lockfiles { warn!( log, - "Ignoring validator lockfile"; + "Deleting validator lockfile"; "file" => format!("{:?}", voting_keystore_lockfile_path) ); + + fs::remove_file(&voting_keystore_lockfile_path) + .map_err(Error::UnableToDeleteLockfile)?; + } else { + return Err(Error::LockfileExists(voting_keystore_lockfile_path)); } } else { // Create a new lockfile. diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index c9b3094fb9e..3e0188ecce4 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -90,7 +90,7 @@ impl ProductionValidatorClient { let validators = InitializedValidators::from_definitions( validator_defs, config.data_dir.clone(), - config.strict_lockfiles, + config.delete_lockfiles, log.clone(), ) .await @@ -107,8 +107,8 @@ impl ProductionValidatorClient { RemoteBeaconNode::new_with_timeout(config.http_server.clone(), HTTP_TIMEOUT) .map_err(|e| format!("Unable to init beacon node http client: {}", e))?; - // TODO: check if all logs in wait_for_node are produed while awaiting - let beacon_node = wait_for_node(beacon_node, &log).await?; + // TODO: check if all logs in wait_for_node are produced while awaiting + let beacon_node = wait_for_node(&context, beacon_node, log.clone()).await?; let eth2_config = beacon_node .http .spec() @@ -138,7 +138,10 @@ impl ProductionValidatorClient { "seconds_to_wait" => (genesis - now).as_secs() ); - delay_for(genesis - now).await + tokio::select! { + () = delay_for(genesis - now) => (), + () = context.executor.exit() => return Err("Shutting down".to_string()) + } } else { info!( log, @@ -274,38 +277,46 @@ impl ProductionValidatorClient { /// Request the version from the node, looping back and trying again on failure. Exit once the node /// has been contacted. async fn wait_for_node( + context: &RuntimeContext, beacon_node: RemoteBeaconNode, - log: &Logger, + log: Logger, ) -> Result, String> { - // Try to get the version string from the node, looping until success is returned. - loop { - let log = log.clone(); - let result = beacon_node - .clone() - .http - .node() - .get_version() - .await - .map_err(|e| format!("{:?}", e)); - - match result { - Ok(version) => { - info!( - log, - "Connected to beacon node"; - "version" => version, - ); - - return Ok(beacon_node); - } - Err(e) => { - error!( - log, - "Unable to connect to beacon node"; - "error" => format!("{:?}", e), - ); - delay_for(RETRY_DELAY).await; + let future = Box::pin(async move { + // Try to get the version string from the node, looping until success is returned. + loop { + let log = log.clone(); + let result = beacon_node + .clone() + .http + .node() + .get_version() + .await + .map_err(|e| format!("{:?}", e)); + + match result { + Ok(version) => { + info!( + log, + "Connected to beacon node"; + "version" => version, + ); + + return Ok(beacon_node); + } + Err(e) => { + error!( + log, + "Unable to connect to beacon node"; + "error" => format!("{:?}", e), + ); + delay_for(RETRY_DELAY).await; + } } } + }); + + tokio::select! { + result = future => result, + () = context.executor.exit() => Err("Shutting down".to_string()) } } From cb031b55318dd1302e9dc24112a1f04134203456 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Sat, 5 Sep 2020 12:13:54 +1000 Subject: [PATCH 4/4] Move all init tasks into fn --- validator_client/src/lib.rs | 190 ++++++++++++++++++------------------ 1 file changed, 97 insertions(+), 93 deletions(-) diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 3e0188ecce4..6b709023faf 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -18,6 +18,7 @@ use block_service::{BlockService, BlockServiceBuilder}; use clap::ArgMatches; use duties_service::{DutiesService, DutiesServiceBuilder}; use environment::RuntimeContext; +use eth2_config::Eth2Config; use fork_service::{ForkService, ForkServiceBuilder}; use futures::channel::mpsc; use initialized_validators::InitializedValidators; @@ -28,7 +29,7 @@ use slot_clock::SlotClock; use slot_clock::SystemTimeSlotClock; use std::time::{SystemTime, UNIX_EPOCH}; use tokio::time::{delay_for, Duration}; -use types::EthSpec; +use types::{EthSpec, Hash256}; use validator_store::ValidatorStore; /// The interval between attempts to contact the beacon node during startup. @@ -107,59 +108,11 @@ impl ProductionValidatorClient { RemoteBeaconNode::new_with_timeout(config.http_server.clone(), HTTP_TIMEOUT) .map_err(|e| format!("Unable to init beacon node http client: {}", e))?; - // TODO: check if all logs in wait_for_node are produced while awaiting - let beacon_node = wait_for_node(&context, beacon_node, log.clone()).await?; - let eth2_config = beacon_node - .http - .spec() - .get_eth2_config() - .await - .map_err(|e| format!("Unable to read eth2 config from beacon node: {:?}", e))?; - let genesis_time = beacon_node - .http - .beacon() - .get_genesis_time() - .await - .map_err(|e| format!("Unable to read genesis time from beacon node: {:?}", e))?; - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .map_err(|e| format!("Unable to read system time: {:?}", e))?; - let genesis = Duration::from_secs(genesis_time); - - // If the time now is less than (prior to) genesis, then delay until the - // genesis instant. - // - // If the validator client starts before genesis, it will get errors from - // the slot clock. - if now < genesis { - info!( - log, - "Starting node prior to genesis"; - "seconds_to_wait" => (genesis - now).as_secs() - ); - - tokio::select! { - () = delay_for(genesis - now) => (), - () = context.executor.exit() => return Err("Shutting down".to_string()) - } - } else { - info!( - log, - "Genesis has already occurred"; - "seconds_ago" => (now - genesis).as_secs() - ); - } - let genesis_validators_root = beacon_node - .http - .beacon() - .get_genesis_validators_root() - .await - .map_err(|e| { - format!( - "Unable to read genesis validators root from beacon node: {:?}", - e - ) - })?; + // Perform some potentially long-running initialization tasks. + let (eth2_config, genesis_time, genesis_validators_root) = tokio::select! { + tuple = init_from_beacon_node(&beacon_node, &context) => tuple?, + () = context.executor.exit() => return Err("Shutting down".to_string()) + }; // Do not permit a connection to a beacon node using different spec constants. if context.eth2_config.spec_constants != eth2_config.spec_constants { @@ -274,49 +227,100 @@ impl ProductionValidatorClient { } } +async fn init_from_beacon_node( + beacon_node: &RemoteBeaconNode, + context: &RuntimeContext, +) -> Result<(Eth2Config, u64, Hash256), String> { + // Wait for the beacon node to come online. + wait_for_node(beacon_node, context.log()).await?; + + let eth2_config = beacon_node + .http + .spec() + .get_eth2_config() + .await + .map_err(|e| format!("Unable to read eth2 config from beacon node: {:?}", e))?; + let genesis_time = beacon_node + .http + .beacon() + .get_genesis_time() + .await + .map_err(|e| format!("Unable to read genesis time from beacon node: {:?}", e))?; + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_err(|e| format!("Unable to read system time: {:?}", e))?; + let genesis = Duration::from_secs(genesis_time); + + // If the time now is less than (prior to) genesis, then delay until the + // genesis instant. + // + // If the validator client starts before genesis, it will get errors from + // the slot clock. + if now < genesis { + info!( + context.log(), + "Starting node prior to genesis"; + "seconds_to_wait" => (genesis - now).as_secs() + ); + + delay_for(genesis - now).await; + } else { + info!( + context.log(), + "Genesis has already occurred"; + "seconds_ago" => (now - genesis).as_secs() + ); + } + let genesis_validators_root = beacon_node + .http + .beacon() + .get_genesis_validators_root() + .await + .map_err(|e| { + format!( + "Unable to read genesis validators root from beacon node: {:?}", + e + ) + })?; + + Ok((eth2_config, genesis_time, genesis_validators_root)) +} + /// Request the version from the node, looping back and trying again on failure. Exit once the node /// has been contacted. async fn wait_for_node( - context: &RuntimeContext, - beacon_node: RemoteBeaconNode, - log: Logger, -) -> Result, String> { - let future = Box::pin(async move { - // Try to get the version string from the node, looping until success is returned. - loop { - let log = log.clone(); - let result = beacon_node - .clone() - .http - .node() - .get_version() - .await - .map_err(|e| format!("{:?}", e)); - - match result { - Ok(version) => { - info!( - log, - "Connected to beacon node"; - "version" => version, - ); - - return Ok(beacon_node); - } - Err(e) => { - error!( - log, - "Unable to connect to beacon node"; - "error" => format!("{:?}", e), - ); - delay_for(RETRY_DELAY).await; - } + beacon_node: &RemoteBeaconNode, + log: &Logger, +) -> Result<(), String> { + // Try to get the version string from the node, looping until success is returned. + loop { + let log = log.clone(); + let result = beacon_node + .clone() + .http + .node() + .get_version() + .await + .map_err(|e| format!("{:?}", e)); + + match result { + Ok(version) => { + info!( + log, + "Connected to beacon node"; + "version" => version, + ); + + return Ok(()); + } + Err(e) => { + error!( + log, + "Unable to connect to beacon node"; + "error" => format!("{:?}", e), + ); + delay_for(RETRY_DELAY).await; } } - }); - - tokio::select! { - result = future => result, - () = context.executor.exit() => Err("Shutting down".to_string()) } }