Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Validator registration request failures do not cause us to mark BNs offline #3488

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 63 additions & 44 deletions validator_client/src/attestation_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::{
duties_service::{DutiesService, DutyAndProof},
http_metrics::metrics,
validator_store::ValidatorStore,
OfflineOnFailure,
};
use environment::RuntimeContext;
use futures::future::join_all;
Expand Down Expand Up @@ -337,17 +338,21 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {

let attestation_data = self
.beacon_nodes
.first_success(RequireSynced::No, |beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::ATTESTATIONS_HTTP_GET],
);
beacon_node
.get_validator_attestation_data(slot, committee_index)
.await
.map_err(|e| format!("Failed to produce attestation data: {:?}", e))
.map(|result| result.data)
})
.first_success(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::ATTESTATIONS_HTTP_GET],
);
beacon_node
.get_validator_attestation_data(slot, committee_index)
.await
.map_err(|e| format!("Failed to produce attestation data: {:?}", e))
.map(|result| result.data)
},
)
.await
.map_err(|e| e.to_string())?;

Expand Down Expand Up @@ -414,15 +419,19 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
// Post the attestations to the BN.
match self
.beacon_nodes
.first_success(RequireSynced::No, |beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::ATTESTATIONS_HTTP_POST],
);
beacon_node
.post_beacon_pool_attestations(attestations)
.await
})
.first_success(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::ATTESTATIONS_HTTP_POST],
);
beacon_node
.post_beacon_pool_attestations(attestations)
.await
},
)
.await
{
Ok(()) => info!(
Expand Down Expand Up @@ -470,21 +479,27 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {

let aggregated_attestation = &self
.beacon_nodes
.first_success(RequireSynced::No, |beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::AGGREGATES_HTTP_GET],
);
beacon_node
.get_validator_aggregate_attestation(
attestation_data.slot,
attestation_data.tree_hash_root(),
)
.await
.map_err(|e| format!("Failed to produce an aggregate attestation: {:?}", e))?
.ok_or_else(|| format!("No aggregate available for {:?}", attestation_data))
.map(|result| result.data)
})
.first_success(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::AGGREGATES_HTTP_GET],
);
beacon_node
.get_validator_aggregate_attestation(
attestation_data.slot,
attestation_data.tree_hash_root(),
)
.await
.map_err(|e| {
format!("Failed to produce an aggregate attestation: {:?}", e)
})?
.ok_or_else(|| format!("No aggregate available for {:?}", attestation_data))
.map(|result| result.data)
},
)
.await
.map_err(|e| e.to_string())?;

