From bab099d83d9640c965bc02b32d90cce86a3f53cb Mon Sep 17 00:00:00 2001 From: perekopskiy <53865202+perekopskiy@users.noreply.github.com> Date: Tue, 10 Oct 2023 14:14:47 +0300 Subject: [PATCH] feat(storage): save enum indices in RocksDB (#162) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # What ❔ Enumeration indices now are saved along with values in the same column family. Indices are added gradually for old DB entries. The number of keys processed each L1 batch is configurable. ## Why ❔ Enumeration indices in storage are necessary for boojum upgrade. ## Checklist - [x] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [x] Tests for the changes have been added / updated. - [x] Documentation comments have been added / updated. - [x] Code has been formatted via `zk fmt` and `zk lint`. --- Cargo.lock | 1 + core/bin/external_node/src/config/mod.rs | 7 + core/bin/external_node/src/main.rs | 1 + core/lib/config/src/configs/chain.rs | 9 + core/lib/dal/sqlx-data.json | 58 +-- core/lib/dal/src/storage_logs_dal.rs | 36 +- core/lib/state/Cargo.toml | 1 + core/lib/state/src/in_memory.rs | 31 +- core/lib/state/src/rocksdb/mod.rs | 338 ++++++++++++++++-- core/lib/storage/src/db.rs | 18 +- .../src/metadata_calculator/helpers.rs | 4 +- .../src/metadata_calculator/tests.rs | 18 +- .../src/state_keeper/batch_executor/mod.rs | 4 + .../batch_executor/tests/tester.rs | 17 +- core/lib/zksync_core/src/state_keeper/mod.rs | 1 + 15 files changed, 461 insertions(+), 83 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e5c438cb51ec..6fa750f187e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8055,6 +8055,7 @@ version = "0.1.0" dependencies = [ "anyhow", "db_test_macro", + "itertools", "mini-moka", "rand 0.8.5", "tempfile", diff --git a/core/bin/external_node/src/config/mod.rs b/core/bin/external_node/src/config/mod.rs index 66f4e54ff571..5835e516d07d 100644 --- a/core/bin/external_node/src/config/mod.rs +++ b/core/bin/external_node/src/config/mod.rs @@ -197,6 +197,9 @@ pub struct OptionalENConfig { /// Whether to try running EN with MultiVM. #[serde(default)] pub experimental_multivm_support: bool, + /// Number of keys that is processed by enum_index migration in State Keeper each L1 batch. + #[serde(default = "OptionalENConfig::default_enum_index_migration_chunk_size")] + pub enum_index_migration_chunk_size: usize, } impl OptionalENConfig { @@ -283,6 +286,10 @@ impl OptionalENConfig { 10 } + const fn default_enum_index_migration_chunk_size() -> usize { + 1000 + } + pub fn polling_interval(&self) -> Duration { Duration::from_millis(self.polling_interval) } diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index a65f3178b2ce..7f59f856ae9d 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -76,6 +76,7 @@ async fn build_state_keeper( max_allowed_l2_tx_gas_limit, save_call_traces, false, + config.optional.enum_index_migration_chunk_size, )); let io = Box::new( diff --git a/core/lib/config/src/configs/chain.rs b/core/lib/config/src/configs/chain.rs index afb928716946..ddf0c85e63e5 100644 --- a/core/lib/config/src/configs/chain.rs +++ b/core/lib/config/src/configs/chain.rs @@ -109,6 +109,9 @@ pub struct StateKeeperConfig { /// Flag which will enable storage to cache witness_inputs during State Keeper's run. /// NOTE: This will slow down StateKeeper, to be used in non-production environments! pub upload_witness_inputs_to_gcs: bool, + + /// Number of keys that is processed by enum_index migration in State Keeper each L1 batch. + pub enum_index_migration_chunk_size: Option, } impl StateKeeperConfig { @@ -122,6 +125,10 @@ impl StateKeeperConfig { default_aa: self.default_aa_hash, } } + + pub fn enum_index_migration_chunk_size(&self) -> usize { + self.enum_index_migration_chunk_size.unwrap_or(1_000) + } } #[derive(Debug, Deserialize, Clone, PartialEq)] @@ -226,6 +233,7 @@ mod tests { virtual_blocks_interval: 1, virtual_blocks_per_miniblock: 1, upload_witness_inputs_to_gcs: false, + enum_index_migration_chunk_size: Some(2_000), }, operations_manager: OperationsManagerConfig { delay_interval: 100, @@ -273,6 +281,7 @@ mod tests { CHAIN_STATE_KEEPER_VALIDATION_COMPUTATIONAL_GAS_LIMIT="10000000" CHAIN_STATE_KEEPER_SAVE_CALL_TRACES="false" CHAIN_STATE_KEEPER_UPLOAD_WITNESS_INPUTS_TO_GCS="false" + CHAIN_STATE_KEEPER_ENUM_INDEX_MIGRATION_CHUNK_SIZE="2000" CHAIN_OPERATIONS_MANAGER_DELAY_INTERVAL="100" CHAIN_MEMPOOL_SYNC_INTERVAL_MS="10" CHAIN_MEMPOOL_SYNC_BATCH_SIZE="1000" diff --git a/core/lib/dal/sqlx-data.json b/core/lib/dal/sqlx-data.json index ba14fa8b6968..e6dba044f12b 100644 --- a/core/lib/dal/sqlx-data.json +++ b/core/lib/dal/sqlx-data.json @@ -9099,6 +9099,38 @@ }, "query": "\n UPDATE prover_jobs_fri\n SET status = 'in_progress', attempts = attempts + 1,\n updated_at = now(), processing_started_at = now(),\n picked_by = $2\n WHERE id = (\n SELECT id\n FROM prover_jobs_fri\n WHERE status = 'queued'\n AND protocol_version = ANY($1)\n ORDER BY aggregation_round DESC, l1_batch_number ASC, id ASC\n LIMIT 1\n FOR UPDATE\n SKIP LOCKED\n )\n RETURNING prover_jobs_fri.id, prover_jobs_fri.l1_batch_number, prover_jobs_fri.circuit_id,\n prover_jobs_fri.aggregation_round, prover_jobs_fri.sequence_number, prover_jobs_fri.depth,\n prover_jobs_fri.is_node_final_proof\n " }, + "d1c82bd0b3c010569937ad7600760fa0c3aca7c9585bbf9598a5c0515b431b26": { + "describe": { + "columns": [ + { + "name": "hashed_key", + "ordinal": 0, + "type_info": "Bytea" + }, + { + "name": "l1_batch_number", + "ordinal": 1, + "type_info": "Int8" + }, + { + "name": "index", + "ordinal": 2, + "type_info": "Int8" + } + ], + "nullable": [ + false, + false, + false + ], + "parameters": { + "Left": [ + "ByteaArray" + ] + } + }, + "query": "SELECT hashed_key, l1_batch_number, index FROM initial_writes WHERE hashed_key = ANY($1::bytea[])" + }, "d5dea31f2a325bb44e8ef2cbbabbeb73fd6996a3e6cb99d62c6b97a4aa49c1ca": { "describe": { "columns": [ @@ -9394,32 +9426,6 @@ }, "query": "UPDATE l1_batches SET skip_proof = TRUE WHERE number = $1" }, - "da01d59119023c822cffa5dc226e82b2abd4cbd46d3856d7db16289868a27fa1": { - "describe": { - "columns": [ - { - "name": "hashed_key", - "ordinal": 0, - "type_info": "Bytea" - }, - { - "name": "l1_batch_number", - "ordinal": 1, - "type_info": "Int8" - } - ], - "nullable": [ - false, - false - ], - "parameters": { - "Left": [ - "ByteaArray" - ] - } - }, - "query": "SELECT hashed_key, l1_batch_number FROM initial_writes WHERE hashed_key = ANY($1::bytea[])" - }, "dc16d0fac093a52480b66dfcb5976fb01e6629e8c982c265f2af1d5000090572": { "describe": { "columns": [ diff --git a/core/lib/dal/src/storage_logs_dal.rs b/core/lib/dal/src/storage_logs_dal.rs index 9633dd856e2d..b0424c534fe9 100644 --- a/core/lib/dal/src/storage_logs_dal.rs +++ b/core/lib/dal/src/storage_logs_dal.rs @@ -3,7 +3,7 @@ use sqlx::Row; use std::{collections::HashMap, time::Instant}; -use crate::StorageProcessor; +use crate::{instrument::InstrumentExt, StorageProcessor}; use zksync_types::{ get_code_key, AccountTreeId, Address, L1BatchNumber, MiniblockNumber, StorageKey, StorageLog, FAILED_CONTRACT_DEPLOYMENT_BYTECODE_HASH, H256, @@ -244,7 +244,7 @@ impl StorageLogsDal<'_, '_> { pub async fn get_storage_logs_for_revert( &mut self, l1_batch_number: L1BatchNumber, - ) -> HashMap> { + ) -> HashMap> { let miniblock_range = self .storage .blocks_dal() @@ -268,7 +268,9 @@ impl StorageLogsDal<'_, '_> { // as per `initial_writes`, so if we return such keys from this method, it will lead to // the incorrect state after revert. let stage_start = Instant::now(); - let l1_batch_by_key = self.get_l1_batches_for_initial_writes(&modified_keys).await; + let l1_batch_and_index_by_key = self + .get_l1_batches_and_indices_for_initial_writes(&modified_keys) + .await; tracing::info!( "Loaded initial write info for modified keys in {:?}", stage_start.elapsed() @@ -277,12 +279,12 @@ impl StorageLogsDal<'_, '_> { let stage_start = Instant::now(); let mut output = HashMap::with_capacity(modified_keys.len()); modified_keys.retain(|key| { - match l1_batch_by_key.get(key) { + match l1_batch_and_index_by_key.get(key) { None => { // Key is completely deduped. It should not be present in the output map. false } - Some(write_batch) if *write_batch > l1_batch_number => { + Some((write_batch, _)) if *write_batch > l1_batch_number => { // Key was initially written to after the specified L1 batch. output.insert(*key, None); false @@ -295,18 +297,24 @@ impl StorageLogsDal<'_, '_> { stage_start.elapsed() ); - let deduped_count = modified_keys_count - l1_batch_by_key.len(); + let deduped_count = modified_keys_count - l1_batch_and_index_by_key.len(); tracing::info!( "Keys to update: {update_count}, to delete: {delete_count}; {deduped_count} modified keys \ are deduped and will be ignored", update_count = modified_keys.len(), - delete_count = l1_batch_by_key.len() - modified_keys.len() + delete_count = l1_batch_and_index_by_key.len() - modified_keys.len() ); let stage_start = Instant::now(); let prev_values_for_updated_keys = self .get_storage_values(&modified_keys, last_miniblock) - .await; + .await + .into_iter() + .map(|(key, value)| { + let value = value.unwrap(); // We already filtered out keys that weren't touched. + let index = l1_batch_and_index_by_key[&key].1; + (key, Some((value, index))) + }); tracing::info!( "Loaded previous values for {} keys in {:?}", prev_values_for_updated_keys.len(), @@ -316,20 +324,22 @@ impl StorageLogsDal<'_, '_> { output } - pub async fn get_l1_batches_for_initial_writes( + pub async fn get_l1_batches_and_indices_for_initial_writes( &mut self, hashed_keys: &[H256], - ) -> HashMap { + ) -> HashMap { if hashed_keys.is_empty() { return HashMap::new(); // Shortcut to save time on communication with DB in the common case } let hashed_keys: Vec<_> = hashed_keys.iter().map(H256::as_bytes).collect(); let rows = sqlx::query!( - "SELECT hashed_key, l1_batch_number FROM initial_writes \ + "SELECT hashed_key, l1_batch_number, index FROM initial_writes \ WHERE hashed_key = ANY($1::bytea[])", &hashed_keys as &[&[u8]], ) + .instrument("get_l1_batches_and_indices_for_initial_writes") + .report_latency() .fetch_all(self.storage.conn()) .await .unwrap(); @@ -338,7 +348,7 @@ impl StorageLogsDal<'_, '_> { .map(|row| { ( H256::from_slice(&row.hashed_key), - L1BatchNumber(row.l1_batch_number as u32), + (L1BatchNumber(row.l1_batch_number as u32), row.index as u64), ) }) .collect() @@ -696,7 +706,7 @@ mod tests { .await; assert_eq!(logs_for_revert.len(), 15); // 5 updated + 10 new keys for log in &logs[5..] { - let prev_value = logs_for_revert[&log.key.hashed_key()].unwrap(); + let prev_value = logs_for_revert[&log.key.hashed_key()].unwrap().0; assert_eq!(prev_value, log.value); } for log in &new_logs[5..] { diff --git a/core/lib/state/Cargo.toml b/core/lib/state/Cargo.toml index cf8a09d91590..f89a1707e0e2 100644 --- a/core/lib/state/Cargo.toml +++ b/core/lib/state/Cargo.toml @@ -20,6 +20,7 @@ anyhow = "1.0" mini-moka = "0.10.0" tokio = { version = "1", features = ["rt"] } tracing = "0.1" +itertools = "0.10.3" [dev-dependencies] db_test_macro = { path = "../db_test_macro" } diff --git a/core/lib/state/src/in_memory.rs b/core/lib/state/src/in_memory.rs index e44187e34d95..9fadc8813718 100644 --- a/core/lib/state/src/in_memory.rs +++ b/core/lib/state/src/in_memory.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{hash_map::Entry, BTreeMap, HashMap}; use crate::ReadStorage; use zksync_types::{ @@ -14,8 +14,9 @@ pub const IN_MEMORY_STORAGE_DEFAULT_NETWORK_ID: u32 = 270; /// In-memory storage. #[derive(Debug, Default)] pub struct InMemoryStorage { - pub(crate) state: HashMap, + pub(crate) state: HashMap, pub(crate) factory_deps: HashMap>, + last_enum_index_set: u64, } impl InMemoryStorage { @@ -47,7 +48,7 @@ impl InMemoryStorage { ) -> Self { let system_context_init_log = get_system_context_init_logs(chain_id); - let state = contracts + let state_without_indices: BTreeMap<_, _> = contracts .iter() .flat_map(|contract| { let bytecode_hash = bytecode_hasher(&contract.bytecode); @@ -63,20 +64,36 @@ impl InMemoryStorage { .chain(system_context_init_log) .filter_map(|log| (log.kind == StorageLogKind::Write).then_some((log.key, log.value))) .collect(); + let state: HashMap<_, _> = state_without_indices + .into_iter() + .enumerate() + .map(|(idx, (key, value))| (key, (value, idx as u64 + 1))) + .collect(); let factory_deps = contracts .into_iter() .map(|contract| (bytecode_hasher(&contract.bytecode), contract.bytecode)) .collect(); + + let last_enum_index_set = state.len() as u64; Self { state, factory_deps, + last_enum_index_set, } } /// Sets the storage `value` at the specified `key`. pub fn set_value(&mut self, key: StorageKey, value: StorageValue) { - self.state.insert(key, value); + match self.state.entry(key) { + Entry::Occupied(mut entry) => { + entry.get_mut().0 = value; + } + Entry::Vacant(entry) => { + self.last_enum_index_set += 1; + entry.insert((value, self.last_enum_index_set)); + } + } } /// Stores a factory dependency with the specified `hash` and `bytecode`. @@ -87,7 +104,11 @@ impl InMemoryStorage { impl ReadStorage for &InMemoryStorage { fn read_value(&mut self, key: &StorageKey) -> StorageValue { - self.state.get(key).copied().unwrap_or_default() + self.state + .get(key) + .map(|(value, _)| value) + .copied() + .unwrap_or_default() } fn is_write_initial(&mut self, key: &StorageKey) -> bool { diff --git a/core/lib/state/src/rocksdb/mod.rs b/core/lib/state/src/rocksdb/mod.rs index ab6db6bbc891..8723efd31d52 100644 --- a/core/lib/state/src/rocksdb/mod.rs +++ b/core/lib/state/src/rocksdb/mod.rs @@ -8,18 +8,24 @@ //! - Contracts //! - Factory dependencies //! -//! | Column | Key | Value | Description | -//! | ------------ | ---------------------- | ----------------------- | ------------------------------------ | -//! | State | 'block_number' | serialized block number | Last processed L1 batch number (u32) | -//! | State | hashed `StorageKey` | 32 bytes value | State for the given key | -//! | Contracts | address (20 bytes) | `Vec` | Contract contents | -//! | Factory deps | hash (32 bytes) | `Vec` | Bytecodes for new contracts that a certain contract may deploy. | - -use std::{collections::HashMap, mem, path::Path, time::Instant}; +//! | Column | Key | Value | Description | +//! | ------------ | ------------------------------- | ------------------------------- | ----------------------------------------- | +//! | State | 'block_number' | serialized block number | Last processed L1 batch number (u32) | +//! | State | 'enum_index_migration_cursor' | serialized hashed key or empty | If key is not present it means that the migration hasn't started. | +//! | | | bytes | If value is of length 32 then it represents hashed_key migration should start from. | +//! | | | | If value is empty then it means the migration has finished | +//! | State | hashed `StorageKey` | 32 bytes value ++ 8 bytes index | State value for the given key | +//! | | | (big-endian) | | +//! | Contracts | address (20 bytes) | `Vec` | Contract contents | +//! | Factory deps | hash (32 bytes) | `Vec` | Bytecodes for new contracts that a certain contract may deploy. | + +use itertools::{Either, Itertools}; +use std::{collections::HashMap, convert::TryInto, mem, path::Path, time::Instant}; use zksync_dal::StorageProcessor; use zksync_storage::{db::NamedColumnFamily, RocksDB}; -use zksync_types::{L1BatchNumber, StorageKey, StorageValue, H256}; +use zksync_types::{L1BatchNumber, StorageKey, StorageValue, H256, U256}; +use zksync_utils::{h256_to_u256, u256_to_h256}; mod metrics; @@ -55,15 +61,56 @@ impl NamedColumnFamily for StateKeeperColumnFamily { } } +#[derive(Debug, Clone, Copy)] +struct StateValue { + pub value: H256, + pub enum_index: Option, +} + +impl StateValue { + pub fn new(value: H256, enum_index: Option) -> Self { + Self { value, enum_index } + } + + pub fn deserialize(bytes: &[u8]) -> Self { + if bytes.len() == 32 { + Self { + value: H256::from_slice(bytes), + enum_index: None, + } + } else { + Self { + value: H256::from_slice(&bytes[..32]), + enum_index: Some(u64::from_be_bytes(bytes[32..40].try_into().unwrap())), + } + } + } + + pub fn serialize(&self) -> Vec { + let mut buffer = Vec::with_capacity(40); + buffer.extend_from_slice(self.value.as_bytes()); + if let Some(index) = self.enum_index { + buffer.extend_from_slice(&index.to_be_bytes()); + } + buffer + } +} + /// [`ReadStorage`] implementation backed by RocksDB. #[derive(Debug)] pub struct RocksdbStorage { db: RocksDB, pending_patch: InMemoryStorage, + enum_index_migration_chunk_size: usize, } impl RocksdbStorage { const BLOCK_NUMBER_KEY: &'static [u8] = b"block_number"; + const ENUM_INDEX_MIGRATION_CURSOR: &'static [u8] = b"enum_index_migration_cursor"; + + fn is_special_key(key: &[u8]) -> bool { + key == Self::BLOCK_NUMBER_KEY || key == Self::ENUM_INDEX_MIGRATION_CURSOR + } /// Creates a new storage with the provided RocksDB `path`. pub fn new(path: &Path) -> Self { @@ -71,9 +118,15 @@ impl RocksdbStorage { Self { db, pending_patch: InMemoryStorage::default(), + enum_index_migration_chunk_size: 0, } } + /// Enables enum indices migration. + pub fn enable_enum_index_migration(&mut self, chunk_size: usize) { + self.enum_index_migration_chunk_size = chunk_size; + } + /// Synchronizes this storage with Postgres using the provided connection. /// /// # Panics @@ -108,7 +161,7 @@ impl RocksdbStorage { .storage_logs_dal() .get_touched_slots_for_l1_batch(L1BatchNumber(current_l1_batch_number)) .await; - self.process_transaction_logs(&storage_logs); + self.apply_storage_logs(storage_logs, conn).await; tracing::debug!("loading factory deps for l1 batch {current_l1_batch_number}"); let factory_deps = conn @@ -131,23 +184,141 @@ impl RocksdbStorage { tracing::info!( "Secondary storage for L1 batch #{latest_l1_batch_number} initialized, size is {estimated_size}" ); + + self.save_missing_enum_indices(conn).await; + } + + async fn apply_storage_logs( + &mut self, + storage_logs: HashMap, + conn: &mut StorageProcessor<'_>, + ) { + let (logs_with_known_indices, logs_with_unknown_indices): (Vec<_>, Vec<_>) = self + .process_transaction_logs(storage_logs) + .partition_map(|(key, StateValue { value, enum_index })| match enum_index { + Some(index) => Either::Left((key, (value, index))), + None => Either::Right((key, value)), + }); + let keys_with_unknown_indices: Vec<_> = logs_with_unknown_indices + .iter() + .map(|(key, _)| key.hashed_key()) + .collect(); + + let enum_indices_and_batches = conn + .storage_logs_dal() + .get_l1_batches_and_indices_for_initial_writes(&keys_with_unknown_indices) + .await; + assert_eq!( + keys_with_unknown_indices.len(), + enum_indices_and_batches.len() + ); + self.pending_patch.state = + logs_with_known_indices + .into_iter() + .chain(logs_with_unknown_indices.into_iter().map(|(key, value)| { + (key, (value, enum_indices_and_batches[&key.hashed_key()].1)) + })) + .collect(); + } + + async fn save_missing_enum_indices(&self, conn: &mut StorageProcessor<'_>) { + let (Some(start_from), true) = ( + self.enum_migration_start_from(), + self.enum_index_migration_chunk_size > 0, + ) else { + return; + }; + + let started_at = Instant::now(); + tracing::info!( + "RocksDB enum index migration is not finished, starting from key {start_from:0>64x}" + ); + + let mut write_batch = self.db.new_write_batch(); + let (keys, values): (Vec<_>, Vec<_>) = self + .db + .from_iterator_cf(StateKeeperColumnFamily::State, start_from.as_bytes()) + .filter_map(|(key, value)| { + if Self::is_special_key(&key) { + return None; + } + let state_value = StateValue::deserialize(&value); + (state_value.enum_index.is_none()) + .then(|| (H256::from_slice(&key), state_value.value)) + }) + .take(self.enum_index_migration_chunk_size) + .unzip(); + let enum_indices_and_batches = conn + .storage_logs_dal() + .get_l1_batches_and_indices_for_initial_writes(&keys) + .await; + assert_eq!(keys.len(), enum_indices_and_batches.len()); + + for (key, value) in keys.iter().zip(values) { + let index = enum_indices_and_batches[key].1; + write_batch.put_cf( + StateKeeperColumnFamily::State, + key.as_bytes(), + &StateValue::new(value, Some(index)).serialize(), + ); + } + + let next_key = keys + .last() + .and_then(|last_key| h256_to_u256(*last_key).checked_add(U256::one())) + .map(u256_to_h256); + match (next_key, keys.len()) { + (Some(next_key), keys_len) if keys_len == self.enum_index_migration_chunk_size => { + write_batch.put_cf( + StateKeeperColumnFamily::State, + Self::ENUM_INDEX_MIGRATION_CURSOR, + next_key.as_bytes(), + ); + } + _ => { + write_batch.put_cf( + StateKeeperColumnFamily::State, + Self::ENUM_INDEX_MIGRATION_CURSOR, + &[], + ); + tracing::info!("RocksDB enum index migration finished"); + } + } + self.db + .write(write_batch) + .expect("failed to save state data into rocksdb"); + tracing::info!( + "RocksDB enum index migration chunk took {:?}, migrated {} keys", + started_at.elapsed(), + keys.len() + ); } fn read_value_inner(&self, key: &StorageKey) -> Option { + self.read_state_value(key) + .map(|state_value| state_value.value) + } + + fn read_state_value(&self, key: &StorageKey) -> Option { let cf = StateKeeperColumnFamily::State; self.db .get_cf(cf, &Self::serialize_state_key(key)) .expect("failed to read rocksdb state value") - .map(|value| H256::from_slice(&value)) + .map(|value| StateValue::deserialize(&value)) } - /// Processes storage `logs` produced by transactions. - fn process_transaction_logs(&mut self, updates: &HashMap) { - for (&key, &value) in updates { - if !value.is_zero() || self.read_value_inner(&key).is_some() { - self.pending_patch.state.insert(key, value); + /// Returns storage logs to apply. + fn process_transaction_logs( + &self, + updates: HashMap, + ) -> impl Iterator + '_ { + updates.into_iter().filter_map(|(key, new_value)| { + if let Some(state_value) = self.read_state_value(&key) { + Some((key, StateValue::new(new_value, state_value.enum_index))) + } else { + (!new_value.is_zero()).then_some((key, StateValue::new(new_value, None))) } - } + }) } /// Stores a factory dependency with the specified `hash` and `bytecode`. @@ -206,8 +377,12 @@ impl RocksdbStorage { let cf = StateKeeperColumnFamily::State; for (key, maybe_value) in logs { - if let Some(prev_value) = maybe_value { - batch.put_cf(cf, key.as_bytes(), prev_value.as_bytes()); + if let Some((prev_value, prev_index)) = maybe_value { + batch.put_cf( + cf, + key.as_bytes(), + &StateValue::new(prev_value, Some(prev_index)).serialize(), + ); } else { batch.delete_cf(cf, key.as_bytes()); } @@ -243,8 +418,12 @@ impl RocksdbStorage { Self::BLOCK_NUMBER_KEY, &serialize_block_number(l1_batch_number.0), ); - for (key, value) in pending_patch.state { - batch.put_cf(cf, &Self::serialize_state_key(&key), value.as_ref()); + for (key, (value, enum_index)) in pending_patch.state { + batch.put_cf( + cf, + &Self::serialize_state_key(&key), + &StateValue::new(value, Some(enum_index)).serialize(), + ); } let cf = StateKeeperColumnFamily::FactoryDeps; @@ -279,6 +458,21 @@ impl RocksdbStorage { self.db .estimated_number_of_entries(StateKeeperColumnFamily::State) } + + fn enum_migration_start_from(&self) -> Option { + let value = self + .db + .get_cf( + StateKeeperColumnFamily::State, + Self::ENUM_INDEX_MIGRATION_CURSOR, + ) + .expect("failed to read `ENUM_INDEX_MIGRATION_CURSOR`"); + match value { + Some(v) if v.is_empty() => None, + Some(cursor) => Some(H256::from_slice(&cursor)), + None => Some(H256::zero()), + } + } } impl ReadStorage for RocksdbStorage { @@ -314,11 +508,14 @@ mod tests { async fn rocksdb_storage_basics() { let dir = TempDir::new().expect("cannot create temporary dir for state keeper"); let mut storage = RocksdbStorage::new(dir.path()); - let mut storage_logs = gen_storage_logs(0..20) + let mut storage_logs: HashMap<_, _> = gen_storage_logs(0..20) .into_iter() .map(|log| (log.key, log.value)) .collect(); - storage.process_transaction_logs(&storage_logs); + let changed_keys = storage.process_transaction_logs(storage_logs.clone()); + storage.pending_patch.state = changed_keys + .map(|(key, state_value)| (key, (state_value.value, 1))) // enum index doesn't matter in the test + .collect(); storage.save(L1BatchNumber(0)).await; { for (key, value) in &storage_logs { @@ -331,7 +528,10 @@ mod tests { for log in storage_logs.values_mut().step_by(2) { *log = StorageValue::zero(); } - storage.process_transaction_logs(&storage_logs); + let changed_keys = storage.process_transaction_logs(storage_logs.clone()); + storage.pending_patch.state = changed_keys + .map(|(key, state_value)| (key, (state_value.value, 1))) // enum index doesn't matter in the test + .collect(); storage.save(L1BatchNumber(1)).await; for (key, value) in &storage_logs { @@ -441,4 +641,94 @@ mod tests { } } } + + #[db_test] + async fn rocksdb_enum_index_migration(pool: ConnectionPool) { + let mut conn = pool.access_storage().await.unwrap(); + prepare_postgres(&mut conn).await; + let storage_logs = gen_storage_logs(20..40); + create_miniblock(&mut conn, MiniblockNumber(1), storage_logs.clone()).await; + create_l1_batch(&mut conn, L1BatchNumber(1), &storage_logs).await; + + let enum_indices: HashMap<_, _> = conn + .storage_logs_dedup_dal() + .initial_writes_for_batch(L1BatchNumber(1)) + .await + .into_iter() + .collect(); + + let dir = TempDir::new().expect("cannot create temporary dir for state keeper"); + let mut storage = RocksdbStorage::new(dir.path()); + storage.update_from_postgres(&mut conn).await; + + assert_eq!(storage.l1_batch_number(), L1BatchNumber(2)); + // Check that enum indices are correct after syncing with postgres. + for log in &storage_logs { + let expected_index = enum_indices[&log.key.hashed_key()]; + assert_eq!( + storage.read_state_value(&log.key).unwrap().enum_index, + Some(expected_index) + ); + } + + // Remove enum indices for some keys. + let mut write_batch = storage.db.new_write_batch(); + for log in &storage_logs { + write_batch.put_cf( + StateKeeperColumnFamily::State, + log.key.hashed_key().as_bytes(), + log.value.as_bytes(), + ); + write_batch.delete_cf( + StateKeeperColumnFamily::State, + RocksdbStorage::ENUM_INDEX_MIGRATION_CURSOR, + ); + } + storage.db.write(write_batch).unwrap(); + + // Check that migration works as expected. + let ordered_keys_to_migrate: Vec = storage_logs + .iter() + .map(|log| log.key) + .sorted_by_key(StorageKey::hashed_key) + .collect(); + + storage.enable_enum_index_migration(10); + let start_from = storage.enum_migration_start_from(); + assert_eq!(start_from, Some(H256::zero())); + + // Migrate the first half. + storage.save_missing_enum_indices(&mut conn).await; + for key in ordered_keys_to_migrate.iter().take(10) { + let expected_index = enum_indices[&key.hashed_key()]; + assert_eq!( + storage.read_state_value(key).unwrap().enum_index, + Some(expected_index) + ); + } + assert!(storage + .read_state_value(&ordered_keys_to_migrate[10]) + .unwrap() + .enum_index + .is_none()); + + // Migrate the second half. + storage.save_missing_enum_indices(&mut conn).await; + for key in ordered_keys_to_migrate.iter().skip(10) { + let expected_index = enum_indices[&key.hashed_key()]; + assert_eq!( + storage.read_state_value(key).unwrap().enum_index, + Some(expected_index) + ); + } + + // 20 keys were processed but we haven't checked that no keys to migrate are left. + let start_from = storage.enum_migration_start_from(); + assert!(start_from.is_some()); + + // Check that migration will be marked as completed after the next iteration. + storage.save_missing_enum_indices(&mut conn).await; + let start_from = storage.enum_migration_start_from(); + assert!(start_from.is_none()); + } } diff --git a/core/lib/storage/src/db.rs b/core/lib/storage/src/db.rs index c1cad72226db..44808af94581 100644 --- a/core/lib/storage/src/db.rs +++ b/core/lib/storage/src/db.rs @@ -1,6 +1,6 @@ use rocksdb::{ properties, BlockBasedOptions, Cache, ColumnFamily, ColumnFamilyDescriptor, DBPinnableSlice, - IteratorMode, Options, PrefixRange, ReadOptions, WriteOptions, DB, + Direction, IteratorMode, Options, PrefixRange, ReadOptions, WriteOptions, DB, }; use std::ffi::CStr; @@ -329,6 +329,22 @@ impl RocksDB { // We panic on RocksDB errors elsewhere and fuse it to prevent polling after the end of the range. // Thus, `unwrap()` should be safe. } + + /// Iterates over key-value pairs in the specified column family `cf` in the lexical + /// key order starting from the given `key_from`. + pub fn from_iterator_cf( + &self, + cf: CF, + key_from: &[u8], + ) -> impl Iterator, Box<[u8]>)> + '_ { + let cf = self.column_family(cf); + self.inner + .db + .iterator_cf(cf, IteratorMode::From(key_from, Direction::Forward)) + .map(Result::unwrap) + .fuse() + // ^ unwrap() is safe for the same reasons as in `prefix_iterator_cf()`. + } } impl RocksDB<()> { diff --git a/core/lib/zksync_core/src/metadata_calculator/helpers.rs b/core/lib/zksync_core/src/metadata_calculator/helpers.rs index 0abcc30c6444..26707c731178 100644 --- a/core/lib/zksync_core/src/metadata_calculator/helpers.rs +++ b/core/lib/zksync_core/src/metadata_calculator/helpers.rs @@ -244,7 +244,7 @@ impl L1BatchWithLogs { let latency = LoadChangesStage::InitialWritesForZeroValues.start(); let l1_batches_for_initial_writes = storage .storage_logs_dal() - .get_l1_batches_for_initial_writes(&hashed_keys_for_zero_values) + .get_l1_batches_and_indices_for_initial_writes(&hashed_keys_for_zero_values) .await; latency.report_with_count(hashed_keys_for_zero_values.len()); @@ -252,7 +252,7 @@ impl L1BatchWithLogs { let write_matters = if value.is_zero() { let initial_write_batch_for_key = l1_batches_for_initial_writes.get(&storage_key.hashed_key()); - initial_write_batch_for_key.map_or(false, |&number| number <= l1_batch_number) + initial_write_batch_for_key.map_or(false, |&(number, _)| number <= l1_batch_number) } else { true }; diff --git a/core/lib/zksync_core/src/metadata_calculator/tests.rs b/core/lib/zksync_core/src/metadata_calculator/tests.rs index e5e6e1f43ba5..269b48f28208 100644 --- a/core/lib/zksync_core/src/metadata_calculator/tests.rs +++ b/core/lib/zksync_core/src/metadata_calculator/tests.rs @@ -669,12 +669,12 @@ async fn deduplication_works_as_expected(pool: ConnectionPool) { let initial_writes = storage .storage_logs_dal() - .get_l1_batches_for_initial_writes(&hashed_keys) + .get_l1_batches_and_indices_for_initial_writes(&hashed_keys) .await; assert_eq!(initial_writes.len(), hashed_keys.len()); assert!(initial_writes .values() - .all(|&batch| batch == L1BatchNumber(1))); + .all(|&(batch, _)| batch == L1BatchNumber(1))); let mut new_logs = gen_storage_logs(120..140, 1).pop().unwrap(); let new_hashed_keys: Vec<_> = new_logs.iter().map(|log| log.key.hashed_key()).collect(); @@ -688,21 +688,21 @@ async fn deduplication_works_as_expected(pool: ConnectionPool) { // Initial writes for previously inserted keys should not change. let initial_writes = storage .storage_logs_dal() - .get_l1_batches_for_initial_writes(&hashed_keys) + .get_l1_batches_and_indices_for_initial_writes(&hashed_keys) .await; assert_eq!(initial_writes.len(), hashed_keys.len()); assert!(initial_writes .values() - .all(|&batch| batch == L1BatchNumber(1))); + .all(|&(batch, _)| batch == L1BatchNumber(1))); let initial_writes = storage .storage_logs_dal() - .get_l1_batches_for_initial_writes(&new_hashed_keys) + .get_l1_batches_and_indices_for_initial_writes(&new_hashed_keys) .await; assert_eq!(initial_writes.len(), new_hashed_keys.len()); assert!(initial_writes .values() - .all(|&batch| batch == L1BatchNumber(2))); + .all(|&(batch, _)| batch == L1BatchNumber(2))); let mut no_op_logs = gen_storage_logs(140..160, 1).pop().unwrap(); let no_op_hashed_keys: Vec<_> = no_op_logs.iter().map(|log| log.key.hashed_key()).collect(); @@ -713,7 +713,7 @@ async fn deduplication_works_as_expected(pool: ConnectionPool) { let initial_writes = storage .storage_logs_dal() - .get_l1_batches_for_initial_writes(&no_op_hashed_keys) + .get_l1_batches_and_indices_for_initial_writes(&no_op_hashed_keys) .await; assert!(initial_writes.is_empty()); @@ -730,10 +730,10 @@ async fn deduplication_works_as_expected(pool: ConnectionPool) { let initial_writes = storage .storage_logs_dal() - .get_l1_batches_for_initial_writes(&no_op_hashed_keys) + .get_l1_batches_and_indices_for_initial_writes(&no_op_hashed_keys) .await; assert_eq!(initial_writes.len(), no_op_hashed_keys.len() / 2); for key in no_op_hashed_keys.iter().step_by(2) { - assert_eq!(initial_writes[key], L1BatchNumber(4)); + assert_eq!(initial_writes[key].0, L1BatchNumber(4)); } } diff --git a/core/lib/zksync_core/src/state_keeper/batch_executor/mod.rs b/core/lib/zksync_core/src/state_keeper/batch_executor/mod.rs index de4b0289cea0..5ebb14f99b78 100644 --- a/core/lib/zksync_core/src/state_keeper/batch_executor/mod.rs +++ b/core/lib/zksync_core/src/state_keeper/batch_executor/mod.rs @@ -84,6 +84,7 @@ pub struct MainBatchExecutorBuilder { save_call_traces: bool, max_allowed_tx_gas_limit: U256, upload_witness_inputs_to_gcs: bool, + enum_index_migration_chunk_size: usize, } impl MainBatchExecutorBuilder { @@ -93,6 +94,7 @@ impl MainBatchExecutorBuilder { max_allowed_tx_gas_limit: U256, save_call_traces: bool, upload_witness_inputs_to_gcs: bool, + enum_index_migration_chunk_size: usize, ) -> Self { Self { state_keeper_db_path, @@ -100,6 +102,7 @@ impl MainBatchExecutorBuilder { save_call_traces, max_allowed_tx_gas_limit, upload_witness_inputs_to_gcs, + enum_index_migration_chunk_size, } } } @@ -112,6 +115,7 @@ impl L1BatchExecutorBuilder for MainBatchExecutorBuilder { system_env: SystemEnv, ) -> BatchExecutorHandle { let mut secondary_storage = RocksdbStorage::new(self.state_keeper_db_path.as_ref()); + secondary_storage.enable_enum_index_migration(self.enum_index_migration_chunk_size); let mut conn = self .pool .access_storage_tagged("state_keeper") diff --git a/core/lib/zksync_core/src/state_keeper/batch_executor/tests/tester.rs b/core/lib/zksync_core/src/state_keeper/batch_executor/tests/tester.rs index d41b0c98a82a..27e59c0110e5 100644 --- a/core/lib/zksync_core/src/state_keeper/batch_executor/tests/tester.rs +++ b/core/lib/zksync_core/src/state_keeper/batch_executor/tests/tester.rs @@ -172,16 +172,27 @@ impl Tester { address, ); let value = u256_to_h256(eth_amount); - let storage_logs = vec![StorageLog::new_write_log(key, value)]; + let storage_log = StorageLog::new_write_log(key, value); storage .storage_logs_dal() - .append_storage_logs(MiniblockNumber(0), &[(H256::zero(), storage_logs.clone())]) + .append_storage_logs(MiniblockNumber(0), &[(H256::zero(), vec![storage_log])]) .await; storage .storage_dal() - .apply_storage_logs(&[(H256::zero(), storage_logs)]) + .apply_storage_logs(&[(H256::zero(), vec![storage_log])]) .await; + if storage + .storage_logs_dedup_dal() + .filter_written_slots(&[storage_log.key.hashed_key()]) + .await + .is_empty() + { + storage + .storage_logs_dedup_dal() + .insert_initial_writes(L1BatchNumber(0), &[storage_log.key]) + .await + } } } } diff --git a/core/lib/zksync_core/src/state_keeper/mod.rs b/core/lib/zksync_core/src/state_keeper/mod.rs index 8eef5d6adbc3..5d5b2dac7a35 100644 --- a/core/lib/zksync_core/src/state_keeper/mod.rs +++ b/core/lib/zksync_core/src/state_keeper/mod.rs @@ -59,6 +59,7 @@ where state_keeper_config.max_allowed_l2_tx_gas_limit.into(), state_keeper_config.save_call_traces, state_keeper_config.upload_witness_inputs_to_gcs, + state_keeper_config.enum_index_migration_chunk_size(), ); let io = MempoolIO::new(