diff --git a/core/node/metadata_calculator/src/updater.rs b/core/node/metadata_calculator/src/updater.rs index 4878ab381a07..b5eb46ac7869 100644 --- a/core/node/metadata_calculator/src/updater.rs +++ b/core/node/metadata_calculator/src/updater.rs @@ -88,26 +88,30 @@ impl TreeUpdater { /// is slow for whatever reason. async fn process_multiple_batches( &mut self, - storage: &mut Connection<'_, Core>, + pool: &ConnectionPool, l1_batch_numbers: ops::RangeInclusive, ) -> anyhow::Result { let tree_mode = self.tree.mode(); let start = Instant::now(); tracing::info!("Processing L1 batches #{l1_batch_numbers:?} in {tree_mode:?} mode"); + let mut storage = pool.connection_tagged("metadata_calculator").await?; let first_l1_batch_number = L1BatchNumber(*l1_batch_numbers.start()); let last_l1_batch_number = L1BatchNumber(*l1_batch_numbers.end()); - let mut l1_batch_data = L1BatchWithLogs::new(storage, first_l1_batch_number, tree_mode) - .await - .with_context(|| { - format!("failed fetching tree input for L1 batch #{first_l1_batch_number}") - })?; + let mut l1_batch_data = + L1BatchWithLogs::new(&mut storage, first_l1_batch_number, tree_mode) + .await + .with_context(|| { + format!("failed fetching tree input for L1 batch #{first_l1_batch_number}") + })?; + drop(storage); let mut total_logs = 0; let mut updated_headers = vec![]; for l1_batch_number in l1_batch_numbers { + let mut storage = pool.connection_tagged("metadata_calculator").await?; let l1_batch_number = L1BatchNumber(l1_batch_number); let Some(current_l1_batch_data) = l1_batch_data else { - Self::ensure_not_pruned(storage, l1_batch_number).await?; + Self::ensure_not_pruned(&mut storage, l1_batch_number).await?; return Ok(l1_batch_number); }; total_logs += current_l1_batch_data.storage_logs.len(); @@ -116,13 +120,14 @@ impl TreeUpdater { let load_next_l1_batch_task = async { if l1_batch_number < last_l1_batch_number { let next_l1_batch_number = l1_batch_number + 1; - L1BatchWithLogs::new(storage, next_l1_batch_number, tree_mode) - .await - .with_context(|| { - format!( - "failed fetching tree input for L1 batch #{next_l1_batch_number}" - ) - }) + let batch_result = + L1BatchWithLogs::new(&mut storage, next_l1_batch_number, tree_mode).await; + // Drop storage at the earliest possible moment so that it doesn't block logic running concurrently, + // such as tree pruning. + drop(storage); + batch_result.with_context(|| { + format!("failed fetching tree input for L1 batch #{next_l1_batch_number}") + }) } else { Ok(None) // Don't need to load the next L1 batch after the last one we're processing. } @@ -135,11 +140,12 @@ impl TreeUpdater { hash: metadata.root_hash, rollup_last_leaf_index: metadata.rollup_last_leaf_index, }; + + let mut storage = pool.connection_tagged("metadata_calculator").await?; storage .blocks_dal() .save_l1_batch_tree_data(l1_batch_number, &tree_data) - .await - .context("failed saving tree data")?; + .await?; // ^ Note that `save_l1_batch_tree_data()` will not blindly overwrite changes if L1 batch // metadata already exists; instead, it'll check that the old and new metadata match. // That is, if we run multiple tree instances, we'll get metadata correspondence @@ -156,6 +162,7 @@ impl TreeUpdater { .insert_proof_generation_details(l1_batch_number, object_key) .await?; } + drop(storage); save_postgres_latency.observe(); tracing::info!("Updated metadata for L1 batch #{l1_batch_number} in Postgres"); @@ -187,9 +194,10 @@ impl TreeUpdater { async fn step( &mut self, - mut storage: Connection<'_, Core>, + pool: &ConnectionPool, next_l1_batch_to_process: &mut L1BatchNumber, ) -> anyhow::Result<()> { + let mut storage = pool.connection_tagged("metadata_calculator").await?; let last_l1_batch_with_protective_reads = if self.tree.mode() == MerkleTreeMode::Lightweight || self.sealed_batches_have_protective_reads { @@ -210,6 +218,8 @@ impl TreeUpdater { .await .context("failed loading latest L1 batch number with protective reads")? }; + drop(storage); + let last_requested_l1_batch = next_l1_batch_to_process.0 + self.max_l1_batches_per_iter as u32 - 1; let last_requested_l1_batch = @@ -222,7 +232,7 @@ impl TreeUpdater { } else { tracing::info!("Updating Merkle tree with L1 batches #{l1_batch_numbers:?}"); *next_l1_batch_to_process = self - .process_multiple_batches(&mut storage, l1_batch_numbers) + .process_multiple_batches(pool, l1_batch_numbers) .await?; } Ok(()) @@ -248,10 +258,9 @@ impl TreeUpdater { tracing::info!("Stop signal received, metadata_calculator is shutting down"); break; } - let storage = pool.connection_tagged("metadata_calculator").await?; let snapshot = *next_l1_batch_to_process; - self.step(storage, &mut next_l1_batch_to_process).await?; + self.step(pool, &mut next_l1_batch_to_process).await?; let delay = if snapshot == *next_l1_batch_to_process { tracing::trace!( "Metadata calculator (next L1 batch: #{next_l1_batch_to_process}) \