Skip to content

Commit

Permalink
Rework Validator Client fallback mechanism (#4393)
Browse files Browse the repository at this point in the history
* Rework Validator Client fallback mechanism

* Add CI workflow for fallback simulator

* Tie-break with sync distance for non-synced nodes

* Fix simulator

* Cleanup unused code

* More improvements

* Add IsOptimistic enum for readability

* Use configurable sync distance tiers

* Fix tests

* Combine status and health and improve logging

* Fix nodes not being marked as available

* Fix simulator

* Fix tests again

* Increase fallback simulator tolerance

* Add http api endpoint

* Fix todos and tests

* Update simulator

* Merge branch 'unstable' into vc-fallback

* Add suggestions

* Add id to ui endpoint

* Remove unnecessary clones

* Formatting

* Merge branch 'unstable' into vc-fallback

* Merge branch 'unstable' into vc-fallback

* Fix flag tests

* Merge branch 'unstable' into vc-fallback

* Merge branch 'unstable' into vc-fallback

* Fix conflicts

* Merge branch 'unstable' into vc-fallback

* Remove unnecessary pubs

* Simplify `compute_distance_tier` and reduce notifier awaits

* Use the more descriptive `user_index` instead of `id`

* Combine sync distance tolerance flags into one

* Merge branch 'unstable' into vc-fallback

* Merge branch 'unstable' into vc-fallback

* wip

* Use new simulator from unstable

* Fix cli text

* Remove leftover files

* Remove old commented code

* Merge branch 'unstable' into vc-fallback

* Update cli text

* Silence candidate errors when pre-genesis

* Merge branch 'unstable' into vc-fallback

* Merge branch 'unstable' into vc-fallback

* Retry on failure

* Merge branch 'unstable' into vc-fallback

* Merge branch 'unstable' into vc-fallback

* Remove disable_run_on_all

* Remove unused error variant

* Fix out of date comment

* Merge branch 'unstable' into vc-fallback

* Remove unnecessary as_u64

* Remove more out of date comments

* Use tokio RwLock and remove parking_lot

* Merge branch 'unstable' into vc-fallback

* Formatting

* Ensure nodes are still added to total when not available

* Allow VC to detect when BN comes online

* Fix ui endpoint

* Don't have block_service as an Option

* Merge branch 'unstable' into vc-fallback

* Clean up lifetimes and futures

* Revert "Don't have block_service as an Option"

This reverts commit b5445a0.

* Merge branch 'unstable' into vc-fallback

* Merge branch 'unstable' into vc-fallback

* Improve rwlock sanitation using clones

* Merge branch 'unstable' into vc-fallback

* Drop read lock immediately by cloning the vec.
  • Loading branch information
macladson authored Oct 3, 2024
1 parent 17849b5 commit f870b66
Show file tree
Hide file tree
Showing 24 changed files with 1,314 additions and 776 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions book/src/help_vc.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,22 @@ Options:
Default is unlimited.
Flags:
--beacon-nodes-sync-tolerances <SYNC_TOLERANCES>
A comma-separated list of 3 values which sets the size of each sync
distance range when determining the health of each connected beacon
node. The first value determines the `Synced` range. If a connected
beacon node is synced to within this number of slots it is considered
'Synced'. The second value determines the `Small` sync distance range.
This range starts immediately after the `Synced` range. The third
value determines the `Medium` sync distance range. This range starts
immediately after the `Small` range. Any sync distance value beyond
that is considered `Large`. For example, a value of `8,8,48` would
have ranges like the following: `Synced`: 0..=8 `Small`: 9..=16
`Medium`: 17..=64 `Large`: 65.. These values are used to determine
what ordering beacon node fallbacks are used in. Generally, `Synced`
nodes are preferred over `Small` and so on. Nodes in the `Synced`
range will tie-break based on their ordering in `--beacon-nodes`. This
ensures the primary beacon node is prioritised. [default: 8,8,48]
--builder-proposals
If this flag is set, Lighthouse will query the Beacon Node for only
block headers during proposals and will sign over headers. Useful for
Expand Down
1 change: 1 addition & 0 deletions common/eth2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ store = { workspace = true }
slashing_protection = { workspace = true }
mediatype = "0.19.13"
pretty_reqwest_error = { workspace = true }
derivative = { workspace = true }

[dev-dependencies]
tokio = { workspace = true }
Expand Down
9 changes: 7 additions & 2 deletions common/eth2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod types;

use self::mixin::{RequestAccept, ResponseOptional};
use self::types::{Error as ResponseError, *};
use derivative::Derivative;
use futures::Stream;
use futures_util::StreamExt;
use lighthouse_network::PeerId;
Expand Down Expand Up @@ -117,7 +118,7 @@ impl fmt::Display for Error {

/// A struct to define a variety of different timeouts for different validator tasks to ensure
/// proper fallback behaviour.
#[derive(Clone)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Timeouts {
pub attestation: Duration,
pub attester_duties: Duration,
Expand Down Expand Up @@ -154,13 +155,17 @@ impl Timeouts {

/// A wrapper around `reqwest::Client` which provides convenience methods for interfacing with a
/// Lighthouse Beacon Node HTTP server (`http_api`).
#[derive(Clone)]
#[derive(Clone, Debug, Derivative)]
#[derivative(PartialEq)]
pub struct BeaconNodeHttpClient {
#[derivative(PartialEq = "ignore")]
client: reqwest::Client,
server: SensitiveUrl,
timeouts: Timeouts,
}

impl Eq for BeaconNodeHttpClient {}

impl fmt::Display for BeaconNodeHttpClient {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.server.fmt(f)
Expand Down
34 changes: 31 additions & 3 deletions lighthouse/tests/validator_client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use validator_client::{config::DEFAULT_WEB3SIGNER_KEEP_ALIVE, ApiTopic, Config};
use validator_client::{
config::DEFAULT_WEB3SIGNER_KEEP_ALIVE, ApiTopic, BeaconNodeSyncDistanceTiers, Config,
};

use crate::exec::CommandLineTestExec;
use bls::{Keypair, PublicKeyBytes};
Expand All @@ -12,7 +14,7 @@ use std::str::FromStr;
use std::string::ToString;
use std::time::Duration;
use tempfile::TempDir;
use types::Address;
use types::{Address, Slot};

/// Returns the `lighthouse validator_client` command.
fn base_cmd() -> Command {
Expand Down Expand Up @@ -511,7 +513,6 @@ fn monitoring_endpoint() {
assert_eq!(api_conf.update_period_secs, Some(30));
});
}

#[test]
fn disable_run_on_all_flag() {
CommandLineTest::new()
Expand Down Expand Up @@ -572,6 +573,33 @@ fn broadcast_flag() {
});
}

/// Tests for validator fallback flags.
#[test]
fn beacon_nodes_sync_tolerances_flag_default() {
CommandLineTest::new().run().with_config(|config| {
assert_eq!(
config.beacon_node_fallback.sync_tolerances,
BeaconNodeSyncDistanceTiers::default()
)
});
}
#[test]
fn beacon_nodes_sync_tolerances_flag() {
CommandLineTest::new()
.flag("beacon-nodes-sync-tolerances", Some("4,4,4"))
.run()
.with_config(|config| {
assert_eq!(
config.beacon_node_fallback.sync_tolerances,
BeaconNodeSyncDistanceTiers {
synced: Slot::new(4),
small: Slot::new(8),
medium: Slot::new(12),
}
);
});
}

#[test]
#[should_panic(expected = "Unknown API topic")]
fn wrong_broadcast_flag() {
Expand Down
2 changes: 1 addition & 1 deletion testing/simulator/src/fallback_sim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ const DENEB_FORK_EPOCH: u64 = 2;
// This has potential to block CI so it should be set conservatively enough that spurious failures
// don't become very common, but not so conservatively that regressions to the fallback mechanism
// cannot be detected.
const ACCEPTABLE_FALLBACK_ATTESTATION_HIT_PERCENTAGE: f64 = 85.0;
const ACCEPTABLE_FALLBACK_ATTESTATION_HIT_PERCENTAGE: f64 = 95.0;

const SUGGESTED_FEE_RECIPIENT: [u8; 20] =
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1];
Expand Down
2 changes: 1 addition & 1 deletion validator_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ path = "src/lib.rs"

[dev-dependencies]
tokio = { workspace = true }
itertools = { workspace = true }

[dependencies]
tree_hash = { workspace = true }
Expand Down Expand Up @@ -60,4 +59,5 @@ sysinfo = { workspace = true }
system_health = { path = "../common/system_health" }
logging = { workspace = true }
strum = { workspace = true }
itertools = { workspace = true }
fdlimit = "0.3.0"
180 changes: 79 additions & 101 deletions validator_client/src/attestation_service.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use crate::beacon_node_fallback::{ApiTopic, BeaconNodeFallback, RequireSynced};
use crate::beacon_node_fallback::{ApiTopic, BeaconNodeFallback};
use crate::{
duties_service::{DutiesService, DutyAndProof},
http_metrics::metrics,
validator_store::{Error as ValidatorStoreError, ValidatorStore},
OfflineOnFailure,
};
use environment::RuntimeContext;
use futures::future::join_all;
Expand Down Expand Up @@ -339,21 +338,17 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {

let attestation_data = self
.beacon_nodes
.first_success(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::ATTESTATIONS_HTTP_GET],
);
beacon_node
.get_validator_attestation_data(slot, committee_index)
.await
.map_err(|e| format!("Failed to produce attestation data: {:?}", e))
.map(|result| result.data)
},
)
.first_success(|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::ATTESTATIONS_HTTP_GET],
);
beacon_node
.get_validator_attestation_data(slot, committee_index)
.await
.map_err(|e| format!("Failed to produce attestation data: {:?}", e))
.map(|result| result.data)
})
.await
.map_err(|e| e.to_string())?;

