diff --git a/Cargo.lock b/Cargo.lock index 887b71c39ec9..c8f3c1f52995 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10777,6 +10777,7 @@ dependencies = [ "anyhow", "assert_matches", "async-trait", + "backon", "chrono", "futures 0.3.30", "once_cell", diff --git a/core/lib/dal/.sqlx/query-180cc8d88563a42423ca1d4b92181f4625ebd593aa4cd2bae79bcc0637387d78.json b/core/lib/dal/.sqlx/query-180cc8d88563a42423ca1d4b92181f4625ebd593aa4cd2bae79bcc0637387d78.json new file mode 100644 index 000000000000..b40bdca666b8 --- /dev/null +++ b/core/lib/dal/.sqlx/query-180cc8d88563a42423ca1d4b92181f4625ebd593aa4cd2bae79bcc0637387d78.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM l1_batches\n WHERE\n number > $1\n AND NOT is_sealed\n RETURNING number\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "number", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + false + ] + }, + "hash": "180cc8d88563a42423ca1d4b92181f4625ebd593aa4cd2bae79bcc0637387d78" +} diff --git a/core/lib/dal/src/blocks_dal.rs b/core/lib/dal/src/blocks_dal.rs index bf1b48130c40..f71dc68ce757 100644 --- a/core/lib/dal/src/blocks_dal.rs +++ b/core/lib/dal/src/blocks_dal.rs @@ -2058,6 +2058,37 @@ impl BlocksDal<'_, '_> { Ok(()) } + /// Deletes the unsealed L1 batch from the storage. Expects the caller to make sure there are no + /// associated L2 blocks. + /// + /// Accepts `batch_to_keep` as a safety mechanism. + pub async fn delete_unsealed_l1_batch( + &mut self, + batch_to_keep: L1BatchNumber, + ) -> DalResult<()> { + let deleted_row = sqlx::query!( + r#" + DELETE FROM l1_batches + WHERE + number > $1 + AND NOT is_sealed + RETURNING number + "#, + i64::from(batch_to_keep.0) + ) + .instrument("delete_unsealed_l1_batch") + .with_arg("batch_to_keep", &batch_to_keep) + .fetch_optional(self.storage) + .await?; + if let Some(deleted_row) = deleted_row { + tracing::info!( + l1_batch_number = %deleted_row.number, + "Deleted unsealed batch" + ); + } + Ok(()) + } + /// Deletes all L1 batches from the storage so that the specified batch number is the last one left. pub async fn delete_l1_batches(&mut self, last_batch_to_keep: L1BatchNumber) -> DalResult<()> { self.delete_l1_batches_inner(Some(last_batch_to_keep)).await @@ -2184,6 +2215,20 @@ impl BlocksDal<'_, '_> { Ok(Some((L2BlockNumber(min as u32), L2BlockNumber(max as u32)))) } + /// Returns `true` if there exists a non-sealed batch (i.e. there is one+ stored L2 block that isn't assigned + /// to any batch yet). + pub async fn pending_batch_exists(&mut self) -> DalResult { + let count = sqlx::query_scalar!( + "SELECT COUNT(miniblocks.number) FROM miniblocks WHERE l1_batch_number IS NULL" + ) + .instrument("pending_batch_exists") + .fetch_one(self.storage) + .await? + .unwrap_or(0); + + Ok(count != 0) + } + // methods used for measuring Eth tx stage transition latencies // and emitting metrics base on these measured data pub async fn oldest_uncommitted_batch_timestamp(&mut self) -> DalResult> { diff --git a/core/node/consensus/src/testonly.rs b/core/node/consensus/src/testonly.rs index 98c0d6b08131..4ebcf5c9a617 100644 --- a/core/node/consensus/src/testonly.rs +++ b/core/node/consensus/src/testonly.rs @@ -219,11 +219,10 @@ impl StateKeeper { .wait(IoCursor::for_fetcher(&mut conn.0)) .await? .context("IoCursor::new()")?; - let batch_sealed = ctx - .wait(conn.0.blocks_dal().get_unsealed_l1_batch()) + let pending_batch = ctx + .wait(conn.0.blocks_dal().pending_batch_exists()) .await? - .context("get_unsealed_l1_batch()")? - .is_none(); + .context("pending_batch_exists()")?; let (actions_sender, actions_queue) = ActionQueue::new(); let addr = sync::watch::channel(None).0; let sync_state = SyncState::default(); @@ -259,7 +258,7 @@ impl StateKeeper { last_batch: cursor.l1_batch, last_block: cursor.next_l2_block - 1, last_timestamp: cursor.prev_l2_block_timestamp, - batch_sealed, + batch_sealed: !pending_batch, next_priority_op: PriorityOpId(1), actions_sender, sync_state: sync_state.clone(), diff --git a/core/node/node_sync/Cargo.toml b/core/node/node_sync/Cargo.toml index ccfc8dd8a4e9..9c5b0c000700 100644 --- a/core/node/node_sync/Cargo.toml +++ b/core/node/node_sync/Cargo.toml @@ -43,3 +43,4 @@ zksync_node_test_utils.workspace = true assert_matches.workspace = true once_cell.workspace = true test-casing.workspace = true +backon.workspace = true diff --git a/core/node/node_sync/src/external_io.rs b/core/node/node_sync/src/external_io.rs index 10fb2925015f..5e3a5ce9f46e 100644 --- a/core/node/node_sync/src/external_io.rs +++ b/core/node/node_sync/src/external_io.rs @@ -155,6 +155,14 @@ impl StateKeeperIO for ExternalIO { ) })?; let Some(mut pending_l2_block_header) = pending_l2_block_header else { + tracing::info!( + l1_batch_number = %cursor.l1_batch, + "No pending L2 blocks found; pruning unsealed batch if exists as we need at least one L2 block to initialize" + ); + storage + .blocks_dal() + .delete_unsealed_l1_batch(cursor.l1_batch - 1) + .await?; return Ok((cursor, None)); }; diff --git a/core/node/node_sync/src/fetcher.rs b/core/node/node_sync/src/fetcher.rs index 3f8558ed0ac5..51b9f7c7a060 100644 --- a/core/node/node_sync/src/fetcher.rs +++ b/core/node/node_sync/src/fetcher.rs @@ -114,8 +114,8 @@ impl IoCursorExt for IoCursor { let mut this = Self::new(storage).await?; // It's important to know whether we have opened a new batch already or just sealed the previous one. // Depending on it, we must either insert `OpenBatch` item into the queue, or not. - let unsealed_batch = storage.blocks_dal().get_unsealed_l1_batch().await?; - if unsealed_batch.is_none() { + let was_new_batch_open = storage.blocks_dal().pending_batch_exists().await?; + if !was_new_batch_open { this.l1_batch -= 1; // Should continue from the last L1 batch present in the storage } Ok(this) @@ -201,35 +201,3 @@ impl IoCursorExt for IoCursor { new_actions } } - -#[cfg(test)] -mod tests { - use zksync_dal::{ConnectionPool, Core, CoreDal}; - use zksync_node_genesis::{insert_genesis_batch, GenesisParams}; - use zksync_state_keeper::io::IoCursor; - use zksync_types::{block::UnsealedL1BatchHeader, L1BatchNumber}; - - use crate::fetcher::IoCursorExt; - - #[tokio::test] - async fn io_cursor_recognizes_empty_unsealed_batch() -> anyhow::Result<()> { - let pool = ConnectionPool::::test_pool().await; - let mut conn = pool.connection().await.unwrap(); - insert_genesis_batch(&mut conn, &GenesisParams::mock()) - .await - .unwrap(); - conn.blocks_dal() - .insert_l1_batch(UnsealedL1BatchHeader { - number: L1BatchNumber(1), - timestamp: 1, - protocol_version: None, - fee_address: Default::default(), - fee_input: Default::default(), - }) - .await?; - - let io_cursor = IoCursor::for_fetcher(&mut conn).await?; - assert_eq!(io_cursor.l1_batch, L1BatchNumber(1)); - Ok(()) - } -} diff --git a/core/node/node_sync/src/sync_action.rs b/core/node/node_sync/src/sync_action.rs index 8cb90d24fe84..e3fd56ae9bb0 100644 --- a/core/node/node_sync/src/sync_action.rs +++ b/core/node/node_sync/src/sync_action.rs @@ -33,6 +33,18 @@ impl ActionQueueSender { Ok(()) } + /// Pushes a single action into the queue without checking validity of the sequence. + /// + /// Useful to simulate situations where only a part of the sequence was executed on the node. + #[cfg(test)] + pub async fn push_action_unchecked(&self, action: SyncAction) -> anyhow::Result<()> { + self.0 + .send(action) + .await + .map_err(|_| anyhow::anyhow!("node action processor stopped"))?; + Ok(()) + } + /// Checks whether the action sequence is valid. /// Returned error is meant to be used as a panic message, since an invalid sequence represents an unrecoverable /// error. This function itself does not panic for the ease of testing. diff --git a/core/node/node_sync/src/tests.rs b/core/node/node_sync/src/tests.rs index 3f5791cdf24c..1ae148709b22 100644 --- a/core/node/node_sync/src/tests.rs +++ b/core/node/node_sync/src/tests.rs @@ -2,6 +2,7 @@ use std::{iter, sync::Arc, time::Duration}; +use backon::{ConstantBuilder, Retryable}; use test_casing::test_casing; use tokio::{sync::watch, task::JoinHandle}; use zksync_contracts::BaseSystemContractsHashes; @@ -18,7 +19,7 @@ use zksync_state_keeper::{ }; use zksync_types::{ api, - block::L2BlockHasher, + block::{L2BlockHasher, UnsealedL1BatchHeader}, fee_model::{BatchFeeInput, PubdataIndependentBatchFeeModelInput}, snapshots::SnapshotRecoveryStatus, Address, L1BatchNumber, L2BlockNumber, L2ChainId, ProtocolVersionId, Transaction, H256, @@ -652,3 +653,101 @@ async fn external_io_with_multiple_l1_batches() { assert_eq!(fictive_l2_block.timestamp, 2); assert_eq!(fictive_l2_block.l2_tx_count, 0); } + +async fn wait_for_batch_to_be_open( + pool: &ConnectionPool, + number: L1BatchNumber, +) -> anyhow::Result { + (|| async { + let mut storage = pool.connection().await.unwrap(); + let unsealed_batch = storage.blocks_dal().get_unsealed_l1_batch().await?; + + if let Some(unsealed_batch) = unsealed_batch { + if unsealed_batch.number == number { + Ok(unsealed_batch) + } else { + Err(anyhow::anyhow!("L1 batch #{number} is not open yet")) + } + } else { + Err(anyhow::anyhow!("No unsealed L1 batch found yet")) + } + }) + .retry( + &ConstantBuilder::default() + .with_delay(Duration::from_millis(200)) + .with_max_times(20), + ) + .await +} + +#[tokio::test] +async fn external_io_empty_unsealed_batch() { + let pool = ConnectionPool::::test_pool().await; + let mut storage = pool.connection().await.unwrap(); + ensure_genesis(&mut storage).await; + drop(storage); + + let open_batch_one = open_l1_batch(1, 1, 1); + let tx = create_l2_transaction(10, 100); + let tx_hash = tx.hash(); + let tx = FetchedTransaction::new(tx.into()); + let open_batch_two = open_l1_batch(2, 2, 3); + let fictive_l2_block = SyncAction::L2Block { + params: L2BlockParams { + timestamp: 2, + virtual_blocks: 0, + }, + number: L2BlockNumber(2), + }; + let actions1 = vec![open_batch_one, tx.into(), SyncAction::SealL2Block]; + let actions2 = vec![fictive_l2_block, SyncAction::SealBatch]; + + let (actions_sender, action_queue) = ActionQueue::new(); + let client = MockMainNodeClient::default(); + let state_keeper = + StateKeeperHandles::new(pool.clone(), client, action_queue, &[&[tx_hash]]).await; + actions_sender.push_actions(actions1).await.unwrap(); + actions_sender.push_actions(actions2).await.unwrap(); + // Unchecked insert of batch #2 to simulate restart in the middle of processing an action sequence + // In other words batch #2 is inserted completely empty with no blocks/txs present in it + actions_sender + .push_action_unchecked(open_batch_two.clone()) + .await + .unwrap(); + // Wait until the L2 block is sealed. + state_keeper.wait_for_local_block(L2BlockNumber(2)).await; + + // Wait until L1 batch #2 is opened and persisted. + let unsealed_batch = wait_for_batch_to_be_open(&pool, L1BatchNumber(2)) + .await + .unwrap(); + assert_eq!(unsealed_batch.number, L1BatchNumber(2)); + assert_eq!(unsealed_batch.timestamp, 2); + + // Prepare the rest of batch #2 + let tx = create_l2_transaction(20, 200); + let tx_hash = tx.hash(); + let tx = FetchedTransaction::new(tx.into()); + let fictive_l2_block = SyncAction::L2Block { + params: L2BlockParams { + timestamp: 4, + virtual_blocks: 0, + }, + number: L2BlockNumber(4), + }; + let actions1 = vec![open_batch_two, tx.into(), SyncAction::SealL2Block]; + let actions2 = vec![fictive_l2_block, SyncAction::SealBatch]; + + // Restart state keeper + let (actions_sender, action_queue) = ActionQueue::new(); + let client = MockMainNodeClient::default(); + let state_keeper = + StateKeeperHandles::new(pool.clone(), client, action_queue, &[&[tx_hash]]).await; + actions_sender.push_actions(actions1).await.unwrap(); + actions_sender.push_actions(actions2).await.unwrap(); + + let hash_task = tokio::spawn(mock_l1_batch_hash_computation(pool.clone(), 1)); + // Wait until the block #4 is sealed. + state_keeper.wait_for_local_block(L2BlockNumber(4)).await; + hash_task.await.unwrap(); +}