Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(external-node): delete empty unsealed batch on EN initialization #3125

Merged
merged 9 commits into from
Oct 18, 2024
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 38 additions & 0 deletions core/lib/dal/src/blocks_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2058,6 +2058,30 @@ impl BlocksDal<'_, '_> {
Ok(())
}

/// Deletes the unsealed L1 batch from the storage. Expects the caller to make sure there are no
/// associated L2 blocks.
///
/// Accepts `batch_to_keep` as a safety mechanism.
pub async fn delete_unsealed_l1_batch(
&mut self,
batch_to_keep: L1BatchNumber,
) -> DalResult<()> {
sqlx::query!(
r#"
DELETE FROM l1_batches
WHERE
number > $1
AND NOT is_sealed
"#,
i64::from(batch_to_keep.0)
)
.instrument("delete_unsealed_l1_batch")
.with_arg("batch_to_keep", &batch_to_keep)
.execute(self.storage)
.await?;
Ok(())
}

/// Deletes all L1 batches from the storage so that the specified batch number is the last one left.
pub async fn delete_l1_batches(&mut self, last_batch_to_keep: L1BatchNumber) -> DalResult<()> {
self.delete_l1_batches_inner(Some(last_batch_to_keep)).await
Expand Down Expand Up @@ -2184,6 +2208,20 @@ impl BlocksDal<'_, '_> {
Ok(Some((L2BlockNumber(min as u32), L2BlockNumber(max as u32))))
}

/// Returns `true` if there exists a non-sealed batch (i.e. there is one+ stored L2 block that isn't assigned
/// to any batch yet).
pub async fn pending_batch_exists(&mut self) -> DalResult<bool> {
let count = sqlx::query_scalar!(
"SELECT COUNT(miniblocks.number) FROM miniblocks WHERE l1_batch_number IS NULL"
)
.instrument("pending_batch_exists")
.fetch_one(self.storage)
.await?
.unwrap_or(0);

Ok(count != 0)
}

// methods used for measuring Eth tx stage transition latencies
// and emitting metrics base on these measured data
pub async fn oldest_uncommitted_batch_timestamp(&mut self) -> DalResult<Option<u64>> {
Expand Down
9 changes: 4 additions & 5 deletions core/node/consensus/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,10 @@ impl StateKeeper {
.wait(IoCursor::for_fetcher(&mut conn.0))
.await?
.context("IoCursor::new()")?;
let batch_sealed = ctx
.wait(conn.0.blocks_dal().get_unsealed_l1_batch())
let pending_batch = ctx
.wait(conn.0.blocks_dal().pending_batch_exists())
.await?
.context("get_unsealed_l1_batch()")?
.is_none();
.context("pending_batch_exists()")?;
let (actions_sender, actions_queue) = ActionQueue::new();
let addr = sync::watch::channel(None).0;
let sync_state = SyncState::default();
Expand Down Expand Up @@ -259,7 +258,7 @@ impl StateKeeper {
last_batch: cursor.l1_batch,
last_block: cursor.next_l2_block - 1,
last_timestamp: cursor.prev_l2_block_timestamp,
batch_sealed,
batch_sealed: !pending_batch,
next_priority_op: PriorityOpId(1),
actions_sender,
sync_state: sync_state.clone(),
Expand Down
1 change: 1 addition & 0 deletions core/node/node_sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,4 @@ zksync_node_test_utils.workspace = true
assert_matches.workspace = true
once_cell.workspace = true
test-casing.workspace = true
backon.workspace = true
10 changes: 9 additions & 1 deletion core/node/node_sync/src/external_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,14 @@ impl StateKeeperIO for ExternalIO {
)
})?;
let Some(mut pending_l2_block_header) = pending_l2_block_header else {
tracing::info!(
l1_batch_number = %cursor.l1_batch,
"Empty unsealed batch found; deleting it as we need at least one L2 block to initialize"
itegulov marked this conversation as resolved.
Show resolved Hide resolved
);
storage
.blocks_dal()
.delete_unsealed_l1_batch(cursor.l1_batch - 1)
.await?;
return Ok((cursor, None));
};
itegulov marked this conversation as resolved.
Show resolved Hide resolved

Expand Down Expand Up @@ -247,7 +255,7 @@ impl StateKeeperIO for ExternalIO {
);

self.pool
.connection_tagged("sync_layer")
.connection()
.await?
.blocks_dal()
.insert_l1_batch(UnsealedL1BatchHeader {
Expand Down
36 changes: 2 additions & 34 deletions core/node/node_sync/src/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ impl IoCursorExt for IoCursor {
let mut this = Self::new(storage).await?;
// It's important to know whether we have opened a new batch already or just sealed the previous one.
// Depending on it, we must either insert `OpenBatch` item into the queue, or not.
let unsealed_batch = storage.blocks_dal().get_unsealed_l1_batch().await?;
if unsealed_batch.is_none() {
let was_new_batch_open = storage.blocks_dal().pending_batch_exists().await?;
if !was_new_batch_open {
this.l1_batch -= 1; // Should continue from the last L1 batch present in the storage
}
Ok(this)
Expand Down Expand Up @@ -201,35 +201,3 @@ impl IoCursorExt for IoCursor {
new_actions
}
}

#[cfg(test)]
mod tests {
use zksync_dal::{ConnectionPool, Core, CoreDal};
use zksync_node_genesis::{insert_genesis_batch, GenesisParams};
use zksync_state_keeper::io::IoCursor;
use zksync_types::{block::UnsealedL1BatchHeader, L1BatchNumber};

use crate::fetcher::IoCursorExt;

#[tokio::test]
async fn io_cursor_recognizes_empty_unsealed_batch() -> anyhow::Result<()> {
let pool = ConnectionPool::<Core>::test_pool().await;
let mut conn = pool.connection().await.unwrap();
insert_genesis_batch(&mut conn, &GenesisParams::mock())
.await
.unwrap();
conn.blocks_dal()
.insert_l1_batch(UnsealedL1BatchHeader {
number: L1BatchNumber(1),
timestamp: 1,
protocol_version: None,
fee_address: Default::default(),
fee_input: Default::default(),
})
.await?;

let io_cursor = IoCursor::for_fetcher(&mut conn).await?;
assert_eq!(io_cursor.l1_batch, L1BatchNumber(1));
Ok(())
}
}
15 changes: 15 additions & 0 deletions core/node/node_sync/src/sync_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,21 @@ impl ActionQueueSender {
Ok(())
}

/// Pushes a single action into the queue without checking validity of the sequence.
///
/// Useful to simulate situations where only a part of the sequence was executed on the node.
#[cfg(test)]
pub async fn push_action_unchecked(&self, action: SyncAction) -> anyhow::Result<()> {
self.0
.send(action)
.await
.map_err(|_| anyhow::anyhow!("node action processor stopped"))?;
QUEUE_METRICS
.action_queue_size
.set(self.0.max_capacity() - self.0.capacity());
itegulov marked this conversation as resolved.
Show resolved Hide resolved
Ok(())
}

/// Checks whether the action sequence is valid.
/// Returned error is meant to be used as a panic message, since an invalid sequence represents an unrecoverable
/// error. This function itself does not panic for the ease of testing.
Expand Down
101 changes: 100 additions & 1 deletion core/node/node_sync/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use std::{iter, sync::Arc, time::Duration};

use backon::{ConstantBuilder, Retryable};
use test_casing::test_casing;
use tokio::{sync::watch, task::JoinHandle};
use zksync_contracts::BaseSystemContractsHashes;
Expand All @@ -18,7 +19,7 @@ use zksync_state_keeper::{
};
use zksync_types::{
api,
block::L2BlockHasher,
block::{L2BlockHasher, UnsealedL1BatchHeader},
fee_model::{BatchFeeInput, PubdataIndependentBatchFeeModelInput},
snapshots::SnapshotRecoveryStatus,
Address, L1BatchNumber, L2BlockNumber, L2ChainId, ProtocolVersionId, Transaction, H256,
Expand Down Expand Up @@ -652,3 +653,101 @@ async fn external_io_with_multiple_l1_batches() {
assert_eq!(fictive_l2_block.timestamp, 2);
assert_eq!(fictive_l2_block.l2_tx_count, 0);
}

async fn wait_for_batch_to_be_open(
pool: &ConnectionPool<Core>,
number: L1BatchNumber,
) -> anyhow::Result<UnsealedL1BatchHeader> {
(|| async {
let mut storage = pool.connection().await.unwrap();
let unsealed_batch = storage.blocks_dal().get_unsealed_l1_batch().await?;

if let Some(unsealed_batch) = unsealed_batch {
if unsealed_batch.number == number {
Ok(unsealed_batch)
} else {
Err(anyhow::anyhow!("L1 batch #{number} is not open yet"))
}
} else {
Err(anyhow::anyhow!("No unsealed L1 batch found yet"))
}
})
.retry(
&ConstantBuilder::default()
.with_delay(Duration::from_millis(200))
.with_max_times(20),
)
.await
}

#[tokio::test]
async fn external_io_empty_unsealed_batch() {
let pool = ConnectionPool::<Core>::test_pool().await;
let mut storage = pool.connection().await.unwrap();
ensure_genesis(&mut storage).await;
drop(storage);

let open_batch_one = open_l1_batch(1, 1, 1);
let tx = create_l2_transaction(10, 100);
let tx_hash = tx.hash();
let tx = FetchedTransaction::new(tx.into());
let open_batch_two = open_l1_batch(2, 2, 3);
let fictive_l2_block = SyncAction::L2Block {
params: L2BlockParams {
timestamp: 2,
virtual_blocks: 0,
},
number: L2BlockNumber(2),
};
let actions1 = vec![open_batch_one, tx.into(), SyncAction::SealL2Block];
let actions2 = vec![fictive_l2_block, SyncAction::SealBatch];

let (actions_sender, action_queue) = ActionQueue::new();
let client = MockMainNodeClient::default();
let state_keeper =
StateKeeperHandles::new(pool.clone(), client, action_queue, &[&[tx_hash]]).await;
actions_sender.push_actions(actions1).await.unwrap();
actions_sender.push_actions(actions2).await.unwrap();
// Unchecked insert of batch #2 to simulate restart in the middle of processing an action sequence
// In other words batch #2 is inserted completely empty with no blocks/txs present in it
actions_sender
.push_action_unchecked(open_batch_two.clone())
.await
.unwrap();
// Wait until the L2 block is sealed.
state_keeper.wait_for_local_block(L2BlockNumber(2)).await;

// Wait until L1 batch #2 is opened and persisted.
let unsealed_batch = wait_for_batch_to_be_open(&pool, L1BatchNumber(2))
.await
.unwrap();
assert_eq!(unsealed_batch.number, L1BatchNumber(2));
assert_eq!(unsealed_batch.timestamp, 2);

// Prepare the rest of batch #2
let tx = create_l2_transaction(20, 200);
let tx_hash = tx.hash();
let tx = FetchedTransaction::new(tx.into());
let fictive_l2_block = SyncAction::L2Block {
params: L2BlockParams {
timestamp: 4,
virtual_blocks: 0,
},
number: L2BlockNumber(4),
};
let actions1 = vec![open_batch_two, tx.into(), SyncAction::SealL2Block];
let actions2 = vec![fictive_l2_block, SyncAction::SealBatch];

// Restart state keeper
let (actions_sender, action_queue) = ActionQueue::new();
let client = MockMainNodeClient::default();
let state_keeper =
StateKeeperHandles::new(pool.clone(), client, action_queue, &[&[tx_hash]]).await;
actions_sender.push_actions(actions1).await.unwrap();
actions_sender.push_actions(actions2).await.unwrap();

let hash_task = tokio::spawn(mock_l1_batch_hash_computation(pool.clone(), 1));
// Wait until the block #4 is sealed.
state_keeper.wait_for_local_block(L2BlockNumber(4)).await;
hash_task.await.unwrap();
}
2 changes: 1 addition & 1 deletion core/node/state_keeper/src/io/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ impl StateKeeperIO for MempoolIO {
}

self.pool
.connection_tagged("state_keeper")
.connection()
.await?
.blocks_dal()
.insert_l1_batch(UnsealedL1BatchHeader {
Expand Down
Loading