From e09e112bfc1c53dda9fa6286acd4bf825d0b3bff Mon Sep 17 00:00:00 2001 From: realbigsean Date: Fri, 19 Aug 2022 15:18:01 -0400 Subject: [PATCH] add `OfflineOnFailure` enum to pass into bn query fallback function. So we can separate requests that do/don't indicate the beacon node is offline --- validator_client/src/attestation_service.rs | 107 ++++++---- validator_client/src/beacon_node_fallback.rs | 12 +- validator_client/src/block_service.rs | 197 +++++++++--------- validator_client/src/doppelganger_service.rs | 37 ++-- validator_client/src/duties_service.rs | 96 +++++---- validator_client/src/duties_service/sync.rs | 15 +- validator_client/src/lib.rs | 19 +- validator_client/src/preparation_service.rs | 29 ++- .../src/sync_committee_service.rs | 70 ++++--- 9 files changed, 340 insertions(+), 242 deletions(-) diff --git a/validator_client/src/attestation_service.rs b/validator_client/src/attestation_service.rs index cdc9b88f688..a7118aa945c 100644 --- a/validator_client/src/attestation_service.rs +++ b/validator_client/src/attestation_service.rs @@ -3,6 +3,7 @@ use crate::{ duties_service::{DutiesService, DutyAndProof}, http_metrics::metrics, validator_store::ValidatorStore, + OfflineOnFailure, }; use environment::RuntimeContext; use futures::future::join_all; @@ -337,17 +338,21 @@ impl AttestationService { let attestation_data = self .beacon_nodes - .first_success(RequireSynced::No, |beacon_node| async move { - let _timer = metrics::start_timer_vec( - &metrics::ATTESTATION_SERVICE_TIMES, - &[metrics::ATTESTATIONS_HTTP_GET], - ); - beacon_node - .get_validator_attestation_data(slot, committee_index) - .await - .map_err(|e| format!("Failed to produce attestation data: {:?}", e)) - .map(|result| result.data) - }) + .first_success( + RequireSynced::No, + OfflineOnFailure::Yes, + |beacon_node| async move { + let _timer = metrics::start_timer_vec( + &metrics::ATTESTATION_SERVICE_TIMES, + &[metrics::ATTESTATIONS_HTTP_GET], + ); + beacon_node + .get_validator_attestation_data(slot, committee_index) + .await + .map_err(|e| format!("Failed to produce attestation data: {:?}", e)) + .map(|result| result.data) + }, + ) .await .map_err(|e| e.to_string())?; @@ -414,15 +419,19 @@ impl AttestationService { // Post the attestations to the BN. match self .beacon_nodes - .first_success(RequireSynced::No, |beacon_node| async move { - let _timer = metrics::start_timer_vec( - &metrics::ATTESTATION_SERVICE_TIMES, - &[metrics::ATTESTATIONS_HTTP_POST], - ); - beacon_node - .post_beacon_pool_attestations(attestations) - .await - }) + .first_success( + RequireSynced::No, + OfflineOnFailure::Yes, + |beacon_node| async move { + let _timer = metrics::start_timer_vec( + &metrics::ATTESTATION_SERVICE_TIMES, + &[metrics::ATTESTATIONS_HTTP_POST], + ); + beacon_node + .post_beacon_pool_attestations(attestations) + .await + }, + ) .await { Ok(()) => info!( @@ -470,21 +479,27 @@ impl AttestationService { let aggregated_attestation = &self .beacon_nodes - .first_success(RequireSynced::No, |beacon_node| async move { - let _timer = metrics::start_timer_vec( - &metrics::ATTESTATION_SERVICE_TIMES, - &[metrics::AGGREGATES_HTTP_GET], - ); - beacon_node - .get_validator_aggregate_attestation( - attestation_data.slot, - attestation_data.tree_hash_root(), - ) - .await - .map_err(|e| format!("Failed to produce an aggregate attestation: {:?}", e))? - .ok_or_else(|| format!("No aggregate available for {:?}", attestation_data)) - .map(|result| result.data) - }) + .first_success( + RequireSynced::No, + OfflineOnFailure::Yes, + |beacon_node| async move { + let _timer = metrics::start_timer_vec( + &metrics::ATTESTATION_SERVICE_TIMES, + &[metrics::AGGREGATES_HTTP_GET], + ); + beacon_node + .get_validator_aggregate_attestation( + attestation_data.slot, + attestation_data.tree_hash_root(), + ) + .await + .map_err(|e| { + format!("Failed to produce an aggregate attestation: {:?}", e) + })? + .ok_or_else(|| format!("No aggregate available for {:?}", attestation_data)) + .map(|result| result.data) + }, + ) .await .map_err(|e| e.to_string())?; @@ -535,15 +550,19 @@ impl AttestationService { let signed_aggregate_and_proofs_slice = signed_aggregate_and_proofs.as_slice(); match self .beacon_nodes - .first_success(RequireSynced::No, |beacon_node| async move { - let _timer = metrics::start_timer_vec( - &metrics::ATTESTATION_SERVICE_TIMES, - &[metrics::AGGREGATES_HTTP_POST], - ); - beacon_node - .post_validator_aggregate_and_proof(signed_aggregate_and_proofs_slice) - .await - }) + .first_success( + RequireSynced::No, + OfflineOnFailure::Yes, + |beacon_node| async move { + let _timer = metrics::start_timer_vec( + &metrics::ATTESTATION_SERVICE_TIMES, + &[metrics::AGGREGATES_HTTP_POST], + ); + beacon_node + .post_validator_aggregate_and_proof(signed_aggregate_and_proofs_slice) + .await + }, + ) .await { Ok(()) => { diff --git a/validator_client/src/beacon_node_fallback.rs b/validator_client/src/beacon_node_fallback.rs index 0b808e71bb6..df6c949aef0 100644 --- a/validator_client/src/beacon_node_fallback.rs +++ b/validator_client/src/beacon_node_fallback.rs @@ -70,6 +70,13 @@ pub enum RequireSynced { No, } +/// Indicates if a beacon node should be set to `Offline` if a request fails. +#[derive(PartialEq, Clone, Copy)] +pub enum OfflineOnFailure { + Yes, + No, +} + impl PartialEq for RequireSynced { fn eq(&self, other: &bool) -> bool { if *other { @@ -387,6 +394,7 @@ impl BeaconNodeFallback { pub async fn first_success<'a, F, O, Err, R>( &'a self, require_synced: RequireSynced, + offline_on_failure: OfflineOnFailure, func: F, ) -> Result> where @@ -415,7 +423,9 @@ impl BeaconNodeFallback { // There exists a race condition where the candidate may have been marked // as ready between the `func` call and now. We deem this an acceptable // inefficiency. - $candidate.set_offline().await; + if matches!(offline_on_failure, OfflineOnFailure::Yes) { + $candidate.set_offline().await; + } errors.push(($candidate.beacon_node.to_string(), Error::RequestFailed(e))); inc_counter_vec(&ENDPOINT_ERRORS, &[$candidate.beacon_node.as_ref()]); } diff --git a/validator_client/src/block_service.rs b/validator_client/src/block_service.rs index d47546eb0d4..2a8d1642258 100644 --- a/validator_client/src/block_service.rs +++ b/validator_client/src/block_service.rs @@ -2,6 +2,7 @@ use crate::beacon_node_fallback::{AllErrored, Error as FallbackError}; use crate::{ beacon_node_fallback::{BeaconNodeFallback, RequireSynced}, graffiti_file::GraffitiFile, + OfflineOnFailure, }; use crate::{http_metrics::metrics, validator_store::ValidatorStore}; use environment::RuntimeContext; @@ -329,70 +330,74 @@ impl BlockService { // Request block from first responsive beacon node. let block = self .beacon_nodes - .first_success(RequireSynced::No, |beacon_node| async move { - let block = match Payload::block_type() { - BlockType::Full => { - let _get_timer = metrics::start_timer_vec( - &metrics::BLOCK_SERVICE_TIMES, - &[metrics::BEACON_BLOCK_HTTP_GET], - ); - beacon_node - .get_validator_blocks::( - slot, - randao_reveal_ref, - graffiti.as_ref(), - ) - .await - .map_err(|e| { - BlockError::Recoverable(format!( - "Error from beacon node when producing block: {:?}", - e - )) - })? - .data - } - BlockType::Blinded => { - let _get_timer = metrics::start_timer_vec( - &metrics::BLOCK_SERVICE_TIMES, - &[metrics::BLINDED_BEACON_BLOCK_HTTP_GET], - ); - beacon_node - .get_validator_blinded_blocks::( - slot, - randao_reveal_ref, - graffiti.as_ref(), - ) - .await - .map_err(|e| { - BlockError::Recoverable(format!( - "Error from beacon node when producing block: {:?}", - e - )) - })? - .data - } - }; - - // Ensure the correctness of the execution payload's fee recipient. - if strict_fee_recipient { - if let Ok(execution_payload) = block.body().execution_payload() { - if Some(execution_payload.fee_recipient()) != fee_recipient { - return Err(BlockError::Recoverable( - "Incorrect fee recipient used by builder".to_string(), - )); + .first_success( + RequireSynced::No, + OfflineOnFailure::Yes, + |beacon_node| async move { + let block = match Payload::block_type() { + BlockType::Full => { + let _get_timer = metrics::start_timer_vec( + &metrics::BLOCK_SERVICE_TIMES, + &[metrics::BEACON_BLOCK_HTTP_GET], + ); + beacon_node + .get_validator_blocks::( + slot, + randao_reveal_ref, + graffiti.as_ref(), + ) + .await + .map_err(|e| { + BlockError::Recoverable(format!( + "Error from beacon node when producing block: {:?}", + e + )) + })? + .data + } + BlockType::Blinded => { + let _get_timer = metrics::start_timer_vec( + &metrics::BLOCK_SERVICE_TIMES, + &[metrics::BLINDED_BEACON_BLOCK_HTTP_GET], + ); + beacon_node + .get_validator_blinded_blocks::( + slot, + randao_reveal_ref, + graffiti.as_ref(), + ) + .await + .map_err(|e| { + BlockError::Recoverable(format!( + "Error from beacon node when producing block: {:?}", + e + )) + })? + .data + } + }; + + // Ensure the correctness of the execution payload's fee recipient. + if strict_fee_recipient { + if let Ok(execution_payload) = block.body().execution_payload() { + if Some(execution_payload.fee_recipient()) != fee_recipient { + return Err(BlockError::Recoverable( + "Incorrect fee recipient used by builder".to_string(), + )); + } } } - } - if proposer_index != Some(block.proposer_index()) { - return Err(BlockError::Recoverable( - "Proposer index does not match block proposer. Beacon chain re-orged" - .to_string(), - )); - } + if proposer_index != Some(block.proposer_index()) { + return Err(BlockError::Recoverable( + "Proposer index does not match block proposer. Beacon chain re-orged" + .to_string(), + )); + } - Ok::<_, BlockError>(block) - }) + Ok::<_, BlockError>(block) + }, + ) .await?; let signed_block = self_ref @@ -403,41 +408,45 @@ impl BlockService { // Publish block with first available beacon node. self.beacon_nodes - .first_success(RequireSynced::No, |beacon_node| async { - match Payload::block_type() { - BlockType::Full => { - let _post_timer = metrics::start_timer_vec( - &metrics::BLOCK_SERVICE_TIMES, - &[metrics::BEACON_BLOCK_HTTP_POST], - ); - beacon_node - .post_beacon_blocks(&signed_block) - .await - .map_err(|e| { - BlockError::Irrecoverable(format!( - "Error from beacon node when publishing block: {:?}", - e - )) - })? - } - BlockType::Blinded => { - let _post_timer = metrics::start_timer_vec( - &metrics::BLOCK_SERVICE_TIMES, - &[metrics::BLINDED_BEACON_BLOCK_HTTP_POST], - ); - beacon_node - .post_beacon_blinded_blocks(&signed_block) - .await - .map_err(|e| { - BlockError::Irrecoverable(format!( - "Error from beacon node when publishing block: {:?}", - e - )) - })? + .first_success( + RequireSynced::No, + OfflineOnFailure::Yes, + |beacon_node| async { + match Payload::block_type() { + BlockType::Full => { + let _post_timer = metrics::start_timer_vec( + &metrics::BLOCK_SERVICE_TIMES, + &[metrics::BEACON_BLOCK_HTTP_POST], + ); + beacon_node + .post_beacon_blocks(&signed_block) + .await + .map_err(|e| { + BlockError::Irrecoverable(format!( + "Error from beacon node when publishing block: {:?}", + e + )) + })? + } + BlockType::Blinded => { + let _post_timer = metrics::start_timer_vec( + &metrics::BLOCK_SERVICE_TIMES, + &[metrics::BLINDED_BEACON_BLOCK_HTTP_POST], + ); + beacon_node + .post_beacon_blinded_blocks(&signed_block) + .await + .map_err(|e| { + BlockError::Irrecoverable(format!( + "Error from beacon node when publishing block: {:?}", + e + )) + })? + } } - } - Ok::<_, BlockError>(()) - }) + Ok::<_, BlockError>(()) + }, + ) .await?; info!( diff --git a/validator_client/src/doppelganger_service.rs b/validator_client/src/doppelganger_service.rs index 9e134f94da0..e6934ed48b0 100644 --- a/validator_client/src/doppelganger_service.rs +++ b/validator_client/src/doppelganger_service.rs @@ -31,6 +31,7 @@ use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced}; use crate::validator_store::ValidatorStore; +use crate::OfflineOnFailure; use environment::RuntimeContext; use eth2::types::LivenessResponseData; use parking_lot::RwLock; @@ -176,13 +177,17 @@ async fn beacon_node_liveness<'a, T: 'static + SlotClock, E: EthSpec>( } else { // Request the previous epoch liveness state from the beacon node. beacon_nodes - .first_success(RequireSynced::Yes, |beacon_node| async move { - beacon_node - .post_lighthouse_liveness(validator_indices, previous_epoch) - .await - .map_err(|e| format!("Failed query for validator liveness: {:?}", e)) - .map(|result| result.data) - }) + .first_success( + RequireSynced::Yes, + OfflineOnFailure::Yes, + |beacon_node| async move { + beacon_node + .post_lighthouse_liveness(validator_indices, previous_epoch) + .await + .map_err(|e| format!("Failed query for validator liveness: {:?}", e)) + .map(|result| result.data) + }, + ) .await .unwrap_or_else(|e| { crit!( @@ -199,13 +204,17 @@ async fn beacon_node_liveness<'a, T: 'static + SlotClock, E: EthSpec>( // Request the current epoch liveness state from the beacon node. let current_epoch_responses = beacon_nodes - .first_success(RequireSynced::Yes, |beacon_node| async move { - beacon_node - .post_lighthouse_liveness(validator_indices, current_epoch) - .await - .map_err(|e| format!("Failed query for validator liveness: {:?}", e)) - .map(|result| result.data) - }) + .first_success( + RequireSynced::Yes, + OfflineOnFailure::Yes, + |beacon_node| async move { + beacon_node + .post_lighthouse_liveness(validator_indices, current_epoch) + .await + .map_err(|e| format!("Failed query for validator liveness: {:?}", e)) + .map(|result| result.data) + }, + ) .await .unwrap_or_else(|e| { crit!( diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index f8ca5a3d44a..3e15b39ab61 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -8,7 +8,7 @@ mod sync; -use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced}; +use crate::beacon_node_fallback::{BeaconNodeFallback, OfflineOnFailure, RequireSynced}; use crate::{ block_service::BlockServiceNotification, http_metrics::metrics, @@ -382,18 +382,22 @@ async fn poll_validator_indices( // Query the remote BN to resolve a pubkey to a validator index. let download_result = duties_service .beacon_nodes - .first_success(duties_service.require_synced, |beacon_node| async move { - let _timer = metrics::start_timer_vec( - &metrics::DUTIES_SERVICE_TIMES, - &[metrics::VALIDATOR_ID_HTTP_GET], - ); - beacon_node - .get_beacon_states_validator_id( - StateId::Head, - &ValidatorId::PublicKey(pubkey), - ) - .await - }) + .first_success( + duties_service.require_synced, + OfflineOnFailure::Yes, + |beacon_node| async move { + let _timer = metrics::start_timer_vec( + &metrics::DUTIES_SERVICE_TIMES, + &[metrics::VALIDATOR_ID_HTTP_GET], + ); + beacon_node + .get_beacon_states_validator_id( + StateId::Head, + &ValidatorId::PublicKey(pubkey), + ) + .await + }, + ) .await; match download_result { @@ -559,15 +563,19 @@ async fn poll_beacon_attesters( let subscriptions_ref = &subscriptions; if let Err(e) = duties_service .beacon_nodes - .first_success(duties_service.require_synced, |beacon_node| async move { - let _timer = metrics::start_timer_vec( - &metrics::DUTIES_SERVICE_TIMES, - &[metrics::SUBSCRIPTIONS_HTTP_POST], - ); - beacon_node - .post_validator_beacon_committee_subscriptions(subscriptions_ref) - .await - }) + .first_success( + duties_service.require_synced, + OfflineOnFailure::Yes, + |beacon_node| async move { + let _timer = metrics::start_timer_vec( + &metrics::DUTIES_SERVICE_TIMES, + &[metrics::SUBSCRIPTIONS_HTTP_POST], + ); + beacon_node + .post_validator_beacon_committee_subscriptions(subscriptions_ref) + .await + }, + ) .await { error!( @@ -619,15 +627,19 @@ async fn poll_beacon_attesters_for_epoch( let response = duties_service .beacon_nodes - .first_success(duties_service.require_synced, |beacon_node| async move { - let _timer = metrics::start_timer_vec( - &metrics::DUTIES_SERVICE_TIMES, - &[metrics::ATTESTER_DUTIES_HTTP_POST], - ); - beacon_node - .post_validator_duties_attester(epoch, local_indices) - .await - }) + .first_success( + duties_service.require_synced, + OfflineOnFailure::Yes, + |beacon_node| async move { + let _timer = metrics::start_timer_vec( + &metrics::DUTIES_SERVICE_TIMES, + &[metrics::ATTESTER_DUTIES_HTTP_POST], + ); + beacon_node + .post_validator_duties_attester(epoch, local_indices) + .await + }, + ) .await .map_err(|e| Error::FailedToDownloadAttesters(e.to_string()))?; @@ -779,15 +791,19 @@ async fn poll_beacon_proposers( if !local_pubkeys.is_empty() { let download_result = duties_service .beacon_nodes - .first_success(duties_service.require_synced, |beacon_node| async move { - let _timer = metrics::start_timer_vec( - &metrics::DUTIES_SERVICE_TIMES, - &[metrics::PROPOSER_DUTIES_HTTP_GET], - ); - beacon_node - .get_validator_duties_proposer(current_epoch) - .await - }) + .first_success( + duties_service.require_synced, + OfflineOnFailure::Yes, + |beacon_node| async move { + let _timer = metrics::start_timer_vec( + &metrics::DUTIES_SERVICE_TIMES, + &[metrics::PROPOSER_DUTIES_HTTP_GET], + ); + beacon_node + .get_validator_duties_proposer(current_epoch) + .await + }, + ) .await; match download_result { diff --git a/validator_client/src/duties_service/sync.rs b/validator_client/src/duties_service/sync.rs index 9857561c48f..b9d4d703065 100644 --- a/validator_client/src/duties_service/sync.rs +++ b/validator_client/src/duties_service/sync.rs @@ -1,3 +1,4 @@ +use crate::beacon_node_fallback::OfflineOnFailure; use crate::{ doppelganger_service::DoppelgangerStatus, duties_service::{DutiesService, Error}, @@ -420,11 +421,15 @@ pub async fn poll_sync_committee_duties_for_period( let genesis = loop { match beacon_nodes - .first_success(RequireSynced::No, |node| async move { - node.get_beacon_genesis().await - }) + .first_success( + RequireSynced::No, + OfflineOnFailure::Yes, + |node| async move { node.get_beacon_genesis().await }, + ) .await { Ok(genesis) => break genesis.data, @@ -659,9 +662,11 @@ async fn poll_whilst_waiting_for_genesis( ) -> Result<(), String> { loop { match beacon_nodes - .first_success(RequireSynced::No, |beacon_node| async move { - beacon_node.get_lighthouse_staking().await - }) + .first_success( + RequireSynced::No, + OfflineOnFailure::Yes, + |beacon_node| async move { beacon_node.get_lighthouse_staking().await }, + ) .await { Ok(is_staking) => { diff --git a/validator_client/src/preparation_service.rs b/validator_client/src/preparation_service.rs index 6dc8e7d56ef..d4178f2c48e 100644 --- a/validator_client/src/preparation_service.rs +++ b/validator_client/src/preparation_service.rs @@ -1,9 +1,10 @@ use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced}; use crate::validator_store::{DoppelgangerStatus, ValidatorStore}; +use crate::OfflineOnFailure; use bls::PublicKeyBytes; use environment::RuntimeContext; use parking_lot::RwLock; -use slog::{debug, error, info}; +use slog::{debug, error, info, warn}; use slot_clock::SlotClock; use std::collections::HashMap; use std::hash::Hash; @@ -330,11 +331,15 @@ impl PreparationService { let preparation_entries = preparation_data.as_slice(); match self .beacon_nodes - .first_success(RequireSynced::Yes, |beacon_node| async move { - beacon_node - .post_validator_prepare_beacon_proposer(preparation_entries) - .await - }) + .first_success( + RequireSynced::Yes, + OfflineOnFailure::Yes, + |beacon_node| async move { + beacon_node + .post_validator_prepare_beacon_proposer(preparation_entries) + .await + }, + ) .await { Ok(()) => debug!( @@ -445,9 +450,13 @@ impl PreparationService { for batch in signed.chunks(VALIDATOR_REGISTRATION_BATCH_SIZE) { match self .beacon_nodes - .first_success(RequireSynced::Yes, |beacon_node| async move { - beacon_node.post_validator_register_validator(batch).await - }) + .first_success( + RequireSynced::Yes, + OfflineOnFailure::No, + |beacon_node| async move { + beacon_node.post_validator_register_validator(batch).await + }, + ) .await { Ok(()) => info!( @@ -455,7 +464,7 @@ impl PreparationService { "Published validator registrations to the builder network"; "count" => registration_data_len, ), - Err(e) => error!( + Err(e) => warn!( log, "Unable to publish validator registrations to the builder network"; "error" => %e, diff --git a/validator_client/src/sync_committee_service.rs b/validator_client/src/sync_committee_service.rs index 73d0066f20a..1e6ff7a5b5f 100644 --- a/validator_client/src/sync_committee_service.rs +++ b/validator_client/src/sync_committee_service.rs @@ -1,5 +1,5 @@ use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced}; -use crate::{duties_service::DutiesService, validator_store::ValidatorStore}; +use crate::{duties_service::DutiesService, validator_store::ValidatorStore, OfflineOnFailure}; use environment::RuntimeContext; use eth2::types::BlockId; use futures::future::join_all; @@ -177,7 +177,7 @@ impl SyncCommitteeService { // Fetch `block_root` and `execution_optimistic` for `SyncCommitteeContribution`. let response = self .beacon_nodes - .first_success(RequireSynced::Yes, |beacon_node| async move { + .first_success(RequireSynced::Yes, OfflineOnFailure::Yes,|beacon_node| async move { beacon_node.get_beacon_blocks_root(BlockId::Head).await }) .await @@ -284,11 +284,15 @@ impl SyncCommitteeService { .collect::>(); self.beacon_nodes - .first_success(RequireSynced::No, |beacon_node| async move { - beacon_node - .post_beacon_pool_sync_committee_signatures(committee_signatures) - .await - }) + .first_success( + RequireSynced::No, + OfflineOnFailure::Yes, + |beacon_node| async move { + beacon_node + .post_beacon_pool_sync_committee_signatures(committee_signatures) + .await + }, + ) .await .map_err(|e| { error!( @@ -351,17 +355,21 @@ impl SyncCommitteeService { let contribution = &self .beacon_nodes - .first_success(RequireSynced::No, |beacon_node| async move { - let sync_contribution_data = SyncContributionData { - slot, - beacon_block_root, - subcommittee_index: subnet_id.into(), - }; + .first_success( + RequireSynced::No, + OfflineOnFailure::Yes, + |beacon_node| async move { + let sync_contribution_data = SyncContributionData { + slot, + beacon_block_root, + subcommittee_index: subnet_id.into(), + }; - beacon_node - .get_validator_sync_committee_contribution::(&sync_contribution_data) - .await - }) + beacon_node + .get_validator_sync_committee_contribution::(&sync_contribution_data) + .await + }, + ) .await .map_err(|e| { crit!( @@ -418,11 +426,15 @@ impl SyncCommitteeService { // Publish to the beacon node. self.beacon_nodes - .first_success(RequireSynced::No, |beacon_node| async move { - beacon_node - .post_validator_contribution_and_proofs(signed_contributions) - .await - }) + .first_success( + RequireSynced::No, + OfflineOnFailure::Yes, + |beacon_node| async move { + beacon_node + .post_validator_contribution_and_proofs(signed_contributions) + .await + }, + ) .await .map_err(|e| { error!( @@ -556,11 +568,15 @@ impl SyncCommitteeService { if let Err(e) = self .beacon_nodes - .first_success(RequireSynced::No, |beacon_node| async move { - beacon_node - .post_validator_sync_committee_subscriptions(subscriptions_slice) - .await - }) + .first_success( + RequireSynced::No, + OfflineOnFailure::Yes, + |beacon_node| async move { + beacon_node + .post_validator_sync_committee_subscriptions(subscriptions_slice) + .await + }, + ) .await { error!(