From f653bc36be5a1362d8007d3ec44b91a2c53d95a9 Mon Sep 17 00:00:00 2001 From: Mac L Date: Wed, 12 May 2021 09:50:47 +1000 Subject: [PATCH 1/6] Reduce outbound requests to eth1 endpoints --- beacon_node/eth1/src/inner.rs | 3 ++ beacon_node/eth1/src/service.rs | 89 ++++++++++++++++++++++++++------- common/fallback/src/lib.rs | 3 +- 3 files changed, 75 insertions(+), 20 deletions(-) diff --git a/beacon_node/eth1/src/inner.rs b/beacon_node/eth1/src/inner.rs index f6c5e85efb7..a728b25afb5 100644 --- a/beacon_node/eth1/src/inner.rs +++ b/beacon_node/eth1/src/inner.rs @@ -2,6 +2,7 @@ use crate::Config; use crate::{ block_cache::{BlockCache, Eth1Block}, deposit_cache::{DepositCache, SszDepositCache}, + service::EndpointsCache, }; use parking_lot::RwLock; use ssz::{Decode, Encode}; @@ -28,6 +29,7 @@ impl DepositUpdater { pub struct Inner { pub block_cache: RwLock, pub deposit_cache: RwLock, + pub endpoints_cache: RwLock>, pub config: RwLock, pub remote_head_block: RwLock>, pub spec: ChainSpec, @@ -87,6 +89,7 @@ impl SszEth1Cache { cache: self.deposit_cache.to_deposit_cache()?, last_processed_block: self.last_processed_block, }), + endpoints_cache: RwLock::new(None), // Set the remote head_block zero when creating a new instance. We only care about // present and future eth1 nodes. remote_head_block: RwLock::new(None), diff --git a/beacon_node/eth1/src/service.rs b/beacon_node/eth1/src/service.rs index 0584a4b71be..dc6eedbb243 100644 --- a/beacon_node/eth1/src/service.rs +++ b/beacon_node/eth1/src/service.rs @@ -54,11 +54,25 @@ pub enum EndpointError { type EndpointState = Result<(), EndpointError>; -type EndpointWithState = (SensitiveUrl, TRwLock>); +#[derive(Clone)] +pub struct EndpointWithState { + endpoint: SensitiveUrl, + state: Arc>>, +} + +impl EndpointWithState { + pub fn new(endpoint: SensitiveUrl) -> Self { + Self { + endpoint, + state: Arc::new(TRwLock::new(None)), + } + } +} /// A cache structure to lazily check usability of endpoints. An endpoint is usable if it is /// reachable and has the correct network id and chain id. Emits a `WARN` log if a checked endpoint /// is not usable. +#[derive(Clone)] pub struct EndpointsCache { pub fallback: Fallback, pub config_network_id: Eth1Id, @@ -70,19 +84,19 @@ impl EndpointsCache { /// Checks the usability of an endpoint. Results get cached and therefore only the first call /// for each endpoint does the real check. async fn state(&self, endpoint: &EndpointWithState) -> EndpointState { - if let Some(result) = *endpoint.1.read().await { + if let Some(result) = *endpoint.state.read().await { return result; } - let mut value = endpoint.1.write().await; + let mut value = endpoint.state.write().await; if let Some(result) = *value { return result; } crate::metrics::inc_counter_vec( &crate::metrics::ENDPOINT_REQUESTS, - &[&endpoint.0.to_string()], + &[&endpoint.endpoint.to_string()], ); let state = endpoint_state( - &endpoint.0, + &endpoint.endpoint, &self.config_network_id, &self.config_chain_id, &self.log, @@ -92,7 +106,7 @@ impl EndpointsCache { if state.is_err() { crate::metrics::inc_counter_vec( &crate::metrics::ENDPOINT_ERRORS, - &[&endpoint.0.to_string()], + &[&endpoint.endpoint.to_string()], ); } state @@ -111,12 +125,12 @@ impl EndpointsCache { .first_success(|endpoint| async move { match self.state(endpoint).await { Ok(()) => { - let endpoint_str = &endpoint.0.to_string(); + let endpoint_str = &endpoint.endpoint.to_string(); crate::metrics::inc_counter_vec( &crate::metrics::ENDPOINT_REQUESTS, &[endpoint_str], ); - match func(&endpoint.0).await { + match func(&endpoint.endpoint).await { Ok(t) => Ok(t), Err(t) => { crate::metrics::inc_counter_vec( @@ -124,8 +138,10 @@ impl EndpointsCache { &[endpoint_str], ); if let SingleEndpointError::EndpointError(e) = &t { - *endpoint.1.write().await = Some(Err(*e)); + *endpoint.state.write().await = Some(Err(*e)); } + // Endpoint had a non-`EndpointError` error so reset the state. + self.reset_endpoint_state(endpoint).await; Err(t) } } @@ -135,6 +151,24 @@ impl EndpointsCache { }) .await } + + async fn reset_endpoint_state(&self, endpoint: &EndpointWithState) { + *endpoint.state.write().await = None; + } + + async fn get_state(&self, endpoint: &EndpointWithState) -> Option { + *endpoint.state.read().await + } + + pub async fn reset_errorred_endpoints(&self) { + for endpoint in &self.fallback.servers { + if let Some(state) = self.get_state(endpoint).await { + if state.is_err() { + self.reset_endpoint_state(endpoint).await; + } + } + } + } } /// Returns `Ok` if the endpoint is usable, i.e. is reachable and has a correct network id and @@ -402,7 +436,7 @@ impl Default for Config { follow_distance: 128, node_far_behind_seconds: 128 * 14, block_cache_truncation: Some(4_096), - auto_update_interval_millis: 7_000, + auto_update_interval_millis: 60_000, blocks_per_log_query: 1_000, max_log_requests_per_update: Some(100), max_blocks_per_update: Some(8_192), @@ -432,6 +466,7 @@ impl Service { deposit_cache: RwLock::new(DepositUpdater::new( config.deposit_contract_deploy_block, )), + endpoints_cache: RwLock::new(None), remote_head_block: RwLock::new(None), config: RwLock::new(config), spec, @@ -602,20 +637,31 @@ impl Service { self.inner.config.write().lowest_cached_block_number = block_number; } + /// Builds a new `EndpointsCache` with empty states. pub fn init_endpoints(&self) -> EndpointsCache { let endpoints = self.config().endpoints.clone(); let config_network_id = self.config().network_id.clone(); let config_chain_id = self.config().chain_id.clone(); - EndpointsCache { - fallback: Fallback::new( - endpoints - .into_iter() - .map(|s| (s, TRwLock::new(None))) - .collect(), - ), + let new_cache = EndpointsCache { + fallback: Fallback::new(endpoints.into_iter().map(EndpointWithState::new).collect()), config_network_id, config_chain_id, log: self.log.clone(), + }; + + let mut endpoints_cache = self.inner.endpoints_cache.write(); + *endpoints_cache = Some(new_cache.clone()); + new_cache + } + + /// Returns the cached `EndpointsCache` if it exists or builds a new one. + pub fn get_endpoints(&self) -> EndpointsCache { + let endpoints_cache = self.inner.endpoints_cache.read(); + if let Some(cache) = endpoints_cache.clone() { + cache + } else { + drop(endpoints_cache); + self.init_endpoints() } } @@ -630,7 +676,12 @@ impl Service { pub async fn update( &self, ) -> Result<(DepositCacheUpdateOutcome, BlockCacheUpdateOutcome), String> { - let endpoints = self.init_endpoints(); + let endpoints = self.get_endpoints(); + + // Reset the state of any endpoints which have errored. + // This will ensure the endpoint state will be updated + endpoints.reset_errorred_endpoints().await; + let node_far_behind_seconds = self.inner.config.read().node_far_behind_seconds; let process_single_err = |e: &FallbackError| { @@ -653,7 +704,7 @@ impl Service { } } } - endpoints.fallback.map_format_error(|s| &s.0, &e) + endpoints.fallback.map_format_error(|s| &s.endpoint, &e) }; let process_err = |e: Error| match &e { diff --git a/common/fallback/src/lib.rs b/common/fallback/src/lib.rs index 1e3cb9cf044..94b177de35c 100644 --- a/common/fallback/src/lib.rs +++ b/common/fallback/src/lib.rs @@ -2,8 +2,9 @@ use itertools::{join, zip}; use std::fmt::{Debug, Display}; use std::future::Future; +#[derive(Clone)] pub struct Fallback { - servers: Vec, + pub servers: Vec, } #[derive(Debug, PartialEq)] From 3d84c705135ecafbf0b00b58a288c332683f6acc Mon Sep 17 00:00:00 2001 From: Mac L Date: Mon, 17 May 2021 09:34:02 +1000 Subject: [PATCH 2/6] Adjust comment styling --- beacon_node/eth1/src/service.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/beacon_node/eth1/src/service.rs b/beacon_node/eth1/src/service.rs index dc6eedbb243..16a897151fb 100644 --- a/beacon_node/eth1/src/service.rs +++ b/beacon_node/eth1/src/service.rs @@ -678,8 +678,7 @@ impl Service { ) -> Result<(DepositCacheUpdateOutcome, BlockCacheUpdateOutcome), String> { let endpoints = self.get_endpoints(); - // Reset the state of any endpoints which have errored. - // This will ensure the endpoint state will be updated + // Reset the state of any endpoints which have errored so their state can be redetermined. endpoints.reset_errorred_endpoints().await; let node_far_behind_seconds = self.inner.config.read().node_far_behind_seconds; From bd08fe90fdb8a989cce0a3132c6a37ce542fa712 Mon Sep 17 00:00:00 2001 From: Mac L Date: Tue, 18 May 2021 16:36:20 +1000 Subject: [PATCH 3/6] Add missing `else` arm --- beacon_node/eth1/src/service.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/beacon_node/eth1/src/service.rs b/beacon_node/eth1/src/service.rs index 16a897151fb..8d4f1117531 100644 --- a/beacon_node/eth1/src/service.rs +++ b/beacon_node/eth1/src/service.rs @@ -139,9 +139,10 @@ impl EndpointsCache { ); if let SingleEndpointError::EndpointError(e) = &t { *endpoint.state.write().await = Some(Err(*e)); + } else { + // A non-`EndpointError` error occurred, so reset the state. + self.reset_endpoint_state(endpoint).await; } - // Endpoint had a non-`EndpointError` error so reset the state. - self.reset_endpoint_state(endpoint).await; Err(t) } } From 2fd1632f41c2d7b031b8c386300464afd38436bd Mon Sep 17 00:00:00 2001 From: Mac L Date: Thu, 20 May 2021 09:36:53 +1000 Subject: [PATCH 4/6] Change methods to functions --- beacon_node/eth1/src/service.rs | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/beacon_node/eth1/src/service.rs b/beacon_node/eth1/src/service.rs index 8d4f1117531..dbad06c04af 100644 --- a/beacon_node/eth1/src/service.rs +++ b/beacon_node/eth1/src/service.rs @@ -69,6 +69,14 @@ impl EndpointWithState { } } +async fn reset_endpoint_state(endpoint: &EndpointWithState) { + *endpoint.state.write().await = None; +} + +async fn get_state(endpoint: &EndpointWithState) -> Option { + *endpoint.state.read().await +} + /// A cache structure to lazily check usability of endpoints. An endpoint is usable if it is /// reachable and has the correct network id and chain id. Emits a `WARN` log if a checked endpoint /// is not usable. @@ -141,7 +149,7 @@ impl EndpointsCache { *endpoint.state.write().await = Some(Err(*e)); } else { // A non-`EndpointError` error occurred, so reset the state. - self.reset_endpoint_state(endpoint).await; + reset_endpoint_state(endpoint).await; } Err(t) } @@ -153,19 +161,11 @@ impl EndpointsCache { .await } - async fn reset_endpoint_state(&self, endpoint: &EndpointWithState) { - *endpoint.state.write().await = None; - } - - async fn get_state(&self, endpoint: &EndpointWithState) -> Option { - *endpoint.state.read().await - } - pub async fn reset_errorred_endpoints(&self) { for endpoint in &self.fallback.servers { - if let Some(state) = self.get_state(endpoint).await { + if let Some(state) = get_state(endpoint).await { if state.is_err() { - self.reset_endpoint_state(endpoint).await; + reset_endpoint_state(endpoint).await; } } } From b1939fd0004dfefccd2dbe158d280abed54ad6f0 Mon Sep 17 00:00:00 2001 From: Mac L Date: Mon, 24 May 2021 17:16:37 +1000 Subject: [PATCH 5/6] Increase `max_log_requests_per_update` --- beacon_node/eth1/src/service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_node/eth1/src/service.rs b/beacon_node/eth1/src/service.rs index dbad06c04af..22bb4cd8d42 100644 --- a/beacon_node/eth1/src/service.rs +++ b/beacon_node/eth1/src/service.rs @@ -439,7 +439,7 @@ impl Default for Config { block_cache_truncation: Some(4_096), auto_update_interval_millis: 60_000, blocks_per_log_query: 1_000, - max_log_requests_per_update: Some(100), + max_log_requests_per_update: Some(5_000), max_blocks_per_update: Some(8_192), purge_cache: false, } From 93e4e2e26aa089dfc6f12ad5cb49231b13cb6d88 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 31 May 2021 11:55:53 +1000 Subject: [PATCH 6/6] Lift Arc in EndpointsCache --- beacon_node/eth1/src/inner.rs | 3 ++- beacon_node/eth1/src/service.rs | 14 ++++++-------- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/beacon_node/eth1/src/inner.rs b/beacon_node/eth1/src/inner.rs index a728b25afb5..2dc39a1de92 100644 --- a/beacon_node/eth1/src/inner.rs +++ b/beacon_node/eth1/src/inner.rs @@ -7,6 +7,7 @@ use crate::{ use parking_lot::RwLock; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; +use std::sync::Arc; use types::ChainSpec; #[derive(Default)] @@ -29,7 +30,7 @@ impl DepositUpdater { pub struct Inner { pub block_cache: RwLock, pub deposit_cache: RwLock, - pub endpoints_cache: RwLock>, + pub endpoints_cache: RwLock>>, pub config: RwLock, pub remote_head_block: RwLock>, pub spec: ChainSpec, diff --git a/beacon_node/eth1/src/service.rs b/beacon_node/eth1/src/service.rs index 22bb4cd8d42..3c00a3a689e 100644 --- a/beacon_node/eth1/src/service.rs +++ b/beacon_node/eth1/src/service.rs @@ -54,17 +54,16 @@ pub enum EndpointError { type EndpointState = Result<(), EndpointError>; -#[derive(Clone)] pub struct EndpointWithState { endpoint: SensitiveUrl, - state: Arc>>, + state: TRwLock>, } impl EndpointWithState { pub fn new(endpoint: SensitiveUrl) -> Self { Self { endpoint, - state: Arc::new(TRwLock::new(None)), + state: TRwLock::new(None), } } } @@ -80,7 +79,6 @@ async fn get_state(endpoint: &EndpointWithState) -> Option { /// A cache structure to lazily check usability of endpoints. An endpoint is usable if it is /// reachable and has the correct network id and chain id. Emits a `WARN` log if a checked endpoint /// is not usable. -#[derive(Clone)] pub struct EndpointsCache { pub fallback: Fallback, pub config_network_id: Eth1Id, @@ -639,16 +637,16 @@ impl Service { } /// Builds a new `EndpointsCache` with empty states. - pub fn init_endpoints(&self) -> EndpointsCache { + pub fn init_endpoints(&self) -> Arc { let endpoints = self.config().endpoints.clone(); let config_network_id = self.config().network_id.clone(); let config_chain_id = self.config().chain_id.clone(); - let new_cache = EndpointsCache { + let new_cache = Arc::new(EndpointsCache { fallback: Fallback::new(endpoints.into_iter().map(EndpointWithState::new).collect()), config_network_id, config_chain_id, log: self.log.clone(), - }; + }); let mut endpoints_cache = self.inner.endpoints_cache.write(); *endpoints_cache = Some(new_cache.clone()); @@ -656,7 +654,7 @@ impl Service { } /// Returns the cached `EndpointsCache` if it exists or builds a new one. - pub fn get_endpoints(&self) -> EndpointsCache { + pub fn get_endpoints(&self) -> Arc { let endpoints_cache = self.inner.endpoints_cache.read(); if let Some(cache) = endpoints_cache.clone() { cache