From bdeb411c593ac3d5e16158e64c4210bb00edcb0c Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Fri, 11 Oct 2024 19:48:44 +1100 Subject: [PATCH] fix(state-keeper): ensure unsealed batch is present during IO init (#3071) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ Ensures unsealed L1 batch is present in the DB even if we start with re-execution. ## Why ❔ Leftover bug after #2846 ## Checklist - [x] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [x] Tests for the changes have been added / updated. - [x] Documentation comments have been added / updated. - [x] Code has been formatted via `zk_supervisor fmt` and `zk_supervisor lint`. --- core/lib/dal/src/blocks_dal.rs | 89 ++++++++++++------- core/lib/types/src/block.rs | 12 +++ .../src/types/inputs/l1_batch_env.rs | 20 ++++- core/node/genesis/src/lib.rs | 8 +- core/node/node_sync/src/external_io.rs | 23 +++-- core/node/state_keeper/src/io/mempool.rs | 25 ++++-- core/node/state_keeper/src/io/persistence.rs | 8 +- core/node/state_keeper/src/io/tests/mod.rs | 34 +++++++ 8 files changed, 158 insertions(+), 61 deletions(-) diff --git a/core/lib/dal/src/blocks_dal.rs b/core/lib/dal/src/blocks_dal.rs index 347152e3c38d..d59f95192c6b 100644 --- a/core/lib/dal/src/blocks_dal.rs +++ b/core/lib/dal/src/blocks_dal.rs @@ -578,11 +578,14 @@ impl BlocksDal<'_, '_> { /// null or set to default value for the corresponding type). pub async fn insert_l1_batch( &mut self, - number: L1BatchNumber, - timestamp: u64, - protocol_version: Option, - fee_address: Address, - batch_fee_input: BatchFeeInput, + unsealed_batch_header: UnsealedL1BatchHeader, + ) -> DalResult<()> { + Self::insert_l1_batch_inner(unsealed_batch_header, self.storage).await + } + + async fn insert_l1_batch_inner( + unsealed_batch_header: UnsealedL1BatchHeader, + conn: &mut Connection<'_, Core>, ) -> DalResult<()> { sqlx::query!( r#" @@ -625,21 +628,51 @@ impl BlocksDal<'_, '_> { FALSE ) "#, - i64::from(number.0), - timestamp as i64, - protocol_version.map(|v| v as i32), - fee_address.as_bytes(), - batch_fee_input.l1_gas_price() as i64, - batch_fee_input.fair_l2_gas_price() as i64, - batch_fee_input.fair_pubdata_price() as i64, + i64::from(unsealed_batch_header.number.0), + unsealed_batch_header.timestamp as i64, + unsealed_batch_header.protocol_version.map(|v| v as i32), + unsealed_batch_header.fee_address.as_bytes(), + unsealed_batch_header.fee_input.l1_gas_price() as i64, + unsealed_batch_header.fee_input.fair_l2_gas_price() as i64, + unsealed_batch_header.fee_input.fair_pubdata_price() as i64, ) .instrument("insert_l1_batch") - .with_arg("number", &number) - .execute(self.storage) + .with_arg("number", &unsealed_batch_header.number) + .execute(conn) .await?; Ok(()) } + pub async fn ensure_unsealed_l1_batch_exists( + &mut self, + unsealed_batch: UnsealedL1BatchHeader, + ) -> anyhow::Result<()> { + let mut transaction = self.storage.start_transaction().await?; + let unsealed_batch_fetched = Self::get_unsealed_l1_batch_inner(&mut transaction).await?; + + match unsealed_batch_fetched { + None => { + tracing::info!( + "Unsealed batch #{} could not be found; inserting", + unsealed_batch.number + ); + Self::insert_l1_batch_inner(unsealed_batch, &mut transaction).await?; + } + Some(unsealed_batch_fetched) => { + if unsealed_batch_fetched.number != unsealed_batch.number { + anyhow::bail!( + "fetched unsealed L1 batch #{} does not conform to expected L1 batch #{}", + unsealed_batch_fetched.number, + unsealed_batch.number + ) + } + } + } + + transaction.commit().await?; + Ok(()) + } + /// Marks provided L1 batch as sealed and populates it with all the runtime information. /// /// Errors if the batch does not exist. @@ -744,6 +777,12 @@ impl BlocksDal<'_, '_> { } pub async fn get_unsealed_l1_batch(&mut self) -> DalResult> { + Self::get_unsealed_l1_batch_inner(self.storage).await + } + + async fn get_unsealed_l1_batch_inner( + conn: &mut Connection<'_, Core>, + ) -> DalResult> { let batch = sqlx::query_as!( UnsealedStorageL1Batch, r#" @@ -761,8 +800,8 @@ impl BlocksDal<'_, '_> { NOT is_sealed "#, ) - .instrument("get_last_committed_to_eth_l1_batch") - .fetch_optional(self.storage) + .instrument("get_unsealed_l1_batch") + .fetch_optional(conn) .await?; Ok(batch.map(|b| b.into())) @@ -2621,11 +2660,7 @@ impl BlocksDal<'_, '_> { pub async fn insert_mock_l1_batch(&mut self, header: &L1BatchHeader) -> anyhow::Result<()> { self.insert_l1_batch( - header.number, - header.timestamp, - header.protocol_version, - header.fee_address, - BatchFeeInput::pubdata_independent(100, 100, 100), + header.to_unsealed_header(BatchFeeInput::pubdata_independent(100, 100, 100)), ) .await?; self.mark_l1_batch_as_sealed( @@ -2940,11 +2975,7 @@ mod tests { }; conn.blocks_dal() .insert_l1_batch( - header.number, - header.timestamp, - header.protocol_version, - header.fee_address, - BatchFeeInput::pubdata_independent(100, 100, 100), + header.to_unsealed_header(BatchFeeInput::pubdata_independent(100, 100, 100)), ) .await .unwrap(); @@ -2958,11 +2989,7 @@ mod tests { predicted_gas += predicted_gas; conn.blocks_dal() .insert_l1_batch( - header.number, - header.timestamp, - header.protocol_version, - header.fee_address, - BatchFeeInput::pubdata_independent(100, 100, 100), + header.to_unsealed_header(BatchFeeInput::pubdata_independent(100, 100, 100)), ) .await .unwrap(); diff --git a/core/lib/types/src/block.rs b/core/lib/types/src/block.rs index 361e9ea56d28..9211a6f1d8cf 100644 --- a/core/lib/types/src/block.rs +++ b/core/lib/types/src/block.rs @@ -68,6 +68,18 @@ pub struct L1BatchHeader { pub fee_address: Address, } +impl L1BatchHeader { + pub fn to_unsealed_header(&self, fee_input: BatchFeeInput) -> UnsealedL1BatchHeader { + UnsealedL1BatchHeader { + number: self.number, + timestamp: self.timestamp, + protocol_version: self.protocol_version, + fee_address: self.fee_address, + fee_input, + } + } +} + #[derive(Debug, Clone, PartialEq)] pub struct UnsealedL1BatchHeader { pub number: L1BatchNumber, diff --git a/core/lib/vm_interface/src/types/inputs/l1_batch_env.rs b/core/lib/vm_interface/src/types/inputs/l1_batch_env.rs index dbc942476170..0011f0b138b5 100644 --- a/core/lib/vm_interface/src/types/inputs/l1_batch_env.rs +++ b/core/lib/vm_interface/src/types/inputs/l1_batch_env.rs @@ -1,5 +1,8 @@ use serde::{Deserialize, Serialize}; -use zksync_types::{fee_model::BatchFeeInput, Address, L1BatchNumber, H256}; +use zksync_types::{ + block::UnsealedL1BatchHeader, fee_model::BatchFeeInput, Address, L1BatchNumber, + ProtocolVersionId, H256, +}; use super::L2BlockEnv; @@ -21,3 +24,18 @@ pub struct L1BatchEnv { pub enforced_base_fee: Option, pub first_l2_block: L2BlockEnv, } + +impl L1BatchEnv { + pub fn into_unsealed_header( + self, + protocol_version: Option, + ) -> UnsealedL1BatchHeader { + UnsealedL1BatchHeader { + number: self.number, + timestamp: self.timestamp, + protocol_version, + fee_address: self.fee_account, + fee_input: self.fee_input, + } + } +} diff --git a/core/node/genesis/src/lib.rs b/core/node/genesis/src/lib.rs index ba5e10dfb22c..3e4c0ee30b94 100644 --- a/core/node/genesis/src/lib.rs +++ b/core/node/genesis/src/lib.rs @@ -419,13 +419,7 @@ pub async fn create_genesis_l1_batch( .await?; transaction .blocks_dal() - .insert_l1_batch( - genesis_l1_batch_header.number, - genesis_l1_batch_header.timestamp, - genesis_l1_batch_header.protocol_version, - genesis_l1_batch_header.fee_address, - batch_fee_input, - ) + .insert_l1_batch(genesis_l1_batch_header.to_unsealed_header(batch_fee_input)) .await?; transaction .blocks_dal() diff --git a/core/node/node_sync/src/external_io.rs b/core/node/node_sync/src/external_io.rs index d4e7240da346..9148f9638684 100644 --- a/core/node/node_sync/src/external_io.rs +++ b/core/node/node_sync/src/external_io.rs @@ -15,6 +15,7 @@ use zksync_state_keeper::{ updates::UpdatesManager, }; use zksync_types::{ + block::UnsealedL1BatchHeader, protocol_upgrade::ProtocolUpgradeTx, protocol_version::{ProtocolSemanticVersion, VersionPatch}, L1BatchNumber, L2BlockNumber, L2ChainId, ProtocolVersionId, Transaction, H256, @@ -200,6 +201,14 @@ impl StateKeeperIO for ExternalIO { cursor.l1_batch ) })?; + storage + .blocks_dal() + .ensure_unsealed_l1_batch_exists( + l1_batch_env + .clone() + .into_unsealed_header(Some(system_env.version)), + ) + .await?; let data = load_pending_batch(&mut storage, system_env, l1_batch_env) .await .with_context(|| { @@ -241,13 +250,13 @@ impl StateKeeperIO for ExternalIO { .connection() .await? .blocks_dal() - .insert_l1_batch( - cursor.l1_batch, - params.first_l2_block.timestamp, - None, - params.operator_address, - params.fee_input, - ) + .insert_l1_batch(UnsealedL1BatchHeader { + number: cursor.l1_batch, + timestamp: params.first_l2_block.timestamp, + protocol_version: None, + fee_address: params.operator_address, + fee_input: params.fee_input, + }) .await?; return Ok(Some(params)); } diff --git a/core/node/state_keeper/src/io/mempool.rs b/core/node/state_keeper/src/io/mempool.rs index f771a2dda4ce..5a3fb8e4c4fc 100644 --- a/core/node/state_keeper/src/io/mempool.rs +++ b/core/node/state_keeper/src/io/mempool.rs @@ -14,8 +14,8 @@ use zksync_mempool::L2TxFilter; use zksync_multivm::{interface::Halt, utils::derive_base_fee_and_gas_per_pubdata}; use zksync_node_fee_model::BatchFeeModelInputProvider; use zksync_types::{ - protocol_upgrade::ProtocolUpgradeTx, utils::display_timestamp, Address, L1BatchNumber, - L2BlockNumber, L2ChainId, ProtocolVersionId, Transaction, H256, U256, + block::UnsealedL1BatchHeader, protocol_upgrade::ProtocolUpgradeTx, utils::display_timestamp, + Address, L1BatchNumber, L2BlockNumber, L2ChainId, ProtocolVersionId, Transaction, H256, U256, }; // TODO (SMA-1206): use seconds instead of milliseconds. use zksync_utils::time::millis_since_epoch; @@ -133,6 +133,15 @@ impl StateKeeperIO for MempoolIO { gas_per_pubdata: gas_per_pubdata as u32, }; + storage + .blocks_dal() + .ensure_unsealed_l1_batch_exists( + l1_batch_env + .clone() + .into_unsealed_header(Some(system_env.version)), + ) + .await?; + Ok(( cursor, Some(PendingBatchData { @@ -219,13 +228,13 @@ impl StateKeeperIO for MempoolIO { .connection() .await? .blocks_dal() - .insert_l1_batch( - cursor.l1_batch, + .insert_l1_batch(UnsealedL1BatchHeader { + number: cursor.l1_batch, timestamp, - Some(protocol_version), - self.fee_account, - self.filter.fee_input, - ) + protocol_version: Some(protocol_version), + fee_address: self.fee_account, + fee_input: self.filter.fee_input, + }) .await?; return Ok(Some(L1BatchParams { diff --git a/core/node/state_keeper/src/io/persistence.rs b/core/node/state_keeper/src/io/persistence.rs index 16275ec672df..3e11285e11f1 100644 --- a/core/node/state_keeper/src/io/persistence.rs +++ b/core/node/state_keeper/src/io/persistence.rs @@ -456,13 +456,7 @@ mod tests { .await .unwrap() .blocks_dal() - .insert_l1_batch( - l1_batch_env.number, - l1_batch_env.timestamp, - None, - l1_batch_env.fee_account, - l1_batch_env.fee_input, - ) + .insert_l1_batch(l1_batch_env.into_unsealed_header(None)) .await .unwrap(); diff --git a/core/node/state_keeper/src/io/tests/mod.rs b/core/node/state_keeper/src/io/tests/mod.rs index cd60bc68b36a..566eebf7ab72 100644 --- a/core/node/state_keeper/src/io/tests/mod.rs +++ b/core/node/state_keeper/src/io/tests/mod.rs @@ -606,3 +606,37 @@ async fn continue_unsealed_batch_on_restart(commitment_mode: L1BatchCommitmentMo assert_eq!(old_l1_batch_params, new_l1_batch_params); } + +#[test_casing(2, COMMITMENT_MODES)] +#[tokio::test] +async fn insert_unsealed_batch_on_init(commitment_mode: L1BatchCommitmentMode) { + let connection_pool = ConnectionPool::::test_pool().await; + let mut tester = Tester::new(commitment_mode); + tester.genesis(&connection_pool).await; + let fee_input = BatchFeeInput::pubdata_independent(55, 555, 5555); + let tx_result = tester + .insert_l2_block(&connection_pool, 1, 5, fee_input) + .await; + tester + .insert_sealed_batch(&connection_pool, 1, &[tx_result]) + .await; + // Pre-insert L2 block without its unsealed L1 batch counterpart + tester.set_timestamp(2); + tester + .insert_l2_block(&connection_pool, 2, 5, fee_input) + .await; + + let (mut mempool, _) = tester.create_test_mempool_io(connection_pool.clone()).await; + // Initialization is supposed to recognize that the current L1 batch is not present in the DB and + // insert it itself. + let (cursor, _) = mempool.initialize().await.unwrap(); + + // Make sure we are able to fetch the newly inserted batch's params + let l1_batch_params = mempool + .wait_for_new_batch_params(&cursor, Duration::from_secs(10)) + .await + .unwrap() + .expect("no batch params generated"); + assert_eq!(l1_batch_params.fee_input, fee_input); + assert_eq!(l1_batch_params.first_l2_block.timestamp, 2); +}