diff --git a/Cargo.lock b/Cargo.lock index de30f767e51..3d7f9395383 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -579,7 +579,7 @@ dependencies = [ [[package]] name = "beacon_node" -version = "1.3.0" +version = "1.4.0" dependencies = [ "beacon_chain", "clap", @@ -785,7 +785,7 @@ dependencies = [ [[package]] name = "boot_node" -version = "1.3.0" +version = "1.4.0" dependencies = [ "beacon_node", "clap", @@ -3315,7 +3315,7 @@ dependencies = [ [[package]] name = "lcli" -version = "1.3.0" +version = "1.4.0" dependencies = [ "account_utils", "bls", @@ -3692,7 +3692,7 @@ dependencies = [ [[package]] name = "lighthouse" -version = "1.3.0" +version = "1.4.0" dependencies = [ "account_manager", "account_utils", diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index 4cfede51001..f170ee86f51 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "beacon_node" -version = "1.3.0" +version = "1.4.0" authors = ["Paul Hauner ", "Age Manning { /// Processing this chain segment finished successfully. @@ -442,18 +455,6 @@ impl BeaconChain { .map(|result| result.map_err(|e| e.into()))) } - /// Traverse backwards from `block_root` to find the root of the ancestor block at `slot`. - pub fn get_ancestor_block_root( - &self, - block_root: Hash256, - slot: Slot, - ) -> Result, Error> { - process_results(self.rev_iter_block_roots_from(block_root)?, |mut iter| { - iter.find(|(_, ancestor_slot)| *ancestor_slot == slot) - .map(|(ancestor_block_root, _)| ancestor_block_root) - }) - } - /// Iterates across all `(state_root, slot)` pairs from the head of the chain (inclusive) to /// the earliest reachable ancestor (may or may not be genesis). /// @@ -489,17 +490,17 @@ impl BeaconChain { /// Returns the block at the given slot, if any. Only returns blocks in the canonical chain. /// + /// Use the `skips` parameter to define the behaviour when `request_slot` is a skipped slot. + /// /// ## Errors /// /// May return a database error. pub fn block_at_slot( &self, - slot: Slot, + request_slot: Slot, + skips: WhenSlotSkipped, ) -> Result>, Error> { - let root = process_results(self.rev_iter_block_roots()?, |mut iter| { - iter.find(|(_, this_slot)| *this_slot == slot) - .map(|(root, _)| root) - })?; + let root = self.block_root_at_slot(request_slot, skips)?; if let Some(block_root) = root { Ok(self.store.get_item(&block_root)?) @@ -521,21 +522,132 @@ impl BeaconChain { } /// Returns the block root at the given slot, if any. Only returns roots in the canonical chain. - /// Returns `Ok(None)` if the given `Slot` was skipped. + /// + /// ## Notes + /// + /// - Use the `skips` parameter to define the behaviour when `request_slot` is a skipped slot. + /// - Returns `Ok(None)` for any slot higher than the current wall-clock slot. + pub fn block_root_at_slot( + &self, + request_slot: Slot, + skips: WhenSlotSkipped, + ) -> Result, Error> { + match skips { + WhenSlotSkipped::None => self.block_root_at_slot_skips_none(request_slot), + WhenSlotSkipped::Prev => self.block_root_at_slot_skips_prev(request_slot), + } + } + + /// Returns the block root at the given slot, if any. Only returns roots in the canonical chain. + /// + /// ## Notes + /// + /// - Returns `Ok(None)` if the given `Slot` was skipped. + /// - Returns `Ok(None)` for any slot higher than the current wall-clock slot. /// /// ## Errors /// /// May return a database error. - pub fn block_root_at_slot(&self, slot: Slot) -> Result, Error> { - process_results(self.rev_iter_block_roots()?, |mut iter| { - let root_opt = iter - .find(|(_, this_slot)| *this_slot == slot) - .map(|(root, _)| root); - if let (Some(root), Some((prev_root, _))) = (root_opt, iter.next()) { - return (prev_root != root).then(|| root); + fn block_root_at_slot_skips_none(&self, request_slot: Slot) -> Result, Error> { + if request_slot > self.slot()? { + return Ok(None); + } else if request_slot == self.spec.genesis_slot { + return Ok(Some(self.genesis_block_root)); + } + + let prev_slot = request_slot.saturating_sub(1_u64); + + // Try an optimized path of reading the root directly from the head state. + let fast_lookup: Option> = self.with_head(|head| { + let state = &head.beacon_state; + + // Try find the root for the `request_slot`. + let request_root_opt = match state.slot.cmp(&request_slot) { + // It's always a skip slot if the head is less than the request slot, return early. + Ordering::Less => return Ok(Some(None)), + // The request slot is the head slot. + Ordering::Equal => Some(head.beacon_block_root), + // Try find the request slot in the state. + Ordering::Greater => state.get_block_root(request_slot).ok().copied(), + }; + + if let Some(request_root) = request_root_opt { + if let Ok(prev_root) = state.get_block_root(prev_slot) { + return Ok(Some((*prev_root != request_root).then(|| request_root))); + } } - root_opt - }) + + // Fast lookup is not possible. + Ok::<_, Error>(None) + })?; + if let Some(root_opt) = fast_lookup { + return Ok(root_opt); + } + + if let Some(((prev_root, _), (curr_root, curr_slot))) = + process_results(self.forwards_iter_block_roots(prev_slot)?, |iter| { + iter.tuple_windows().next() + })? + { + // Sanity check. + if curr_slot != request_slot { + return Err(Error::InconsistentForwardsIter { + request_slot, + slot: curr_slot, + }); + } + Ok((curr_root != prev_root).then(|| curr_root)) + } else { + Ok(None) + } + } + + /// Returns the block root at the given slot, if any. Only returns roots in the canonical chain. + /// + /// ## Notes + /// + /// - Returns the root at the previous non-skipped slot if the given `Slot` was skipped. + /// - Returns `Ok(None)` for any slot higher than the current wall-clock slot. + /// + /// ## Errors + /// + /// May return a database error. + fn block_root_at_slot_skips_prev(&self, request_slot: Slot) -> Result, Error> { + if request_slot > self.slot()? { + return Ok(None); + } else if request_slot == self.spec.genesis_slot { + return Ok(Some(self.genesis_block_root)); + } + + // Try an optimized path of reading the root directly from the head state. + let fast_lookup: Option = self.with_head(|head| { + if head.beacon_block.slot() <= request_slot { + // Return the head root if all slots between the request and the head are skipped. + Ok(Some(head.beacon_block_root)) + } else if let Ok(root) = head.beacon_state.get_block_root(request_slot) { + // Return the root if it's easily accessible from the head state. + Ok(Some(*root)) + } else { + // Fast lookup is not possible. + Ok::<_, Error>(None) + } + })?; + if let Some(root) = fast_lookup { + return Ok(Some(root)); + } + + process_results(self.forwards_iter_block_roots(request_slot)?, |mut iter| { + if let Some((root, slot)) = iter.next() { + if slot == request_slot { + Ok(Some(root)) + } else { + // Sanity check. + Err(Error::InconsistentForwardsIter { request_slot, slot }) + } + } else { + Ok(None) + } + })? } /// Returns the block at the given root, if any. @@ -825,16 +937,6 @@ impl BeaconChain { Ok(map) } - /// Returns the block canonical root of the current canonical chain at a given slot. - /// - /// Returns `None` if the given slot doesn't exist in the chain. - pub fn root_at_slot(&self, target_slot: Slot) -> Result, Error> { - process_results(self.rev_iter_block_roots()?, |mut iter| { - iter.find(|(_, slot)| *slot == target_slot) - .map(|(root, _)| root) - }) - } - /// Returns the block canonical root of the current canonical chain at a given slot, starting from the given state. /// /// Returns `None` if the given slot doesn't exist in the chain. @@ -2324,10 +2426,10 @@ impl BeaconChain { if let Some(event_handler) = self.event_handler.as_ref() { if event_handler.has_head_subscribers() { if let Ok(Some(current_duty_dependent_root)) = - self.root_at_slot(target_epoch_start_slot - 1) + self.block_root_at_slot(target_epoch_start_slot - 1, WhenSlotSkipped::Prev) { - if let Ok(Some(previous_duty_dependent_root)) = - self.root_at_slot(prev_target_epoch_start_slot - 1) + if let Ok(Some(previous_duty_dependent_root)) = self + .block_root_at_slot(prev_target_epoch_start_slot - 1, WhenSlotSkipped::Prev) { event_handler.register(EventKind::Head(SseHead { slot: head_slot, diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 9c47904dda1..dabe96c8afd 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -113,6 +113,10 @@ pub enum BeaconChainError { state_epoch: Epoch, shuffling_epoch: Epoch, }, + InconsistentForwardsIter { + request_slot: Slot, + slot: Slot, + }, } easy_from_to!(SlotProcessingError, BeaconChainError); diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 27f999d305f..eb15a699a92 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -31,7 +31,7 @@ mod validator_pubkey_cache; pub use self::beacon_chain::{ AttestationProcessingOutcome, BeaconChain, BeaconChainTypes, BeaconStore, ChainSegmentResult, - ForkChoiceError, StateSkipConfig, MAXIMUM_GOSSIP_CLOCK_DISPARITY, + ForkChoiceError, StateSkipConfig, WhenSlotSkipped, MAXIMUM_GOSSIP_CLOCK_DISPARITY, }; pub use self::beacon_snapshot::BeaconSnapshot; pub use self::chain_config::ChainConfig; diff --git a/beacon_node/beacon_chain/tests/attestation_production.rs b/beacon_node/beacon_chain/tests/attestation_production.rs index 4e4e062d404..e00a04395dc 100644 --- a/beacon_node/beacon_chain/tests/attestation_production.rs +++ b/beacon_node/beacon_chain/tests/attestation_production.rs @@ -5,7 +5,7 @@ extern crate lazy_static; use beacon_chain::{ test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy}, - StateSkipConfig, + StateSkipConfig, WhenSlotSkipped, }; use store::config::StoreConfig; use tree_hash::TreeHash; @@ -60,7 +60,7 @@ fn produces_attestations() { }; let block = chain - .block_at_slot(block_slot) + .block_at_slot(block_slot, WhenSlotSkipped::Prev) .expect("should get block") .expect("block should not be skipped"); let block_root = block.message.tree_hash_root(); diff --git a/beacon_node/beacon_chain/tests/attestation_verification.rs b/beacon_node/beacon_chain/tests/attestation_verification.rs index 0b7e7ef8d1f..9ee351faa6e 100644 --- a/beacon_node/beacon_chain/tests/attestation_verification.rs +++ b/beacon_node/beacon_chain/tests/attestation_verification.rs @@ -6,7 +6,7 @@ extern crate lazy_static; use beacon_chain::{ attestation_verification::Error as AttnError, test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType}, - BeaconChain, BeaconChainTypes, + BeaconChain, BeaconChainTypes, WhenSlotSkipped, }; use int_to_bytes::int_to_bytes32; use state_processing::{ @@ -912,7 +912,7 @@ fn attestation_that_skips_epochs() { let earlier_slot = (current_epoch - 2).start_slot(MainnetEthSpec::slots_per_epoch()); let earlier_block = harness .chain - .block_at_slot(earlier_slot) + .block_at_slot(earlier_slot, WhenSlotSkipped::Prev) .expect("should not error getting block at slot") .expect("should find block at slot"); diff --git a/beacon_node/beacon_chain/tests/tests.rs b/beacon_node/beacon_chain/tests/tests.rs index 616f2b544ff..daa306659de 100644 --- a/beacon_node/beacon_chain/tests/tests.rs +++ b/beacon_node/beacon_chain/tests/tests.rs @@ -9,6 +9,7 @@ use beacon_chain::{ AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType, OP_POOL_DB_KEY, }, + WhenSlotSkipped, }; use operation_pool::PersistedOperationPool; use state_processing::{ @@ -609,3 +610,147 @@ fn produces_and_processes_with_genesis_skip_slots() { run_skip_slot_test(i) } } + +#[test] +fn block_roots_skip_slot_behaviour() { + let harness = get_harness(VALIDATOR_COUNT); + + // Test should be longer than the block roots to ensure a DB lookup is triggered. + let chain_length = harness.chain.head().unwrap().beacon_state.block_roots.len() as u64 * 3; + + let skipped_slots = [1, 6, 7, 10, chain_length]; + + // Build a chain with some skip slots. + for i in 1..=chain_length { + if i > 1 { + harness.advance_slot(); + } + + let slot = harness.chain.slot().unwrap().as_u64(); + + if !skipped_slots.contains(&slot) { + harness.extend_chain( + 1, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ); + } + } + + let mut prev_unskipped_root = None; + + for target_slot in 0..=chain_length { + if skipped_slots.contains(&target_slot) { + /* + * A skip slot + */ + assert!( + harness + .chain + .block_root_at_slot(target_slot.into(), WhenSlotSkipped::None) + .unwrap() + .is_none(), + "WhenSlotSkipped::None should return None on a skip slot" + ); + + let skipped_root = harness + .chain + .block_root_at_slot(target_slot.into(), WhenSlotSkipped::Prev) + .unwrap() + .expect("WhenSlotSkipped::Prev should always return Some"); + + assert_eq!( + skipped_root, + prev_unskipped_root.expect("test is badly formed"), + "WhenSlotSkipped::Prev should accurately return the prior skipped block" + ); + + let expected_block = harness.chain.get_block(&skipped_root).unwrap().unwrap(); + + assert_eq!( + harness + .chain + .block_at_slot(target_slot.into(), WhenSlotSkipped::Prev) + .unwrap() + .unwrap(), + expected_block, + ); + + assert!( + harness + .chain + .block_at_slot(target_slot.into(), WhenSlotSkipped::None) + .unwrap() + .is_none(), + "WhenSlotSkipped::None should return None on a skip slot" + ); + } else { + /* + * Not a skip slot + */ + let skips_none = harness + .chain + .block_root_at_slot(target_slot.into(), WhenSlotSkipped::None) + .unwrap() + .expect("WhenSlotSkipped::None should return Some for non-skipped block"); + let skips_prev = harness + .chain + .block_root_at_slot(target_slot.into(), WhenSlotSkipped::Prev) + .unwrap() + .expect("WhenSlotSkipped::Prev should always return Some"); + assert_eq!( + skips_none, skips_prev, + "WhenSlotSkipped::None and WhenSlotSkipped::Prev should be equal on non-skipped slot" + ); + + let expected_block = harness.chain.get_block(&skips_prev).unwrap().unwrap(); + + assert_eq!( + harness + .chain + .block_at_slot(target_slot.into(), WhenSlotSkipped::Prev) + .unwrap() + .unwrap(), + expected_block + ); + + assert_eq!( + harness + .chain + .block_at_slot(target_slot.into(), WhenSlotSkipped::None) + .unwrap() + .unwrap(), + expected_block + ); + + prev_unskipped_root = Some(skips_prev); + } + } + + /* + * A future, non-existent slot. + */ + + let future_slot = harness.chain.slot().unwrap() + 1; + assert_eq!( + harness.chain.head().unwrap().beacon_block.slot(), + future_slot - 2, + "test precondition" + ); + assert!( + harness + .chain + .block_root_at_slot(future_slot, WhenSlotSkipped::None) + .unwrap() + .is_none(), + "WhenSlotSkipped::None should return None on a future slot" + ); + assert!( + harness + .chain + .block_root_at_slot(future_slot, WhenSlotSkipped::Prev) + .unwrap() + .is_none(), + "WhenSlotSkipped::Prev should return None on a future slot" + ); +} diff --git a/beacon_node/eth1/src/inner.rs b/beacon_node/eth1/src/inner.rs index f6c5e85efb7..2dc39a1de92 100644 --- a/beacon_node/eth1/src/inner.rs +++ b/beacon_node/eth1/src/inner.rs @@ -2,10 +2,12 @@ use crate::Config; use crate::{ block_cache::{BlockCache, Eth1Block}, deposit_cache::{DepositCache, SszDepositCache}, + service::EndpointsCache, }; use parking_lot::RwLock; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; +use std::sync::Arc; use types::ChainSpec; #[derive(Default)] @@ -28,6 +30,7 @@ impl DepositUpdater { pub struct Inner { pub block_cache: RwLock, pub deposit_cache: RwLock, + pub endpoints_cache: RwLock>>, pub config: RwLock, pub remote_head_block: RwLock>, pub spec: ChainSpec, @@ -87,6 +90,7 @@ impl SszEth1Cache { cache: self.deposit_cache.to_deposit_cache()?, last_processed_block: self.last_processed_block, }), + endpoints_cache: RwLock::new(None), // Set the remote head_block zero when creating a new instance. We only care about // present and future eth1 nodes. remote_head_block: RwLock::new(None), diff --git a/beacon_node/eth1/src/service.rs b/beacon_node/eth1/src/service.rs index 8a28881b120..ba5060989ea 100644 --- a/beacon_node/eth1/src/service.rs +++ b/beacon_node/eth1/src/service.rs @@ -54,7 +54,27 @@ pub enum EndpointError { type EndpointState = Result<(), EndpointError>; -type EndpointWithState = (SensitiveUrl, TRwLock>); +pub struct EndpointWithState { + endpoint: SensitiveUrl, + state: TRwLock>, +} + +impl EndpointWithState { + pub fn new(endpoint: SensitiveUrl) -> Self { + Self { + endpoint, + state: TRwLock::new(None), + } + } +} + +async fn reset_endpoint_state(endpoint: &EndpointWithState) { + *endpoint.state.write().await = None; +} + +async fn get_state(endpoint: &EndpointWithState) -> Option { + *endpoint.state.read().await +} /// A cache structure to lazily check usability of endpoints. An endpoint is usable if it is /// reachable and has the correct network id and chain id. Emits a `WARN` log if a checked endpoint @@ -70,19 +90,19 @@ impl EndpointsCache { /// Checks the usability of an endpoint. Results get cached and therefore only the first call /// for each endpoint does the real check. async fn state(&self, endpoint: &EndpointWithState) -> EndpointState { - if let Some(result) = *endpoint.1.read().await { + if let Some(result) = *endpoint.state.read().await { return result; } - let mut value = endpoint.1.write().await; + let mut value = endpoint.state.write().await; if let Some(result) = *value { return result; } crate::metrics::inc_counter_vec( &crate::metrics::ENDPOINT_REQUESTS, - &[&endpoint.0.to_string()], + &[&endpoint.endpoint.to_string()], ); let state = endpoint_state( - &endpoint.0, + &endpoint.endpoint, &self.config_network_id, &self.config_chain_id, &self.log, @@ -92,7 +112,7 @@ impl EndpointsCache { if state.is_err() { crate::metrics::inc_counter_vec( &crate::metrics::ENDPOINT_ERRORS, - &[&endpoint.0.to_string()], + &[&endpoint.endpoint.to_string()], ); crate::metrics::set_gauge(&metrics::ETH1_CONNECTED, 0); } else { @@ -114,12 +134,12 @@ impl EndpointsCache { .first_success(|endpoint| async move { match self.state(endpoint).await { Ok(()) => { - let endpoint_str = &endpoint.0.to_string(); + let endpoint_str = &endpoint.endpoint.to_string(); crate::metrics::inc_counter_vec( &crate::metrics::ENDPOINT_REQUESTS, &[endpoint_str], ); - match func(&endpoint.0).await { + match func(&endpoint.endpoint).await { Ok(t) => Ok(t), Err(t) => { crate::metrics::inc_counter_vec( @@ -127,7 +147,10 @@ impl EndpointsCache { &[endpoint_str], ); if let SingleEndpointError::EndpointError(e) = &t { - *endpoint.1.write().await = Some(Err(*e)); + *endpoint.state.write().await = Some(Err(*e)); + } else { + // A non-`EndpointError` error occurred, so reset the state. + reset_endpoint_state(endpoint).await; } Err(t) } @@ -138,6 +161,16 @@ impl EndpointsCache { }) .await } + + pub async fn reset_errorred_endpoints(&self) { + for endpoint in &self.fallback.servers { + if let Some(state) = get_state(endpoint).await { + if state.is_err() { + reset_endpoint_state(endpoint).await; + } + } + } + } } /// Returns `Ok` if the endpoint is usable, i.e. is reachable and has a correct network id and @@ -405,9 +438,9 @@ impl Default for Config { follow_distance: 128, node_far_behind_seconds: 128 * 14, block_cache_truncation: Some(4_096), - auto_update_interval_millis: 7_000, + auto_update_interval_millis: 60_000, blocks_per_log_query: 1_000, - max_log_requests_per_update: Some(100), + max_log_requests_per_update: Some(5_000), max_blocks_per_update: Some(8_192), purge_cache: false, } @@ -435,6 +468,7 @@ impl Service { deposit_cache: RwLock::new(DepositUpdater::new( config.deposit_contract_deploy_block, )), + endpoints_cache: RwLock::new(None), remote_head_block: RwLock::new(None), config: RwLock::new(config), spec, @@ -605,20 +639,31 @@ impl Service { self.inner.config.write().lowest_cached_block_number = block_number; } - pub fn init_endpoints(&self) -> EndpointsCache { + /// Builds a new `EndpointsCache` with empty states. + pub fn init_endpoints(&self) -> Arc { let endpoints = self.config().endpoints.clone(); let config_network_id = self.config().network_id.clone(); let config_chain_id = self.config().chain_id.clone(); - EndpointsCache { - fallback: Fallback::new( - endpoints - .into_iter() - .map(|s| (s, TRwLock::new(None))) - .collect(), - ), + let new_cache = Arc::new(EndpointsCache { + fallback: Fallback::new(endpoints.into_iter().map(EndpointWithState::new).collect()), config_network_id, config_chain_id, log: self.log.clone(), + }); + + let mut endpoints_cache = self.inner.endpoints_cache.write(); + *endpoints_cache = Some(new_cache.clone()); + new_cache + } + + /// Returns the cached `EndpointsCache` if it exists or builds a new one. + pub fn get_endpoints(&self) -> Arc { + let endpoints_cache = self.inner.endpoints_cache.read(); + if let Some(cache) = endpoints_cache.clone() { + cache + } else { + drop(endpoints_cache); + self.init_endpoints() } } @@ -633,7 +678,11 @@ impl Service { pub async fn update( &self, ) -> Result<(DepositCacheUpdateOutcome, BlockCacheUpdateOutcome), String> { - let endpoints = self.init_endpoints(); + let endpoints = self.get_endpoints(); + + // Reset the state of any endpoints which have errored so their state can be redetermined. + endpoints.reset_errorred_endpoints().await; + let node_far_behind_seconds = self.inner.config.read().node_far_behind_seconds; let process_single_err = |e: &FallbackError| { @@ -656,7 +705,7 @@ impl Service { } } } - endpoints.fallback.map_format_error(|s| &s.0, &e) + endpoints.fallback.map_format_error(|s| &s.endpoint, &e) }; let process_err = |e: Error| match &e { diff --git a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs index 659c8d9a392..d2763669e71 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs @@ -60,6 +60,9 @@ const PEER_EXCESS_FACTOR: f32 = 0.1; /// them in lighthouse. const ALLOWED_NEGATIVE_GOSSIPSUB_FACTOR: f32 = 0.1; +/// A fraction of `PeerManager::target_peers` that need to be outbound-only connections. +const MIN_OUTBOUND_ONLY_FACTOR: f32 = 0.1; + /// The main struct that handles peer's reputation and connection status. pub struct PeerManager { /// Storage of network globals to access the `PeerDB`. @@ -835,7 +838,6 @@ impl PeerManager { /// NOTE: This is experimental and will likely be adjusted fn update_peer_scores(&mut self) { /* Check how long have peers been in this state and update their reputations if needed */ - let mut to_ban_peers = Vec::new(); let mut to_unban_peers = Vec::new(); @@ -910,12 +912,16 @@ impl PeerManager { /// The Peer manager's heartbeat maintains the peer count and maintains peer reputations. /// /// It will request discovery queries if the peer count has not reached the desired number of - /// peers. + /// overall peers, as well as the desired number of outbound-only peers. /// /// NOTE: Discovery will only add a new query if one isn't already queued. fn heartbeat(&mut self) { let peer_count = self.network_globals.connected_or_dialing_peers(); - if peer_count < self.target_peers { + let mut outbound_only_peer_count = self.network_globals.connected_outbound_only_peers(); + let min_outbound_only_target = + (self.target_peers as f32 * MIN_OUTBOUND_ONLY_FACTOR).ceil() as usize; + + if peer_count < self.target_peers || outbound_only_peer_count < min_outbound_only_target { // If we need more peers, queue a discovery lookup. if self.discovery.started { debug!(self.log, "Starting a new peer discovery query"; "connected_peers" => peer_count, "target_peers" => self.target_peers); @@ -931,19 +937,28 @@ impl PeerManager { let connected_peer_count = self.network_globals.connected_peers(); if connected_peer_count > self.target_peers { - //remove excess peers with the worst scores, but keep subnet peers - for (peer_id, _) in self + // Remove excess peers with the worst scores, but keep subnet peers. + // Must also ensure that the outbound-only peer count does not go below the minimum threshold. + outbound_only_peer_count = self.network_globals.connected_outbound_only_peers(); + let mut n_outbound_removed = 0; + for (peer_id, info) in self .network_globals .peers .read() .worst_connected_peers() .iter() .filter(|(_, info)| !info.has_future_duty()) - .take(connected_peer_count - self.target_peers) - //we only need to disconnect peers with healthy scores, since the others got already - //disconnected in update_peer_scores - .filter(|(_, info)| info.score_state() == ScoreState::Healthy) { + if disconnecting_peers.len() == connected_peer_count - self.target_peers { + break; + } + if info.is_outbound_only() { + if min_outbound_only_target < outbound_only_peer_count - n_outbound_removed { + n_outbound_removed += 1; + } else { + continue; + } + } disconnecting_peers.push(**peer_id); } } @@ -1045,3 +1060,296 @@ enum ConnectingType { multiaddr: Multiaddr, }, } + +#[cfg(test)] +mod tests { + use super::*; + use crate::discovery::enr::build_enr; + use crate::discovery::enr_ext::CombinedKeyExt; + use crate::rpc::methods::MetaData; + use crate::Enr; + use discv5::enr::CombinedKey; + use slog::{o, Drain}; + use std::net::UdpSocket; + use types::{EnrForkId, MinimalEthSpec}; + + type E = MinimalEthSpec; + + pub fn unused_port() -> u16 { + let socket = UdpSocket::bind("127.0.0.1:0").expect("should create udp socket"); + let local_addr = socket.local_addr().expect("should read udp socket"); + local_addr.port() + } + + pub fn build_log(level: slog::Level, enabled: bool) -> slog::Logger { + let decorator = slog_term::TermDecorator::new().build(); + let drain = slog_term::FullFormat::new(decorator).build().fuse(); + let drain = slog_async::Async::new(drain).build().fuse(); + + if enabled { + slog::Logger::root(drain.filter_level(level).fuse(), o!()) + } else { + slog::Logger::root(drain.filter(|_| false).fuse(), o!()) + } + } + + async fn build_peer_manager(target: usize) -> PeerManager { + let keypair = libp2p::identity::Keypair::generate_secp256k1(); + let config = NetworkConfig { + discovery_port: unused_port(), + target_peers: target, + ..Default::default() + }; + let enr_key: CombinedKey = CombinedKey::from_libp2p(&keypair).unwrap(); + let enr: Enr = build_enr::(&enr_key, &config, EnrForkId::default()).unwrap(); + let log = build_log(slog::Level::Debug, false); + let globals = NetworkGlobals::new( + enr, + 9000, + 9000, + MetaData { + seq_number: 0, + attnets: Default::default(), + }, + vec![], + &log, + ); + PeerManager::new(&keypair, &config, Arc::new(globals), &log) + .await + .unwrap() + } + + #[tokio::test] + async fn test_peer_manager_disconnects_correctly_during_heartbeat() { + let mut peer_manager = build_peer_manager(3).await; + + // Create 5 peers to connect to. + // 2 will be outbound-only, and have the lowest score. + let peer0 = PeerId::random(); + let peer1 = PeerId::random(); + let peer2 = PeerId::random(); + let outbound_only_peer1 = PeerId::random(); + let outbound_only_peer2 = PeerId::random(); + + peer_manager.connect_ingoing(&peer0, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_ingoing(&peer1, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_ingoing(&peer2, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_outgoing(&outbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_outgoing(&outbound_only_peer2, "/ip4/0.0.0.0".parse().unwrap()); + + // Set the outbound-only peers to have the lowest score. + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&outbound_only_peer1) + .unwrap() + .add_to_score(-1.0); + + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&outbound_only_peer2) + .unwrap() + .add_to_score(-2.0); + + // Check initial connected peers. + assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 5); + + peer_manager.heartbeat(); + + // Check that we disconnected from two peers. + // Check that one outbound-only peer was removed because it had the worst score + // and that we did not disconnect the other outbound peer due to the minimum outbound quota. + assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3); + assert!(peer_manager + .network_globals + .peers + .read() + .is_connected(&outbound_only_peer1)); + assert!(!peer_manager + .network_globals + .peers + .read() + .is_connected(&outbound_only_peer2)); + + peer_manager.heartbeat(); + + // Check that if we are at target number of peers, we do not disconnect any. + assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3); + } + + #[tokio::test] + async fn test_peer_manager_not_enough_outbound_peers_no_panic_during_heartbeat() { + let mut peer_manager = build_peer_manager(20).await; + + // Connect to 20 ingoing-only peers. + for _i in 0..19 { + let peer = PeerId::random(); + peer_manager.connect_ingoing(&peer, "/ip4/0.0.0.0".parse().unwrap()); + } + + // Connect an outbound-only peer. + // Give it the lowest score so that it is evaluated first in the disconnect list iterator. + let outbound_only_peer = PeerId::random(); + peer_manager.connect_ingoing(&outbound_only_peer, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(outbound_only_peer)) + .unwrap() + .add_to_score(-1.0); + // After heartbeat, we will have removed one peer. + // Having less outbound-only peers than minimum won't cause panic when the outbound-only peer is being considered for disconnection. + peer_manager.heartbeat(); + assert_eq!( + peer_manager.network_globals.connected_or_dialing_peers(), + 20 + ); + } + + #[tokio::test] + async fn test_peer_manager_removes_unhealthy_peers_during_heartbeat() { + let mut peer_manager = build_peer_manager(3).await; + + // Create 3 peers to connect to. + let peer0 = PeerId::random(); + let inbound_only_peer1 = PeerId::random(); + let outbound_only_peer1 = PeerId::random(); + + peer_manager.connect_ingoing(&peer0, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_outgoing(&peer0, "/ip4/0.0.0.0".parse().unwrap()); + + // Connect to two peers that are on the threshold of being disconnected. + peer_manager.connect_ingoing(&inbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_outgoing(&outbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(inbound_only_peer1)) + .unwrap() + .add_to_score(-19.9); + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(outbound_only_peer1)) + .unwrap() + .add_to_score(-19.9); + // Update the gossipsub scores to induce connection downgrade + // during the heartbeat, update_peer_scores will downgrade the score from -19.9 to at least -20, this will then trigger a disconnection. + // If we changed the peer scores to -20 before the heartbeat, update_peer_scores will mark the previous score status as disconnected, + // then handle_state_transitions will not change the connection status to disconnected because the score state has not changed. + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(inbound_only_peer1)) + .unwrap() + .set_gossipsub_score(-85.0); + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(outbound_only_peer1)) + .unwrap() + .set_gossipsub_score(-85.0); + + peer_manager.heartbeat(); + + assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 1); + } + + #[tokio::test] + async fn test_peer_manager_remove_unhealthy_peers_brings_peers_below_target() { + let mut peer_manager = build_peer_manager(3).await; + + // Create 4 peers to connect to. + // One pair will be unhealthy inbound only and outbound only peers. + let peer0 = PeerId::random(); + let peer1 = PeerId::random(); + let inbound_only_peer1 = PeerId::random(); + let outbound_only_peer1 = PeerId::random(); + + peer_manager.connect_ingoing(&peer0, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_ingoing(&peer1, "/ip4/0.0.0.0".parse().unwrap()); + + // Connect to two peers that are on the threshold of being disconnected. + peer_manager.connect_ingoing(&inbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_outgoing(&outbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(inbound_only_peer1)) + .unwrap() + .add_to_score(-19.9); + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(outbound_only_peer1)) + .unwrap() + .add_to_score(-19.9); + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(inbound_only_peer1)) + .unwrap() + .set_gossipsub_score(-85.0); + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(outbound_only_peer1)) + .unwrap() + .set_gossipsub_score(-85.0); + peer_manager.heartbeat(); + // Tests that when we are over the target peer limit, after disconnecting two unhealthy peers, + // the loop to check for disconnecting peers will stop because we have removed enough peers (only needed to remove 1 to reach target). + assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 2); + } + + #[tokio::test] + async fn test_peer_manager_removes_enough_peers_when_one_is_unhealthy() { + let mut peer_manager = build_peer_manager(3).await; + + // Create 5 peers to connect to. + // One will be unhealthy inbound only and outbound only peers. + let peer0 = PeerId::random(); + let peer1 = PeerId::random(); + let peer2 = PeerId::random(); + let inbound_only_peer1 = PeerId::random(); + let outbound_only_peer1 = PeerId::random(); + + peer_manager.connect_ingoing(&peer0, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_ingoing(&peer1, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_ingoing(&peer2, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_outgoing(&outbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap()); + // Have one peer be on the verge of disconnection. + peer_manager.connect_ingoing(&inbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(inbound_only_peer1)) + .unwrap() + .add_to_score(-19.9); + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(inbound_only_peer1)) + .unwrap() + .set_gossipsub_score(-85.0); + + peer_manager.heartbeat(); + // Tests that when we are over the target peer limit, after disconnecting an unhealthy peer, + // the number of connected peers updates and we will not remove too many peers. + assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3); + } +} diff --git a/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs b/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs index 49546776a8f..43570c5aeee 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs @@ -182,6 +182,11 @@ impl PeerInfo { matches!(self.connection_status, Disconnected { .. }) } + /// Checks if the peer is outbound-only + pub fn is_outbound_only(&self) -> bool { + matches!(self.connection_status, Connected {n_in, n_out} if n_in == 0 && n_out > 0) + } + /// Returns the number of connections with this peer. pub fn connections(&self) -> (u8, u8) { match self.connection_status { @@ -306,6 +311,11 @@ impl PeerInfo { self.score.test_add(score) } } + + #[cfg(test)] + pub fn set_gossipsub_score(&mut self, score: f64) { + self.score.set_gossipsub_score(score); + } } #[derive(Clone, Debug, Serialize)] diff --git a/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs index f3785e92540..e96ca3a81d0 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs @@ -232,6 +232,14 @@ impl PeerDB { .map(|(peer_id, _)| peer_id) } + /// Connected outbound-only peers + pub fn connected_outbound_only_peers(&self) -> impl Iterator { + self.peers + .iter() + .filter(|(_, info)| info.is_outbound_only()) + .map(|(peer_id, _)| peer_id) + } + /// Gives the `peer_id` of all known connected and synced peers. pub fn synced_peers(&self) -> impl Iterator { self.peers @@ -688,6 +696,25 @@ mod tests { assert_eq!(peer_info.unwrap().connections(), (n_in, n_out)); } + #[test] + fn test_outbound_only_peers_counted_correctly() { + let mut pdb = get_db(); + let p0 = PeerId::random(); + let p1 = PeerId::random(); + let p2 = PeerId::random(); + // Create peer with no connections. + let _p3 = PeerId::random(); + + pdb.connect_ingoing(&p0, "/ip4/0.0.0.0".parse().unwrap(), None); + pdb.connect_ingoing(&p1, "/ip4/0.0.0.0".parse().unwrap(), None); + pdb.connect_outgoing(&p1, "/ip4/0.0.0.0".parse().unwrap(), None); + pdb.connect_outgoing(&p2, "/ip4/0.0.0.0".parse().unwrap(), None); + + // We should only have one outbound-only peer (p2). + // Peers that are inbound-only, have both types of connections, or no connections should not be counted. + assert_eq!(pdb.connected_outbound_only_peers().count(), 1); + } + #[test] fn test_disconnected_are_bounded() { let mut pdb = get_db(); diff --git a/beacon_node/eth2_libp2p/src/peer_manager/score.rs b/beacon_node/eth2_libp2p/src/peer_manager/score.rs index 38fecad3af9..02479bef067 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/score.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/score.rs @@ -216,6 +216,13 @@ impl RealScore { self.set_lighthouse_score(0f64); } + // Set the gossipsub_score to a specific f64. + // Used in testing to induce score status changes during a heartbeat. + #[cfg(test)] + pub fn set_gossipsub_score(&mut self, score: f64) { + self.gossipsub_score = score; + } + /// Applies time-based logic such as decay rates to the score. /// This function should be called periodically. pub fn update(&mut self) { @@ -291,6 +298,8 @@ apply!(update_gossipsub_score, new_score: f64, ignore: bool); apply!(test_add, score: f64); #[cfg(test)] apply!(test_reset); +#[cfg(test)] +apply!(set_gossipsub_score, score: f64); impl Score { pub fn score(&self) -> f64 { diff --git a/beacon_node/eth2_libp2p/src/types/globals.rs b/beacon_node/eth2_libp2p/src/types/globals.rs index 67abcf77242..4055e53b205 100644 --- a/beacon_node/eth2_libp2p/src/types/globals.rs +++ b/beacon_node/eth2_libp2p/src/types/globals.rs @@ -84,6 +84,11 @@ impl NetworkGlobals { self.peers.read().connected_peer_ids().count() } + /// Returns the number of libp2p connected peers with outbound-only connections. + pub fn connected_outbound_only_peers(&self) -> usize { + self.peers.read().connected_outbound_only_peers().count() + } + /// Returns the number of libp2p peers that are either connected or being dialed. pub fn connected_or_dialing_peers(&self) -> usize { self.peers.read().connected_or_dialing_peers().count() diff --git a/beacon_node/http_api/src/block_id.rs b/beacon_node/http_api/src/block_id.rs index 793a10a5e8c..c21701f3a37 100644 --- a/beacon_node/http_api/src/block_id.rs +++ b/beacon_node/http_api/src/block_id.rs @@ -1,4 +1,4 @@ -use beacon_chain::{BeaconChain, BeaconChainTypes}; +use beacon_chain::{BeaconChain, BeaconChainTypes, WhenSlotSkipped}; use eth2::types::BlockId as CoreBlockId; use std::str::FromStr; use types::{Hash256, SignedBeaconBlock, Slot}; @@ -37,7 +37,7 @@ impl BlockId { .map(|head| head.current_justified_checkpoint.root) .map_err(warp_utils::reject::beacon_chain_error), CoreBlockId::Slot(slot) => chain - .block_root_at_slot(*slot) + .block_root_at_slot(*slot, WhenSlotSkipped::None) .map_err(warp_utils::reject::beacon_chain_error) .and_then(|root_opt| { root_opt.ok_or_else(|| { diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 325b2898865..30b8c8d90bd 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -17,6 +17,7 @@ use beacon_chain::{ observed_operations::ObservationOutcome, validator_monitor::{get_block_delay_ms, timestamp_now}, AttestationError as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes, + WhenSlotSkipped, }; use block_id::BlockId; use eth2::types::{self as api_types, ValidatorId}; @@ -751,7 +752,7 @@ pub fn serve( let block = BlockId::from_root(root).block(&chain)?; let canonical = chain - .block_root_at_slot(block.slot()) + .block_root_at_slot(block.slot(), WhenSlotSkipped::None) .map_err(warp_utils::reject::beacon_chain_error)? .map_or(false, |canonical| root == canonical); diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index fd284a7c1ce..4fb7dc39a83 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -2,7 +2,7 @@ use beacon_chain::{ test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType}, - BeaconChain, StateSkipConfig, MAXIMUM_GOSSIP_CLOCK_DISPARITY, + BeaconChain, StateSkipConfig, WhenSlotSkipped, MAXIMUM_GOSSIP_CLOCK_DISPARITY, }; use environment::null_logger; use eth2::Error; @@ -791,7 +791,10 @@ impl ApiTester { .current_justified_checkpoint .root, ), - BlockId::Slot(slot) => self.chain.block_root_at_slot(slot).unwrap(), + BlockId::Slot(slot) => self + .chain + .block_root_at_slot(slot, WhenSlotSkipped::None) + .unwrap(), BlockId::Root(root) => Some(root), } } @@ -812,14 +815,21 @@ impl ApiTester { .unwrap() .map(|res| res.data); - let root = self.chain.block_root_at_slot(slot).unwrap(); + let root = self + .chain + .block_root_at_slot(slot, WhenSlotSkipped::None) + .unwrap(); if root.is_none() && result.is_none() { continue; } let root = root.unwrap(); - let block = self.chain.block_at_slot(slot).unwrap().unwrap(); + let block = self + .chain + .block_at_slot(slot, WhenSlotSkipped::Prev) + .unwrap() + .unwrap(); let header = BlockHeaderData { root, canonical: true, @@ -900,7 +910,7 @@ impl ApiTester { let block_root = block_root_opt.unwrap(); let canonical = self .chain - .block_root_at_slot(block.slot()) + .block_root_at_slot(block.slot(), WhenSlotSkipped::None) .unwrap() .map_or(false, |canonical| block_root == canonical); @@ -1532,7 +1542,10 @@ impl ApiTester { let dependent_root = self .chain - .root_at_slot((epoch - 1).start_slot(E::slots_per_epoch()) - 1) + .block_root_at_slot( + (epoch - 1).start_slot(E::slots_per_epoch()) - 1, + WhenSlotSkipped::Prev, + ) .unwrap() .unwrap_or(self.chain.head_beacon_block_root().unwrap()); @@ -1604,7 +1617,10 @@ impl ApiTester { let dependent_root = self .chain - .root_at_slot(epoch.start_slot(E::slots_per_epoch()) - 1) + .block_root_at_slot( + epoch.start_slot(E::slots_per_epoch()) - 1, + WhenSlotSkipped::Prev, + ) .unwrap() .unwrap_or(self.chain.head_beacon_block_root().unwrap()); @@ -2186,7 +2202,7 @@ impl ApiTester { current_duty_dependent_root, previous_duty_dependent_root: self .chain - .root_at_slot(current_slot - E::slots_per_epoch()) + .block_root_at_slot(current_slot - E::slots_per_epoch(), WhenSlotSkipped::Prev) .unwrap() .unwrap(), epoch_transition: true, @@ -2195,7 +2211,7 @@ impl ApiTester { let expected_finalized = EventKind::FinalizedCheckpoint(SseFinalizedCheckpoint { block: self .chain - .root_at_slot(next_slot - finalization_distance) + .block_root_at_slot(next_slot - finalization_distance, WhenSlotSkipped::Prev) .unwrap() .unwrap(), state: self diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index d25590f7c74..e1e51ef2c6b 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -2,7 +2,7 @@ use crate::beacon_processor::worker::FUTURE_SLOT_TOLERANCE; use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use crate::sync::SyncMessage; -use beacon_chain::{BeaconChainError, BeaconChainTypes}; +use beacon_chain::{BeaconChainError, BeaconChainTypes, WhenSlotSkipped}; use eth2_libp2p::rpc::StatusMessage; use eth2_libp2p::rpc::*; use eth2_libp2p::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo}; @@ -72,7 +72,7 @@ impl Worker { && local.finalized_root != Hash256::zero() && self .chain - .root_at_slot(start_slot(remote.finalized_epoch)) + .block_root_at_slot(start_slot(remote.finalized_epoch), WhenSlotSkipped::Prev) .map(|root_opt| root_opt != Some(remote.finalized_root))? { // The remote's finalized epoch is less than or equal to ours, but the block root is diff --git a/beacon_node/store/src/chunked_vector.rs b/beacon_node/store/src/chunked_vector.rs index 7dda6278e57..9a26304a0b5 100644 --- a/beacon_node/store/src/chunked_vector.rs +++ b/beacon_node/store/src/chunked_vector.rs @@ -431,9 +431,15 @@ fn range_query, E: EthSpec, T: Decode + Encode>( start_index: usize, end_index: usize, ) -> Result>, Error> { - let mut result = vec![]; + let range = start_index..=end_index; + let len = range + .end() + // Add one to account for inclusive range. + .saturating_add(1) + .saturating_sub(*range.start()); + let mut result = Vec::with_capacity(len); - for chunk_index in start_index..=end_index { + for chunk_index in range { let key = &chunk_key(chunk_index as u64)[..]; let chunk = Chunk::load(store, column, key)?.ok_or(ChunkError::Missing { chunk_index })?; result.push(chunk); diff --git a/boot_node/Cargo.toml b/boot_node/Cargo.toml index 063687584f7..897fa2f3bc0 100644 --- a/boot_node/Cargo.toml +++ b/boot_node/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "boot_node" -version = "1.3.0" +version = "1.4.0" authors = ["Sigma Prime "] edition = "2018" diff --git a/common/fallback/src/lib.rs b/common/fallback/src/lib.rs index 1e3cb9cf044..94b177de35c 100644 --- a/common/fallback/src/lib.rs +++ b/common/fallback/src/lib.rs @@ -2,8 +2,9 @@ use itertools::{join, zip}; use std::fmt::{Debug, Display}; use std::future::Future; +#[derive(Clone)] pub struct Fallback { - servers: Vec, + pub servers: Vec, } #[derive(Debug, PartialEq)] diff --git a/common/lighthouse_version/src/lib.rs b/common/lighthouse_version/src/lib.rs index 5fa8fbe5a8f..06dec1c0dee 100644 --- a/common/lighthouse_version/src/lib.rs +++ b/common/lighthouse_version/src/lib.rs @@ -16,7 +16,7 @@ pub const VERSION: &str = git_version!( // NOTE: using --match instead of --exclude for compatibility with old Git "--match=thiswillnevermatchlol" ], - prefix = "Lighthouse/v1.3.0-", + prefix = "Lighthouse/v1.4.0-", fallback = "unknown" ); diff --git a/consensus/fork_choice/tests/tests.rs b/consensus/fork_choice/tests/tests.rs index 5c0db4ebc92..ec0b1277432 100644 --- a/consensus/fork_choice/tests/tests.rs +++ b/consensus/fork_choice/tests/tests.rs @@ -3,7 +3,7 @@ use beacon_chain::{ test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType}, BeaconChain, BeaconChainError, BeaconForkChoiceStore, ChainConfig, ForkChoiceError, - StateSkipConfig, + StateSkipConfig, WhenSlotSkipped, }; use fork_choice::{ ForkChoiceStore, InvalidAttestation, InvalidBlock, QueuedAttestation, @@ -872,7 +872,7 @@ fn invalid_attestation_future_block() { MutationDelay::Blocks(1), |attestation, chain| { attestation.data.beacon_block_root = chain - .block_at_slot(chain.slot().unwrap()) + .block_at_slot(chain.slot().unwrap(), WhenSlotSkipped::Prev) .unwrap() .unwrap() .canonical_root(); @@ -901,7 +901,7 @@ fn invalid_attestation_inconsistent_ffg_vote() { MutationDelay::NoDelay, |attestation, chain| { attestation.data.target.root = chain - .block_at_slot(Slot::new(1)) + .block_at_slot(Slot::new(1), WhenSlotSkipped::Prev) .unwrap() .unwrap() .canonical_root(); @@ -909,7 +909,7 @@ fn invalid_attestation_inconsistent_ffg_vote() { *attestation_opt.lock().unwrap() = Some(attestation.data.target.root); *local_opt.lock().unwrap() = Some( chain - .block_at_slot(Slot::new(0)) + .block_at_slot(Slot::new(0), WhenSlotSkipped::Prev) .unwrap() .unwrap() .canonical_root(), diff --git a/lcli/Cargo.toml b/lcli/Cargo.toml index bc9c69c8b7d..56d8b615610 100644 --- a/lcli/Cargo.toml +++ b/lcli/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "lcli" description = "Lighthouse CLI (modeled after zcli)" -version = "1.3.0" +version = "1.4.0" authors = ["Paul Hauner "] edition = "2018" diff --git a/lighthouse/Cargo.toml b/lighthouse/Cargo.toml index 60009400aa3..7746d58956c 100644 --- a/lighthouse/Cargo.toml +++ b/lighthouse/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lighthouse" -version = "1.3.0" +version = "1.4.0" authors = ["Sigma Prime "] edition = "2018" autotests = false