Expand Down Expand Up @@ -458,26 +453,21 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
// Post the attestations to the BN.
match self
.beacon_nodes
.request(
RequireSynced::No,
OfflineOnFailure::Yes,
ApiTopic::Attestations,
|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::ATTESTATIONS_HTTP_POST],
);
if fork_name.electra_enabled() {
beacon_node
.post_beacon_pool_attestations_v2(attestations, fork_name)
.await
} else {
beacon_node
.post_beacon_pool_attestations_v1(attestations)
.await
}
},
)
.request(ApiTopic::Attestations, |beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::ATTESTATIONS_HTTP_POST],
);
if fork_name.electra_enabled() {
beacon_node
.post_beacon_pool_attestations_v2(attestations, fork_name)
.await
} else {
beacon_node
.post_beacon_pool_attestations_v1(attestations)
.await
}
})
.await
{
Ok(()) => info!(
Expand Down Expand Up @@ -540,46 +530,38 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {

let aggregated_attestation = &self
.beacon_nodes
.first_success(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::AGGREGATES_HTTP_GET],
);
if fork_name.electra_enabled() {
beacon_node
.get_validator_aggregate_attestation_v2(
attestation_data.slot,
attestation_data.tree_hash_root(),
committee_index,
)
.await
.map_err(|e| {
format!("Failed to produce an aggregate attestation: {:?}", e)
})?
.ok_or_else(|| {
format!("No aggregate available for {:?}", attestation_data)
})
.map(|result| result.data)
} else {
beacon_node
.get_validator_aggregate_attestation_v1(
attestation_data.slot,
attestation_data.tree_hash_root(),
)
.await
.map_err(|e| {
format!("Failed to produce an aggregate attestation: {:?}", e)
})?
.ok_or_else(|| {
format!("No aggregate available for {:?}", attestation_data)
})
.map(|result| result.data)
}
},
)
.first_success(|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::AGGREGATES_HTTP_GET],
);
if fork_name.electra_enabled() {
beacon_node
.get_validator_aggregate_attestation_v2(
attestation_data.slot,
attestation_data.tree_hash_root(),
committee_index,
)
.await
.map_err(|e| {
format!("Failed to produce an aggregate attestation: {:?}", e)
})?
.ok_or_else(|| format!("No aggregate available for {:?}", attestation_data))
.map(|result| result.data)
} else {
beacon_node
.get_validator_aggregate_attestation_v1(
attestation_data.slot,
attestation_data.tree_hash_root(),
)
.await
.map_err(|e| {
format!("Failed to produce an aggregate attestation: {:?}", e)
})?
.ok_or_else(|| format!("No aggregate available for {:?}", attestation_data))
.map(|result| result.data)
}
})
.await
.map_err(|e| e.to_string())?;

Expand Down Expand Up @@ -637,30 +619,26 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
let signed_aggregate_and_proofs_slice = signed_aggregate_and_proofs.as_slice();
match self
.beacon_nodes
.first_success(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::AGGREGATES_HTTP_POST],
);
if fork_name.electra_enabled() {
beacon_node
.post_validator_aggregate_and_proof_v2(
signed_aggregate_and_proofs_slice,
fork_name,
)
.await
} else {
beacon_node
.post_validator_aggregate_and_proof_v1(
signed_aggregate_and_proofs_slice,
)
.await
}
},
)
.first_success(|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::AGGREGATES_HTTP_POST],
);
if fork_name.electra_enabled() {
beacon_node
.post_validator_aggregate_and_proof_v2(
signed_aggregate_and_proofs_slice,
fork_name,
)
.await
} else {
beacon_node
.post_validator_aggregate_and_proof_v1(
signed_aggregate_and_proofs_slice,
)
.await
}
})
.await
{
Ok(()) => {
Expand Down
Loading

0 comments on commit f870b66

Please sign in to comment.