Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Reduce outbound requests to eth1 endpoints #2340

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions beacon_node/eth1/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::Config;
use crate::{
block_cache::{BlockCache, Eth1Block},
deposit_cache::{DepositCache, SszDepositCache},
service::EndpointsCache,
};
use parking_lot::RwLock;
use ssz::{Decode, Encode};
Expand All @@ -28,6 +29,7 @@ impl DepositUpdater {
pub struct Inner {
pub block_cache: RwLock<BlockCache>,
pub deposit_cache: RwLock<DepositUpdater>,
pub endpoints_cache: RwLock<Option<EndpointsCache>>,
pub config: RwLock<Config>,
pub remote_head_block: RwLock<Option<Eth1Block>>,
pub spec: ChainSpec,
Expand Down Expand Up @@ -87,6 +89,7 @@ impl SszEth1Cache {
cache: self.deposit_cache.to_deposit_cache()?,
last_processed_block: self.last_processed_block,
}),
endpoints_cache: RwLock::new(None),
// Set the remote head_block zero when creating a new instance. We only care about
// present and future eth1 nodes.
remote_head_block: RwLock::new(None),
Expand Down
91 changes: 71 additions & 20 deletions beacon_node/eth1/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,33 @@ pub enum EndpointError {

type EndpointState = Result<(), EndpointError>;

type EndpointWithState = (SensitiveUrl, TRwLock<Option<EndpointState>>);
#[derive(Clone)]
pub struct EndpointWithState {
paulhauner marked this conversation as resolved.
Show resolved Hide resolved
endpoint: SensitiveUrl,
state: Arc<TRwLock<Option<EndpointState>>>,
}

impl EndpointWithState {
pub fn new(endpoint: SensitiveUrl) -> Self {
Self {
endpoint,
state: Arc::new(TRwLock::new(None)),
}
}
}

async fn reset_endpoint_state(endpoint: &EndpointWithState) {
*endpoint.state.write().await = None;
}

async fn get_state(endpoint: &EndpointWithState) -> Option<EndpointState> {
*endpoint.state.read().await
}

/// A cache structure to lazily check usability of endpoints. An endpoint is usable if it is
/// reachable and has the correct network id and chain id. Emits a `WARN` log if a checked endpoint
/// is not usable.
#[derive(Clone)]
pub struct EndpointsCache {
pub fallback: Fallback<EndpointWithState>,
pub config_network_id: Eth1Id,
Expand All @@ -70,19 +92,19 @@ impl EndpointsCache {
/// Checks the usability of an endpoint. Results get cached and therefore only the first call
/// for each endpoint does the real check.
async fn state(&self, endpoint: &EndpointWithState) -> EndpointState {
if let Some(result) = *endpoint.1.read().await {
if let Some(result) = *endpoint.state.read().await {
return result;
}
let mut value = endpoint.1.write().await;
let mut value = endpoint.state.write().await;
if let Some(result) = *value {
return result;
}
crate::metrics::inc_counter_vec(
&crate::metrics::ENDPOINT_REQUESTS,
&[&endpoint.0.to_string()],
&[&endpoint.endpoint.to_string()],
);
let state = endpoint_state(
&endpoint.0,
&endpoint.endpoint,
&self.config_network_id,
&self.config_chain_id,
&self.log,
Expand All @@ -92,7 +114,7 @@ impl EndpointsCache {
if state.is_err() {
crate::metrics::inc_counter_vec(
&crate::metrics::ENDPOINT_ERRORS,
&[&endpoint.0.to_string()],
&[&endpoint.endpoint.to_string()],
);
}
state
Expand All @@ -111,20 +133,23 @@ impl EndpointsCache {
.first_success(|endpoint| async move {
match self.state(endpoint).await {
Ok(()) => {
let endpoint_str = &endpoint.0.to_string();
let endpoint_str = &endpoint.endpoint.to_string();
crate::metrics::inc_counter_vec(
&crate::metrics::ENDPOINT_REQUESTS,
&[endpoint_str],
);
match func(&endpoint.0).await {
match func(&endpoint.endpoint).await {
Ok(t) => Ok(t),
Err(t) => {
crate::metrics::inc_counter_vec(
&crate::metrics::ENDPOINT_ERRORS,
&[endpoint_str],
);
if let SingleEndpointError::EndpointError(e) = &t {
*endpoint.1.write().await = Some(Err(*e));
*endpoint.state.write().await = Some(Err(*e));
} else {
// A non-`EndpointError` error occurred, so reset the state.
reset_endpoint_state(endpoint).await;
}
Err(t)
}
Expand All @@ -135,6 +160,16 @@ impl EndpointsCache {
})
.await
}

pub async fn reset_errorred_endpoints(&self) {
for endpoint in &self.fallback.servers {
if let Some(state) = get_state(endpoint).await {
if state.is_err() {
reset_endpoint_state(endpoint).await;
}
}
}
}
}

/// Returns `Ok` if the endpoint is usable, i.e. is reachable and has a correct network id and
Expand Down Expand Up @@ -402,9 +437,9 @@ impl Default for Config {
follow_distance: 128,
node_far_behind_seconds: 128 * 14,
block_cache_truncation: Some(4_096),
auto_update_interval_millis: 7_000,
auto_update_interval_millis: 60_000,
blocks_per_log_query: 1_000,
max_log_requests_per_update: Some(100),
max_log_requests_per_update: Some(5_000),
max_blocks_per_update: Some(8_192),
purge_cache: false,
}
Expand Down Expand Up @@ -432,6 +467,7 @@ impl Service {
deposit_cache: RwLock::new(DepositUpdater::new(
config.deposit_contract_deploy_block,
)),
endpoints_cache: RwLock::new(None),
remote_head_block: RwLock::new(None),
config: RwLock::new(config),
spec,
Expand Down Expand Up @@ -602,20 +638,31 @@ impl Service {
self.inner.config.write().lowest_cached_block_number = block_number;
}

/// Builds a new `EndpointsCache` with empty states.
pub fn init_endpoints(&self) -> EndpointsCache {
let endpoints = self.config().endpoints.clone();
let config_network_id = self.config().network_id.clone();
let config_chain_id = self.config().chain_id.clone();
EndpointsCache {
fallback: Fallback::new(
endpoints
.into_iter()
.map(|s| (s, TRwLock::new(None)))
.collect(),
),
let new_cache = EndpointsCache {
fallback: Fallback::new(endpoints.into_iter().map(EndpointWithState::new).collect()),
config_network_id,
config_chain_id,
log: self.log.clone(),
};

let mut endpoints_cache = self.inner.endpoints_cache.write();
*endpoints_cache = Some(new_cache.clone());
new_cache
}

/// Returns the cached `EndpointsCache` if it exists or builds a new one.
pub fn get_endpoints(&self) -> EndpointsCache {
let endpoints_cache = self.inner.endpoints_cache.read();
if let Some(cache) = endpoints_cache.clone() {
cache
} else {
drop(endpoints_cache);
macladson marked this conversation as resolved.
Show resolved Hide resolved
self.init_endpoints()
}
}

Expand All @@ -630,7 +677,11 @@ impl Service {
pub async fn update(
&self,
) -> Result<(DepositCacheUpdateOutcome, BlockCacheUpdateOutcome), String> {
let endpoints = self.init_endpoints();
let endpoints = self.get_endpoints();

// Reset the state of any endpoints which have errored so their state can be redetermined.
endpoints.reset_errorred_endpoints().await;

let node_far_behind_seconds = self.inner.config.read().node_far_behind_seconds;

let process_single_err = |e: &FallbackError<SingleEndpointError>| {
Expand All @@ -653,7 +704,7 @@ impl Service {
}
}
}
endpoints.fallback.map_format_error(|s| &s.0, &e)
endpoints.fallback.map_format_error(|s| &s.endpoint, &e)
};

let process_err = |e: Error| match &e {
Expand Down
3 changes: 2 additions & 1 deletion common/fallback/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use itertools::{join, zip};
use std::fmt::{Debug, Display};
use std::future::Future;

#[derive(Clone)]
pub struct Fallback<T> {
servers: Vec<T>,
pub servers: Vec<T>,
}

#[derive(Debug, PartialEq)]
Expand Down