Skip to content

Commit

Permalink
feat(vm-runner): implement VM runner storage layer (#1651)
Browse files Browse the repository at this point in the history
## What ❔

Adds an abstraction that can load unprocessed batches as well as
implement `ReadStorageFactory`. Implementation largely reuses the
existing primitives from state keeper and they had to be generalized
slightly.

## Why ❔

One of the components for the upcoming VM runner

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
- [x] Linkcheck has been run via `zk linkcheck`.
  • Loading branch information
itegulov authored Apr 23, 2024
1 parent 7124add commit 543f9e9
Show file tree
Hide file tree
Showing 30 changed files with 1,481 additions and 232 deletions.
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ anyhow = "1"
assert_matches = "1.5"
async-trait = "0.1"
axum = "0.6.19"
backon = "0.4.4"
bigdecimal = "0.3.0"
bincode = "1"
bitflags = "1.3.2"
Expand Down Expand Up @@ -226,6 +227,6 @@ zksync_crypto_primitives = { path = "core/lib/crypto_primitives" }
zksync_node_framework = { path = "core/node/node_framework" }
zksync_eth_watch = { path = "core/node/eth_watch" }
zksync_shared_metrics = { path = "core/node/shared_metrics" }
zksync_block_reverter = { path = "core/node/block_reverter"}
zksync_block_reverter = { path = "core/node/block_reverter" }
zksync_commitment_generator = { path = "core/node/commitment_generator" }
zksync_house_keeper = { path = "core/node/house_keeper" }
zksync_house_keeper = { path = "core/node/house_keeper" }
1 change: 1 addition & 0 deletions checks-config/era.dic
Original file line number Diff line number Diff line change
Expand Up @@ -943,3 +943,4 @@ superset
80M
780kb
hyperchain
storages

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions core/lib/dal/src/factory_deps_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,4 +180,23 @@ impl FactoryDepsDal<'_, '_> {
.await?;
Ok(())
}

/// Retrieves all factory deps entries for testing purposes.
pub async fn dump_all_factory_deps_for_tests(&mut self) -> HashMap<H256, Vec<u8>> {
sqlx::query!(
r#"
SELECT
bytecode,
bytecode_hash
FROM
factory_deps
"#
)
.fetch_all(self.storage.conn())
.await
.unwrap()
.into_iter()
.map(|row| (H256::from_slice(&row.bytecode_hash), row.bytecode))
.collect()
}
}
26 changes: 26 additions & 0 deletions core/lib/dal/src/storage_logs_dedup_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,32 @@ impl StorageLogsDedupDal<'_, '_> {
.map(|row| row.index as u64))
}

pub async fn get_enumeration_index_in_l1_batch(
&mut self,
hashed_key: H256,
l1_batch_number: L1BatchNumber,
) -> DalResult<Option<u64>> {
Ok(sqlx::query!(
r#"
SELECT
INDEX
FROM
initial_writes
WHERE
hashed_key = $1
AND l1_batch_number <= $2
"#,
hashed_key.as_bytes(),
l1_batch_number.0 as i32,
)
.instrument("get_enumeration_index_in_l1_batch")
.with_arg("hashed_key", &hashed_key)
.with_arg("l1_batch_number", &l1_batch_number)
.fetch_optional(self.storage)
.await?
.map(|row| row.index as u64))
}

/// Returns `hashed_keys` that are both present in the input and in `initial_writes` table.
pub async fn filter_written_slots(&mut self, hashed_keys: &[H256]) -> DalResult<HashSet<H256>> {
let hashed_keys: Vec<_> = hashed_keys.iter().map(H256::as_bytes).collect();
Expand Down
34 changes: 31 additions & 3 deletions core/lib/dal/src/transactions_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1252,7 +1252,8 @@ impl TransactionsDal<'_, '_> {
.fetch_all(self.storage)
.await?;

self.map_transactions_to_execution_data(transactions).await
self.map_transactions_to_execution_data(transactions, None)
.await
}

/// Returns L2 blocks with their transactions to be used in VM execution.
Expand Down Expand Up @@ -1282,14 +1283,37 @@ impl TransactionsDal<'_, '_> {
.fetch_all(self.storage)
.await?;

self.map_transactions_to_execution_data(transactions).await
let fictive_l2_block = sqlx::query!(
r#"
SELECT
number
FROM
miniblocks
WHERE
miniblocks.l1_batch_number = $1
AND l1_tx_count = 0
AND l2_tx_count = 0
ORDER BY
number
"#,
i64::from(l1_batch_number.0)
)
.instrument("get_l2_blocks_to_execute_for_l1_batch#fictive_l2_block")
.with_arg("l1_batch_number", &l1_batch_number)
.fetch_optional(self.storage)
.await?
.map(|row| L2BlockNumber(row.number as u32));

self.map_transactions_to_execution_data(transactions, fictive_l2_block)
.await
}

async fn map_transactions_to_execution_data(
&mut self,
transactions: Vec<StorageTransaction>,
fictive_l2_block: Option<L2BlockNumber>,
) -> DalResult<Vec<L2BlockExecutionData>> {
let transactions_by_l2_block: Vec<(L2BlockNumber, Vec<Transaction>)> = transactions
let mut transactions_by_l2_block: Vec<(L2BlockNumber, Vec<Transaction>)> = transactions
.into_iter()
.group_by(|tx| tx.miniblock_number.unwrap())
.into_iter()
Expand All @@ -1300,6 +1324,10 @@ impl TransactionsDal<'_, '_> {
)
})
.collect();
// Fictive L2 block is always at the end of a batch so it is safe to append it
if let Some(fictive_l2_block) = fictive_l2_block {
transactions_by_l2_block.push((fictive_l2_block, Vec::new()));
}
if transactions_by_l2_block.is_empty() {
return Ok(Vec::new());
}
Expand Down
3 changes: 3 additions & 0 deletions core/lib/state/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@ vise.workspace = true
zksync_dal.workspace = true
zksync_types.workspace = true
zksync_utils.workspace = true
zksync_shared_metrics.workspace = true
zksync_storage.workspace = true

