Skip to content

Commit

Permalink
fix(merkle-tree): Fix connection timeouts during tree pruning (#2372)
Browse files Browse the repository at this point in the history
## What ❔

- Makes DB connections short-lived during main tree update loop.
- Propagates errors when queuing sync actions on EN.

## Why ❔

- There's less chance of DB connection timeout during tree pruning.
Realistically, such timeouts can occur when the tree is syncing if
pruning is enabled.
- Propagating errors gets rid of potential panics during EN shutdown.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.

fix(en): Fix panics when queuing sync actions during shutdown
  • Loading branch information
slowli authored Jul 3, 2024
1 parent 39709f5 commit d5935c7
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 49 deletions.
4 changes: 2 additions & 2 deletions core/node/consensus/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl PayloadQueue {

/// Advances the cursor by converting the block into actions and pushing them
/// to the actions queue.
/// Does nothing and returns Ok() if the block has been already processed.
/// Does nothing and returns `Ok(())` if the block has been already processed.
/// Returns an error if a block with an earlier block number was expected.
pub(crate) async fn send(&mut self, block: FetchedBlock) -> anyhow::Result<()> {
let want = self.inner.next_l2_block;
Expand All @@ -53,7 +53,7 @@ impl PayloadQueue {
if block.number < want {
return Ok(());
}
self.actions.push_actions(self.inner.advance(block)).await;
self.actions.push_actions(self.inner.advance(block)).await?;
Ok(())
}
}
4 changes: 2 additions & 2 deletions core/node/consensus/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,15 +260,15 @@ impl StateKeeper {
actions.push(FetchedTransaction::new(tx).into());
}
actions.push(SyncAction::SealL2Block);
self.actions_sender.push_actions(actions).await;
self.actions_sender.push_actions(actions).await.unwrap();
}

/// Pushes `SealBatch` command to the `StateKeeper`.
pub async fn seal_batch(&mut self) {
// Each batch ends with an empty block (aka fictive block).
let mut actions = vec![self.open_block()];
actions.push(SyncAction::SealBatch);
self.actions_sender.push_actions(actions).await;
self.actions_sender.push_actions(actions).await.unwrap();
self.batch_sealed = true;
}

Expand Down
49 changes: 29 additions & 20 deletions core/node/metadata_calculator/src/updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,26 +88,30 @@ impl TreeUpdater {
/// is slow for whatever reason.
async fn process_multiple_batches(
&mut self,
storage: &mut Connection<'_, Core>,
pool: &ConnectionPool<Core>,
l1_batch_numbers: ops::RangeInclusive<u32>,
) -> anyhow::Result<L1BatchNumber> {
let tree_mode = self.tree.mode();
let start = Instant::now();
tracing::info!("Processing L1 batches #{l1_batch_numbers:?} in {tree_mode:?} mode");
let mut storage = pool.connection_tagged("metadata_calculator").await?;
let first_l1_batch_number = L1BatchNumber(*l1_batch_numbers.start());
let last_l1_batch_number = L1BatchNumber(*l1_batch_numbers.end());
let mut l1_batch_data = L1BatchWithLogs::new(storage, first_l1_batch_number, tree_mode)
.await
.with_context(|| {
format!("failed fetching tree input for L1 batch #{first_l1_batch_number}")
})?;
let mut l1_batch_data =
L1BatchWithLogs::new(&mut storage, first_l1_batch_number, tree_mode)
.await
.with_context(|| {
format!("failed fetching tree input for L1 batch #{first_l1_batch_number}")
})?;
drop(storage);

let mut total_logs = 0;
let mut updated_headers = vec![];
for l1_batch_number in l1_batch_numbers {
let mut storage = pool.connection_tagged("metadata_calculator").await?;
let l1_batch_number = L1BatchNumber(l1_batch_number);
let Some(current_l1_batch_data) = l1_batch_data else {
Self::ensure_not_pruned(storage, l1_batch_number).await?;
Self::ensure_not_pruned(&mut storage, l1_batch_number).await?;
return Ok(l1_batch_number);
};
total_logs += current_l1_batch_data.storage_logs.len();
Expand All @@ -116,13 +120,14 @@ impl TreeUpdater {
let load_next_l1_batch_task = async {
if l1_batch_number < last_l1_batch_number {
let next_l1_batch_number = l1_batch_number + 1;
L1BatchWithLogs::new(storage, next_l1_batch_number, tree_mode)
.await
.with_context(|| {
format!(
"failed fetching tree input for L1 batch #{next_l1_batch_number}"
)
})
let batch_result =
L1BatchWithLogs::new(&mut storage, next_l1_batch_number, tree_mode).await;
// Drop storage at the earliest possible moment so that it doesn't block logic running concurrently,
// such as tree pruning.
drop(storage);
batch_result.with_context(|| {
format!("failed fetching tree input for L1 batch #{next_l1_batch_number}")
})
} else {
Ok(None) // Don't need to load the next L1 batch after the last one we're processing.
}
Expand All @@ -135,11 +140,12 @@ impl TreeUpdater {
hash: metadata.root_hash,
rollup_last_leaf_index: metadata.rollup_last_leaf_index,
};

let mut storage = pool.connection_tagged("metadata_calculator").await?;
storage
.blocks_dal()
.save_l1_batch_tree_data(l1_batch_number, &tree_data)
.await
.context("failed saving tree data")?;
.await?;
// ^ Note that `save_l1_batch_tree_data()` will not blindly overwrite changes if L1 batch
// metadata already exists; instead, it'll check that the old and new metadata match.
// That is, if we run multiple tree instances, we'll get metadata correspondence
Expand All @@ -156,6 +162,7 @@ impl TreeUpdater {
.insert_proof_generation_details(l1_batch_number, object_key)
.await?;
}
drop(storage);
save_postgres_latency.observe();
tracing::info!("Updated metadata for L1 batch #{l1_batch_number} in Postgres");

Expand Down Expand Up @@ -187,9 +194,10 @@ impl TreeUpdater {

async fn step(
&mut self,
mut storage: Connection<'_, Core>,
pool: &ConnectionPool<Core>,
next_l1_batch_to_process: &mut L1BatchNumber,
) -> anyhow::Result<()> {
let mut storage = pool.connection_tagged("metadata_calculator").await?;
let last_l1_batch_with_protective_reads = if self.tree.mode() == MerkleTreeMode::Lightweight
|| self.sealed_batches_have_protective_reads
{
Expand All @@ -210,6 +218,8 @@ impl TreeUpdater {
.await
.context("failed loading latest L1 batch number with protective reads")?
};
drop(storage);

let last_requested_l1_batch =
next_l1_batch_to_process.0 + self.max_l1_batches_per_iter as u32 - 1;
let last_requested_l1_batch =
Expand All @@ -222,7 +232,7 @@ impl TreeUpdater {
} else {
tracing::info!("Updating Merkle tree with L1 batches #{l1_batch_numbers:?}");
*next_l1_batch_to_process = self
.process_multiple_batches(&mut storage, l1_batch_numbers)
.process_multiple_batches(pool, l1_batch_numbers)
.await?;
}
Ok(())
Expand All @@ -248,10 +258,9 @@ impl TreeUpdater {
tracing::info!("Stop signal received, metadata_calculator is shutting down");
break;
}
let storage = pool.connection_tagged("metadata_calculator").await?;

let snapshot = *next_l1_batch_to_process;
self.step(storage, &mut next_l1_batch_to_process).await?;
self.step(pool, &mut next_l1_batch_to_process).await?;
let delay = if snapshot == *next_l1_batch_to_process {
tracing::trace!(
"Metadata calculator (next L1 batch: #{next_l1_batch_to_process}) \
Expand Down
39 changes: 22 additions & 17 deletions core/node/node_sync/src/sync_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,30 @@ impl ActionQueueSender {
/// Requires that the actions are in the correct order: starts with a new open L1 batch / L2 block,
/// followed by 0 or more transactions, have mandatory `SealL2Block` and optional `SealBatch` at the end.
/// Would panic if the order is incorrect.
pub async fn push_actions(&self, actions: Vec<SyncAction>) {
Self::check_action_sequence(&actions).unwrap();
///
/// # Errors
///
/// Errors correspond to incorrect action order, or to `ExternalIO` instance that the queue is connected to shutting down.
/// Hence, returned errors must be treated as unrecoverable by the caller; it is unsound to continue
/// operating a node if some of the `actions` may be lost.
pub async fn push_actions(&self, actions: Vec<SyncAction>) -> anyhow::Result<()> {
Self::check_action_sequence(&actions)?;
for action in actions {
self.0.send(action).await.expect("EN sync logic panicked");
self.0
.send(action)
.await
.map_err(|_| anyhow::anyhow!("node action processor stopped"))?;
QUEUE_METRICS
.action_queue_size
.set(self.0.max_capacity() - self.0.capacity());
}
Ok(())
}

/// Checks whether the action sequence is valid.
/// Returned error is meant to be used as a panic message, since an invalid sequence represents an unrecoverable
/// error. This function itself does not panic for the ease of testing.
fn check_action_sequence(actions: &[SyncAction]) -> Result<(), String> {
fn check_action_sequence(actions: &[SyncAction]) -> anyhow::Result<()> {
// Rules for the sequence:
// 1. Must start with either `OpenBatch` or `L2Block`, both of which may be met only once.
// 2. Followed by a sequence of `Tx` actions which consists of 0 or more elements.
Expand All @@ -38,27 +48,22 @@ impl ActionQueueSender {
for action in actions {
match action {
SyncAction::OpenBatch { .. } | SyncAction::L2Block { .. } => {
if opened {
return Err(format!("Unexpected OpenBatch / L2Block: {actions:?}"));
}
anyhow::ensure!(!opened, "Unexpected OpenBatch / L2Block: {actions:?}");
opened = true;
}
SyncAction::Tx(_) => {
if !opened || l2_block_sealed {
return Err(format!("Unexpected Tx: {actions:?}"));
}
anyhow::ensure!(opened && !l2_block_sealed, "Unexpected Tx: {actions:?}");
}
SyncAction::SealL2Block | SyncAction::SealBatch => {
if !opened || l2_block_sealed {
return Err(format!("Unexpected SealL2Block / SealBatch: {actions:?}"));
}
anyhow::ensure!(
opened && !l2_block_sealed,
"Unexpected SealL2Block / SealBatch: {actions:?}"
);
l2_block_sealed = true;
}
}
}
if !l2_block_sealed {
return Err(format!("Incomplete sequence: {actions:?}"));
}
anyhow::ensure!(l2_block_sealed, "Incomplete sequence: {actions:?}");
Ok(())
}
}
Expand Down Expand Up @@ -287,7 +292,7 @@ mod tests {
panic!("Invalid sequence passed the test. Sequence #{idx}, expected error: {expected_err}");
};
assert!(
err.starts_with(expected_err),
err.to_string().contains(expected_err),
"Sequence #{idx} failed. Expected error: {expected_err}, got: {err}"
);
}
Expand Down
31 changes: 23 additions & 8 deletions core/node/node_sync/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ async fn external_io_basics(snapshot_recovery: bool) {
&[&extract_tx_hashes(&actions)],
)
.await;
actions_sender.push_actions(actions).await;
actions_sender.push_actions(actions).await.unwrap();
// Wait until the L2 block is sealed.
state_keeper
.wait_for_local_block(snapshot.l2_block_number + 1)
Expand Down Expand Up @@ -316,7 +316,7 @@ async fn external_io_works_without_local_protocol_version(snapshot_recovery: boo
&[&extract_tx_hashes(&actions)],
)
.await;
actions_sender.push_actions(actions).await;
actions_sender.push_actions(actions).await.unwrap();
// Wait until the L2 block is sealed.
state_keeper
.wait_for_local_block(snapshot.l2_block_number + 1)
Expand Down Expand Up @@ -407,8 +407,14 @@ pub(super) async fn run_state_keeper_with_multiple_l2_blocks(
let (actions_sender, action_queue) = ActionQueue::new();
let client = MockMainNodeClient::default();
let state_keeper = StateKeeperHandles::new(pool, client, action_queue, &[&tx_hashes]).await;
actions_sender.push_actions(first_l2_block_actions).await;
actions_sender.push_actions(second_l2_block_actions).await;
actions_sender
.push_actions(first_l2_block_actions)
.await
.unwrap();
actions_sender
.push_actions(second_l2_block_actions)
.await
.unwrap();
// Wait until both L2 blocks are sealed.
state_keeper
.wait_for_local_block(snapshot.l2_block_number + 2)
Expand Down Expand Up @@ -490,7 +496,7 @@ async fn test_external_io_recovery(
number: snapshot.l2_block_number + 3,
};
let actions = vec![open_l2_block, new_tx.into(), SyncAction::SealL2Block];
actions_sender.push_actions(actions).await;
actions_sender.push_actions(actions).await.unwrap();
state_keeper
.wait_for_local_block(snapshot.l2_block_number + 3)
.await;
Expand Down Expand Up @@ -580,9 +586,18 @@ pub(super) async fn run_state_keeper_with_multiple_l1_batches(
&[&[first_tx_hash], &[second_tx_hash]],
)
.await;
actions_sender.push_actions(first_l1_batch_actions).await;
actions_sender.push_actions(fictive_l2_block_actions).await;
actions_sender.push_actions(second_l1_batch_actions).await;
actions_sender
.push_actions(first_l1_batch_actions)
.await
.unwrap();
actions_sender
.push_actions(fictive_l2_block_actions)
.await
.unwrap();
actions_sender
.push_actions(second_l1_batch_actions)
.await
.unwrap();

let hash_task = tokio::spawn(mock_l1_batch_hash_computation(
pool.clone(),
Expand Down

0 comments on commit d5935c7

Please sign in to comment.