Skip to content

Commit

Permalink
Notifier updates and bug fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
AgeManning committed Sep 6, 2021
1 parent f5f3e56 commit f89b69b
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 12 deletions.
96 changes: 87 additions & 9 deletions beacon_node/client/src/notifier.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::metrics;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::NetworkGlobals;
use eth2_libp2p::{NetworkGlobals, types::SyncState};
use parking_lot::Mutex;
use slog::{debug, error, info, warn, Logger};
use slot_clock::SlotClock;
Expand Down Expand Up @@ -42,6 +42,13 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
let log = executor.log().clone();
let mut interval = tokio::time::interval_at(start_instant, interval_duration);

// Keep track of sync state and reset the speedo on specific sync state changes.
// Specifically, if we switch between a sync and a backfill sync, reset the speedo.
let mut current_sync_state = network.sync_state();

// Store info if we are required to do a backfill sync.
let original_anchor_slot = beacon_chain.store.get_anchor_info().map(|ai| ai.oldest_block_slot);

let interval_future = async move {
// Perform pre-genesis logging.
loop {
Expand All @@ -68,6 +75,25 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
let connected_peer_count = network.connected_peers();
let sync_state = network.sync_state();

// Determine if we have switched syncing chains
if sync_state != current_sync_state {
match (current_sync_state, &sync_state) {
(_, SyncState::BackFillSyncing {..} ) => {
// We have transitioned to a backfill sync. Reset the speedo.
let mut speedo = speedo.lock();
speedo.clear();
},
(SyncState::BackFillSyncing { .. }, _) => {
// We have transitioned from a backfill sync, reset the speedo
let mut speedo = speedo.lock();
speedo.clear();
}
(_, _) => {}
}
current_sync_state = sync_state;
}


let head_info = match beacon_chain.head_info() {
Ok(head_info) => head_info,
Err(e) => {
Expand Down Expand Up @@ -97,16 +123,35 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
let finalized_root = head_info.finalized_checkpoint.root;
let head_root = head_info.block_root;

// The default is for regular sync but this gets modified if backfill sync is in
// progress.
let mut sync_distance = current_slot - head_slot;

let mut speedo = speedo.lock();
speedo.observe(head_slot, Instant::now());
match current_sync_state {
SyncState::BackFillSyncing { .. } => {
// Observe backfilling sync info.
if let Some(oldest_slot) = original_anchor_slot {
if let Some(current_slot) = beacon_chain.store.get_anchor_info().map(|ai| ai.oldest_block_slot) {
sync_distance = current_slot;
speedo.observe(oldest_slot.saturating_sub(current_slot), Instant::now());
}
}
},
SyncState::SyncingFinalized { .. } | SyncState::SyncingHead { .. } | SyncState:: SyncTransition => {
speedo.observe(head_slot, Instant::now());
}
SyncState::Stalled | SyncState::Synced => {}
}

// NOTE: This is going to change based on which sync we are currently performing. A
// backfill sync should process slots significantly faster than the other sync
// processes.
metrics::set_gauge(
&metrics::SYNC_SLOTS_PER_SECOND,
speedo.slots_per_second().unwrap_or(0_f64) as i64,
);

// The next two lines take advantage of saturating subtraction on `Slot`.
let head_distance = current_slot - head_slot;

if connected_peer_count <= WARN_PEER_COUNT {
warn!(log, "Low peer count"; "peer_count" => peer_count_pretty(connected_peer_count));
Expand All @@ -121,16 +166,16 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
"head_block" => format!("{}", head_root),
"head_slot" => head_slot,
"current_slot" => current_slot,
"sync_state" =>format!("{}", sync_state)
"sync_state" =>format!("{}", current_sync_state)
);

// Log if we are syncing
if sync_state.is_syncing() {
if current_sync_state.is_syncing() {
metrics::set_gauge(&metrics::IS_SYNCED, 0);
let distance = format!(
"{} slots ({})",
head_distance.as_u64(),
slot_distance_pretty(head_distance, slot_duration)
sync_distance.as_u64(),
slot_distance_pretty(sync_distance, slot_duration)
);

let speed = speedo.slots_per_second();
Expand All @@ -154,7 +199,35 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
"est_time" => estimated_time_pretty(speedo.estimated_time_till_slot(current_slot)),
);
}
} else if sync_state.is_synced() {
} else if matches!(current_sync_state, SyncState::BackFillSyncing { .. } ) {
let distance = format!(
"{} slots ({})",
sync_distance.as_u64(),
slot_distance_pretty(sync_distance, slot_duration)
);

let speed = speedo.slots_per_second();
let display_speed = speed.map_or(false, |speed| speed != 0.0);

if display_speed {
info!(
log,
"Synced - Downloading historical blocks";
"peers" => peer_count_pretty(connected_peer_count),
"distance" => distance,
"speed" => sync_speed_pretty(speed),
"est_time" => estimated_time_pretty(speedo.estimated_time_till_slot(current_slot)),
);
} else {
info!(
log,
"Synced - Downloading historical blocks";
"peers" => peer_count_pretty(connected_peer_count),
"distance" => distance,
"est_time" => estimated_time_pretty(speedo.estimated_time_till_slot(current_slot)),
);
}
} else if current_sync_state.is_synced() {
metrics::set_gauge(&metrics::IS_SYNCED, 1);
let block_info = if current_slot > head_slot {
" … empty".to_string()
Expand Down Expand Up @@ -397,4 +470,9 @@ impl Speedo {
None
}
}

/// Clears all past observations to be used for an alternative sync (i.e backfill sync).
pub fn clear(&mut self) {
self.0.clear()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl<T: BeaconChainTypes> Worker<T> {
BatchProcessResult::Success(sent_blocks > 0)
}
(_, Err(e)) => {
debug!(self.log, "Backfill Batch processing failed";
debug!(self.log, "Backfill batch processing failed";
"batch_epoch" => epoch,
"first_block_slot" => start_slot,
"last_block_slot" => end_slot,
Expand Down
8 changes: 6 additions & 2 deletions beacon_node/network/src/sync/backfill_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
self.advance_chain(network, batch_id);
}

if batch_id == self.processing_target && !self.last_batch_downloaded {
if batch_id == self.processing_target {
self.processing_target = self
.processing_target
.saturating_sub(BACKFILL_EPOCHS_PER_BATCH);
Expand Down Expand Up @@ -888,7 +888,11 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
// validation
let mut redownload_queue = Vec::new();

for (id, batch) in self.batches.range_mut(..batch_id) {
for (id, batch) in self
.batches
.iter_mut()
.filter(|(&id, _batch)| id >= batch_id)
{
match batch
.validation_failed()
.map_err(|e| BackFillError::BatchInvalidState(batch_id, e.0))?
Expand Down

0 comments on commit f89b69b

Please sign in to comment.