anyhow.workspace = true
async-trait.workspace = true
mini-moka.workspace = true
tokio = { workspace = true, features = ["rt"] }
tracing.workspace = true
itertools.workspace = true
chrono.workspace = true
once_cell.workspace = true

[dev-dependencies]
assert_matches.workspace = true
Expand Down
80 changes: 80 additions & 0 deletions core/lib/state/src/catchup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use std::{sync::Arc, time::Instant};

use anyhow::Context;
use once_cell::sync::OnceCell;
use tokio::sync::watch;
use zksync_dal::{ConnectionPool, Core};
use zksync_shared_metrics::{SnapshotRecoveryStage, APP_METRICS};
use zksync_storage::RocksDB;
use zksync_types::L1BatchNumber;

use crate::{RocksdbStorage, StateKeeperColumnFamily};

/// A runnable task that blocks until the provided RocksDB cache instance is caught up with
/// Postgres.
///
/// See [`ReadStorageFactory`] for more context.
#[derive(Debug)]
pub struct AsyncCatchupTask {
pool: ConnectionPool<Core>,
state_keeper_db_path: String,
rocksdb_cell: Arc<OnceCell<RocksDB<StateKeeperColumnFamily>>>,
to_l1_batch_number: Option<L1BatchNumber>,
}

impl AsyncCatchupTask {
/// Create a new catch-up task with the provided Postgres and RocksDB instances. Optionally
/// accepts the last L1 batch number to catch up to (defaults to latest if not specified).
pub fn new(
pool: ConnectionPool<Core>,
state_keeper_db_path: String,
rocksdb_cell: Arc<OnceCell<RocksDB<StateKeeperColumnFamily>>>,
to_l1_batch_number: Option<L1BatchNumber>,
) -> Self {
Self {
pool,
state_keeper_db_path,
rocksdb_cell,
to_l1_batch_number,
}
}

/// Block until RocksDB cache instance is caught up with Postgres.
///
/// # Errors
///
/// Propagates RocksDB and Postgres errors.
pub async fn run(self, stop_receiver: watch::Receiver<bool>) -> anyhow::Result<()> {
let started_at = Instant::now();
tracing::debug!("Catching up RocksDB asynchronously");

let mut rocksdb_builder = RocksdbStorage::builder(self.state_keeper_db_path.as_ref())
.await
.context("Failed creating RocksDB storage builder")?;
let mut connection = self.pool.connection().await?;
let was_recovered_from_snapshot = rocksdb_builder
.ensure_ready(&mut connection, &stop_receiver)
.await
.context("failed initializing state keeper RocksDB from snapshot or scratch")?;
if was_recovered_from_snapshot {
let elapsed = started_at.elapsed();
APP_METRICS.snapshot_recovery_latency[&SnapshotRecoveryStage::StateKeeperCache]
.set(elapsed);
tracing::info!("Recovered state keeper RocksDB from snapshot in {elapsed:?}");
}

let rocksdb = rocksdb_builder
.synchronize(&mut connection, &stop_receiver, self.to_l1_batch_number)
.await
.context("Failed to catch up RocksDB to Postgres")?;
drop(connection);
if let Some(rocksdb) = rocksdb {
self.rocksdb_cell
.set(rocksdb.into_rocksdb())
.map_err(|_| anyhow::anyhow!("Async RocksDB cache was initialized twice"))?;
} else {
tracing::info!("Synchronizing RocksDB interrupted");
}
Ok(())
}
}
4 changes: 4 additions & 0 deletions core/lib/state/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,25 @@ use zksync_types::{
};

mod cache;
mod catchup;
mod in_memory;
mod postgres;
mod rocksdb;
mod shadow_storage;
mod storage_factory;
mod storage_view;
#[cfg(test)]
mod test_utils;
mod witness;

pub use self::{
cache::sequential_cache::SequentialCache,
catchup::AsyncCatchupTask,
in_memory::InMemoryStorage,
postgres::{PostgresStorage, PostgresStorageCaches, PostgresStorageCachesTask},
rocksdb::{RocksdbStorage, RocksdbStorageBuilder, StateKeeperColumnFamily},
shadow_storage::ShadowStorage,
storage_factory::{BatchDiff, PgOrRocksdbStorage, ReadStorageFactory, RocksdbWithMemory},
storage_view::{StorageView, StorageViewMetrics},
witness::WitnessStorage,
};
Expand Down
5 changes: 4 additions & 1 deletion core/lib/state/src/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,10 @@ impl ReadStorage for PostgresStorage<'_> {
let mut dal = self.connection.storage_logs_dedup_dal();
let value = self
.rt_handle
.block_on(dal.get_enumeration_index_for_key(key.hashed_key()));
.block_on(dal.get_enumeration_index_in_l1_batch(
key.hashed_key(),
self.l1_batch_number_for_l2_block,
));
value.expect("failed getting enumeration index for key")
}
}
Loading

0 comments on commit 543f9e9

Please sign in to comment.