Expand Down Expand Up @@ -535,15 +550,19 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
let signed_aggregate_and_proofs_slice = signed_aggregate_and_proofs.as_slice();
match self
.beacon_nodes
.first_success(RequireSynced::No, |beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::AGGREGATES_HTTP_POST],
);
beacon_node
.post_validator_aggregate_and_proof(signed_aggregate_and_proofs_slice)
.await
})
.first_success(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::AGGREGATES_HTTP_POST],
);
beacon_node
.post_validator_aggregate_and_proof(signed_aggregate_and_proofs_slice)
.await
},
)
.await
{
Ok(()) => {
Expand Down
12 changes: 11 additions & 1 deletion validator_client/src/beacon_node_fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ pub enum RequireSynced {
No,
}

/// Indicates if a beacon node should be set to `Offline` if a request fails.
#[derive(PartialEq, Clone, Copy)]
pub enum OfflineOnFailure {
Yes,
No,
}

impl PartialEq<bool> for RequireSynced {
fn eq(&self, other: &bool) -> bool {
if *other {
Expand Down Expand Up @@ -387,6 +394,7 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
pub async fn first_success<'a, F, O, Err, R>(
&'a self,
require_synced: RequireSynced,
offline_on_failure: OfflineOnFailure,
func: F,
) -> Result<O, AllErrored<Err>>
where
Expand Down Expand Up @@ -415,7 +423,9 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
// There exists a race condition where the candidate may have been marked
// as ready between the `func` call and now. We deem this an acceptable
// inefficiency.
$candidate.set_offline().await;
if matches!(offline_on_failure, OfflineOnFailure::Yes) {
$candidate.set_offline().await;
}
errors.push(($candidate.beacon_node.to_string(), Error::RequestFailed(e)));
inc_counter_vec(&ENDPOINT_ERRORS, &[$candidate.beacon_node.as_ref()]);
}
Expand Down
197 changes: 103 additions & 94 deletions validator_client/src/block_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::beacon_node_fallback::{AllErrored, Error as FallbackError};
use crate::{
beacon_node_fallback::{BeaconNodeFallback, RequireSynced},
graffiti_file::GraffitiFile,
OfflineOnFailure,
};
use crate::{http_metrics::metrics, validator_store::ValidatorStore};
use environment::RuntimeContext;
Expand Down Expand Up @@ -329,70 +330,74 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
// Request block from first responsive beacon node.
let block = self
.beacon_nodes
.first_success(RequireSynced::No, |beacon_node| async move {
let block = match Payload::block_type() {
BlockType::Full => {
let _get_timer = metrics::start_timer_vec(
&metrics::BLOCK_SERVICE_TIMES,
&[metrics::BEACON_BLOCK_HTTP_GET],
);
beacon_node
.get_validator_blocks::<E, Payload>(
slot,
randao_reveal_ref,
graffiti.as_ref(),
)
.await
.map_err(|e| {
BlockError::Recoverable(format!(
"Error from beacon node when producing block: {:?}",
e
))
})?
.data
}
BlockType::Blinded => {
let _get_timer = metrics::start_timer_vec(
&metrics::BLOCK_SERVICE_TIMES,
&[metrics::BLINDED_BEACON_BLOCK_HTTP_GET],
);
beacon_node
.get_validator_blinded_blocks::<E, Payload>(
slot,
randao_reveal_ref,
graffiti.as_ref(),
)
.await
.map_err(|e| {
BlockError::Recoverable(format!(
"Error from beacon node when producing block: {:?}",
e
))
})?
.data
}
};

// Ensure the correctness of the execution payload's fee recipient.
if strict_fee_recipient {
if let Ok(execution_payload) = block.body().execution_payload() {
if Some(execution_payload.fee_recipient()) != fee_recipient {
return Err(BlockError::Recoverable(
"Incorrect fee recipient used by builder".to_string(),
));
.first_success(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async move {
let block = match Payload::block_type() {
BlockType::Full => {
let _get_timer = metrics::start_timer_vec(
&metrics::BLOCK_SERVICE_TIMES,
&[metrics::BEACON_BLOCK_HTTP_GET],
);
beacon_node
.get_validator_blocks::<E, Payload>(
slot,
randao_reveal_ref,
graffiti.as_ref(),
)
.await
.map_err(|e| {
BlockError::Recoverable(format!(
"Error from beacon node when producing block: {:?}",
e
))
})?
.data
}
BlockType::Blinded => {
let _get_timer = metrics::start_timer_vec(
&metrics::BLOCK_SERVICE_TIMES,
&[metrics::BLINDED_BEACON_BLOCK_HTTP_GET],
);
beacon_node
.get_validator_blinded_blocks::<E, Payload>(
slot,
randao_reveal_ref,
graffiti.as_ref(),
)
.await
.map_err(|e| {
BlockError::Recoverable(format!(
"Error from beacon node when producing block: {:?}",
e
))
})?
.data
}
};

// Ensure the correctness of the execution payload's fee recipient.
if strict_fee_recipient {
if let Ok(execution_payload) = block.body().execution_payload() {
if Some(execution_payload.fee_recipient()) != fee_recipient {
return Err(BlockError::Recoverable(
"Incorrect fee recipient used by builder".to_string(),
));
}
}
}
}

if proposer_index != Some(block.proposer_index()) {
return Err(BlockError::Recoverable(
"Proposer index does not match block proposer. Beacon chain re-orged"
.to_string(),
));
}
if proposer_index != Some(block.proposer_index()) {
return Err(BlockError::Recoverable(
"Proposer index does not match block proposer. Beacon chain re-orged"
.to_string(),
));
}

Ok::<_, BlockError>(block)
})
Ok::<_, BlockError>(block)
},
)
.await?;

let signed_block = self_ref
Expand All @@ -403,41 +408,45 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {

// Publish block with first available beacon node.
self.beacon_nodes
.first_success(RequireSynced::No, |beacon_node| async {
match Payload::block_type() {
BlockType::Full => {
let _post_timer = metrics::start_timer_vec(
&metrics::BLOCK_SERVICE_TIMES,
&[metrics::BEACON_BLOCK_HTTP_POST],
);
beacon_node
.post_beacon_blocks(&signed_block)
.await
.map_err(|e| {
BlockError::Irrecoverable(format!(
"Error from beacon node when publishing block: {:?}",
e
))
})?
}
BlockType::Blinded => {
let _post_timer = metrics::start_timer_vec(
&metrics::BLOCK_SERVICE_TIMES,
&[metrics::BLINDED_BEACON_BLOCK_HTTP_POST],
);
beacon_node
.post_beacon_blinded_blocks(&signed_block)
.await
.map_err(|e| {
BlockError::Irrecoverable(format!(
"Error from beacon node when publishing block: {:?}",
e
))
})?
.first_success(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async {
match Payload::block_type() {
BlockType::Full => {
let _post_timer = metrics::start_timer_vec(
&metrics::BLOCK_SERVICE_TIMES,
&[metrics::BEACON_BLOCK_HTTP_POST],
);
beacon_node
.post_beacon_blocks(&signed_block)
.await
.map_err(|e| {
BlockError::Irrecoverable(format!(
"Error from beacon node when publishing block: {:?}",
e
))
})?
}
BlockType::Blinded => {
let _post_timer = metrics::start_timer_vec(
&metrics::BLOCK_SERVICE_TIMES,
&[metrics::BLINDED_BEACON_BLOCK_HTTP_POST],
);
beacon_node
.post_beacon_blinded_blocks(&signed_block)
.await
.map_err(|e| {
BlockError::Irrecoverable(format!(
"Error from beacon node when publishing block: {:?}",
e
))
})?
}
}
}
Ok::<_, BlockError>(())
})
Ok::<_, BlockError>(())
},
)
.await?;

info!(
Expand Down
Loading