Skip to content

Commit

Permalink
Network Update for Weak Subjectivity Sync (#2561)
Browse files Browse the repository at this point in the history
* Close to first draft

* Further progress to first draft

* Further progress, before rebase

* First draft

* Cleanup and fixes

* Notifier updates and bug fixes

* Fix off-by-one errors

* Remove todo

* Increase backfill buffer

* Gracefully handle requests during backfill

* Improve comments

* fmt

* Update error handling

* Reviewers suggestions

* Take historic blocks by ref, avoid clone

* Clear batch on pause

* Further reviewers suggestions

Co-authored-by: Michael Sproul <[email protected]>
  • Loading branch information
AgeManning and michaelsproul authored Sep 14, 2021
1 parent 8f4bf6e commit f06bf2a
Show file tree
Hide file tree
Showing 25 changed files with 1,818 additions and 164 deletions.
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/historical_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// Return the number of blocks successfully imported.
pub fn import_historical_block_batch(
&self,
blocks: Vec<SignedBeaconBlock<T::EthSpec>>,
blocks: &[SignedBeaconBlock<T::EthSpec>],
) -> Result<usize, Error> {
let anchor_info = self
.store
Expand Down
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub use self::beacon_chain::{
pub use self::beacon_snapshot::BeaconSnapshot;
pub use self::chain_config::ChainConfig;
pub use self::errors::{BeaconChainError, BlockProductionError};
pub use self::historical_blocks::HistoricalBlockError;
pub use attestation_verification::Error as AttestationError;
pub use beacon_fork_choice_store::{BeaconForkChoiceStore, Error as ForkChoiceStoreError};
pub use block_verification::{BlockError, GossipVerifiedBlock};
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/beacon_chain/tests/store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1860,13 +1860,13 @@ fn weak_subjectivity_sync() {
.map(|s| s.beacon_block.clone())
.collect::<Vec<_>>();
beacon_chain
.import_historical_block_batch(historical_blocks.clone())
.import_historical_block_batch(&historical_blocks)
.unwrap();
assert_eq!(beacon_chain.store.get_oldest_block_slot(), 0);

// Resupplying the blocks should not fail, they can be safely ignored.
beacon_chain
.import_historical_block_batch(historical_blocks)
.import_historical_block_batch(&historical_blocks)
.unwrap();

// The forwards iterator should now match the original chain
Expand Down
110 changes: 100 additions & 10 deletions beacon_node/client/src/notifier.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::metrics;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::NetworkGlobals;
use eth2_libp2p::{types::SyncState, NetworkGlobals};
use parking_lot::Mutex;
use slog::{debug, error, info, warn, Logger};
use slot_clock::SlotClock;
Expand Down Expand Up @@ -42,6 +42,16 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
let log = executor.log().clone();
let mut interval = tokio::time::interval_at(start_instant, interval_duration);

// Keep track of sync state and reset the speedo on specific sync state changes.
// Specifically, if we switch between a sync and a backfill sync, reset the speedo.
let mut current_sync_state = network.sync_state();

// Store info if we are required to do a backfill sync.
let original_anchor_slot = beacon_chain
.store
.get_anchor_info()
.map(|ai| ai.oldest_block_slot);

let interval_future = async move {
// Perform pre-genesis logging.
loop {
Expand All @@ -68,6 +78,24 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
let connected_peer_count = network.connected_peers();
let sync_state = network.sync_state();

// Determine if we have switched syncing chains
if sync_state != current_sync_state {
match (current_sync_state, &sync_state) {
(_, SyncState::BackFillSyncing { .. }) => {
// We have transitioned to a backfill sync. Reset the speedo.
let mut speedo = speedo.lock();
speedo.clear();
}
(SyncState::BackFillSyncing { .. }, _) => {
// We have transitioned from a backfill sync, reset the speedo
let mut speedo = speedo.lock();
speedo.clear();
}
(_, _) => {}
}
current_sync_state = sync_state;
}

let head_info = match beacon_chain.head_info() {
Ok(head_info) => head_info,
Err(e) => {
Expand Down Expand Up @@ -97,17 +125,46 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
let finalized_root = head_info.finalized_checkpoint.root;
let head_root = head_info.block_root;

// The default is for regular sync but this gets modified if backfill sync is in
// progress.
let mut sync_distance = current_slot - head_slot;

let mut speedo = speedo.lock();
speedo.observe(head_slot, Instant::now());
match current_sync_state {
SyncState::BackFillSyncing { .. } => {
// Observe backfilling sync info.
if let Some(oldest_slot) = original_anchor_slot {
if let Some(current_anchor_slot) = beacon_chain
.store
.get_anchor_info()
.map(|ai| ai.oldest_block_slot)
{
sync_distance = current_anchor_slot;
speedo
// For backfill sync use a fake slot which is the distance we've progressed from the starting `oldest_block_slot`.
.observe(
oldest_slot.saturating_sub(current_anchor_slot),
Instant::now(),
);
}
}
}
SyncState::SyncingFinalized { .. }
| SyncState::SyncingHead { .. }
| SyncState::SyncTransition => {
speedo.observe(head_slot, Instant::now());
}
SyncState::Stalled | SyncState::Synced => {}
}

// NOTE: This is going to change based on which sync we are currently performing. A
// backfill sync should process slots significantly faster than the other sync
// processes.
metrics::set_gauge(
&metrics::SYNC_SLOTS_PER_SECOND,
speedo.slots_per_second().unwrap_or(0_f64) as i64,
);

// The next two lines take advantage of saturating subtraction on `Slot`.
let head_distance = current_slot - head_slot;

if connected_peer_count <= WARN_PEER_COUNT {
warn!(log, "Low peer count"; "peer_count" => peer_count_pretty(connected_peer_count));
}
Expand All @@ -121,16 +178,16 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
"head_block" => format!("{}", head_root),
"head_slot" => head_slot,
"current_slot" => current_slot,
"sync_state" =>format!("{}", sync_state)
"sync_state" =>format!("{}", current_sync_state)
);

// Log if we are syncing
if sync_state.is_syncing() {
if current_sync_state.is_syncing() {
metrics::set_gauge(&metrics::IS_SYNCED, 0);
let distance = format!(
"{} slots ({})",
head_distance.as_u64(),
slot_distance_pretty(head_distance, slot_duration)
sync_distance.as_u64(),
slot_distance_pretty(sync_distance, slot_duration)
);

let speed = speedo.slots_per_second();
Expand All @@ -154,7 +211,35 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
"est_time" => estimated_time_pretty(speedo.estimated_time_till_slot(current_slot)),
);
}
} else if sync_state.is_synced() {
} else if matches!(current_sync_state, SyncState::BackFillSyncing { .. }) {
let distance = format!(
"{} slots ({})",
sync_distance.as_u64(),
slot_distance_pretty(sync_distance, slot_duration)
);

let speed = speedo.slots_per_second();
let display_speed = speed.map_or(false, |speed| speed != 0.0);

if display_speed {
info!(
log,
"Synced - Downloading historical blocks";
"peers" => peer_count_pretty(connected_peer_count),
"distance" => distance,
"speed" => sync_speed_pretty(speed),
"est_time" => estimated_time_pretty(speedo.estimated_time_till_slot(original_anchor_slot.unwrap_or(current_slot))),
);
} else {
info!(
log,
"Synced - Downloading historical blocks";
"peers" => peer_count_pretty(connected_peer_count),
"distance" => distance,
"est_time" => estimated_time_pretty(speedo.estimated_time_till_slot(original_anchor_slot.unwrap_or(current_slot))),
);
}
} else if current_sync_state.is_synced() {
metrics::set_gauge(&metrics::IS_SYNCED, 1);
let block_info = if current_slot > head_slot {
" … empty".to_string()
Expand Down Expand Up @@ -397,4 +482,9 @@ impl Speedo {
None
}
}

/// Clears all past observations to be used for an alternative sync (i.e backfill sync).
pub fn clear(&mut self) {
self.0.clear()
}
}
2 changes: 1 addition & 1 deletion beacon_node/eth2_libp2p/src/behaviour/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
}

/// Inform the peer that their request produced an error.
pub fn _send_error_reponse(
pub fn send_error_reponse(
&mut self,
peer_id: PeerId,
id: PeerRequestId,
Expand Down
13 changes: 11 additions & 2 deletions beacon_node/eth2_libp2p/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,8 +551,17 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
RPCResponseErrorCode::Unknown => PeerAction::HighToleranceError,
RPCResponseErrorCode::ResourceUnavailable => {
// NOTE: This error only makes sense for the `BlocksByRange` and `BlocksByRoot`
// protocols. For the time being, there is no reason why a peer should send
// this error.
// protocols.
//
// If we are syncing, there is no point keeping these peers around and
// continually failing to request blocks. We instantly ban them and hope that
// by the time the ban lifts, the peers will have completed their backfill
// sync.
//
// TODO: Potentially a more graceful way of handling such peers, would be to
// implement a new sync type which tracks these peers and prevents the sync
// algorithms from requesting blocks from them (at least for a set period of
// time, multiple failures would then lead to a ban).
PeerAction::Fatal
}
RPCResponseErrorCode::ServerError => PeerAction::MidToleranceError,
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/eth2_libp2p/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ impl<TSpec: EthSpec> Service<TSpec> {
) {
self.swarm
.behaviour_mut()
._send_error_reponse(peer_id, id, error, reason);
.send_error_reponse(peer_id, id, error, reason);
}

/// Report a peer's action.
Expand Down
10 changes: 9 additions & 1 deletion beacon_node/eth2_libp2p/src/types/globals.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! A collection of variables that are accessible outside of the network thread itself.
use crate::peer_manager::PeerDB;
use crate::rpc::MetaData;
use crate::types::SyncState;
use crate::types::{BackFillState, SyncState};
use crate::Client;
use crate::EnrExt;
use crate::{Enr, GossipTopic, Multiaddr, PeerId};
Expand Down Expand Up @@ -29,6 +29,8 @@ pub struct NetworkGlobals<TSpec: EthSpec> {
pub gossipsub_subscriptions: RwLock<HashSet<GossipTopic>>,
/// The current sync status of the node.
pub sync_state: RwLock<SyncState>,
/// The current state of the backfill sync.
pub backfill_state: RwLock<BackFillState>,
}

impl<TSpec: EthSpec> NetworkGlobals<TSpec> {
Expand All @@ -50,6 +52,7 @@ impl<TSpec: EthSpec> NetworkGlobals<TSpec> {
peers: RwLock::new(PeerDB::new(trusted_peers, log)),
gossipsub_subscriptions: RwLock::new(HashSet::new()),
sync_state: RwLock::new(SyncState::Stalled),
backfill_state: RwLock::new(BackFillState::NotRequired),
}
}

Expand Down Expand Up @@ -104,6 +107,11 @@ impl<TSpec: EthSpec> NetworkGlobals<TSpec> {
self.sync_state.read().clone()
}

/// Returns the current backfill state.
pub fn backfill_state(&self) -> BackFillState {
self.backfill_state.read().clone()
}

/// Returns a `Client` type if one is known for the `PeerId`.
pub fn client(&self, peer_id: &PeerId) -> Client {
self.peers
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/eth2_libp2p/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ pub type Enr = discv5::enr::Enr<discv5::enr::CombinedKey>;
pub use globals::NetworkGlobals;
pub use pubsub::{PubsubMessage, SnappyTransform};
pub use subnet::{Subnet, SubnetDiscovery};
pub use sync_state::SyncState;
pub use sync_state::{BackFillState, SyncState};
pub use topics::{subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, CORE_TOPICS};
37 changes: 33 additions & 4 deletions beacon_node/eth2_libp2p/src/types/sync_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@ pub enum SyncState {
/// The node is performing a long-range (batch) sync over one or many head chains.
/// In this state parent lookups are disabled.
SyncingHead { start_slot: Slot, target_slot: Slot },
/// The node has identified the need for is sync operations and is transitioning to a syncing
/// state.
/// The node is undertaking a backfill sync. This occurs when a user has specified a trusted
/// state. The node first syncs "forward" by downloading blocks up to the current head as
/// specified by its peers. Once completed, the node enters this sync state and attempts to
/// download all required historical blocks to complete its chain.
BackFillSyncing { completed: usize, remaining: usize },
/// The node has completed syncing a finalized chain and is in the process of re-evaluating
/// which sync state to progress to.
SyncTransition,
/// The node is up to date with all known peers and is connected to at least one
/// fully synced peer. In this state, parent lookups are enabled.
Expand All @@ -21,6 +26,21 @@ pub enum SyncState {
Stalled,
}

#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)]
/// The state of the backfill sync.
pub enum BackFillState {
/// The sync is partially completed and currently paused.
Paused,
/// We are currently backfilling.
Syncing,
/// A backfill sync has completed.
Completed,
/// A backfill sync is not required.
NotRequired,
/// Too many failed attempts at backfilling. Consider it failed.
Failed,
}

impl PartialEq for SyncState {
fn eq(&self, other: &Self) -> bool {
matches!(
Expand All @@ -32,6 +52,10 @@ impl PartialEq for SyncState {
| (SyncState::Synced, SyncState::Synced)
| (SyncState::Stalled, SyncState::Stalled)
| (SyncState::SyncTransition, SyncState::SyncTransition)
| (
SyncState::BackFillSyncing { .. },
SyncState::BackFillSyncing { .. }
)
)
}
}
Expand All @@ -43,14 +67,18 @@ impl SyncState {
SyncState::SyncingFinalized { .. } => true,
SyncState::SyncingHead { .. } => true,
SyncState::SyncTransition => true,
// Backfill doesn't effect any logic, we consider this state, not syncing.
SyncState::BackFillSyncing { .. } => false,
SyncState::Synced => false,
SyncState::Stalled => false,
}
}

/// Returns true if the node is synced.
///
/// NOTE: We consider the node synced if it is fetching old historical blocks.
pub fn is_synced(&self) -> bool {
matches!(self, SyncState::Synced)
matches!(self, SyncState::Synced | SyncState::BackFillSyncing { .. })
}
}

Expand All @@ -61,7 +89,8 @@ impl std::fmt::Display for SyncState {
SyncState::SyncingHead { .. } => write!(f, "Syncing Head Chain"),
SyncState::Synced { .. } => write!(f, "Synced"),
SyncState::Stalled { .. } => write!(f, "Stalled"),
SyncState::SyncTransition => write!(f, "Searching syncing peers"),
SyncState::SyncTransition => write!(f, "Evaluating known peers"),
SyncState::BackFillSyncing { .. } => write!(f, "Syncing Historical Blocks"),
}
}
}
2 changes: 1 addition & 1 deletion beacon_node/http_api/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub fn historical_blocks<T: BeaconChainTypes>(
blocks: Vec<SignedBeaconBlock<T::EthSpec>>,
) -> Result<AnchorInfo, warp::Rejection> {
chain
.import_historical_block_batch(blocks)
.import_historical_block_batch(&blocks)
.map_err(warp_utils::reject::beacon_chain_error)?;

let anchor = chain.store.get_anchor_info().ok_or_else(|| {
Expand Down
Loading

0 comments on commit f06bf2a

Please sign in to comment.