Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): save enum indices in RocksDB #162

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions core/bin/external_node/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ pub struct OptionalENConfig {
/// Whether to try running EN with MultiVM.
#[serde(default)]
pub experimental_multivm_support: bool,
/// Number of enum_index migration chunks that State Keeper processes each L1 batch.
perekopskiy marked this conversation as resolved.
Show resolved Hide resolved
#[serde(default = "OptionalENConfig::default_enum_index_migration_chunks")]
pub enum_index_migration_chunks: u16,
}

impl OptionalENConfig {
Expand Down Expand Up @@ -282,6 +285,10 @@ impl OptionalENConfig {
10
}

const fn default_enum_index_migration_chunks() -> u16 {
1
perekopskiy marked this conversation as resolved.
Show resolved Hide resolved
}

pub fn polling_interval(&self) -> Duration {
Duration::from_millis(self.polling_interval)
}
Expand Down
1 change: 1 addition & 0 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ async fn build_state_keeper(
max_allowed_l2_tx_gas_limit,
save_call_traces,
false,
config.optional.enum_index_migration_chunks,
));

let io = Box::new(
Expand Down
9 changes: 9 additions & 0 deletions core/lib/config/src/configs/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ pub struct StateKeeperConfig {
/// Flag which will enable storage to cache witness_inputs during State Keeper's run.
/// NOTE: This will slow down StateKeeper, to be used in non-production environments!
pub upload_witness_inputs_to_gcs: bool,

/// Number of enum_index migration chunks that State Keeper processes each L1 batch.
pub enum_index_migration_chunks: Option<u16>,
}

impl StateKeeperConfig {
Expand All @@ -122,6 +125,10 @@ impl StateKeeperConfig {
default_aa: self.default_aa_hash,
}
}

pub fn enum_index_migration_chunks(&self) -> u16 {
self.enum_index_migration_chunks.unwrap_or(1)
}
}

