From f999be4264920823994ccce13f1667ff1c800ef0 Mon Sep 17 00:00:00 2001 From: rob-maron <132852777+rob-maron@users.noreply.github.com> Date: Fri, 29 Nov 2024 14:01:01 -0500 Subject: [PATCH] Timeout L1 provider (#2340) * print derived PID * [Backport release-shibainu] Bump query service (#2303) Bump query service (#2301) (cherry picked from commit 239e3e27a91fe961f94eea0300f30291db7019ad) Co-authored-by: Jeb Bearer * L1 timeout * capitalization :see_no_evil: * fmt --------- Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: Jeb Bearer --- sequencer/src/lib.rs | 3 + types/src/v0/impls/l1.rs | 116 ++++++++++++++++++++++----------------- 2 files changed, 69 insertions(+), 50 deletions(-) diff --git a/sequencer/src/lib.rs b/sequencer/src/lib.rs index fe07de294..143e229b0 100644 --- a/sequencer/src/lib.rs +++ b/sequencer/src/lib.rs @@ -297,6 +297,9 @@ pub async fn init_node( ) .with_context(|| "Failed to derive Libp2p peer ID")?; + // Print the libp2p public key + info!("Starting Libp2p with PeerID: {}", libp2p_public_key); + let persistence = persistence_opt.clone().create().await?; let (mut network_config, wait_for_orchestrator) = match ( persistence.load_config().await?, diff --git a/types/src/v0/impls/l1.rs b/types/src/v0/impls/l1.rs index 51137ae05..958d4b3ad 100644 --- a/types/src/v0/impls/l1.rs +++ b/types/src/v0/impls/l1.rs @@ -394,59 +394,75 @@ impl L1Client { }; tracing::info!("established L1 block stream"); - while let Some(head) = block_stream.next().await { - let Some(head) = head.number else { - // This shouldn't happen, but if it does, it means the block stream has - // erroneously given us a pending block. We are only interested in committed - // blocks, so just skip this one. - tracing::warn!("got block from L1 block stream with no number"); - continue; - }; - let head = head.as_u64(); - tracing::debug!(head, "received L1 block"); - - // A new block has been produced. This happens fairly rarely, so it is now ok to - // poll to see if a new block has been finalized. - let finalized = loop { - match get_finalized_block(&rpc).await { - Ok(finalized) => break finalized, - Err(err) => { - tracing::warn!("error getting finalized block: {err:#}"); - sleep(retry_delay).await; + loop { + // Wait for a block, timing out if we don't get one within 60 seconds + let block_timeout = tokio::time::timeout(Duration::from_secs(60), block_stream.next()).await; + + match block_timeout { + // We got a block + Ok(Some(head)) => { + let Some(head_number) = head.number else { + // This shouldn't happen, but if it does, it means the block stream has + // erroneously given us a pending block. We are only interested in committed + // blocks, so just skip this one. + tracing::warn!("got block from L1 block stream with no number"); + continue; + }; + let head = head_number.as_u64(); + tracing::debug!(head, "received L1 block"); + + // A new block has been produced. This happens fairly rarely, so it is now ok to + // poll to see if a new block has been finalized. + let finalized = loop { + match get_finalized_block(&rpc).await { + Ok(finalized) => break finalized, + Err(err) => { + tracing::warn!("error getting finalized block: {err:#}"); + sleep(retry_delay).await; + } + } + }; + + // Update the state snapshot; + let mut state = state.lock().await; + if head > state.snapshot.head { + tracing::debug!(head, old_head = state.snapshot.head, "L1 head updated"); + state.snapshot.head = head; + // Emit an event about the new L1 head. Ignore send errors; it just means no + // one is listening to events right now. + sender + .broadcast_direct(L1Event::NewHead { head }) + .await + .ok(); } + if finalized > state.snapshot.finalized { + tracing::info!( + ?finalized, + old_finalized = ?state.snapshot.finalized, + "L1 finalized updated", + ); + state.snapshot.finalized = finalized; + if let Some(finalized) = finalized { + sender + .broadcast_direct(L1Event::NewFinalized { finalized }) + .await + .ok(); + } + } + tracing::debug!("updated L1 snapshot to {:?}", state.snapshot); } - }; - - // Update the state snapshot; - let mut state = state.lock().await; - if head > state.snapshot.head { - tracing::debug!(head, old_head = state.snapshot.head, "L1 head updated"); - state.snapshot.head = head; - // Emit an event about the new L1 head. Ignore send errors; it just means no - // one is listening to events right now. - sender - .broadcast_direct(L1Event::NewHead { head }) - .await - .ok(); - } - if finalized > state.snapshot.finalized { - tracing::info!( - ?finalized, - old_finalized = ?state.snapshot.finalized, - "L1 finalized updated", - ); - state.snapshot.finalized = finalized; - if let Some(finalized) = finalized { - sender - .broadcast_direct(L1Event::NewFinalized { finalized }) - .await - .ok(); + // The stream ended + Ok(None) => { + tracing::error!("L1 block stream ended unexpectedly, trying to re-establish block stream"); + break; + } + // We timed out waiting for a block + Err(_) => { + tracing::error!("No block received for 60 seconds, trying to re-establish block stream"); + break; } } - tracing::debug!("updated L1 snapshot to {:?}", state.snapshot); } - - tracing::error!("L1 block stream ended unexpectedly, trying to re-establish"); } }.instrument(span) } @@ -525,13 +541,13 @@ impl L1Client { continue; }; if finalized.number >= number { - tracing::info!(number, ?finalized, "got finalized block"); + tracing::info!(number, ?finalized, "got finalized L1 block"); return self .get_finalized_block(self.state.lock().await, number) .await .1; } - tracing::debug!(number, ?finalized, "waiting for L1 finalized block"); + tracing::debug!(number, ?finalized, "waiting for finalized L1 block"); } // This should not happen: the event stream ended. All we can do is try again.