From 2d6e4d66dc5b61d3f833f20d3ab8031a437a9620 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Tue, 16 May 2023 16:28:15 +1000 Subject: [PATCH 1/6] Implement `el_offline` and use it in the VC --- .../tests/payload_invalidation.rs | 3 + beacon_node/execution_layer/src/engines.rs | 5 + beacon_node/execution_layer/src/lib.rs | 51 +++--- .../src/test_utils/handle_rpc.rs | 25 ++- .../execution_layer/src/test_utils/mod.rs | 27 +++- beacon_node/http_api/src/lib.rs | 6 + beacon_node/http_api/tests/main.rs | 1 + beacon_node/http_api/tests/status_tests.rs | 145 ++++++++++++++++++ beacon_node/http_api/tests/tests.rs | 1 + common/eth2/src/types.rs | 1 + consensus/types/src/fork_name.rs | 18 ++- validator_client/src/check_synced.rs | 6 +- 12 files changed, 250 insertions(+), 39 deletions(-) create mode 100644 beacon_node/http_api/tests/status_tests.rs diff --git a/beacon_node/beacon_chain/tests/payload_invalidation.rs b/beacon_node/beacon_chain/tests/payload_invalidation.rs index 54d7734471c..0566f593b20 100644 --- a/beacon_node/beacon_chain/tests/payload_invalidation.rs +++ b/beacon_node/beacon_chain/tests/payload_invalidation.rs @@ -910,6 +910,9 @@ async fn invalid_after_optimistic_sync() { .await, ); + // EL status should still be online, no errors. + assert!(!rig.execution_layer().is_offline_or_erroring()); + // Running fork choice is necessary since a block has been invalidated. rig.recompute_head().await; diff --git a/beacon_node/execution_layer/src/engines.rs b/beacon_node/execution_layer/src/engines.rs index ce413cb1139..25fd21b5f37 100644 --- a/beacon_node/execution_layer/src/engines.rs +++ b/beacon_node/execution_layer/src/engines.rs @@ -238,6 +238,11 @@ impl Engine { **self.state.read().await == EngineStateInternal::Synced } + /// Returns `true` if the engine has a status other than synced or syncing. + pub fn is_offline_blocking(&self) -> bool { + EngineState::from(**self.state.blocking_read()) == EngineState::Offline + } + /// Run the `EngineApi::upcheck` function if the node's last known state is not synced. This /// might be used to recover the node if offline. pub async fn upcheck(&self) { diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 16a7f3665f3..5c778d693ad 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -222,6 +222,11 @@ struct Inner { builder_profit_threshold: Uint256, log: Logger, always_prefer_builder_payload: bool, + /// Track whether the last `newPayload` call errored. + /// + /// This is used *only* in the informational sync status endpoint, so that a VC using this + /// node can prefer another node with a healthier EL. + last_new_payload_errored: RwLock, } #[derive(Debug, Default, Clone, Serialize, Deserialize)] @@ -350,6 +355,7 @@ impl ExecutionLayer { builder_profit_threshold: Uint256::from(builder_profit_threshold), log, always_prefer_builder_payload, + last_new_payload_errored: RwLock::new(false), }; Ok(Self { @@ -542,6 +548,15 @@ impl ExecutionLayer { synced } + /// Return `true` if the execution layer is offline or returning errors on `newPayload`. + /// + /// This function should never be used to prevent any operation in the beacon node, but can + /// be used to give an indication on the HTTP API that the node's execution layer is struggling, + /// which can in turn be used by the VC. + pub fn is_offline_or_erroring(&self) -> bool { + self.engine().is_offline_blocking() || *self.inner.last_new_payload_errored.blocking_read() + } + /// Updates the proposer preparation data provided by validators pub async fn update_proposer_preparation( &self, @@ -1116,18 +1131,6 @@ impl ExecutionLayer { } /// Maps to the `engine_newPayload` JSON-RPC call. - /// - /// ## Fallback Behaviour - /// - /// The request will be broadcast to all nodes, simultaneously. It will await a response (or - /// failure) from all nodes and then return based on the first of these conditions which - /// returns true: - /// - /// - Error::ConsensusFailure if some nodes return valid and some return invalid - /// - Valid, if any nodes return valid. - /// - Invalid, if any nodes return invalid. - /// - Syncing, if any nodes return syncing. - /// - An error, if all nodes return an error. pub async fn notify_new_payload( &self, execution_payload: &ExecutionPayload, @@ -1156,12 +1159,24 @@ impl ExecutionLayer { &["new_payload", status.status.into()], ); } + *self.inner.last_new_payload_errored.write().await = result.is_err(); process_payload_status(execution_payload.block_hash(), result, self.log()) .map_err(Box::new) .map_err(Error::EngineError) } + /// Update engine sync status. + /// + /// This will actually perform 2 upchecks, the 2nd one asynchronously. + pub async fn upcheck(&self) -> Result<(), Error> { + self.engine() + .request(|engine| async { Ok(engine.upcheck().await) }) + .await + .map_err(Box::new) + .map_err(Error::EngineError) + } + /// Register that the given `validator_index` is going to produce a block at `slot`. /// /// The block will be built atop `head_block_root` and the EL will need to prepare an @@ -1221,18 +1236,6 @@ impl ExecutionLayer { } /// Maps to the `engine_consensusValidated` JSON-RPC call. - /// - /// ## Fallback Behaviour - /// - /// The request will be broadcast to all nodes, simultaneously. It will await a response (or - /// failure) from all nodes and then return based on the first of these conditions which - /// returns true: - /// - /// - Error::ConsensusFailure if some nodes return valid and some return invalid - /// - Valid, if any nodes return valid. - /// - Invalid, if any nodes return invalid. - /// - Syncing, if any nodes return syncing. - /// - An error, if all nodes return an error. pub async fn notify_forkchoice_updated( &self, head_block_hash: ExecutionBlockHash, diff --git a/beacon_node/execution_layer/src/test_utils/handle_rpc.rs b/beacon_node/execution_layer/src/test_utils/handle_rpc.rs index e3c58cfc270..79468b21169 100644 --- a/beacon_node/execution_layer/src/test_utils/handle_rpc.rs +++ b/beacon_node/execution_layer/src/test_utils/handle_rpc.rs @@ -30,7 +30,12 @@ pub async fn handle_rpc( .map_err(|s| (s, GENERIC_ERROR_CODE))?; match method { - ETH_SYNCING => Ok(JsonValue::Bool(false)), + ETH_SYNCING => ctx + .syncing_response + .lock() + .clone() + .map(JsonValue::Bool) + .map_err(|message| (message, GENERIC_ERROR_CODE)), ETH_GET_BLOCK_BY_NUMBER => { let tag = params .get(0) @@ -145,7 +150,9 @@ pub async fn handle_rpc( // Canned responses set by block hash take priority. if let Some(status) = ctx.get_new_payload_status(request.block_hash()) { - return Ok(serde_json::to_value(JsonPayloadStatusV1::from(status)).unwrap()); + return status + .map(|status| serde_json::to_value(JsonPayloadStatusV1::from(status)).unwrap()) + .map_err(|message| (message, GENERIC_ERROR_CODE)); } let (static_response, should_import) = @@ -320,11 +327,15 @@ pub async fn handle_rpc( // Canned responses set by block hash take priority. if let Some(status) = ctx.get_fcu_payload_status(&head_block_hash) { - let response = JsonForkchoiceUpdatedV1Response { - payload_status: JsonPayloadStatusV1::from(status), - payload_id: None, - }; - return Ok(serde_json::to_value(response).unwrap()); + return status + .map(|status| { + let response = JsonForkchoiceUpdatedV1Response { + payload_status: JsonPayloadStatusV1::from(status), + payload_id: None, + }; + serde_json::to_value(response).unwrap() + }) + .map_err(|message| (message, GENERIC_ERROR_CODE)); } let mut response = ctx diff --git a/beacon_node/execution_layer/src/test_utils/mod.rs b/beacon_node/execution_layer/src/test_utils/mod.rs index 9379a3c2389..a8e7bab270a 100644 --- a/beacon_node/execution_layer/src/test_utils/mod.rs +++ b/beacon_node/execution_layer/src/test_utils/mod.rs @@ -126,6 +126,7 @@ impl MockServer { hook: <_>::default(), new_payload_statuses: <_>::default(), fcu_payload_statuses: <_>::default(), + syncing_response: Arc::new(Mutex::new(Ok(false))), engine_capabilities: Arc::new(RwLock::new(DEFAULT_ENGINE_CAPABILITIES)), _phantom: PhantomData, }); @@ -414,14 +415,25 @@ impl MockServer { self.ctx .new_payload_statuses .lock() - .insert(block_hash, status); + .insert(block_hash, Ok(status)); } pub fn set_fcu_payload_status(&self, block_hash: ExecutionBlockHash, status: PayloadStatusV1) { self.ctx .fcu_payload_statuses .lock() - .insert(block_hash, status); + .insert(block_hash, Ok(status)); + } + + pub fn set_new_payload_error(&self, block_hash: ExecutionBlockHash, error: String) { + self.ctx + .new_payload_statuses + .lock() + .insert(block_hash, Err(error)); + } + + pub fn set_syncing_response(&self, res: Result) { + *self.ctx.syncing_response.lock() = res; } } @@ -478,8 +490,11 @@ pub struct Context { // // This is a more flexible and less stateful alternative to `static_new_payload_response` // and `preloaded_responses`. - pub new_payload_statuses: Arc>>, - pub fcu_payload_statuses: Arc>>, + pub new_payload_statuses: + Arc>>>, + pub fcu_payload_statuses: + Arc>>>, + pub syncing_response: Arc>>, pub engine_capabilities: Arc>, pub _phantom: PhantomData, @@ -489,14 +504,14 @@ impl Context { pub fn get_new_payload_status( &self, block_hash: &ExecutionBlockHash, - ) -> Option { + ) -> Option> { self.new_payload_statuses.lock().get(block_hash).cloned() } pub fn get_fcu_payload_status( &self, block_hash: &ExecutionBlockHash, - ) -> Option { + ) -> Option> { self.fcu_payload_statuses.lock().get(block_hash).cloned() } } diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 096d99f3f1d..a88a6cebfca 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -2298,9 +2298,15 @@ pub fn serve( .is_optimistic_or_invalid_head() .map_err(warp_utils::reject::beacon_chain_error)?; + let el_offline = chain + .execution_layer + .as_ref() + .map_or(true, |el| el.is_offline_or_erroring()); + let syncing_data = api_types::SyncingData { is_syncing: network_globals.sync_state.read().is_syncing(), is_optimistic: Some(is_optimistic), + el_offline: Some(el_offline), head_slot, sync_distance, }; diff --git a/beacon_node/http_api/tests/main.rs b/beacon_node/http_api/tests/main.rs index 342b72cc7de..f5916d8506a 100644 --- a/beacon_node/http_api/tests/main.rs +++ b/beacon_node/http_api/tests/main.rs @@ -2,4 +2,5 @@ pub mod fork_tests; pub mod interactive_tests; +pub mod status_tests; pub mod tests; diff --git a/beacon_node/http_api/tests/status_tests.rs b/beacon_node/http_api/tests/status_tests.rs new file mode 100644 index 00000000000..2f96adc4c96 --- /dev/null +++ b/beacon_node/http_api/tests/status_tests.rs @@ -0,0 +1,145 @@ +//! Tests related to the beacon node's sync status +use beacon_chain::{ + test_utils::{AttestationStrategy, BlockStrategy, SyncCommitteeStrategy}, + BlockError, +}; +use execution_layer::{PayloadStatusV1, PayloadStatusV1Status}; +use http_api::test_utils::InteractiveTester; +use types::{EthSpec, ExecPayload, ForkName, MinimalEthSpec, Slot}; + +type E = MinimalEthSpec; + +/// Create a new test environment that is post-merge with `chain_depth` blocks. +async fn post_merge_tester(chain_depth: u64, validator_count: u64) -> InteractiveTester { + // Test using latest fork so that we simulate conditions as similar to mainnet as possible. + let mut spec = ForkName::latest().make_genesis_spec(E::default_spec()); + spec.terminal_total_difficulty = 1.into(); + + let tester = InteractiveTester::::new(Some(spec), validator_count as usize).await; + let harness = &tester.harness; + let mock_el = harness.mock_execution_layer.as_ref().unwrap(); + let execution_ctx = mock_el.server.ctx.clone(); + + // Move to terminal block. + mock_el.server.all_payloads_valid(); + execution_ctx + .execution_block_generator + .write() + .move_to_terminal_block() + .unwrap(); + + // Create some chain depth. + harness.advance_slot(); + harness + .extend_chain_with_sync( + chain_depth as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + SyncCommitteeStrategy::AllValidators, + ) + .await; + tester +} + +/// Check `syncing` endpoint when the EL is syncing. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn el_syncing_then_synced() { + let num_blocks = E::slots_per_epoch() / 2; + let num_validators = E::slots_per_epoch(); + let tester = post_merge_tester(num_blocks, num_validators).await; + let harness = &tester.harness; + let mock_el = harness.mock_execution_layer.as_ref().unwrap(); + + // EL syncing + mock_el.server.set_syncing_response(Ok(true)); + mock_el.el.upcheck().await.unwrap(); + + let api_response = tester.client.get_node_syncing().await.unwrap().data; + assert_eq!(api_response.el_offline, Some(false)); + assert_eq!(api_response.is_optimistic, Some(false)); + assert_eq!(api_response.is_syncing, false); + + // EL synced + mock_el.server.set_syncing_response(Ok(false)); + mock_el.el.upcheck().await.unwrap(); + + let api_response = tester.client.get_node_syncing().await.unwrap().data; + assert_eq!(api_response.el_offline, Some(false)); + assert_eq!(api_response.is_optimistic, Some(false)); + assert_eq!(api_response.is_syncing, false); +} + +/// Check `syncing` endpoint when the EL is offline (errors on upcheck). +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn el_offline() { + let num_blocks = E::slots_per_epoch() / 2; + let num_validators = E::slots_per_epoch(); + let tester = post_merge_tester(num_blocks, num_validators).await; + let harness = &tester.harness; + let mock_el = harness.mock_execution_layer.as_ref().unwrap(); + + // EL offline + mock_el.server.set_syncing_response(Err("offline".into())); + mock_el.el.upcheck().await.unwrap(); + + let api_response = tester.client.get_node_syncing().await.unwrap().data; + assert_eq!(api_response.el_offline, Some(true)); + assert_eq!(api_response.is_optimistic, Some(false)); + assert_eq!(api_response.is_syncing, false); +} + +/// Check `syncing` endpoint when the EL errors on newPaylod but is not fully offline. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn el_error_on_new_payload() { + let num_blocks = E::slots_per_epoch() / 2; + let num_validators = E::slots_per_epoch(); + let tester = post_merge_tester(num_blocks, num_validators).await; + let harness = &tester.harness; + let mock_el = harness.mock_execution_layer.as_ref().unwrap(); + + // Make a block. + let pre_state = harness.get_current_state(); + let (block, _) = harness + .make_block(pre_state, Slot::new(num_blocks + 1)) + .await; + let block_hash = block + .message() + .body() + .execution_payload() + .unwrap() + .block_hash(); + + // Make sure `newPayload` errors for the new block. + mock_el + .server + .set_new_payload_error(block_hash, "error".into()); + + // Attempt to process the block, which should error. + harness.advance_slot(); + assert!(matches!( + harness.process_block_result(block.clone()).await, + Err(BlockError::ExecutionPayloadError(_)) + )); + + // The EL should now be *offline* according to the API. + let api_response = tester.client.get_node_syncing().await.unwrap().data; + assert_eq!(api_response.el_offline, Some(true)); + assert_eq!(api_response.is_optimistic, Some(false)); + assert_eq!(api_response.is_syncing, false); + + // Processing a block successfully should remove the status. + mock_el.server.set_new_payload_status( + block_hash, + PayloadStatusV1 { + status: PayloadStatusV1Status::Valid, + latest_valid_hash: Some(block_hash), + validation_error: None, + }, + ); + harness.process_block_result(block).await.unwrap(); + + let api_response = tester.client.get_node_syncing().await.unwrap().data; + assert_eq!(api_response.el_offline, Some(false)); + assert_eq!(api_response.is_optimistic, Some(false)); + assert_eq!(api_response.is_syncing, false); +} diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index fc78b2a9bf1..46bfd3b5ea8 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -1721,6 +1721,7 @@ impl ApiTester { let expected = SyncingData { is_syncing: false, is_optimistic: Some(false), + el_offline: Some(false), head_slot, sync_distance, }; diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index f58dc8e2a45..d7150bff71c 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -577,6 +577,7 @@ pub struct VersionData { pub struct SyncingData { pub is_syncing: bool, pub is_optimistic: Option, + pub el_offline: Option, pub head_slot: Slot, pub sync_distance: Slot, } diff --git a/consensus/types/src/fork_name.rs b/consensus/types/src/fork_name.rs index 007d4c4daa5..85144a61377 100644 --- a/consensus/types/src/fork_name.rs +++ b/consensus/types/src/fork_name.rs @@ -24,6 +24,11 @@ impl ForkName { ] } + pub fn latest() -> ForkName { + // This unwrap is safe as long as we have 1+ forks. It is tested below. + *ForkName::list_all().last().unwrap() + } + /// Set the activation slots in the given `ChainSpec` so that the fork named by `self` /// is the only fork in effect from genesis. pub fn make_genesis_spec(&self, mut spec: ChainSpec) -> ChainSpec { @@ -178,7 +183,7 @@ mod test { #[test] fn previous_and_next_fork_consistent() { - assert_eq!(ForkName::Capella.next_fork(), None); + assert_eq!(ForkName::latest().next_fork(), None); assert_eq!(ForkName::Base.previous_fork(), None); for (prev_fork, fork) in ForkName::list_all().into_iter().tuple_windows() { @@ -211,4 +216,15 @@ mod test { assert_eq!(ForkName::from_str("merge"), Ok(ForkName::Merge)); assert_eq!(ForkName::Merge.to_string(), "bellatrix"); } + + #[test] + fn fork_name_latest() { + assert_eq!(ForkName::latest(), *ForkName::list_all().last().unwrap()); + + let mut fork = ForkName::Base; + while let Some(next_fork) = fork.next_fork() { + fork = next_fork; + } + assert_eq!(ForkName::latest(), fork); + } } diff --git a/validator_client/src/check_synced.rs b/validator_client/src/check_synced.rs index c31457e2889..fb88d33dae3 100644 --- a/validator_client/src/check_synced.rs +++ b/validator_client/src/check_synced.rs @@ -36,7 +36,10 @@ pub async fn check_synced( } }; - let is_synced = !resp.data.is_syncing || (resp.data.sync_distance.as_u64() < SYNC_TOLERANCE); + // Default EL status to "online" for backwards-compatibility with BNs that don't include it. + let el_offline = resp.data.el_offline.unwrap_or(false); + let bn_is_synced = !resp.data.is_syncing || (resp.data.sync_distance.as_u64() < SYNC_TOLERANCE); + let is_synced = bn_is_synced && !el_offline; if let Some(log) = log_opt { if !is_synced { @@ -52,6 +55,7 @@ pub async fn check_synced( "sync_distance" => resp.data.sync_distance.as_u64(), "head_slot" => resp.data.head_slot.as_u64(), "endpoint" => %beacon_node, + "el_offline" => el_offline, ); } From 976634752f1d08dd135f049ec2edb64c4ee665e4 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Tue, 16 May 2023 17:39:37 +1000 Subject: [PATCH 2/6] Overhaul RequireSynced usage --- lighthouse/tests/validator_client.rs | 6 ++---- validator_client/src/cli.rs | 5 +---- validator_client/src/config.rs | 8 +++++++- validator_client/src/duties_service.rs | 13 ++++--------- validator_client/src/lib.rs | 4 ++-- validator_client/src/preparation_service.rs | 4 ++-- validator_client/src/sync_committee_service.rs | 2 +- 7 files changed, 19 insertions(+), 23 deletions(-) diff --git a/lighthouse/tests/validator_client.rs b/lighthouse/tests/validator_client.rs index 45cd989a44d..8c1f0477c42 100644 --- a/lighthouse/tests/validator_client.rs +++ b/lighthouse/tests/validator_client.rs @@ -103,10 +103,8 @@ fn beacon_nodes_flag() { #[test] fn allow_unsynced_flag() { - CommandLineTest::new() - .flag("allow-unsynced", None) - .run() - .with_config(|config| assert!(config.allow_unsynced_beacon_node)); + // No-op, but doesn't crash. + CommandLineTest::new().flag("allow-unsynced", None).run(); } #[test] diff --git a/validator_client/src/cli.rs b/validator_client/src/cli.rs index 41ef85dfcd3..6e199cb1731 100644 --- a/validator_client/src/cli.rs +++ b/validator_client/src/cli.rs @@ -109,10 +109,7 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .arg( Arg::with_name("allow-unsynced") .long("allow-unsynced") - .help( - "If present, the validator client will still poll for duties if the beacon - node is not synced.", - ), + .help("DEPRECATED: this flag does nothing"), ) .arg( Arg::with_name("use-long-timeouts") diff --git a/validator_client/src/config.rs b/validator_client/src/config.rs index b6e808a86b5..fa297dcfedd 100644 --- a/validator_client/src/config.rs +++ b/validator_client/src/config.rs @@ -205,7 +205,13 @@ impl Config { ); } - config.allow_unsynced_beacon_node = cli_args.is_present("allow-unsynced"); + if cli_args.is_present("allow-unsynced") { + warn!( + log, + "The --allow-unsynced flag is deprecated"; + "msg" => "it no longer has any effect", + ); + } config.disable_run_on_all = cli_args.is_present("disable-run-on-all"); config.disable_auto_discover = cli_args.is_present("disable-auto-discover"); config.init_slashing_protection = cli_args.is_present("init-slashing-protection"); diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index 3cab6e7821c..83cdb936aa3 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -147,11 +147,6 @@ pub struct DutiesService { pub slot_clock: T, /// Provides HTTP access to remote beacon nodes. pub beacon_nodes: Arc>, - /// Controls whether or not this function will refuse to interact with non-synced beacon nodes. - /// - /// This functionality is a little redundant since most BNs will likely reject duties when they - /// aren't synced, but we keep it around for an emergency. - pub require_synced: RequireSynced, pub enable_high_validator_count_metrics: bool, pub context: RuntimeContext, pub spec: ChainSpec, @@ -421,7 +416,7 @@ async fn poll_validator_indices( let download_result = duties_service .beacon_nodes .first_success( - duties_service.require_synced, + RequireSynced::No, OfflineOnFailure::Yes, |beacon_node| async move { let _timer = metrics::start_timer_vec( @@ -618,7 +613,7 @@ async fn poll_beacon_attesters( if let Err(e) = duties_service .beacon_nodes .run( - duties_service.require_synced, + RequireSynced::No, OfflineOnFailure::Yes, |beacon_node| async move { let _timer = metrics::start_timer_vec( @@ -856,7 +851,7 @@ async fn post_validator_duties_attester( duties_service .beacon_nodes .first_success( - duties_service.require_synced, + RequireSynced::No, OfflineOnFailure::Yes, |beacon_node| async move { let _timer = metrics::start_timer_vec( @@ -1063,7 +1058,7 @@ async fn poll_beacon_proposers( let download_result = duties_service .beacon_nodes .first_success( - duties_service.require_synced, + RequireSynced::No, OfflineOnFailure::Yes, |beacon_node| async move { let _timer = metrics::start_timer_vec( diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index cfe355f54eb..d5538ef21c2 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -447,9 +447,9 @@ impl ProductionValidatorClient { beacon_nodes: beacon_nodes.clone(), validator_store: validator_store.clone(), require_synced: if config.allow_unsynced_beacon_node { - RequireSynced::Yes - } else { RequireSynced::No + } else { + RequireSynced::Yes }, spec: context.eth2_config.spec.clone(), context: duties_context, diff --git a/validator_client/src/preparation_service.rs b/validator_client/src/preparation_service.rs index fc80f2ded08..5bd93a50532 100644 --- a/validator_client/src/preparation_service.rs +++ b/validator_client/src/preparation_service.rs @@ -332,7 +332,7 @@ impl PreparationService { match self .beacon_nodes .run( - RequireSynced::Yes, + RequireSynced::No, OfflineOnFailure::Yes, |beacon_node| async move { beacon_node @@ -451,7 +451,7 @@ impl PreparationService { match self .beacon_nodes .first_success( - RequireSynced::Yes, + RequireSynced::No, OfflineOnFailure::No, |beacon_node| async move { beacon_node.post_validator_register_validator(batch).await diff --git a/validator_client/src/sync_committee_service.rs b/validator_client/src/sync_committee_service.rs index 3647396ed59..cc20cedfc6c 100644 --- a/validator_client/src/sync_committee_service.rs +++ b/validator_client/src/sync_committee_service.rs @@ -178,7 +178,7 @@ impl SyncCommitteeService { let response = self .beacon_nodes .first_success( - RequireSynced::Yes, + RequireSynced::No, OfflineOnFailure::Yes, |beacon_node| async move { match beacon_node.get_beacon_blocks_root(BlockId::Head).await { From cda5820135352c08cdc6572579e0bd633c2e27bf Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Tue, 16 May 2023 17:48:03 +1000 Subject: [PATCH 3/6] Clippy fixes --- beacon_node/execution_layer/src/lib.rs | 5 ++++- validator_client/src/duties_service/sync.rs | 4 ++-- validator_client/src/lib.rs | 5 ----- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 5c778d693ad..8e6c712c208 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -1171,7 +1171,10 @@ impl ExecutionLayer { /// This will actually perform 2 upchecks, the 2nd one asynchronously. pub async fn upcheck(&self) -> Result<(), Error> { self.engine() - .request(|engine| async { Ok(engine.upcheck().await) }) + .request(|engine| async { + engine.upcheck().await; + Ok(()) + }) .await .map_err(Box::new) .map_err(Error::EngineError) diff --git a/validator_client/src/duties_service/sync.rs b/validator_client/src/duties_service/sync.rs index b9d4d703065..7a852091aa3 100644 --- a/validator_client/src/duties_service/sync.rs +++ b/validator_client/src/duties_service/sync.rs @@ -1,4 +1,4 @@ -use crate::beacon_node_fallback::OfflineOnFailure; +use crate::beacon_node_fallback::{OfflineOnFailure, RequireSynced}; use crate::{ doppelganger_service::DoppelgangerStatus, duties_service::{DutiesService, Error}, @@ -422,7 +422,7 @@ pub async fn poll_sync_committee_duties_for_period ProductionValidatorClient { slot_clock: slot_clock.clone(), beacon_nodes: beacon_nodes.clone(), validator_store: validator_store.clone(), - require_synced: if config.allow_unsynced_beacon_node { - RequireSynced::No - } else { - RequireSynced::Yes - }, spec: context.eth2_config.spec.clone(), context: duties_context, enable_high_validator_count_metrics: config.enable_high_validator_count_metrics, From 1723c955a171f3d36118f3374857fb36b63f5add Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Tue, 16 May 2023 17:52:26 +1000 Subject: [PATCH 4/6] Update beacon_node/execution_layer/src/lib.rs --- beacon_node/execution_layer/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 8e6c712c208..0e6cb010370 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -1168,7 +1168,7 @@ impl ExecutionLayer { /// Update engine sync status. /// - /// This will actually perform 2 upchecks, the 2nd one asynchronously. + /// This will sometimes perform 2 upchecks, the 2nd one asynchronously. pub async fn upcheck(&self) -> Result<(), Error> { self.engine() .request(|engine| async { From 3248e7181e092eb004789a0fb2a303614983e0e9 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Wed, 17 May 2023 10:45:21 +1000 Subject: [PATCH 5/6] Address review comments, fix tests --- .../tests/payload_invalidation.rs | 2 +- beacon_node/execution_layer/src/engines.rs | 4 +- beacon_node/execution_layer/src/lib.rs | 17 ++---- beacon_node/http_api/src/lib.rs | 54 ++++++++++--------- beacon_node/http_api/tests/status_tests.rs | 6 +-- beacon_node/http_api/tests/tests.rs | 3 +- 6 files changed, 42 insertions(+), 44 deletions(-) diff --git a/beacon_node/beacon_chain/tests/payload_invalidation.rs b/beacon_node/beacon_chain/tests/payload_invalidation.rs index 0566f593b20..f88c2ee6fdc 100644 --- a/beacon_node/beacon_chain/tests/payload_invalidation.rs +++ b/beacon_node/beacon_chain/tests/payload_invalidation.rs @@ -911,7 +911,7 @@ async fn invalid_after_optimistic_sync() { ); // EL status should still be online, no errors. - assert!(!rig.execution_layer().is_offline_or_erroring()); + assert!(!rig.execution_layer().is_offline_or_erroring().await); // Running fork choice is necessary since a block has been invalidated. rig.recompute_head().await; diff --git a/beacon_node/execution_layer/src/engines.rs b/beacon_node/execution_layer/src/engines.rs index 25fd21b5f37..362f5b0b2b4 100644 --- a/beacon_node/execution_layer/src/engines.rs +++ b/beacon_node/execution_layer/src/engines.rs @@ -239,8 +239,8 @@ impl Engine { } /// Returns `true` if the engine has a status other than synced or syncing. - pub fn is_offline_blocking(&self) -> bool { - EngineState::from(**self.state.blocking_read()) == EngineState::Offline + pub async fn is_offline(&self) -> bool { + EngineState::from(**self.state.read().await) == EngineState::Offline } /// Run the `EngineApi::upcheck` function if the node's last known state is not synced. This diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 0e6cb010370..19fa91b1296 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -553,8 +553,8 @@ impl ExecutionLayer { /// This function should never be used to prevent any operation in the beacon node, but can /// be used to give an indication on the HTTP API that the node's execution layer is struggling, /// which can in turn be used by the VC. - pub fn is_offline_or_erroring(&self) -> bool { - self.engine().is_offline_blocking() || *self.inner.last_new_payload_errored.blocking_read() + pub async fn is_offline_or_erroring(&self) -> bool { + self.engine().is_offline().await || *self.inner.last_new_payload_errored.read().await } /// Updates the proposer preparation data provided by validators @@ -1167,17 +1167,8 @@ impl ExecutionLayer { } /// Update engine sync status. - /// - /// This will sometimes perform 2 upchecks, the 2nd one asynchronously. - pub async fn upcheck(&self) -> Result<(), Error> { - self.engine() - .request(|engine| async { - engine.upcheck().await; - Ok(()) - }) - .await - .map_err(Box::new) - .map_err(Error::EngineError) + pub async fn upcheck(&self) { + self.engine().upcheck().await; } /// Register that the given `validator_index` is going to produce a block at `slot`. diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index a88a6cebfca..be1463f0c31 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -2285,34 +2285,40 @@ pub fn serve( .and(chain_filter.clone()) .and_then( |network_globals: Arc>, chain: Arc>| { - blocking_json_task(move || { - let head_slot = chain.canonical_head.cached_head().head_slot(); - let current_slot = chain.slot_clock.now_or_genesis().ok_or_else(|| { - warp_utils::reject::custom_server_error("Unable to read slot clock".into()) - })?; + async move { + let el_offline = if let Some(el) = &chain.execution_layer { + el.is_offline_or_erroring().await + } else { + true + }; - // Taking advantage of saturating subtraction on slot. - let sync_distance = current_slot - head_slot; + blocking_json_task(move || { + let head_slot = chain.canonical_head.cached_head().head_slot(); + let current_slot = chain.slot_clock.now_or_genesis().ok_or_else(|| { + warp_utils::reject::custom_server_error( + "Unable to read slot clock".into(), + ) + })?; - let is_optimistic = chain - .is_optimistic_or_invalid_head() - .map_err(warp_utils::reject::beacon_chain_error)?; + // Taking advantage of saturating subtraction on slot. + let sync_distance = current_slot - head_slot; - let el_offline = chain - .execution_layer - .as_ref() - .map_or(true, |el| el.is_offline_or_erroring()); - - let syncing_data = api_types::SyncingData { - is_syncing: network_globals.sync_state.read().is_syncing(), - is_optimistic: Some(is_optimistic), - el_offline: Some(el_offline), - head_slot, - sync_distance, - }; + let is_optimistic = chain + .is_optimistic_or_invalid_head() + .map_err(warp_utils::reject::beacon_chain_error)?; - Ok(api_types::GenericResponse::from(syncing_data)) - }) + let syncing_data = api_types::SyncingData { + is_syncing: network_globals.sync_state.read().is_syncing(), + is_optimistic: Some(is_optimistic), + el_offline: Some(el_offline), + head_slot, + sync_distance, + }; + + Ok(api_types::GenericResponse::from(syncing_data)) + }) + .await + } }, ); diff --git a/beacon_node/http_api/tests/status_tests.rs b/beacon_node/http_api/tests/status_tests.rs index 2f96adc4c96..ce725b75a9a 100644 --- a/beacon_node/http_api/tests/status_tests.rs +++ b/beacon_node/http_api/tests/status_tests.rs @@ -52,7 +52,7 @@ async fn el_syncing_then_synced() { // EL syncing mock_el.server.set_syncing_response(Ok(true)); - mock_el.el.upcheck().await.unwrap(); + mock_el.el.upcheck().await; let api_response = tester.client.get_node_syncing().await.unwrap().data; assert_eq!(api_response.el_offline, Some(false)); @@ -61,7 +61,7 @@ async fn el_syncing_then_synced() { // EL synced mock_el.server.set_syncing_response(Ok(false)); - mock_el.el.upcheck().await.unwrap(); + mock_el.el.upcheck().await; let api_response = tester.client.get_node_syncing().await.unwrap().data; assert_eq!(api_response.el_offline, Some(false)); @@ -80,7 +80,7 @@ async fn el_offline() { // EL offline mock_el.server.set_syncing_response(Err("offline".into())); - mock_el.el.upcheck().await.unwrap(); + mock_el.el.upcheck().await; let api_response = tester.client.get_node_syncing().await.unwrap().data; assert_eq!(api_response.el_offline, Some(true)); diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 46bfd3b5ea8..a6c49ddaeef 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -1721,7 +1721,8 @@ impl ApiTester { let expected = SyncingData { is_syncing: false, is_optimistic: Some(false), - el_offline: Some(false), + // these tests run without the Bellatrix fork enabled + el_offline: Some(true), head_slot, sync_distance, }; From a54f96a40ddafc36bc43575f071e3d38528542af Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Wed, 17 May 2023 11:19:46 +1000 Subject: [PATCH 6/6] Poll all BNs every slot --- validator_client/src/beacon_node_fallback.rs | 42 +++++++------------- validator_client/src/lib.rs | 4 +- 2 files changed, 17 insertions(+), 29 deletions(-) diff --git a/validator_client/src/beacon_node_fallback.rs b/validator_client/src/beacon_node_fallback.rs index 3e667429b4e..2cbab3b218c 100644 --- a/validator_client/src/beacon_node_fallback.rs +++ b/validator_client/src/beacon_node_fallback.rs @@ -28,7 +28,7 @@ const UPDATE_REQUIRED_LOG_HINT: &str = "this VC or the remote BN may need updati /// too early, we risk switching nodes between the time of publishing an attestation and publishing /// an aggregate; this may result in a missed aggregation. If we set this time too late, we risk not /// having the correct nodes up and running prior to the start of the slot. -const SLOT_LOOKAHEAD: Duration = Duration::from_secs(1); +const SLOT_LOOKAHEAD: Duration = Duration::from_secs(2); /// Indicates a measurement of latency between the VC and a BN. pub struct LatencyMeasurement { @@ -52,7 +52,7 @@ pub fn start_fallback_updater_service( let future = async move { loop { - beacon_nodes.update_unready_candidates().await; + beacon_nodes.update_all_candidates().await; let sleep_time = beacon_nodes .slot_clock @@ -385,33 +385,21 @@ impl BeaconNodeFallback { n } - /// Loop through any `self.candidates` that we don't think are online, compatible or synced and - /// poll them to see if their status has changed. + /// Loop through ALL candidates in `self.candidates` and update their sync status. /// - /// We do not poll nodes that are synced to avoid sending additional requests when everything is - /// going smoothly. - pub async fn update_unready_candidates(&self) { - let mut futures = Vec::new(); - for candidate in &self.candidates { - // There is a potential race condition between having the read lock and the write - // lock. The worst case of this race is running `try_become_ready` twice, which is - // acceptable. - // - // Note: `RequireSynced` is always set to false here. This forces us to recheck the sync - // status of nodes that were previously not-synced. - if candidate.status(RequireSynced::Yes).await.is_err() { - // There exists a race-condition that could result in `refresh_status` being called - // when the status does not require refreshing anymore. This is deemed an - // acceptable inefficiency. - futures.push(candidate.refresh_status( - self.slot_clock.as_ref(), - &self.spec, - &self.log, - )); - } - } + /// It is possible for a node to return an unsynced status while continuing to serve + /// low quality responses. To route around this it's best to poll all connected beacon nodes. + /// A previous implementation of this function polled only the unavailable BNs. + pub async fn update_all_candidates(&self) { + let futures = self + .candidates + .iter() + .map(|candidate| { + candidate.refresh_status(self.slot_clock.as_ref(), &self.spec, &self.log) + }) + .collect::>(); - //run all updates concurrently and ignore results + // run all updates concurrently and ignore errors let _ = future::join_all(futures).await; } diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index b59be561a1d..6563d2fea50 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -615,8 +615,8 @@ async fn init_from_beacon_node( context: &RuntimeContext, ) -> Result<(u64, Hash256), String> { loop { - beacon_nodes.update_unready_candidates().await; - proposer_nodes.update_unready_candidates().await; + beacon_nodes.update_all_candidates().await; + proposer_nodes.update_all_candidates().await; let num_available = beacon_nodes.num_available().await; let num_total = beacon_nodes.num_total();