Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(external-node): make fetcher rely on unsealed batches #3088

Merged
merged 6 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 0 additions & 14 deletions core/lib/dal/src/blocks_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2184,20 +2184,6 @@ impl BlocksDal<'_, '_> {
Ok(Some((L2BlockNumber(min as u32), L2BlockNumber(max as u32))))
}

/// Returns `true` if there exists a non-sealed batch (i.e. there is one+ stored L2 block that isn't assigned
/// to any batch yet).
pub async fn pending_batch_exists(&mut self) -> DalResult<bool> {
let count = sqlx::query_scalar!(
"SELECT COUNT(miniblocks.number) FROM miniblocks WHERE l1_batch_number IS NULL"
)
.instrument("pending_batch_exists")
.fetch_one(self.storage)
.await?
.unwrap_or(0);

Ok(count != 0)
}

// methods used for measuring Eth tx stage transition latencies
// and emitting metrics base on these measured data
pub async fn oldest_uncommitted_batch_timestamp(&mut self) -> DalResult<Option<u64>> {
Expand Down
9 changes: 5 additions & 4 deletions core/node/consensus/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,11 @@ impl StateKeeper {
.wait(IoCursor::for_fetcher(&mut conn.0))
.await?
.context("IoCursor::new()")?;
let pending_batch = ctx
.wait(conn.0.blocks_dal().pending_batch_exists())
let batch_sealed = ctx
.wait(conn.0.blocks_dal().get_unsealed_l1_batch())
.await?
.context("pending_batch_exists()")?;
.context("get_unsealed_l1_batch()")?
.is_none();
let (actions_sender, actions_queue) = ActionQueue::new();
let addr = sync::watch::channel(None).0;
let sync_state = SyncState::default();
Expand Down Expand Up @@ -258,7 +259,7 @@ impl StateKeeper {
last_batch: cursor.l1_batch,
last_block: cursor.next_l2_block - 1,
last_timestamp: cursor.prev_l2_block_timestamp,
batch_sealed: !pending_batch,
batch_sealed,
next_priority_op: PriorityOpId(1),
actions_sender,
sync_state: sync_state.clone(),
Expand Down
2 changes: 1 addition & 1 deletion core/node/node_sync/src/external_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ impl StateKeeperIO for ExternalIO {
);

self.pool
.connection()
.connection_tagged("sync_layer")
.await?
.blocks_dal()
.insert_l1_batch(UnsealedL1BatchHeader {
Expand Down
36 changes: 34 additions & 2 deletions core/node/node_sync/src/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ impl IoCursorExt for IoCursor {
let mut this = Self::new(storage).await?;
// It's important to know whether we have opened a new batch already or just sealed the previous one.
// Depending on it, we must either insert `OpenBatch` item into the queue, or not.
let was_new_batch_open = storage.blocks_dal().pending_batch_exists().await?;
if !was_new_batch_open {
let unsealed_batch = storage.blocks_dal().get_unsealed_l1_batch().await?;
if unsealed_batch.is_none() {
this.l1_batch -= 1; // Should continue from the last L1 batch present in the storage
}
Ok(this)
Expand Down Expand Up @@ -201,3 +201,35 @@ impl IoCursorExt for IoCursor {
new_actions
}
}

#[cfg(test)]
mod tests {
use zksync_dal::{ConnectionPool, Core, CoreDal};
use zksync_node_genesis::{insert_genesis_batch, GenesisParams};
use zksync_state_keeper::io::IoCursor;
use zksync_types::{block::UnsealedL1BatchHeader, L1BatchNumber};

use crate::fetcher::IoCursorExt;

#[tokio::test]
async fn io_cursor_recognizes_empty_unsealed_batch() -> anyhow::Result<()> {
let pool = ConnectionPool::<Core>::test_pool().await;
let mut conn = pool.connection().await.unwrap();
insert_genesis_batch(&mut conn, &GenesisParams::mock())
.await
.unwrap();
conn.blocks_dal()
.insert_l1_batch(UnsealedL1BatchHeader {
number: L1BatchNumber(1),
timestamp: 1,
protocol_version: None,
fee_address: Default::default(),
fee_input: Default::default(),
})
.await?;

let io_cursor = IoCursor::for_fetcher(&mut conn).await?;
assert_eq!(io_cursor.l1_batch, L1BatchNumber(1));
Ok(())
}
}
2 changes: 1 addition & 1 deletion core/node/state_keeper/src/io/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ impl StateKeeperIO for MempoolIO {
}

self.pool
.connection()
.connection_tagged("state_keeper")
.await?
.blocks_dal()
.insert_l1_batch(UnsealedL1BatchHeader {
Expand Down
Loading