Skip to content

Commit

Permalink
fix(sync): prevent synchronizer loop when very close to tip (#3854)
Browse files Browse the repository at this point in the history
* Refactor to split `ChainSync::sync` method in two

Replace the use of loop labels and `continue` for control flow, and use
early return from a separate method instead. This also allows removing
the `started_once` flag.

* Refactor to create `handle_block_response` helper

Reduce duplicate code and make the main synchronization methods a little
more concise to improve readability.

* Only cancel downloads in case of error

Leave active downloads running if the tips have been exhausted, because
it could have reached the chain tip.
  • Loading branch information
jvff authored Mar 18, 2022
1 parent 39dfca8 commit 78080d8
Showing 1 changed file with 97 additions and 78 deletions.
175 changes: 97 additions & 78 deletions zebrad/src/components/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,110 +346,107 @@ where
(new_syncer, sync_status)
}

/// Runs the syncer to synchronize the chain and keep it synchronized.
#[instrument(skip(self))]
pub async fn sync(mut self) -> Result<(), Report> {
// We can't download the genesis block using our normal algorithm,
// due to protocol limitations
self.request_genesis().await?;

// Distinguishes a restart from a start, so we don't sleep when starting
// the sync process, but we can keep restart logic in one place.
let mut started_once = false;

'sync: loop {
if started_once {
info!(
timeout = ?SYNC_RESTART_DELAY,
state_tip = ?self.latest_chain_tip.best_tip_height(),
"waiting to restart sync"
);
self.prospective_tips = HashSet::new();
loop {
if self.try_to_sync().await.is_err() {
self.downloads.cancel_all();
self.update_metrics();
sleep(SYNC_RESTART_DELAY).await;
} else {
started_once = true;
}

self.update_metrics();

info!(
timeout = ?SYNC_RESTART_DELAY,
state_tip = ?self.latest_chain_tip.best_tip_height(),
"starting sync, obtaining new tips"
"waiting to restart sync"
);
if let Err(e) = self.obtain_tips().await {
warn!(?e, "error obtaining tips");
continue 'sync;
}
self.update_metrics();
sleep(SYNC_RESTART_DELAY).await;
}
}

while !self.prospective_tips.is_empty() {
// Check whether any block tasks are currently ready:
while let Poll::Ready(Some(rsp)) = futures::poll!(self.downloads.next()) {
match rsp {
Ok(hash) => {
trace!(?hash, "verified and committed block to state");
}
Err(e) => {
if Self::should_restart_sync(e) {
continue 'sync;
}
}
}
}
self.update_metrics();
/// Tries to synchronize the chain as far as it can.
///
/// Obtains some prospective tips and iteratively tries to extend them and download the missing
/// blocks.
///
/// Returns `Ok` if it was able to synchronize as much of the chain as it could, and then ran
/// out of prospective tips. This happens when synchronization finishes or if Zebra ended up
/// following a fork. Either way, Zebra should attempt to obtain some more tips.
///
/// Returns `Err` if there was an unrecoverable error and restarting the synchronization is
/// necessary.
#[instrument(skip(self))]
async fn try_to_sync(&mut self) -> Result<(), ()> {
self.prospective_tips = HashSet::new();

// If we have too many pending tasks, wait for some to finish.
//
// Starting to wait is interesting, but logging each wait can be
// very verbose.
if self.downloads.in_flight() > self.lookahead_limit {
tracing::info!(
tips.len = self.prospective_tips.len(),
in_flight = self.downloads.in_flight(),
lookahead_limit = self.lookahead_limit,
"waiting for pending blocks",
);
}
while self.downloads.in_flight() > self.lookahead_limit {
trace!(
tips.len = self.prospective_tips.len(),
in_flight = self.downloads.in_flight(),
lookahead_limit = self.lookahead_limit,
state_tip = ?self.latest_chain_tip.best_tip_height(),
"waiting for pending blocks",
);

match self.downloads.next().await.expect("downloads is nonempty") {
Ok(hash) => {
trace!(?hash, "verified and committed block to state");
}
info!(
state_tip = ?self.latest_chain_tip.best_tip_height(),
"starting sync, obtaining new tips"
);
if let Err(e) = self.obtain_tips().await {
warn!(?e, "error obtaining tips");
return Err(());
}
self.update_metrics();

Err(e) => {
if Self::should_restart_sync(e) {
continue 'sync;
}
}
}
self.update_metrics();
}
while !self.prospective_tips.is_empty() {
// Check whether any block tasks are currently ready:
while let Poll::Ready(Some(rsp)) = futures::poll!(self.downloads.next()) {
self.handle_block_response(rsp).await?;
}
self.update_metrics();

// Once we're below the lookahead limit, we can keep extending the tips.
info!(
// If we have too many pending tasks, wait for some to finish.
//
// Starting to wait is interesting, but logging each wait can be
// very verbose.
if self.downloads.in_flight() > self.lookahead_limit {
tracing::info!(
tips.len = self.prospective_tips.len(),
in_flight = self.downloads.in_flight(),
lookahead_limit = self.lookahead_limit,
"waiting for pending blocks",
);
}
while self.downloads.in_flight() > self.lookahead_limit {
trace!(
tips.len = self.prospective_tips.len(),
in_flight = self.downloads.in_flight(),
lookahead_limit = self.lookahead_limit,
state_tip = ?self.latest_chain_tip.best_tip_height(),
"extending tips",
"waiting for pending blocks",
);

if let Err(e) = self.extend_tips().await {
warn!(?e, "error extending tips");
continue 'sync;
}
let response = self.downloads.next().await.expect("downloads is nonempty");

self.handle_block_response(response).await?;
self.update_metrics();
}

info!("exhausted prospective tip set");
// Once we're below the lookahead limit, we can keep extending the tips.
info!(
tips.len = self.prospective_tips.len(),
in_flight = self.downloads.in_flight(),
lookahead_limit = self.lookahead_limit,
state_tip = ?self.latest_chain_tip.best_tip_height(),
"extending tips",
);

if let Err(e) = self.extend_tips().await {
warn!(?e, "error extending tips");
return Err(());
}
self.update_metrics();
}

info!("exhausted prospective tip set");

Ok(())
}

/// Given a block_locator list fan out request for subsequent hashes to
Expand Down Expand Up @@ -777,6 +774,28 @@ where
Ok(())
}

/// Handles a response for a requested block.
///
/// Returns `Ok` if the block was successfully verified and commited to the state, or if an
/// expected error occurred, so that the synchronization can continue normally.
///
/// Returns `Err` if an unexpected error occurred, to force the synchronizer to restart.
async fn handle_block_response(
&mut self,
response: Result<block::Hash, BlockDownloadVerifyError>,
) -> Result<(), ()> {
match response {
Ok(hash) => trace!(?hash, "verified and committed block to state"),
Err(error) => {
if Self::should_restart_sync(error) {
return Err(());
}
}
}

Ok(())
}

/// Returns `true` if the hash is present in the state, and `false`
/// if the hash is not present in the state.
///
Expand Down

0 comments on commit 78080d8

Please sign in to comment.