Skip to content

Commit

Permalink
Use more granular DB connections
Browse files Browse the repository at this point in the history
  • Loading branch information
slowli committed Jul 2, 2024
1 parent 99ea820 commit 1f0e6aa
Showing 1 changed file with 29 additions and 20 deletions.
49 changes: 29 additions & 20 deletions core/node/metadata_calculator/src/updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,26 +88,30 @@ impl TreeUpdater {
/// is slow for whatever reason.
async fn process_multiple_batches(
&mut self,
storage: &mut Connection<'_, Core>,
pool: &ConnectionPool<Core>,
l1_batch_numbers: ops::RangeInclusive<u32>,
) -> anyhow::Result<L1BatchNumber> {
let tree_mode = self.tree.mode();
let start = Instant::now();
tracing::info!("Processing L1 batches #{l1_batch_numbers:?} in {tree_mode:?} mode");
let mut storage = pool.connection_tagged("metadata_calculator").await?;
let first_l1_batch_number = L1BatchNumber(*l1_batch_numbers.start());
let last_l1_batch_number = L1BatchNumber(*l1_batch_numbers.end());
let mut l1_batch_data = L1BatchWithLogs::new(storage, first_l1_batch_number, tree_mode)
.await
.with_context(|| {
format!("failed fetching tree input for L1 batch #{first_l1_batch_number}")
})?;
let mut l1_batch_data =
L1BatchWithLogs::new(&mut storage, first_l1_batch_number, tree_mode)
.await
.with_context(|| {
format!("failed fetching tree input for L1 batch #{first_l1_batch_number}")
})?;
drop(storage);

let mut total_logs = 0;
let mut updated_headers = vec![];
for l1_batch_number in l1_batch_numbers {
let mut storage = pool.connection_tagged("metadata_calculator").await?;
let l1_batch_number = L1BatchNumber(l1_batch_number);
let Some(current_l1_batch_data) = l1_batch_data else {
Self::ensure_not_pruned(storage, l1_batch_number).await?;
Self::ensure_not_pruned(&mut storage, l1_batch_number).await?;
return Ok(l1_batch_number);
};
total_logs += current_l1_batch_data.storage_logs.len();
Expand All @@ -116,13 +120,14 @@ impl TreeUpdater {
let load_next_l1_batch_task = async {
if l1_batch_number < last_l1_batch_number {
let next_l1_batch_number = l1_batch_number + 1;
L1BatchWithLogs::new(storage, next_l1_batch_number, tree_mode)
.await
.with_context(|| {
format!(
"failed fetching tree input for L1 batch #{next_l1_batch_number}"
)
})
let batch_result =
L1BatchWithLogs::new(&mut storage, next_l1_batch_number, tree_mode).await;
// Drop storage at the earliest possible moment so that it doesn't block logic running concurrently,
// such as tree pruning.
drop(storage);
batch_result.with_context(|| {
format!("failed fetching tree input for L1 batch #{next_l1_batch_number}")
})
} else {
Ok(None) // Don't need to load the next L1 batch after the last one we're processing.
}
Expand All @@ -135,11 +140,12 @@ impl TreeUpdater {
hash: metadata.root_hash,
rollup_last_leaf_index: metadata.rollup_last_leaf_index,
};

let mut storage = pool.connection_tagged("metadata_calculator").await?;
storage
.blocks_dal()
.save_l1_batch_tree_data(l1_batch_number, &tree_data)
.await
.context("failed saving tree data")?;
.await?;
// ^ Note that `save_l1_batch_tree_data()` will not blindly overwrite changes if L1 batch
// metadata already exists; instead, it'll check that the old and new metadata match.
// That is, if we run multiple tree instances, we'll get metadata correspondence
Expand All @@ -156,6 +162,7 @@ impl TreeUpdater {
.insert_proof_generation_details(l1_batch_number, object_key)
.await?;
}
drop(storage);
save_postgres_latency.observe();
tracing::info!("Updated metadata for L1 batch #{l1_batch_number} in Postgres");

Expand Down Expand Up @@ -187,9 +194,10 @@ impl TreeUpdater {

async fn step(
&mut self,
mut storage: Connection<'_, Core>,
pool: &ConnectionPool<Core>,
next_l1_batch_to_process: &mut L1BatchNumber,
) -> anyhow::Result<()> {
let mut storage = pool.connection_tagged("metadata_calculator").await?;
let last_l1_batch_with_protective_reads = if self.tree.mode() == MerkleTreeMode::Lightweight
|| self.sealed_batches_have_protective_reads
{
Expand All @@ -210,6 +218,8 @@ impl TreeUpdater {
.await
.context("failed loading latest L1 batch number with protective reads")?
};
drop(storage);

let last_requested_l1_batch =
next_l1_batch_to_process.0 + self.max_l1_batches_per_iter as u32 - 1;
let last_requested_l1_batch =
Expand All @@ -222,7 +232,7 @@ impl TreeUpdater {
} else {
tracing::info!("Updating Merkle tree with L1 batches #{l1_batch_numbers:?}");
*next_l1_batch_to_process = self
.process_multiple_batches(&mut storage, l1_batch_numbers)
.process_multiple_batches(pool, l1_batch_numbers)
.await?;
}
Ok(())
Expand All @@ -248,10 +258,9 @@ impl TreeUpdater {
tracing::info!("Stop signal received, metadata_calculator is shutting down");
break;
}
let storage = pool.connection_tagged("metadata_calculator").await?;

let snapshot = *next_l1_batch_to_process;
self.step(storage, &mut next_l1_batch_to_process).await?;
self.step(pool, &mut next_l1_batch_to_process).await?;
let delay = if snapshot == *next_l1_batch_to_process {
tracing::trace!(
"Metadata calculator (next L1 batch: #{next_l1_batch_to_process}) \
Expand Down

0 comments on commit 1f0e6aa

Please sign in to comment.