From f5f3e569ba7719bc60d7f068cc01e49a9da1c9b7 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Mon, 6 Sep 2021 15:08:13 +1000 Subject: [PATCH] Cleanup and fixes --- .../eth2_libp2p/src/types/sync_state.rs | 12 +- .../network/src/sync/backfill_sync/mod.rs | 160 ++++++++++-------- beacon_node/network/src/sync/manager.rs | 19 ++- beacon_node/store/src/metadata.rs | 7 + scripts/local_testnet/README.md | 2 +- 5 files changed, 118 insertions(+), 82 deletions(-) diff --git a/beacon_node/eth2_libp2p/src/types/sync_state.rs b/beacon_node/eth2_libp2p/src/types/sync_state.rs index 7d7519fcdf7..82f4a6e963a 100644 --- a/beacon_node/eth2_libp2p/src/types/sync_state.rs +++ b/beacon_node/eth2_libp2p/src/types/sync_state.rs @@ -57,6 +57,10 @@ impl PartialEq for SyncState { | (SyncState::Synced, SyncState::Synced) | (SyncState::Stalled, SyncState::Stalled) | (SyncState::SyncTransition, SyncState::SyncTransition) + | ( + SyncState::BackFillSyncing { .. }, + SyncState::BackFillSyncing { .. } + ) ) } } @@ -76,8 +80,10 @@ impl SyncState { } /// Returns true if the node is synced. + /// + /// NOTE: We consider the node synced if it is fetching old historical blocks. pub fn is_synced(&self) -> bool { - matches!(self, SyncState::Synced) + matches!(self, SyncState::Synced | SyncState::BackFillSyncing { .. }) } } @@ -88,8 +94,8 @@ impl std::fmt::Display for SyncState { SyncState::SyncingHead { .. } => write!(f, "Syncing Head Chain"), SyncState::Synced { .. } => write!(f, "Synced"), SyncState::Stalled { .. } => write!(f, "Stalled"), - SyncState::SyncTransition => write!(f, "Searching syncing peers"), - SyncState::BackFillSyncing { .. } => write!(f, "Syncing old blocks."), + SyncState::SyncTransition => write!(f, "Evaluating known peers"), + SyncState::BackFillSyncing { .. } => write!(f, "Syncing Historical Blocks"), } } } diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 9f2b65f4fd1..a60636c9f89 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -24,7 +24,7 @@ use std::collections::{ }; use std::sync::Arc; use tokio::sync::mpsc; -use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock}; +use types::{Epoch, EthSpec, SignedBeaconBlock}; /// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of /// blocks per batch are requested _at most_. A batch may request less blocks to account for @@ -128,6 +128,10 @@ pub struct BackFillSync { /// have participated and only penalize these peers if backfill sync fails. participating_peers: HashSet, + /// When a backfill sync fails, we keep track of whether a new fully synced peer has joined. + /// This signifies that we are able to attempt to restart a failed chain. + restart_failed_sync: bool, + /// Reference to the beacon chain to obtain initial starting points for the backfill sync. beacon_chain: Arc>, @@ -157,7 +161,7 @@ impl BackFillSync { let (state, current_start) = if let Some(anchor_info) = beacon_chain.store.get_anchor_info() { - if anchor_info.oldest_block_parent == Hash256::zero() { + if anchor_info.block_backfill_complete() { (BackFillState::Completed, Epoch::new(0)) } else { ( @@ -183,6 +187,7 @@ impl BackFillSync { current_processing_batch: None, validated_batches: 0, participating_peers: HashSet::new(), + restart_failed_sync: false, beacon_chain, beacon_processor_send, log, @@ -193,11 +198,6 @@ impl BackFillSync { bfs } - /// Returns the current status of the backfill sync. - pub fn state(&self) -> &BackFillState { - &self.state - } - /// Pauses the backfill sync if it's currently syncing. pub fn pause(&mut self) { if let BackFillState::Syncing = self.state { @@ -209,6 +209,8 @@ impl BackFillSync { /// Starts or resumes syncing. /// /// If resuming is successful, reports back the current syncing metrics. + /// The `new_peer` parameter indicates a new synced peer has been added and we should attempt + /// to revive any previously failed backfill sync. #[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"] pub fn start( &mut self, @@ -217,24 +219,73 @@ impl BackFillSync { match self.state { BackFillState::Syncing => {} // already syncing ignore. BackFillState::Paused => { - debug!(self.log, "Resuming backfill sync"; "start_epoch" => self.current_start, "awaiting_batches" => self.batches.len()); - self.state = BackFillState::Syncing; - // begin requesting blocks from the peer pool, until all peers are exhausted. - self.request_batches(network)?; + if self + .network_globals + .peers + .read() + .synced_peers() + .next() + .is_some() + { + // If there are peers to resume with, begin the resume. + debug!(self.log, "Resuming backfill sync"; "start_epoch" => self.current_start, "awaiting_batches" => self.batches.len(), "processing_target" => self.processing_target); + self.state = BackFillState::Syncing; + // begin requesting blocks from the peer pool, until all peers are exhausted. + self.request_batches(network)?; - // start processing batches if needed - self.process_completed_batches(network)?; + // start processing batches if needed + self.process_completed_batches(network)?; + } else { + return Ok(SyncStart::NotSyncing); + } } BackFillState::Failed => { - // We don't attempt to restart sync here. We use the restart() function - // explicitly to prevent accidentally starting a failed sync. - return Ok(SyncStart::NotSyncing); + // Attempt to recover from a failed sync. All local variables should be reset and + // cleared already for a fresh start. + // We only attempt to restart a failed backfill sync if a new synced peer has been + // added. + if !self.restart_failed_sync { + return Ok(SyncStart::NotSyncing); + } + + self.state = BackFillState::Syncing; + + // Obtain a new start slot, from the beacon chain and handle possible errors. + match self.reset_start_epoch() { + Err(ResetEpochError::SyncCompleted) => { + error!(self.log, "Backfill sync completed whilst in failed status"); + self.state = BackFillState::Completed; + self.update_global_state(); + return Err(BackFillError::InvalidSyncState(String::from( + "chain completed", + ))); + } + Err(ResetEpochError::NotRequired) => { + error!( + self.log, + "Backfill sync not required whilst in failed status" + ); + self.state = BackFillState::NotRequired; + self.update_global_state(); + return Err(BackFillError::InvalidSyncState(String::from( + "backfill not required", + ))); + } + Ok(_) => {} + } + + debug!(self.log, "Resuming a failed backfill sync"; "start_epoch" => self.current_start); + + // begin requesting blocks from the peer pool, until all peers are exhausted. + self.request_batches(network)?; } BackFillState::Completed | BackFillState::NotRequired => { return Ok(SyncStart::NotSyncing) } } + self.update_global_state(); + Ok(SyncStart::Syncing { completed: (self.validated_batches * BACKFILL_EPOCHS_PER_BATCH @@ -246,55 +297,13 @@ impl BackFillSync { }) } - /// Attempt to restart sync from a failed status. - pub fn restart( - &mut self, - network: &mut SyncNetworkContext, - ) -> Result<(), BackFillError> { - if !matches!(self.state, BackFillState::Failed) { - crit!( - self.log, - "Attempted to restart a backfill sync that isn't failed" - ); - return Err(BackFillError::InvalidSyncState(String::from( - "backfill isn't failed", - ))); - } - - // Attempt to recover from a failed sync. All local variables should be reset and - // cleared already for a fresh start. - - self.state = BackFillState::Syncing; - - // Obtain a new start slot, from the beacon chain and handle possible errors. - match self.reset_start_epoch() { - Err(ResetEpochError::SyncCompleted) => { - error!(self.log, "Backfill sync completed whilst in failed status"); - self.state = BackFillState::Completed; - self.update_global_state(); - return Err(BackFillError::InvalidSyncState(String::from( - "chain completed", - ))); - } - Err(ResetEpochError::NotRequired) => { - error!( - self.log, - "Backfill sync not required whilst in failed status" - ); - self.state = BackFillState::NotRequired; - self.update_global_state(); - return Err(BackFillError::InvalidSyncState(String::from( - "backfill not required", - ))); - } - Ok(_) => {} + /// A fully synced peer has joined us. + /// If we are in a failed state, update a local variable to indicate we are able to restart + /// the failed sync on the next attempt. + pub fn fully_synced_peer_joined(&mut self) { + if matches!(self.state, BackFillState::Failed) { + self.restart_failed_sync = true; } - self.update_global_state(); - - debug!(self.log, "Resuming a failed backfill sync"; "start_epoch" => self.current_start); - - // begin requesting blocks from the peer pool, until all peers are exhausted. - self.request_batches(network) } /// A peer has disconnected. @@ -347,7 +356,10 @@ impl BackFillSync { .next() .is_none() { - debug!(self.log, "Backfill sync paused."; "reason" => "insufficient synced peers" ); + debug!(self.log, "Backfill sync paused."; "reason" => "insufficient_synced_peers" ); + // Remove any batch that is awaiting download + self.batches + .retain(|_key, batch| !matches!(batch.state(), BatchState::AwaitingDownload)); self.state = BackFillState::Paused; self.update_global_state(); } @@ -372,7 +384,7 @@ impl BackFillSync { if !batch.is_expecting_block(peer_id, &request_id) { return Ok(()); } - debug!(self.log, "Batch failed. RPC Error"; "batch_epoch" => batch_id); + debug!(self.log, "Batch failed"; "batch_epoch" => batch_id, "error" => "rpc_error"); if let Some(active_requests) = self.active_requests.get_mut(peer_id) { active_requests.remove(&batch_id); } @@ -487,6 +499,7 @@ impl BackFillSync { self.batches.clear(); self.active_requests.clear(); self.participating_peers.clear(); + self.restart_failed_sync = false; // Reset all downloading and processing targets self.processing_target = self.current_start; @@ -573,6 +586,9 @@ impl BackFillSync { batch_id: BatchId, result: &BatchProcessResult, ) -> Result { + // On each batch process, we update the global state. + self.update_global_state(); + // The first two cases are possible in regular sync, should not occur in backfill, but we // keep this logic for handling potential processing race conditions. // result @@ -617,8 +633,10 @@ impl BackFillSync { self.advance_chain(network, batch_id); } - if batch_id == self.processing_target { - self.processing_target -= BACKFILL_EPOCHS_PER_BATCH; + if batch_id == self.processing_target && !self.last_batch_downloaded { + self.processing_target = self + .processing_target + .saturating_sub(BACKFILL_EPOCHS_PER_BATCH); } // check if the chain has completed syncing @@ -934,6 +952,7 @@ impl BackFillSync { self.send_batch(network, batch_id, peer) } else { // If we are here the chain has no more synced peers + info!(self.log, "Backfill sync paused"; "reason" => "insufficient_synced_peers"); self.state = BackFillState::Paused; Err(BackFillError::Paused) } @@ -1091,7 +1110,7 @@ impl BackFillSync { /// not required. fn reset_start_epoch(&mut self) -> Result<(), ResetEpochError> { if let Some(anchor_info) = self.beacon_chain.store.get_anchor_info() { - if anchor_info.oldest_block_parent == Hash256::zero() { + if anchor_info.block_backfill_complete() { Err(ResetEpochError::SyncCompleted) } else { self.current_start = anchor_info @@ -1110,7 +1129,8 @@ impl BackFillSync { // Check that the beacon chain agrees if let Some(anchor_info) = self.beacon_chain.store.get_anchor_info() { - if anchor_info.oldest_block_parent == Hash256::zero() { + // Conditions that we have completed a backfill sync + if anchor_info.block_backfill_complete() { return true; } else { error!(self.log, "Backfill out of sync with beacon chain"); diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index c67bd97a98e..f1c9bcb7a8e 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -43,7 +43,7 @@ use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError}; use eth2_libp2p::rpc::{methods::MAX_REQUEST_BLOCKS, BlocksByRootRequest, GoodbyeReason}; -use eth2_libp2p::types::{BackFillState, NetworkGlobals, SyncState}; +use eth2_libp2p::types::{NetworkGlobals, SyncState}; use eth2_libp2p::SyncInfo; use eth2_libp2p::{PeerAction, PeerId}; use fnv::FnvHashMap; @@ -652,14 +652,11 @@ impl SyncManager { "their_head_slot" => remote_sync_info.head_slot, "their_finalized_epoch" => remote_sync_info.finalized_epoch, "is_connected" => peer_info.is_connected()); - // A peer has transitioned its sync state. If the new state is "synced" and we have - // a failed backfill sync, we attempt to restart it as we have a new potential peer - // to progress the sync. + // A peer has transitioned its sync state. If the new state is "synced" we + // inform the backfill sync that a new synced peer has joined us. if new_state.is_synced() - && matches!(self.backfill_sync.state(), BackFillState::Failed) { - debug!(self.log, "Attempting to restart a failed backfill sync"); - let _ = self.backfill_sync.restart(&mut self.network); + self.backfill_sync.fully_synced_peer_joined(); } } peer_info.is_connected() @@ -717,6 +714,8 @@ impl SyncManager { // If we would otherwise be synced, first check if we need to perform or // complete a backfill sync. if matches!(sync_state, SyncState::Synced) { + + // Determine if we need to start/resume/restart a backfill sync. match self.backfill_sync.start(&mut self.network) { Ok(SyncStart::Syncing { completed, @@ -733,6 +732,8 @@ impl SyncManager { } } } + + // Return the sync state if backfilling is not required. sync_state } @@ -762,7 +763,9 @@ impl SyncManager { if !new_state.eq(&old_state) { info!(self.log, "Sync state updated"; "old_state" => %old_state, "new_state" => %new_state); // If we have become synced - Subscribe to all the core subnet topics - if new_state.is_synced() { + // We don't need to subscribe if the old state is a state that would have already + // invoked this call. + if new_state.is_synced() && !matches!(old_state, SyncState::Synced { .. } | SyncState::BackFillSyncing { .. }) { self.network.subscribe_core_topics(); } } diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index bca4770c6c5..fd20a588010 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -97,6 +97,13 @@ pub struct AnchorInfo { pub state_lower_limit: Slot, } +impl AnchorInfo { + /// Returns true if the block backfill has completed. + pub fn block_backfill_complete(&self) -> bool { + self.oldest_block_slot == 0 + } +} + impl StoreItem for AnchorInfo { fn db_column() -> DBColumn { DBColumn::BeaconMeta diff --git a/scripts/local_testnet/README.md b/scripts/local_testnet/README.md index 5cb8407a98f..921fd1cf214 100644 --- a/scripts/local_testnet/README.md +++ b/scripts/local_testnet/README.md @@ -20,7 +20,7 @@ Start a local eth1 ganache server ./ganache_test_node.sh ``` -Assuming you are happy with the configuration in `var.env`, deploy the deposit contract, make deposits, +Assuming you are happy with the configuration in `vars.env`, deploy the deposit contract, make deposits, create the testnet directory, genesis state and validator keys with: ```bash