diff --git a/core/node/consensus/src/storage/mod.rs b/core/node/consensus/src/storage/mod.rs index 58238f4b601b..6660f75332bc 100644 --- a/core/node/consensus/src/storage/mod.rs +++ b/core/node/consensus/src/storage/mod.rs @@ -41,7 +41,7 @@ impl PayloadQueue { /// Advances the cursor by converting the block into actions and pushing them /// to the actions queue. - /// Does nothing and returns Ok() if the block has been already processed. + /// Does nothing and returns `Ok(())` if the block has been already processed. /// Returns an error if a block with an earlier block number was expected. pub(crate) async fn send(&mut self, block: FetchedBlock) -> anyhow::Result<()> { let want = self.inner.next_l2_block; @@ -53,7 +53,7 @@ impl PayloadQueue { if block.number < want { return Ok(()); } - self.actions.push_actions(self.inner.advance(block)).await; + self.actions.push_actions(self.inner.advance(block)).await?; Ok(()) } } diff --git a/core/node/consensus/src/testonly.rs b/core/node/consensus/src/testonly.rs index f2c51521b3f4..81084b8f599a 100644 --- a/core/node/consensus/src/testonly.rs +++ b/core/node/consensus/src/testonly.rs @@ -260,7 +260,7 @@ impl StateKeeper { actions.push(FetchedTransaction::new(tx).into()); } actions.push(SyncAction::SealL2Block); - self.actions_sender.push_actions(actions).await; + self.actions_sender.push_actions(actions).await.unwrap(); } /// Pushes `SealBatch` command to the `StateKeeper`. @@ -268,7 +268,7 @@ impl StateKeeper { // Each batch ends with an empty block (aka fictive block). let mut actions = vec![self.open_block()]; actions.push(SyncAction::SealBatch); - self.actions_sender.push_actions(actions).await; + self.actions_sender.push_actions(actions).await.unwrap(); self.batch_sealed = true; } diff --git a/core/node/metadata_calculator/src/updater.rs b/core/node/metadata_calculator/src/updater.rs index 4878ab381a07..b5eb46ac7869 100644 --- a/core/node/metadata_calculator/src/updater.rs +++ b/core/node/metadata_calculator/src/updater.rs @@ -88,26 +88,30 @@ impl TreeUpdater { /// is slow for whatever reason. async fn process_multiple_batches( &mut self, - storage: &mut Connection<'_, Core>, + pool: &ConnectionPool, l1_batch_numbers: ops::RangeInclusive, ) -> anyhow::Result { let tree_mode = self.tree.mode(); let start = Instant::now(); tracing::info!("Processing L1 batches #{l1_batch_numbers:?} in {tree_mode:?} mode"); + let mut storage = pool.connection_tagged("metadata_calculator").await?; let first_l1_batch_number = L1BatchNumber(*l1_batch_numbers.start()); let last_l1_batch_number = L1BatchNumber(*l1_batch_numbers.end()); - let mut l1_batch_data = L1BatchWithLogs::new(storage, first_l1_batch_number, tree_mode) - .await - .with_context(|| { - format!("failed fetching tree input for L1 batch #{first_l1_batch_number}") - })?; + let mut l1_batch_data = + L1BatchWithLogs::new(&mut storage, first_l1_batch_number, tree_mode) + .await + .with_context(|| { + format!("failed fetching tree input for L1 batch #{first_l1_batch_number}") + })?; + drop(storage); let mut total_logs = 0; let mut updated_headers = vec![]; for l1_batch_number in l1_batch_numbers { + let mut storage = pool.connection_tagged("metadata_calculator").await?; let l1_batch_number = L1BatchNumber(l1_batch_number); let Some(current_l1_batch_data) = l1_batch_data else { - Self::ensure_not_pruned(storage, l1_batch_number).await?; + Self::ensure_not_pruned(&mut storage, l1_batch_number).await?; return Ok(l1_batch_number); }; total_logs += current_l1_batch_data.storage_logs.len(); @@ -116,13 +120,14 @@ impl TreeUpdater { let load_next_l1_batch_task = async { if l1_batch_number < last_l1_batch_number { let next_l1_batch_number = l1_batch_number + 1; - L1BatchWithLogs::new(storage, next_l1_batch_number, tree_mode) - .await - .with_context(|| { - format!( - "failed fetching tree input for L1 batch #{next_l1_batch_number}" - ) - }) + let batch_result = + L1BatchWithLogs::new(&mut storage, next_l1_batch_number, tree_mode).await; + // Drop storage at the earliest possible moment so that it doesn't block logic running concurrently, + // such as tree pruning. + drop(storage); + batch_result.with_context(|| { + format!("failed fetching tree input for L1 batch #{next_l1_batch_number}") + }) } else { Ok(None) // Don't need to load the next L1 batch after the last one we're processing. } @@ -135,11 +140,12 @@ impl TreeUpdater { hash: metadata.root_hash, rollup_last_leaf_index: metadata.rollup_last_leaf_index, }; + + let mut storage = pool.connection_tagged("metadata_calculator").await?; storage .blocks_dal() .save_l1_batch_tree_data(l1_batch_number, &tree_data) - .await - .context("failed saving tree data")?; + .await?; // ^ Note that `save_l1_batch_tree_data()` will not blindly overwrite changes if L1 batch // metadata already exists; instead, it'll check that the old and new metadata match. // That is, if we run multiple tree instances, we'll get metadata correspondence @@ -156,6 +162,7 @@ impl TreeUpdater { .insert_proof_generation_details(l1_batch_number, object_key) .await?; } + drop(storage); save_postgres_latency.observe(); tracing::info!("Updated metadata for L1 batch #{l1_batch_number} in Postgres"); @@ -187,9 +194,10 @@ impl TreeUpdater { async fn step( &mut self, - mut storage: Connection<'_, Core>, + pool: &ConnectionPool, next_l1_batch_to_process: &mut L1BatchNumber, ) -> anyhow::Result<()> { + let mut storage = pool.connection_tagged("metadata_calculator").await?; let last_l1_batch_with_protective_reads = if self.tree.mode() == MerkleTreeMode::Lightweight || self.sealed_batches_have_protective_reads { @@ -210,6 +218,8 @@ impl TreeUpdater { .await .context("failed loading latest L1 batch number with protective reads")? }; + drop(storage); + let last_requested_l1_batch = next_l1_batch_to_process.0 + self.max_l1_batches_per_iter as u32 - 1; let last_requested_l1_batch = @@ -222,7 +232,7 @@ impl TreeUpdater { } else { tracing::info!("Updating Merkle tree with L1 batches #{l1_batch_numbers:?}"); *next_l1_batch_to_process = self - .process_multiple_batches(&mut storage, l1_batch_numbers) + .process_multiple_batches(pool, l1_batch_numbers) .await?; } Ok(()) @@ -248,10 +258,9 @@ impl TreeUpdater { tracing::info!("Stop signal received, metadata_calculator is shutting down"); break; } - let storage = pool.connection_tagged("metadata_calculator").await?; let snapshot = *next_l1_batch_to_process; - self.step(storage, &mut next_l1_batch_to_process).await?; + self.step(pool, &mut next_l1_batch_to_process).await?; let delay = if snapshot == *next_l1_batch_to_process { tracing::trace!( "Metadata calculator (next L1 batch: #{next_l1_batch_to_process}) \ diff --git a/core/node/node_sync/src/sync_action.rs b/core/node/node_sync/src/sync_action.rs index 09d49943a454..8cb90d24fe84 100644 --- a/core/node/node_sync/src/sync_action.rs +++ b/core/node/node_sync/src/sync_action.rs @@ -13,20 +13,30 @@ impl ActionQueueSender { /// Requires that the actions are in the correct order: starts with a new open L1 batch / L2 block, /// followed by 0 or more transactions, have mandatory `SealL2Block` and optional `SealBatch` at the end. /// Would panic if the order is incorrect. - pub async fn push_actions(&self, actions: Vec) { - Self::check_action_sequence(&actions).unwrap(); + /// + /// # Errors + /// + /// Errors correspond to incorrect action order, or to `ExternalIO` instance that the queue is connected to shutting down. + /// Hence, returned errors must be treated as unrecoverable by the caller; it is unsound to continue + /// operating a node if some of the `actions` may be lost. + pub async fn push_actions(&self, actions: Vec) -> anyhow::Result<()> { + Self::check_action_sequence(&actions)?; for action in actions { - self.0.send(action).await.expect("EN sync logic panicked"); + self.0 + .send(action) + .await + .map_err(|_| anyhow::anyhow!("node action processor stopped"))?; QUEUE_METRICS .action_queue_size .set(self.0.max_capacity() - self.0.capacity()); } + Ok(()) } /// Checks whether the action sequence is valid. /// Returned error is meant to be used as a panic message, since an invalid sequence represents an unrecoverable /// error. This function itself does not panic for the ease of testing. - fn check_action_sequence(actions: &[SyncAction]) -> Result<(), String> { + fn check_action_sequence(actions: &[SyncAction]) -> anyhow::Result<()> { // Rules for the sequence: // 1. Must start with either `OpenBatch` or `L2Block`, both of which may be met only once. // 2. Followed by a sequence of `Tx` actions which consists of 0 or more elements. @@ -38,27 +48,22 @@ impl ActionQueueSender { for action in actions { match action { SyncAction::OpenBatch { .. } | SyncAction::L2Block { .. } => { - if opened { - return Err(format!("Unexpected OpenBatch / L2Block: {actions:?}")); - } + anyhow::ensure!(!opened, "Unexpected OpenBatch / L2Block: {actions:?}"); opened = true; } SyncAction::Tx(_) => { - if !opened || l2_block_sealed { - return Err(format!("Unexpected Tx: {actions:?}")); - } + anyhow::ensure!(opened && !l2_block_sealed, "Unexpected Tx: {actions:?}"); } SyncAction::SealL2Block | SyncAction::SealBatch => { - if !opened || l2_block_sealed { - return Err(format!("Unexpected SealL2Block / SealBatch: {actions:?}")); - } + anyhow::ensure!( + opened && !l2_block_sealed, + "Unexpected SealL2Block / SealBatch: {actions:?}" + ); l2_block_sealed = true; } } } - if !l2_block_sealed { - return Err(format!("Incomplete sequence: {actions:?}")); - } + anyhow::ensure!(l2_block_sealed, "Incomplete sequence: {actions:?}"); Ok(()) } } @@ -287,7 +292,7 @@ mod tests { panic!("Invalid sequence passed the test. Sequence #{idx}, expected error: {expected_err}"); }; assert!( - err.starts_with(expected_err), + err.to_string().contains(expected_err), "Sequence #{idx} failed. Expected error: {expected_err}, got: {err}" ); } diff --git a/core/node/node_sync/src/tests.rs b/core/node/node_sync/src/tests.rs index 9830641a9fa1..7c57e04a3404 100644 --- a/core/node/node_sync/src/tests.rs +++ b/core/node/node_sync/src/tests.rs @@ -230,7 +230,7 @@ async fn external_io_basics(snapshot_recovery: bool) { &[&extract_tx_hashes(&actions)], ) .await; - actions_sender.push_actions(actions).await; + actions_sender.push_actions(actions).await.unwrap(); // Wait until the L2 block is sealed. state_keeper .wait_for_local_block(snapshot.l2_block_number + 1) @@ -316,7 +316,7 @@ async fn external_io_works_without_local_protocol_version(snapshot_recovery: boo &[&extract_tx_hashes(&actions)], ) .await; - actions_sender.push_actions(actions).await; + actions_sender.push_actions(actions).await.unwrap(); // Wait until the L2 block is sealed. state_keeper .wait_for_local_block(snapshot.l2_block_number + 1) @@ -407,8 +407,14 @@ pub(super) async fn run_state_keeper_with_multiple_l2_blocks( let (actions_sender, action_queue) = ActionQueue::new(); let client = MockMainNodeClient::default(); let state_keeper = StateKeeperHandles::new(pool, client, action_queue, &[&tx_hashes]).await; - actions_sender.push_actions(first_l2_block_actions).await; - actions_sender.push_actions(second_l2_block_actions).await; + actions_sender + .push_actions(first_l2_block_actions) + .await + .unwrap(); + actions_sender + .push_actions(second_l2_block_actions) + .await + .unwrap(); // Wait until both L2 blocks are sealed. state_keeper .wait_for_local_block(snapshot.l2_block_number + 2) @@ -490,7 +496,7 @@ async fn test_external_io_recovery( number: snapshot.l2_block_number + 3, }; let actions = vec![open_l2_block, new_tx.into(), SyncAction::SealL2Block]; - actions_sender.push_actions(actions).await; + actions_sender.push_actions(actions).await.unwrap(); state_keeper .wait_for_local_block(snapshot.l2_block_number + 3) .await; @@ -580,9 +586,18 @@ pub(super) async fn run_state_keeper_with_multiple_l1_batches( &[&[first_tx_hash], &[second_tx_hash]], ) .await; - actions_sender.push_actions(first_l1_batch_actions).await; - actions_sender.push_actions(fictive_l2_block_actions).await; - actions_sender.push_actions(second_l1_batch_actions).await; + actions_sender + .push_actions(first_l1_batch_actions) + .await + .unwrap(); + actions_sender + .push_actions(fictive_l2_block_actions) + .await + .unwrap(); + actions_sender + .push_actions(second_l1_batch_actions) + .await + .unwrap(); let hash_task = tokio::spawn(mock_l1_batch_hash_computation( pool.clone(),