Skip to content

Commit

Permalink
fix(state-keeper): ensure unsealed batch is present during IO init (#…
Browse files Browse the repository at this point in the history
…3071)

## What ❔

Ensures unsealed L1 batch is present in the DB even if we start with
re-execution.

## Why ❔

Leftover bug after #2846 

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk_supervisor fmt` and `zk_supervisor
lint`.
  • Loading branch information
itegulov authored Oct 11, 2024
1 parent 0841f1e commit bdeb411
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 61 deletions.
89 changes: 58 additions & 31 deletions core/lib/dal/src/blocks_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,11 +578,14 @@ impl BlocksDal<'_, '_> {
/// null or set to default value for the corresponding type).
pub async fn insert_l1_batch(
&mut self,
number: L1BatchNumber,
timestamp: u64,
protocol_version: Option<ProtocolVersionId>,
fee_address: Address,
batch_fee_input: BatchFeeInput,
unsealed_batch_header: UnsealedL1BatchHeader,
) -> DalResult<()> {
Self::insert_l1_batch_inner(unsealed_batch_header, self.storage).await
}

async fn insert_l1_batch_inner(
unsealed_batch_header: UnsealedL1BatchHeader,
conn: &mut Connection<'_, Core>,
) -> DalResult<()> {
sqlx::query!(
r#"
Expand Down Expand Up @@ -625,21 +628,51 @@ impl BlocksDal<'_, '_> {
FALSE
)
"#,
i64::from(number.0),
timestamp as i64,
protocol_version.map(|v| v as i32),
fee_address.as_bytes(),
batch_fee_input.l1_gas_price() as i64,
batch_fee_input.fair_l2_gas_price() as i64,
batch_fee_input.fair_pubdata_price() as i64,
i64::from(unsealed_batch_header.number.0),
unsealed_batch_header.timestamp as i64,
unsealed_batch_header.protocol_version.map(|v| v as i32),
unsealed_batch_header.fee_address.as_bytes(),
unsealed_batch_header.fee_input.l1_gas_price() as i64,
unsealed_batch_header.fee_input.fair_l2_gas_price() as i64,
unsealed_batch_header.fee_input.fair_pubdata_price() as i64,
)
.instrument("insert_l1_batch")
.with_arg("number", &number)
.execute(self.storage)
.with_arg("number", &unsealed_batch_header.number)
.execute(conn)
.await?;
Ok(())
}

pub async fn ensure_unsealed_l1_batch_exists(
&mut self,
unsealed_batch: UnsealedL1BatchHeader,
) -> anyhow::Result<()> {
let mut transaction = self.storage.start_transaction().await?;
let unsealed_batch_fetched = Self::get_unsealed_l1_batch_inner(&mut transaction).await?;

match unsealed_batch_fetched {
None => {
tracing::info!(
"Unsealed batch #{} could not be found; inserting",
unsealed_batch.number
);
Self::insert_l1_batch_inner(unsealed_batch, &mut transaction).await?;
}
Some(unsealed_batch_fetched) => {
if unsealed_batch_fetched.number != unsealed_batch.number {
anyhow::bail!(
"fetched unsealed L1 batch #{} does not conform to expected L1 batch #{}",
unsealed_batch_fetched.number,
unsealed_batch.number
)
}
}
}

transaction.commit().await?;
Ok(())
}

/// Marks provided L1 batch as sealed and populates it with all the runtime information.
///
/// Errors if the batch does not exist.
Expand Down Expand Up @@ -744,6 +777,12 @@ impl BlocksDal<'_, '_> {
}

pub async fn get_unsealed_l1_batch(&mut self) -> DalResult<Option<UnsealedL1BatchHeader>> {
Self::get_unsealed_l1_batch_inner(self.storage).await
}

async fn get_unsealed_l1_batch_inner(
conn: &mut Connection<'_, Core>,
) -> DalResult<Option<UnsealedL1BatchHeader>> {
let batch = sqlx::query_as!(
UnsealedStorageL1Batch,
r#"
Expand All @@ -761,8 +800,8 @@ impl BlocksDal<'_, '_> {
NOT is_sealed
"#,
)
.instrument("get_last_committed_to_eth_l1_batch")
.fetch_optional(self.storage)
.instrument("get_unsealed_l1_batch")
.fetch_optional(conn)
.await?;

Ok(batch.map(|b| b.into()))
Expand Down Expand Up @@ -2621,11 +2660,7 @@ impl BlocksDal<'_, '_> {

pub async fn insert_mock_l1_batch(&mut self, header: &L1BatchHeader) -> anyhow::Result<()> {
self.insert_l1_batch(
header.number,
header.timestamp,
header.protocol_version,
header.fee_address,
BatchFeeInput::pubdata_independent(100, 100, 100),
header.to_unsealed_header(BatchFeeInput::pubdata_independent(100, 100, 100)),
)
.await?;
self.mark_l1_batch_as_sealed(
Expand Down Expand Up @@ -2940,11 +2975,7 @@ mod tests {
};
conn.blocks_dal()
.insert_l1_batch(
header.number,
header.timestamp,
header.protocol_version,
header.fee_address,
BatchFeeInput::pubdata_independent(100, 100, 100),
header.to_unsealed_header(BatchFeeInput::pubdata_independent(100, 100, 100)),
)
.await
.unwrap();
Expand All @@ -2958,11 +2989,7 @@ mod tests {
predicted_gas += predicted_gas;
conn.blocks_dal()
.insert_l1_batch(
header.number,
header.timestamp,
header.protocol_version,
header.fee_address,
BatchFeeInput::pubdata_independent(100, 100, 100),
header.to_unsealed_header(BatchFeeInput::pubdata_independent(100, 100, 100)),
)
.await
.unwrap();
Expand Down
12 changes: 12 additions & 0 deletions core/lib/types/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@ pub struct L1BatchHeader {
pub fee_address: Address,
}

impl L1BatchHeader {
pub fn to_unsealed_header(&self, fee_input: BatchFeeInput) -> UnsealedL1BatchHeader {
UnsealedL1BatchHeader {
number: self.number,
timestamp: self.timestamp,
protocol_version: self.protocol_version,
fee_address: self.fee_address,
fee_input,
}
}
}

#[derive(Debug, Clone, PartialEq)]
pub struct UnsealedL1BatchHeader {
pub number: L1BatchNumber,
Expand Down
20 changes: 19 additions & 1 deletion core/lib/vm_interface/src/types/inputs/l1_batch_env.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use serde::{Deserialize, Serialize};
use zksync_types::{fee_model::BatchFeeInput, Address, L1BatchNumber, H256};
use zksync_types::{
block::UnsealedL1BatchHeader, fee_model::BatchFeeInput, Address, L1BatchNumber,
ProtocolVersionId, H256,
};

use super::L2BlockEnv;

Expand All @@ -21,3 +24,18 @@ pub struct L1BatchEnv {
pub enforced_base_fee: Option<u64>,
pub first_l2_block: L2BlockEnv,
}

impl L1BatchEnv {
pub fn into_unsealed_header(
self,
protocol_version: Option<ProtocolVersionId>,
) -> UnsealedL1BatchHeader {
UnsealedL1BatchHeader {
number: self.number,
timestamp: self.timestamp,
protocol_version,
fee_address: self.fee_account,
fee_input: self.fee_input,
}
}
}
8 changes: 1 addition & 7 deletions core/node/genesis/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,13 +419,7 @@ pub async fn create_genesis_l1_batch(
.await?;
transaction
.blocks_dal()
.insert_l1_batch(
genesis_l1_batch_header.number,
genesis_l1_batch_header.timestamp,
genesis_l1_batch_header.protocol_version,
genesis_l1_batch_header.fee_address,
batch_fee_input,
)
.insert_l1_batch(genesis_l1_batch_header.to_unsealed_header(batch_fee_input))
.await?;
transaction
.blocks_dal()
Expand Down
23 changes: 16 additions & 7 deletions core/node/node_sync/src/external_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use zksync_state_keeper::{
updates::UpdatesManager,
};
use zksync_types::{
block::UnsealedL1BatchHeader,
protocol_upgrade::ProtocolUpgradeTx,
protocol_version::{ProtocolSemanticVersion, VersionPatch},
L1BatchNumber, L2BlockNumber, L2ChainId, ProtocolVersionId, Transaction, H256,
Expand Down Expand Up @@ -200,6 +201,14 @@ impl StateKeeperIO for ExternalIO {
cursor.l1_batch
)
})?;
storage
.blocks_dal()
.ensure_unsealed_l1_batch_exists(
l1_batch_env
.clone()
.into_unsealed_header(Some(system_env.version)),
)
.await?;
let data = load_pending_batch(&mut storage, system_env, l1_batch_env)
.await
.with_context(|| {
Expand Down Expand Up @@ -241,13 +250,13 @@ impl StateKeeperIO for ExternalIO {
.connection()
.await?
.blocks_dal()
.insert_l1_batch(
cursor.l1_batch,
params.first_l2_block.timestamp,
None,
params.operator_address,
params.fee_input,
)
.insert_l1_batch(UnsealedL1BatchHeader {
number: cursor.l1_batch,
timestamp: params.first_l2_block.timestamp,
protocol_version: None,
fee_address: params.operator_address,
fee_input: params.fee_input,
})
.await?;
return Ok(Some(params));
}
Expand Down
25 changes: 17 additions & 8 deletions core/node/state_keeper/src/io/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use zksync_mempool::L2TxFilter;
use zksync_multivm::{interface::Halt, utils::derive_base_fee_and_gas_per_pubdata};
use zksync_node_fee_model::BatchFeeModelInputProvider;
use zksync_types::{
protocol_upgrade::ProtocolUpgradeTx, utils::display_timestamp, Address, L1BatchNumber,
L2BlockNumber, L2ChainId, ProtocolVersionId, Transaction, H256, U256,
block::UnsealedL1BatchHeader, protocol_upgrade::ProtocolUpgradeTx, utils::display_timestamp,
Address, L1BatchNumber, L2BlockNumber, L2ChainId, ProtocolVersionId, Transaction, H256, U256,
};
// TODO (SMA-1206): use seconds instead of milliseconds.
use zksync_utils::time::millis_since_epoch;
Expand Down Expand Up @@ -133,6 +133,15 @@ impl StateKeeperIO for MempoolIO {
gas_per_pubdata: gas_per_pubdata as u32,
};

storage
.blocks_dal()
.ensure_unsealed_l1_batch_exists(
l1_batch_env
.clone()
.into_unsealed_header(Some(system_env.version)),
)
.await?;

Ok((
cursor,
Some(PendingBatchData {
Expand Down Expand Up @@ -219,13 +228,13 @@ impl StateKeeperIO for MempoolIO {
.connection()
.await?
.blocks_dal()
.insert_l1_batch(
cursor.l1_batch,
.insert_l1_batch(UnsealedL1BatchHeader {
number: cursor.l1_batch,
timestamp,
Some(protocol_version),
self.fee_account,
self.filter.fee_input,
)
protocol_version: Some(protocol_version),
fee_address: self.fee_account,
fee_input: self.filter.fee_input,
})
.await?;

return Ok(Some(L1BatchParams {
Expand Down
8 changes: 1 addition & 7 deletions core/node/state_keeper/src/io/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,13 +456,7 @@ mod tests {
.await
.unwrap()
.blocks_dal()
.insert_l1_batch(
l1_batch_env.number,
l1_batch_env.timestamp,
None,
l1_batch_env.fee_account,
l1_batch_env.fee_input,
)
.insert_l1_batch(l1_batch_env.into_unsealed_header(None))
.await
.unwrap();

Expand Down
34 changes: 34 additions & 0 deletions core/node/state_keeper/src/io/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,3 +606,37 @@ async fn continue_unsealed_batch_on_restart(commitment_mode: L1BatchCommitmentMo

assert_eq!(old_l1_batch_params, new_l1_batch_params);
}

#[test_casing(2, COMMITMENT_MODES)]
#[tokio::test]
async fn insert_unsealed_batch_on_init(commitment_mode: L1BatchCommitmentMode) {
let connection_pool = ConnectionPool::<Core>::test_pool().await;
let mut tester = Tester::new(commitment_mode);
tester.genesis(&connection_pool).await;
let fee_input = BatchFeeInput::pubdata_independent(55, 555, 5555);
let tx_result = tester
.insert_l2_block(&connection_pool, 1, 5, fee_input)
.await;
tester
.insert_sealed_batch(&connection_pool, 1, &[tx_result])
.await;
// Pre-insert L2 block without its unsealed L1 batch counterpart
tester.set_timestamp(2);
tester
.insert_l2_block(&connection_pool, 2, 5, fee_input)
.await;

let (mut mempool, _) = tester.create_test_mempool_io(connection_pool.clone()).await;
// Initialization is supposed to recognize that the current L1 batch is not present in the DB and
// insert it itself.
let (cursor, _) = mempool.initialize().await.unwrap();

// Make sure we are able to fetch the newly inserted batch's params
let l1_batch_params = mempool
.wait_for_new_batch_params(&cursor, Duration::from_secs(10))
.await
.unwrap()
.expect("no batch params generated");
assert_eq!(l1_batch_params.fee_input, fee_input);
assert_eq!(l1_batch_params.first_l2_block.timestamp, 2);
}

0 comments on commit bdeb411

Please sign in to comment.