Skip to content

Commit

Permalink
Timeout L1 provider (#2340)
Browse files Browse the repository at this point in the history
* print derived PID

* [Backport release-shibainu] Bump query service (#2303)

Bump query service (#2301)

(cherry picked from commit 239e3e2)

Co-authored-by: Jeb Bearer <[email protected]>

* L1 timeout

* capitalization 🙈

* fmt

---------

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: Jeb Bearer <[email protected]>
  • Loading branch information
3 people authored Nov 29, 2024
1 parent e03304d commit f999be4
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 50 deletions.
3 changes: 3 additions & 0 deletions sequencer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,9 @@ pub async fn init_node<P: PersistenceOptions, V: Versions>(
)
.with_context(|| "Failed to derive Libp2p peer ID")?;

// Print the libp2p public key
info!("Starting Libp2p with PeerID: {}", libp2p_public_key);

let persistence = persistence_opt.clone().create().await?;
let (mut network_config, wait_for_orchestrator) = match (
persistence.load_config().await?,
Expand Down
116 changes: 66 additions & 50 deletions types/src/v0/impls/l1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,59 +394,75 @@ impl L1Client {
};

tracing::info!("established L1 block stream");
while let Some(head) = block_stream.next().await {
let Some(head) = head.number else {
// This shouldn't happen, but if it does, it means the block stream has
// erroneously given us a pending block. We are only interested in committed
// blocks, so just skip this one.
tracing::warn!("got block from L1 block stream with no number");
continue;
};
let head = head.as_u64();
tracing::debug!(head, "received L1 block");

// A new block has been produced. This happens fairly rarely, so it is now ok to
// poll to see if a new block has been finalized.
let finalized = loop {
match get_finalized_block(&rpc).await {
Ok(finalized) => break finalized,
Err(err) => {
tracing::warn!("error getting finalized block: {err:#}");
sleep(retry_delay).await;
loop {
// Wait for a block, timing out if we don't get one within 60 seconds
let block_timeout = tokio::time::timeout(Duration::from_secs(60), block_stream.next()).await;

match block_timeout {
// We got a block
Ok(Some(head)) => {
let Some(head_number) = head.number else {
// This shouldn't happen, but if it does, it means the block stream has
// erroneously given us a pending block. We are only interested in committed
// blocks, so just skip this one.
tracing::warn!("got block from L1 block stream with no number");
continue;
};
let head = head_number.as_u64();
tracing::debug!(head, "received L1 block");

// A new block has been produced. This happens fairly rarely, so it is now ok to
// poll to see if a new block has been finalized.
let finalized = loop {
match get_finalized_block(&rpc).await {
Ok(finalized) => break finalized,
Err(err) => {
tracing::warn!("error getting finalized block: {err:#}");
sleep(retry_delay).await;
}
}
};

// Update the state snapshot;
let mut state = state.lock().await;
if head > state.snapshot.head {
tracing::debug!(head, old_head = state.snapshot.head, "L1 head updated");
state.snapshot.head = head;
// Emit an event about the new L1 head. Ignore send errors; it just means no
// one is listening to events right now.
sender
.broadcast_direct(L1Event::NewHead { head })
.await
.ok();
}
if finalized > state.snapshot.finalized {
tracing::info!(
?finalized,
old_finalized = ?state.snapshot.finalized,
"L1 finalized updated",
);
state.snapshot.finalized = finalized;
if let Some(finalized) = finalized {
sender
.broadcast_direct(L1Event::NewFinalized { finalized })
.await
.ok();
}
}
tracing::debug!("updated L1 snapshot to {:?}", state.snapshot);
}
};

// Update the state snapshot;
let mut state = state.lock().await;
if head > state.snapshot.head {
tracing::debug!(head, old_head = state.snapshot.head, "L1 head updated");
state.snapshot.head = head;
// Emit an event about the new L1 head. Ignore send errors; it just means no
// one is listening to events right now.
sender
.broadcast_direct(L1Event::NewHead { head })
.await
.ok();
}
if finalized > state.snapshot.finalized {
tracing::info!(
?finalized,
old_finalized = ?state.snapshot.finalized,
"L1 finalized updated",
);
state.snapshot.finalized = finalized;
if let Some(finalized) = finalized {
sender
.broadcast_direct(L1Event::NewFinalized { finalized })
.await
.ok();
// The stream ended
Ok(None) => {
tracing::error!("L1 block stream ended unexpectedly, trying to re-establish block stream");
break;
}
// We timed out waiting for a block
Err(_) => {
tracing::error!("No block received for 60 seconds, trying to re-establish block stream");
break;
}
}
tracing::debug!("updated L1 snapshot to {:?}", state.snapshot);
}

tracing::error!("L1 block stream ended unexpectedly, trying to re-establish");
}
}.instrument(span)
}
Expand Down Expand Up @@ -525,13 +541,13 @@ impl L1Client {
continue;
};
if finalized.number >= number {
tracing::info!(number, ?finalized, "got finalized block");
tracing::info!(number, ?finalized, "got finalized L1 block");
return self
.get_finalized_block(self.state.lock().await, number)
.await
.1;
}
tracing::debug!(number, ?finalized, "waiting for L1 finalized block");
tracing::debug!(number, ?finalized, "waiting for finalized L1 block");
}

// This should not happen: the event stream ended. All we can do is try again.
Expand Down

0 comments on commit f999be4

Please sign in to comment.