Skip to content

Commit

Permalink
Cleanup and fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
AgeManning committed Sep 6, 2021
1 parent ba48b9d commit f5f3e56
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 82 deletions.
12 changes: 9 additions & 3 deletions beacon_node/eth2_libp2p/src/types/sync_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ impl PartialEq for SyncState {
| (SyncState::Synced, SyncState::Synced)
| (SyncState::Stalled, SyncState::Stalled)
| (SyncState::SyncTransition, SyncState::SyncTransition)
| (
SyncState::BackFillSyncing { .. },
SyncState::BackFillSyncing { .. }
)
)
}
}
Expand All @@ -76,8 +80,10 @@ impl SyncState {
}

/// Returns true if the node is synced.
///
/// NOTE: We consider the node synced if it is fetching old historical blocks.
pub fn is_synced(&self) -> bool {
matches!(self, SyncState::Synced)
matches!(self, SyncState::Synced | SyncState::BackFillSyncing { .. })
}
}

Expand All @@ -88,8 +94,8 @@ impl std::fmt::Display for SyncState {
SyncState::SyncingHead { .. } => write!(f, "Syncing Head Chain"),
SyncState::Synced { .. } => write!(f, "Synced"),
SyncState::Stalled { .. } => write!(f, "Stalled"),
SyncState::SyncTransition => write!(f, "Searching syncing peers"),
SyncState::BackFillSyncing { .. } => write!(f, "Syncing old blocks."),
SyncState::SyncTransition => write!(f, "Evaluating known peers"),
SyncState::BackFillSyncing { .. } => write!(f, "Syncing Historical Blocks"),
}
}
}
160 changes: 90 additions & 70 deletions beacon_node/network/src/sync/backfill_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::collections::{
};
use std::sync::Arc;
use tokio::sync::mpsc;
use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock};
use types::{Epoch, EthSpec, SignedBeaconBlock};

/// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of
/// blocks per batch are requested _at most_. A batch may request less blocks to account for
Expand Down Expand Up @@ -128,6 +128,10 @@ pub struct BackFillSync<T: BeaconChainTypes> {
/// have participated and only penalize these peers if backfill sync fails.
participating_peers: HashSet<PeerId>,

/// When a backfill sync fails, we keep track of whether a new fully synced peer has joined.
/// This signifies that we are able to attempt to restart a failed chain.
restart_failed_sync: bool,

/// Reference to the beacon chain to obtain initial starting points for the backfill sync.
beacon_chain: Arc<BeaconChain<T>>,

Expand Down Expand Up @@ -157,7 +161,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {

let (state, current_start) = if let Some(anchor_info) = beacon_chain.store.get_anchor_info()
{
if anchor_info.oldest_block_parent == Hash256::zero() {
if anchor_info.block_backfill_complete() {
(BackFillState::Completed, Epoch::new(0))
} else {
(
Expand All @@ -183,6 +187,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
current_processing_batch: None,
validated_batches: 0,
participating_peers: HashSet::new(),
restart_failed_sync: false,
beacon_chain,
beacon_processor_send,
log,
Expand All @@ -193,11 +198,6 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
bfs
}

/// Returns the current status of the backfill sync.
pub fn state(&self) -> &BackFillState {
&self.state
}

/// Pauses the backfill sync if it's currently syncing.
pub fn pause(&mut self) {
if let BackFillState::Syncing = self.state {
Expand All @@ -209,6 +209,8 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
/// Starts or resumes syncing.
///
/// If resuming is successful, reports back the current syncing metrics.
/// The `new_peer` parameter indicates a new synced peer has been added and we should attempt
/// to revive any previously failed backfill sync.
#[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"]
pub fn start(
&mut self,
Expand All @@ -217,24 +219,73 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
match self.state {
BackFillState::Syncing => {} // already syncing ignore.
BackFillState::Paused => {
debug!(self.log, "Resuming backfill sync"; "start_epoch" => self.current_start, "awaiting_batches" => self.batches.len());
self.state = BackFillState::Syncing;
// begin requesting blocks from the peer pool, until all peers are exhausted.
self.request_batches(network)?;
if self
.network_globals
.peers
.read()
.synced_peers()
.next()
.is_some()
{
// If there are peers to resume with, begin the resume.
debug!(self.log, "Resuming backfill sync"; "start_epoch" => self.current_start, "awaiting_batches" => self.batches.len(), "processing_target" => self.processing_target);
self.state = BackFillState::Syncing;
// begin requesting blocks from the peer pool, until all peers are exhausted.
self.request_batches(network)?;

// start processing batches if needed
self.process_completed_batches(network)?;
// start processing batches if needed
self.process_completed_batches(network)?;
} else {
return Ok(SyncStart::NotSyncing);
}
}
BackFillState::Failed => {
// We don't attempt to restart sync here. We use the restart() function
// explicitly to prevent accidentally starting a failed sync.
return Ok(SyncStart::NotSyncing);
// Attempt to recover from a failed sync. All local variables should be reset and
// cleared already for a fresh start.
// We only attempt to restart a failed backfill sync if a new synced peer has been
// added.
if !self.restart_failed_sync {
return Ok(SyncStart::NotSyncing);
}

self.state = BackFillState::Syncing;

// Obtain a new start slot, from the beacon chain and handle possible errors.
match self.reset_start_epoch() {
Err(ResetEpochError::SyncCompleted) => {
error!(self.log, "Backfill sync completed whilst in failed status");
self.state = BackFillState::Completed;
self.update_global_state();
return Err(BackFillError::InvalidSyncState(String::from(
"chain completed",
)));
}
Err(ResetEpochError::NotRequired) => {
error!(
self.log,
"Backfill sync not required whilst in failed status"
);
self.state = BackFillState::NotRequired;
self.update_global_state();
return Err(BackFillError::InvalidSyncState(String::from(
"backfill not required",
)));
}
Ok(_) => {}
}

debug!(self.log, "Resuming a failed backfill sync"; "start_epoch" => self.current_start);

// begin requesting blocks from the peer pool, until all peers are exhausted.
self.request_batches(network)?;
}
BackFillState::Completed | BackFillState::NotRequired => {
return Ok(SyncStart::NotSyncing)
}
}

self.update_global_state();

Ok(SyncStart::Syncing {
completed: (self.validated_batches
* BACKFILL_EPOCHS_PER_BATCH
Expand All @@ -246,55 +297,13 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
})
}

/// Attempt to restart sync from a failed status.
pub fn restart(
&mut self,
network: &mut SyncNetworkContext<T::EthSpec>,
) -> Result<(), BackFillError> {
if !matches!(self.state, BackFillState::Failed) {
crit!(
self.log,
"Attempted to restart a backfill sync that isn't failed"
);
return Err(BackFillError::InvalidSyncState(String::from(
"backfill isn't failed",
)));
}

// Attempt to recover from a failed sync. All local variables should be reset and
// cleared already for a fresh start.

self.state = BackFillState::Syncing;

// Obtain a new start slot, from the beacon chain and handle possible errors.
match self.reset_start_epoch() {
Err(ResetEpochError::SyncCompleted) => {
error!(self.log, "Backfill sync completed whilst in failed status");
self.state = BackFillState::Completed;
self.update_global_state();
return Err(BackFillError::InvalidSyncState(String::from(
"chain completed",
)));
}
Err(ResetEpochError::NotRequired) => {
error!(
self.log,
"Backfill sync not required whilst in failed status"
);
self.state = BackFillState::NotRequired;
self.update_global_state();
return Err(BackFillError::InvalidSyncState(String::from(
"backfill not required",
)));
}
Ok(_) => {}
/// A fully synced peer has joined us.
/// If we are in a failed state, update a local variable to indicate we are able to restart
/// the failed sync on the next attempt.
pub fn fully_synced_peer_joined(&mut self) {
if matches!(self.state, BackFillState::Failed) {
self.restart_failed_sync = true;
}
self.update_global_state();

debug!(self.log, "Resuming a failed backfill sync"; "start_epoch" => self.current_start);

// begin requesting blocks from the peer pool, until all peers are exhausted.
self.request_batches(network)
}

/// A peer has disconnected.
Expand Down Expand Up @@ -347,7 +356,10 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
.next()
.is_none()
{
debug!(self.log, "Backfill sync paused."; "reason" => "insufficient synced peers" );
debug!(self.log, "Backfill sync paused."; "reason" => "insufficient_synced_peers" );
// Remove any batch that is awaiting download
self.batches
.retain(|_key, batch| !matches!(batch.state(), BatchState::AwaitingDownload));
self.state = BackFillState::Paused;
self.update_global_state();
}
Expand All @@ -372,7 +384,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
if !batch.is_expecting_block(peer_id, &request_id) {
return Ok(());
}
debug!(self.log, "Batch failed. RPC Error"; "batch_epoch" => batch_id);
debug!(self.log, "Batch failed"; "batch_epoch" => batch_id, "error" => "rpc_error");
if let Some(active_requests) = self.active_requests.get_mut(peer_id) {
active_requests.remove(&batch_id);
}
Expand Down Expand Up @@ -487,6 +499,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
self.batches.clear();
self.active_requests.clear();
self.participating_peers.clear();
self.restart_failed_sync = false;

// Reset all downloading and processing targets
self.processing_target = self.current_start;
Expand Down Expand Up @@ -573,6 +586,9 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
batch_id: BatchId,
result: &BatchProcessResult,
) -> Result<ProcessResult, BackFillError> {
// On each batch process, we update the global state.
self.update_global_state();

// The first two cases are possible in regular sync, should not occur in backfill, but we
// keep this logic for handling potential processing race conditions.
// result
Expand Down Expand Up @@ -617,8 +633,10 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
self.advance_chain(network, batch_id);
}

if batch_id == self.processing_target {
self.processing_target -= BACKFILL_EPOCHS_PER_BATCH;
if batch_id == self.processing_target && !self.last_batch_downloaded {
self.processing_target = self
.processing_target
.saturating_sub(BACKFILL_EPOCHS_PER_BATCH);
}

// check if the chain has completed syncing
Expand Down Expand Up @@ -934,6 +952,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
self.send_batch(network, batch_id, peer)
} else {
// If we are here the chain has no more synced peers
info!(self.log, "Backfill sync paused"; "reason" => "insufficient_synced_peers");
self.state = BackFillState::Paused;
Err(BackFillError::Paused)
}
Expand Down Expand Up @@ -1091,7 +1110,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
/// not required.
fn reset_start_epoch(&mut self) -> Result<(), ResetEpochError> {
if let Some(anchor_info) = self.beacon_chain.store.get_anchor_info() {
if anchor_info.oldest_block_parent == Hash256::zero() {
if anchor_info.block_backfill_complete() {
Err(ResetEpochError::SyncCompleted)
} else {
self.current_start = anchor_info
Expand All @@ -1110,7 +1129,8 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
// Check that the beacon chain agrees

if let Some(anchor_info) = self.beacon_chain.store.get_anchor_info() {
if anchor_info.oldest_block_parent == Hash256::zero() {
// Conditions that we have completed a backfill sync
if anchor_info.block_backfill_complete() {
return true;
} else {
error!(self.log, "Backfill out of sync with beacon chain");
Expand Down
19 changes: 11 additions & 8 deletions beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use crate::service::NetworkMessage;
use crate::status::ToStatusMessage;
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError};
use eth2_libp2p::rpc::{methods::MAX_REQUEST_BLOCKS, BlocksByRootRequest, GoodbyeReason};
use eth2_libp2p::types::{BackFillState, NetworkGlobals, SyncState};
use eth2_libp2p::types::{NetworkGlobals, SyncState};
use eth2_libp2p::SyncInfo;
use eth2_libp2p::{PeerAction, PeerId};
use fnv::FnvHashMap;
Expand Down Expand Up @@ -652,14 +652,11 @@ impl<T: BeaconChainTypes> SyncManager<T> {
"their_head_slot" => remote_sync_info.head_slot, "their_finalized_epoch" => remote_sync_info.finalized_epoch,
"is_connected" => peer_info.is_connected());

// A peer has transitioned its sync state. If the new state is "synced" and we have
// a failed backfill sync, we attempt to restart it as we have a new potential peer
// to progress the sync.
// A peer has transitioned its sync state. If the new state is "synced" we
// inform the backfill sync that a new synced peer has joined us.
if new_state.is_synced()
&& matches!(self.backfill_sync.state(), BackFillState::Failed)
{
debug!(self.log, "Attempting to restart a failed backfill sync");
let _ = self.backfill_sync.restart(&mut self.network);
self.backfill_sync.fully_synced_peer_joined();
}
}
peer_info.is_connected()
Expand Down Expand Up @@ -717,6 +714,8 @@ impl<T: BeaconChainTypes> SyncManager<T> {
// If we would otherwise be synced, first check if we need to perform or
// complete a backfill sync.
if matches!(sync_state, SyncState::Synced) {

// Determine if we need to start/resume/restart a backfill sync.
match self.backfill_sync.start(&mut self.network) {
Ok(SyncStart::Syncing {
completed,
Expand All @@ -733,6 +732,8 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
}
}


// Return the sync state if backfilling is not required.
sync_state
}
Expand Down Expand Up @@ -762,7 +763,9 @@ impl<T: BeaconChainTypes> SyncManager<T> {
if !new_state.eq(&old_state) {
info!(self.log, "Sync state updated"; "old_state" => %old_state, "new_state" => %new_state);
// If we have become synced - Subscribe to all the core subnet topics
if new_state.is_synced() {
// We don't need to subscribe if the old state is a state that would have already
// invoked this call.
if new_state.is_synced() && !matches!(old_state, SyncState::Synced { .. } | SyncState::BackFillSyncing { .. }) {
self.network.subscribe_core_topics();
}
}
Expand Down
7 changes: 7 additions & 0 deletions beacon_node/store/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ pub struct AnchorInfo {
pub state_lower_limit: Slot,
}

impl AnchorInfo {
/// Returns true if the block backfill has completed.
pub fn block_backfill_complete(&self) -> bool {
self.oldest_block_slot == 0
}
}

impl StoreItem for AnchorInfo {
fn db_column() -> DBColumn {
DBColumn::BeaconMeta
Expand Down
2 changes: 1 addition & 1 deletion scripts/local_testnet/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Start a local eth1 ganache server
./ganache_test_node.sh
```

Assuming you are happy with the configuration in `var.env`, deploy the deposit contract, make deposits,
Assuming you are happy with the configuration in `vars.env`, deploy the deposit contract, make deposits,
create the testnet directory, genesis state and validator keys with:

```bash
Expand Down

0 comments on commit f5f3e56

Please sign in to comment.