#[derive(Debug, Deserialize, Clone, PartialEq)]
Expand Down Expand Up @@ -226,6 +233,7 @@ mod tests {
virtual_blocks_interval: 1,
virtual_blocks_per_miniblock: 1,
upload_witness_inputs_to_gcs: false,
enum_index_migration_chunks: Some(2),
},
operations_manager: OperationsManagerConfig {
delay_interval: 100,
Expand Down Expand Up @@ -273,6 +281,7 @@ mod tests {
CHAIN_STATE_KEEPER_VALIDATION_COMPUTATIONAL_GAS_LIMIT="10000000"
CHAIN_STATE_KEEPER_SAVE_CALL_TRACES="false"
CHAIN_STATE_KEEPER_UPLOAD_WITNESS_INPUTS_TO_GCS="false"
CHAIN_STATE_KEEPER_ENUM_INDEX_MIGRATION_CHUNKS="2"
CHAIN_OPERATIONS_MANAGER_DELAY_INTERVAL="100"
CHAIN_MEMPOOL_SYNC_INTERVAL_MS="10"
CHAIN_MEMPOOL_SYNC_BATCH_SIZE="1000"
Expand Down
82 changes: 70 additions & 12 deletions core/lib/dal/sqlx-data.json
Original file line number Diff line number Diff line change
Expand Up @@ -222,19 +222,19 @@
}
],
"nullable": [
false,
false,
false,
false,
false,
false,
false,
false,
true,
true,
true,
true,
true,
true,
true,
true,
true,
true,
true,
true,
true
false,
false,
false,
false
],
"parameters": {
"Left": [
Expand Down Expand Up @@ -9135,6 +9135,38 @@
},
"query": "\n UPDATE prover_jobs_fri\n SET status = 'in_progress', attempts = attempts + 1,\n updated_at = now(), processing_started_at = now(),\n picked_by = $2\n WHERE id = (\n SELECT id\n FROM prover_jobs_fri\n WHERE status = 'queued'\n AND protocol_version = ANY($1)\n ORDER BY aggregation_round DESC, l1_batch_number ASC, id ASC\n LIMIT 1\n FOR UPDATE\n SKIP LOCKED\n )\n RETURNING prover_jobs_fri.id, prover_jobs_fri.l1_batch_number, prover_jobs_fri.circuit_id,\n prover_jobs_fri.aggregation_round, prover_jobs_fri.sequence_number, prover_jobs_fri.depth,\n prover_jobs_fri.is_node_final_proof\n "
},
"d1c82bd0b3c010569937ad7600760fa0c3aca7c9585bbf9598a5c0515b431b26": {
"describe": {
"columns": [
{
"name": "hashed_key",
"ordinal": 0,
"type_info": "Bytea"
},
{
"name": "l1_batch_number",
"ordinal": 1,
"type_info": "Int8"
},
{
"name": "index",
"ordinal": 2,
"type_info": "Int8"
}
],
"nullable": [
false,
false,
false
],
"parameters": {
"Left": [
"ByteaArray"
]
}
},
"query": "SELECT hashed_key, l1_batch_number, index FROM initial_writes WHERE hashed_key = ANY($1::bytea[])"
},
"d5dea31f2a325bb44e8ef2cbbabbeb73fd6996a3e6cb99d62c6b97a4aa49c1ca": {
"describe": {
"columns": [
Expand Down Expand Up @@ -9360,6 +9392,32 @@
},
"query": "SELECT number, timestamp, is_finished, l1_tx_count, l2_tx_count, fee_account_address, bloom, priority_ops_onchain_data, hash, parent_hash, commitment, compressed_write_logs, compressed_contracts, eth_prove_tx_id, eth_commit_tx_id, eth_execute_tx_id, merkle_root_hash, l2_to_l1_logs, l2_to_l1_messages, used_contract_hashes, compressed_initial_writes, compressed_repeated_writes, l2_l1_compressed_messages, l2_l1_merkle_root, l1_gas_price, l2_fair_gas_price, rollup_last_leaf_index, zkporter_is_available, bootloader_code_hash, default_aa_code_hash, base_fee_per_gas, aux_data_hash, pass_through_data_hash, meta_parameters_hash, protocol_version FROM (SELECT l1_batches.*, row_number() OVER (ORDER BY number ASC) AS row_number FROM l1_batches WHERE eth_commit_tx_id IS NOT NULL AND l1_batches.skip_proof = TRUE AND l1_batches.number > $1 ORDER BY number LIMIT $2) inn WHERE number - row_number = $1"
},
"d611418a9f5ea1ab6b1af674ee44a824cd7e68e41f6065868dd66d94af5e0987": {
"describe": {
"columns": [
{
"name": "hashed_key",
"ordinal": 0,
"type_info": "Bytea"
},
{
"name": "index",
"ordinal": 1,
"type_info": "Int8"
}
],
"nullable": [
false,
false
],
"parameters": {
"Left": [
"ByteaArray"
]
}
},
"query": "SELECT hashed_key, index FROM initial_writes WHERE hashed_key = ANY($1)"
},
"d6709f3ce8f08f988e10a0e0fb5c06db9488834a85066babaf3d56cf212b4ea0": {
"describe": {
"columns": [],
Expand Down
52 changes: 44 additions & 8 deletions core/lib/dal/src/storage_logs_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ impl StorageLogsDal<'_, '_> {
pub async fn get_storage_logs_for_revert(
&mut self,
l1_batch_number: L1BatchNumber,
) -> HashMap<H256, Option<H256>> {
) -> HashMap<H256, Option<(H256, u64)>> {
let miniblock_range = self
.storage
.blocks_dal()
Expand All @@ -268,7 +268,9 @@ impl StorageLogsDal<'_, '_> {
// as per `initial_writes`, so if we return such keys from this method, it will lead to
// the incorrect state after revert.
let stage_start = Instant::now();
let l1_batch_by_key = self.get_l1_batches_for_initial_writes(&modified_keys).await;
perekopskiy marked this conversation as resolved.
Show resolved Hide resolved
let l1_batch_and_index_by_key = self
.get_l1_batches_and_indices_for_initial_writes(&modified_keys)
.await;
tracing::info!(
"Loaded initial write info for modified keys in {:?}",
stage_start.elapsed()
Expand All @@ -277,12 +279,12 @@ impl StorageLogsDal<'_, '_> {
let stage_start = Instant::now();
let mut output = HashMap::with_capacity(modified_keys.len());
modified_keys.retain(|key| {
match l1_batch_by_key.get(key) {
match l1_batch_and_index_by_key.get(key) {
None => {
// Key is completely deduped. It should not be present in the output map.
false
}
Some(write_batch) if *write_batch > l1_batch_number => {
Some((write_batch, _)) if *write_batch > l1_batch_number => {
// Key was initially written to after the specified L1 batch.
output.insert(*key, None);
false
Expand All @@ -295,18 +297,24 @@ impl StorageLogsDal<'_, '_> {
stage_start.elapsed()
);

let deduped_count = modified_keys_count - l1_batch_by_key.len();
let deduped_count = modified_keys_count - l1_batch_and_index_by_key.len();
tracing::info!(
"Keys to update: {update_count}, to delete: {delete_count}; {deduped_count} modified keys \
are deduped and will be ignored",
update_count = modified_keys.len(),
delete_count = l1_batch_by_key.len() - modified_keys.len()
delete_count = l1_batch_and_index_by_key.len() - modified_keys.len()
);

let stage_start = Instant::now();
let prev_values_for_updated_keys = self
.get_storage_values(&modified_keys, last_miniblock)
.await;
.await
.into_iter()
.map(|(key, value)| {
let value = value.unwrap(); // We already filtered out keys that weren't touched.
let index = l1_batch_and_index_by_key.get(&key).unwrap().1;
perekopskiy marked this conversation as resolved.
Show resolved Hide resolved
(key, Some((value, index)))
});
tracing::info!(
"Loaded previous values for {} keys in {:?}",
prev_values_for_updated_keys.len(),
Expand Down Expand Up @@ -344,6 +352,34 @@ impl StorageLogsDal<'_, '_> {
.collect()
}

pub async fn get_l1_batches_and_indices_for_initial_writes(
&mut self,
hashed_keys: &[H256],
) -> HashMap<H256, (L1BatchNumber, u64)> {
if hashed_keys.is_empty() {
return HashMap::new(); // Shortcut to save time on communication with DB in the common case
}

let hashed_keys: Vec<_> = hashed_keys.iter().map(H256::as_bytes).collect();
let rows = sqlx::query!(
"SELECT hashed_key, l1_batch_number, index FROM initial_writes \
WHERE hashed_key = ANY($1::bytea[])",
&hashed_keys as &[&[u8]],
)
.fetch_all(self.storage.conn())
.await
.unwrap();

rows.into_iter()
.map(|row| {
(
H256::from_slice(&row.hashed_key),
(L1BatchNumber(row.l1_batch_number as u32), row.index as u64),
)
})
.collect()
}

/// Gets previous values for the specified storage keys before the specified L1 batch number.
///
/// # Return value
Expand Down Expand Up @@ -696,7 +732,7 @@ mod tests {
.await;
assert_eq!(logs_for_revert.len(), 15); // 5 updated + 10 new keys
for log in &logs[5..] {
let prev_value = logs_for_revert[&log.key.hashed_key()].unwrap();
let prev_value = logs_for_revert[&log.key.hashed_key()].unwrap().0;
assert_eq!(prev_value, log.value);
}
for log in &new_logs[5..] {
Expand Down
25 changes: 24 additions & 1 deletion core/lib/dal/src/storage_logs_dedup_dal.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::StorageProcessor;
use sqlx::types::chrono::Utc;
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use zksync_types::{AccountTreeId, Address, L1BatchNumber, LogQuery, StorageKey, H256};
use zksync_utils::u256_to_h256;

Expand Down Expand Up @@ -133,4 +133,27 @@ impl StorageLogsDedupDal<'_, '_> {
.map(|row| H256::from_slice(&row.hashed_key))
.collect()
}

pub async fn enum_indices_for_keys(&mut self, hashed_keys: &[H256]) -> Vec<u64> {
perekopskiy marked this conversation as resolved.
Show resolved Hide resolved
let hashed_keys_order: HashMap<_, _> = hashed_keys
.iter()
.enumerate()
.map(|(index, key)| (key, index))
.collect();
let hashed_keys: Vec<_> = hashed_keys.iter().map(H256::as_bytes).collect();
let mut indices: Vec<(H256, u64)> = sqlx::query!(
"SELECT hashed_key, index FROM initial_writes \
WHERE hashed_key = ANY($1)",
&hashed_keys as &[&[u8]]
)
perekopskiy marked this conversation as resolved.
Show resolved Hide resolved
.fetch_all(self.storage.conn())
.await
.unwrap()
.into_iter()
.map(|row| (H256::from_slice(&row.hashed_key), row.index as u64))
.collect();

indices.sort_by_key(|(hashed_key, _)| hashed_keys_order.get(hashed_key).unwrap());
indices.into_iter().map(|(_, index)| index).collect()
}
}
1 change: 1 addition & 0 deletions core/lib/state/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ anyhow = "1.0"
mini-moka = "0.10.0"
tokio = { version = "1", features = ["rt"] }
tracing = "0.1"
itertools = "0.10.3"

[dev-dependencies]
db_test_macro = { path = "../db_test_macro" }
Expand Down
Loading