From 1307e1392a8184ba2083d54f50878205c8240557 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Fri, 19 Apr 2024 15:56:04 +0300 Subject: [PATCH 01/14] Propagate errors in block reverter --- Cargo.lock | 1 + core/bin/block_reverter/src/main.rs | 16 +- core/bin/external_node/src/main.rs | 4 +- core/node/block_reverter/Cargo.toml | 3 +- core/node/block_reverter/src/lib.rs | 380 ++++++++++++++++------------ 5 files changed, 232 insertions(+), 172 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a1d3481d0009..bdbea0c2e18f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8132,6 +8132,7 @@ dependencies = [ name = "zksync_block_reverter" version = "0.1.0" dependencies = [ + "anyhow", "bitflags 1.3.2", "serde", "tokio", diff --git a/core/bin/block_reverter/src/main.rs b/core/bin/block_reverter/src/main.rs index b8e1d8a3eb23..2e506c78b871 100644 --- a/core/bin/block_reverter/src/main.rs +++ b/core/bin/block_reverter/src/main.rs @@ -104,11 +104,11 @@ async fn main() -> anyhow::Result<()> { operator_address, .. } = &command { - Some(operator_address) + Some(*operator_address) } else { None }; - let config = BlockReverterEthConfig::new(eth_sender, contracts, operator_address.copied()); + let config = BlockReverterEthConfig::new(eth_sender, contracts, operator_address)?; let connection_pool = ConnectionPool::::builder( postgres_config.master_url()?, @@ -128,9 +128,9 @@ async fn main() -> anyhow::Result<()> { match command { Command::Display { json, .. } => { - let suggested_values = block_reverter.suggested_values().await; + let suggested_values = block_reverter.suggested_values().await?; if json { - println!("{}", serde_json::to_string(&suggested_values).unwrap()); + println!("{}", serde_json::to_string(&suggested_values)?); } else { println!("Suggested values for rollback: {:#?}", suggested_values); } @@ -148,7 +148,7 @@ async fn main() -> anyhow::Result<()> { priority_fee_per_gas, nonce, ) - .await + .await?; } Command::RollbackDB { l1_batch_number, @@ -200,9 +200,11 @@ async fn main() -> anyhow::Result<()> { block_reverter .rollback_db(L1BatchNumber(l1_batch_number), flags) - .await + .await?; + } + Command::ClearFailedL1Transactions => { + block_reverter.clear_failed_l1_transactions().await?; } - Command::ClearFailedL1Transactions => block_reverter.clear_failed_l1_transactions().await, } Ok(()) } diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index 80da90c89c0f..d620ca0f40b7 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -954,7 +954,7 @@ async fn run_node( tracing::info!("Rolling back to l1 batch number {last_correct_l1_batch}"); reverter .rollback_db(last_correct_l1_batch, BlockReverterFlags::all()) - .await; + .await?; tracing::info!("Rollback successfully completed"); } Err(err) => return Err(err).context("reorg_detector.check_consistency()"), @@ -974,7 +974,7 @@ async fn run_node( tracing::info!("Rolling back to l1 batch number {sealed_l1_batch_number}"); reverter .rollback_db(sealed_l1_batch_number, BlockReverterFlags::all()) - .await; + .await?; tracing::info!("Rollback successfully completed"); } diff --git a/core/node/block_reverter/Cargo.toml b/core/node/block_reverter/Cargo.toml index bcbfec519b87..4b345b768de3 100644 --- a/core/node/block_reverter/Cargo.toml +++ b/core/node/block_reverter/Cargo.toml @@ -19,7 +19,8 @@ zksync_eth_signer.workspace = true zksync_state.workspace = true zksync_merkle_tree.workspace = true -tokio = { workspace = true, features = ["time"] } +anyhow.workspace = true +tokio = { workspace = true, features = ["time", "fs"] } bitflags.workspace = true serde.workspace = true tracing.workspace = true diff --git a/core/node/block_reverter/src/lib.rs b/core/node/block_reverter/src/lib.rs index 1645bfbd8b5e..4041cfff793b 100644 --- a/core/node/block_reverter/src/lib.rs +++ b/core/node/block_reverter/src/lib.rs @@ -1,8 +1,9 @@ use std::{path::Path, time::Duration}; +use anyhow::Context as _; use bitflags::bitflags; use serde::Serialize; -use tokio::time::sleep; +use tokio::fs; use zksync_config::{ContractsConfig, EthConfig}; use zksync_contracts::zksync_contract; use zksync_dal::{ConnectionPool, Core, CoreDal}; @@ -32,7 +33,7 @@ bitflags! { /// Flag determining whether the reverter is allowed to revert the state /// past the last batch finalized on L1. If this flag is set to `Disallowed`, -/// block reverter will panic upon such an attempt. +/// block reverter will error upon such an attempt. /// /// Main use case for the `Allowed` flag is the external node, where may obtain an /// incorrect state even for a block that was marked as executed. On the EN, this mode is not destructive. @@ -57,12 +58,15 @@ impl BlockReverterEthConfig { eth_config: EthConfig, contract: ContractsConfig, reverter_address: Option
, - ) -> Self { + ) -> anyhow::Result { #[allow(deprecated)] // `BlockReverter` doesn't support non env configs yet - let pk = eth_config.sender.expect("eth_sender_config").private_key(); + let pk = eth_config + .sender + .context("eth_sender_config")? + .private_key(); - Self { + Ok(Self { eth_client_url: eth_config.web3_url, reverter_private_key: pk, reverter_address, @@ -70,9 +74,9 @@ impl BlockReverterEthConfig { validator_timelock_addr: contract.validator_timelock_addr, default_priority_fee_per_gas: eth_config .gas_adjuster - .expect("gas adjuster") + .context("gas adjuster")? .default_priority_fee_per_gas, - } + }) } } @@ -132,7 +136,7 @@ impl BlockReverter { &self, last_l1_batch_to_keep: L1BatchNumber, flags: BlockReverterFlags, - ) { + ) -> anyhow::Result<()> { let rollback_tree = flags.contains(BlockReverterFlags::TREE); let rollback_postgres = flags.contains(BlockReverterFlags::POSTGRES); let rollback_sk_cache = flags.contains(BlockReverterFlags::SK_CACHE); @@ -145,21 +149,20 @@ impl BlockReverter { let last_executed_l1_batch = storage .blocks_dal() .get_number_of_last_l1_batch_executed_on_eth() - .await - .unwrap() - .expect("failed to get last executed L1 batch"); - assert!( - last_l1_batch_to_keep >= last_executed_l1_batch, - "Attempt to revert already executed L1 batches" + .await?; + anyhow::ensure!( + Some(last_l1_batch_to_keep) >= last_executed_l1_batch, + "Attempt to revert already executed L1 batches; the last executed batch is: {last_executed_l1_batch:?}" ); } // Tree needs to be reverted first to keep state recoverable self.rollback_rocks_dbs(last_l1_batch_to_keep, rollback_tree, rollback_sk_cache) - .await; + .await?; if rollback_postgres { - self.rollback_postgres(last_l1_batch_to_keep).await; + self.rollback_postgres(last_l1_batch_to_keep).await?; } + Ok(()) } async fn rollback_rocks_dbs( @@ -167,192 +170,229 @@ impl BlockReverter { last_l1_batch_to_keep: L1BatchNumber, rollback_tree: bool, rollback_sk_cache: bool, - ) { + ) -> anyhow::Result<()> { if rollback_tree { let storage_root_hash = self .connection_pool .connection() - .await - .unwrap() + .await? .blocks_dal() .get_l1_batch_state_root(last_l1_batch_to_keep) - .await - .unwrap() - .expect("failed to fetch root hash for target L1 batch"); + .await? + .context("failed to fetch root hash for target L1 batch")?; // Rolling back Merkle tree let merkle_tree_path = Path::new(&self.merkle_tree_path); - if merkle_tree_path.exists() { - tracing::info!("Rolling back Merkle tree..."); - Self::rollback_new_tree(last_l1_batch_to_keep, merkle_tree_path, storage_root_hash); + let merkle_tree_exists = fs::try_exists(merkle_tree_path).await.with_context(|| { + format!( + "cannot check whether Merkle tree path `{}` exists", + merkle_tree_path.display() + ) + })?; + if merkle_tree_exists { + tracing::info!("Reverting Merkle tree at {}", merkle_tree_path.display()); + let merkle_tree_path = merkle_tree_path.to_path_buf(); + tokio::task::spawn_blocking(move || { + Self::revert_tree_blocking( + last_l1_batch_to_keep, + &merkle_tree_path, + storage_root_hash, + ) + }) + .await + .context("reverting Merkle tree panicked")??; } else { - tracing::info!("Merkle tree not found; skipping"); + tracing::info!( + "Merkle tree not found at `{}`; skipping", + merkle_tree_path.display() + ); } } if rollback_sk_cache { - assert!( - Path::new(&self.state_keeper_cache_path).exists(), - "Path with state keeper cache DB doesn't exist" + let sk_cache_exists = fs::try_exists(&self.state_keeper_cache_path) + .await + .with_context(|| { + format!( + "cannot check whether state keeper cache path `{}` exists", + self.state_keeper_cache_path + ) + })?; + anyhow::ensure!( + sk_cache_exists, + "Path with state keeper cache DB doesn't exist at {}", + self.state_keeper_cache_path ); self.rollback_state_keeper_cache(last_l1_batch_to_keep) - .await; + .await?; } + Ok(()) } - fn rollback_new_tree( + fn revert_tree_blocking( last_l1_batch_to_keep: L1BatchNumber, path: &Path, storage_root_hash: H256, - ) { - let db = RocksDB::new(path).expect("Failed initializing RocksDB for Merkle tree"); + ) -> anyhow::Result<()> { + let db = RocksDB::new(path).context("failed initializing RocksDB for Merkle tree")?; let mut tree = ZkSyncTree::new_lightweight(db.into()); if tree.next_l1_batch_number() <= last_l1_batch_to_keep { tracing::info!("Tree is behind the L1 batch to revert to; skipping"); - return; + return Ok(()); } tree.revert_logs(last_l1_batch_to_keep); - tracing::info!("checking match of the tree root hash and root hash from Postgres..."); - assert_eq!(tree.root_hash(), storage_root_hash); - tracing::info!("saving tree changes to disk..."); + tracing::info!("Checking match of the tree root hash and root hash from Postgres"); + let tree_root_hash = tree.root_hash(); + anyhow::ensure!( + tree_root_hash == storage_root_hash, + "Mismatch between the tree root hash {tree_root_hash:?} and storage root hash {storage_root_hash:?} after revert" + ); + tracing::info!("Saving tree changes to disk"); tree.save(); + Ok(()) } /// Reverts blocks in the state keeper cache. - async fn rollback_state_keeper_cache(&self, last_l1_batch_to_keep: L1BatchNumber) { - tracing::info!("opening DB with state keeper cache..."); + async fn rollback_state_keeper_cache( + &self, + last_l1_batch_to_keep: L1BatchNumber, + ) -> anyhow::Result<()> { + tracing::info!( + "Opening DB with state keeper cache at `{}`", + self.state_keeper_cache_path + ); let sk_cache = RocksdbStorage::builder(self.state_keeper_cache_path.as_ref()) .await - .expect("Failed initializing state keeper cache"); + .context("failed initializing state keeper cache")?; if sk_cache.l1_batch_number().await > Some(last_l1_batch_to_keep + 1) { - let mut storage = self.connection_pool.connection().await.unwrap(); + let mut storage = self.connection_pool.connection().await?; tracing::info!("Rolling back state keeper cache..."); sk_cache .rollback(&mut storage, last_l1_batch_to_keep) .await - .expect("Failed rolling back state keeper cache"); + .context("failed rolling back state keeper cache")?; } else { tracing::info!("Nothing to revert in state keeper cache"); } + Ok(()) } /// Reverts data in the Postgres database. /// If `node_role` is `Main` a consensus hard-fork is performed. - async fn rollback_postgres(&self, last_l1_batch_to_keep: L1BatchNumber) { - tracing::info!("rolling back postgres data..."); - let mut storage = self.connection_pool.connection().await.unwrap(); - let mut transaction = storage.start_transaction().await.unwrap(); + async fn rollback_postgres(&self, last_l1_batch_to_keep: L1BatchNumber) -> anyhow::Result<()> { + tracing::info!("Rolling back postgres data"); + let mut storage = self.connection_pool.connection().await?; + let mut transaction = storage.start_transaction().await?; - let (_, last_miniblock_to_keep) = transaction + let (_, last_l2_block_to_keep) = transaction .blocks_dal() .get_l2_block_range_of_l1_batch(last_l1_batch_to_keep) - .await - .unwrap() - .expect("L1 batch should contain at least one miniblock"); + .await? + .with_context(|| { + format!("L1 batch #{last_l1_batch_to_keep} doesn't contain L2 blocks") + })?; - tracing::info!("rolling back transactions state..."); + tracing::info!("Rolling back transactions state"); transaction .transactions_dal() - .reset_transactions_state(last_miniblock_to_keep) - .await - .expect("failed resetting transaction state"); - tracing::info!("rolling back events..."); + .reset_transactions_state(last_l2_block_to_keep) + .await?; + tracing::info!("Rolling back events"); transaction .events_dal() - .rollback_events(last_miniblock_to_keep) - .await - .expect("failed rolling back events"); - tracing::info!("rolling back l2 to l1 logs..."); + .rollback_events(last_l2_block_to_keep) + .await?; + tracing::info!("Rolling back L2 to L1 logs"); transaction .events_dal() - .rollback_l2_to_l1_logs(last_miniblock_to_keep) - .await - .expect("failed rolling back L2-to-L1 logs"); - tracing::info!("rolling back created tokens..."); + .rollback_l2_to_l1_logs(last_l2_block_to_keep) + .await?; + tracing::info!("Rolling back created tokens"); transaction .tokens_dal() - .rollback_tokens(last_miniblock_to_keep) - .await - .expect("failed rolling back created tokens"); - tracing::info!("rolling back factory deps...."); + .rollback_tokens(last_l2_block_to_keep) + .await?; + tracing::info!("Rolling back factory deps"); transaction .factory_deps_dal() - .rollback_factory_deps(last_miniblock_to_keep) - .await - .expect("Failed rolling back factory dependencies"); - tracing::info!("rolling back storage..."); + .rollback_factory_deps(last_l2_block_to_keep) + .await?; + tracing::info!("Rolling back storage"); #[allow(deprecated)] transaction .storage_logs_dal() - .rollback_storage(last_miniblock_to_keep) - .await - .expect("failed rolling back storage"); - tracing::info!("rolling back storage logs..."); + .rollback_storage(last_l2_block_to_keep) + .await?; + tracing::info!("Rolling back storage logs"); transaction .storage_logs_dal() - .rollback_storage_logs(last_miniblock_to_keep) - .await - .unwrap(); - tracing::info!("rolling back eth_txs..."); + .rollback_storage_logs(last_l2_block_to_keep) + .await?; + tracing::info!("Rolling back Ethereum transactions"); transaction .eth_sender_dal() .delete_eth_txs(last_l1_batch_to_keep) - .await - .unwrap(); - tracing::info!("rolling back l1 batches..."); + .await?; + tracing::info!("Rolling back L1 batches"); transaction .blocks_dal() .delete_l1_batches(last_l1_batch_to_keep) - .await - .unwrap(); + .await?; transaction .blocks_dal() .delete_initial_writes(last_l1_batch_to_keep) - .await - .unwrap(); - tracing::info!("rolling back miniblocks..."); + .await?; + tracing::info!("Rolling back L2 blocks..."); transaction .blocks_dal() - .delete_l2_blocks(last_miniblock_to_keep) - .await - .unwrap(); + .delete_l2_blocks(last_l2_block_to_keep) + .await?; + if self.node_role == NodeRole::Main { - tracing::info!("performing consensus hard fork"); - transaction.consensus_dal().fork().await.unwrap(); + tracing::info!("Performing consensus hard fork"); + transaction.consensus_dal().fork().await?; } - transaction.commit().await.unwrap(); + + transaction.commit().await?; + Ok(()) } - /// Sends revert transaction to L1. + /// Sends a revert transaction to L1. pub async fn send_ethereum_revert_transaction( &self, last_l1_batch_to_keep: L1BatchNumber, priority_fee_per_gas: U256, nonce: u64, - ) { + ) -> anyhow::Result<()> { let eth_config = self .eth_config .as_ref() - .expect("eth_config is not provided"); + .context("eth_config is not provided")?; - let web3 = Web3::new(Http::new(ð_config.eth_client_url).unwrap()); + let web3 = + Web3::new(Http::new(ð_config.eth_client_url).context("cannot create L1 client")?); let contract = zksync_contract(); let signer = PrivateKeySigner::new( eth_config .reverter_private_key - .expect("Private key is required to send revert transaction"), + .context("private key is required to send revert transaction")?, ); - let chain_id = web3.eth().chain_id().await.unwrap().as_u64(); + let chain_id = web3 + .eth() + .chain_id() + .await + .context("failed getting L1 chain ID")? + .as_u64(); let revert_function = contract .function("revertBlocks") .or_else(|_| contract.function("revertBatches")) - .expect( + .context( "Either `revertBlocks` or `revertBatches` function must be present in contract", - ); + )?; let data = revert_function .encode_input(&[Token::Uint(last_l1_batch_to_keep.0.into())]) .unwrap(); @@ -361,8 +401,13 @@ impl BlockReverter { .eth() .block(BlockId::Number(BlockNumber::Pending)) .await - .unwrap() - .map(|block| block.base_fee_per_gas.unwrap()); + .context("failed getting pending L1 block")? + .map(|block| { + block + .base_fee_per_gas + .context("no base_fee_per_gas in pending block") + }) + .transpose()?; let base_fee = if let Some(base_fee) = base_fee { base_fee } else { @@ -370,10 +415,10 @@ impl BlockReverter { web3.eth() .block(BlockId::Number(BlockNumber::Latest)) .await - .unwrap() - .unwrap() + .context("failed geting latest L1 block")? + .context("no latest L1 block")? .base_fee_per_gas - .unwrap() + .context("no base_fee_per_gas in latest block")? }; let tx = TransactionParameters { @@ -387,105 +432,116 @@ impl BlockReverter { ..Default::default() }; - let signed_tx = signer.sign_transaction(tx).await.unwrap(); + let signed_tx = signer + .sign_transaction(tx) + .await + .context("cannot sign revert transaction")?; let hash = web3 .eth() .send_raw_transaction(signed_tx.into()) .await - .unwrap(); + .context("failed sending revert transaction")?; + tracing::info!("Sent revert transaction to L1 with hash {hash:?}"); loop { - if let Some(receipt) = web3.eth().transaction_receipt(hash).await.unwrap() { - assert_eq!(receipt.status, Some(1.into()), "revert transaction failed"); - tracing::info!("revert transaction has completed"); - return; + let maybe_receipt = web3 + .eth() + .transaction_receipt(hash) + .await + .context("failed getting receipt for revert transaction")?; + if let Some(receipt) = maybe_receipt { + anyhow::ensure!( + receipt.status == Some(1.into()), + "Revert transaction {hash:?} failed with status {:?}", + receipt.status + ); + tracing::info!("Revert transaction has completed"); + return Ok(()); } else { tracing::info!("waiting for L1 transaction confirmation..."); - sleep(Duration::from_secs(5)).await; + tokio::time::sleep(Duration::from_secs(5)).await; } } } - async fn get_l1_batch_number_from_contract(&self, op: AggregatedActionType) -> L1BatchNumber { + #[tracing::instrument(skip(contract), err, fields(contract.address = ?contract.address()))] + async fn get_l1_batch_number_from_contract( + contract: &Contract, + op: AggregatedActionType, + ) -> anyhow::Result { let function_name = match op { AggregatedActionType::Commit => "getTotalBatchesCommitted", AggregatedActionType::PublishProofOnchain => "getTotalBatchesVerified", AggregatedActionType::Execute => "getTotalBatchesExecuted", }; - let eth_config = self - .eth_config - .as_ref() - .expect("eth_config is not provided"); - - let web3 = Web3::new(Http::new(ð_config.eth_client_url).unwrap()); - let contract = { - let abi = zksync_contract(); - let contract_address = eth_config.diamond_proxy_addr; - Contract::new(web3.eth(), contract_address, abi) - }; - let block_number: U256 = contract .query(function_name, (), None, Options::default(), None) .await - .unwrap(); - - L1BatchNumber(block_number.as_u32()) + .with_context(|| { + format!( + "failed calling `{function_name}` for contract {:?}", + contract.address() + ) + })?; + Ok(L1BatchNumber(block_number.as_u32())) } /// Returns suggested values for rollback. - pub async fn suggested_values(&self) -> SuggestedRollbackValues { - let last_committed_l1_batch_number = self - .get_l1_batch_number_from_contract(AggregatedActionType::Commit) - .await; - let last_verified_l1_batch_number = self - .get_l1_batch_number_from_contract(AggregatedActionType::PublishProofOnchain) - .await; - let last_executed_l1_batch_number = self - .get_l1_batch_number_from_contract(AggregatedActionType::Execute) - .await; + pub async fn suggested_values(&self) -> anyhow::Result { + let eth_config = self + .eth_config + .as_ref() + .context("eth_config is not provided")?; + let web3 = + Web3::new(Http::new(ð_config.eth_client_url).context("cannot create L1 client")?); + let contract_address = eth_config.diamond_proxy_addr; + let contract = Contract::new(web3.eth(), contract_address, zksync_contract()); + + let last_committed_l1_batch_number = + Self::get_l1_batch_number_from_contract(&contract, AggregatedActionType::Commit) + .await?; + let last_verified_l1_batch_number = Self::get_l1_batch_number_from_contract( + &contract, + AggregatedActionType::PublishProofOnchain, + ) + .await?; + let last_executed_l1_batch_number = + Self::get_l1_batch_number_from_contract(&contract, AggregatedActionType::Execute) + .await?; + tracing::info!( "Last L1 batch numbers on contract: committed {last_committed_l1_batch_number}, \ verified {last_verified_l1_batch_number}, executed {last_executed_l1_batch_number}" ); - let eth_config = self - .eth_config - .as_ref() - .expect("eth_config is not provided"); - let priority_fee = eth_config.default_priority_fee_per_gas; - - let web3 = Web3::new(Http::new(ð_config.eth_client_url).unwrap()); + let reverter_address = eth_config + .reverter_address + .context("need to provide operator address to suggest reversion values")?; let nonce = web3 .eth() - .transaction_count( - eth_config - .reverter_address - .expect("Need to provide operator address to suggest revertion values"), - Some(BlockNumber::Pending), - ) + .transaction_count(reverter_address, Some(BlockNumber::Pending)) .await - .unwrap() + .with_context(|| format!("failed getting transaction count for {reverter_address:?}"))? .as_u64(); - SuggestedRollbackValues { + Ok(SuggestedRollbackValues { last_executed_l1_batch_number, nonce, priority_fee, - } + }) } - /// Clears failed L1 transactions - pub async fn clear_failed_l1_transactions(&self) { - tracing::info!("clearing failed L1 transactions..."); + /// Clears failed L1 transactions. + pub async fn clear_failed_l1_transactions(&self) -> anyhow::Result<()> { + tracing::info!("Clearing failed L1 transactions"); self.connection_pool .connection() - .await - .unwrap() + .await? .eth_sender_dal() .clear_failed_transactions() - .await - .unwrap(); + .await?; + Ok(()) } pub fn change_rollback_executed_l1_batches_allowance( From a6080d30057fc7cad3751fa624c1bcb8e5275352 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Fri, 19 Apr 2024 16:07:55 +0300 Subject: [PATCH 02/14] Brush up reverter initialization --- core/bin/block_reverter/src/main.rs | 13 ++---- core/bin/external_node/src/main.rs | 8 ++-- core/node/block_reverter/src/lib.rs | 67 ++++++++++------------------- 3 files changed, 31 insertions(+), 57 deletions(-) diff --git a/core/bin/block_reverter/src/main.rs b/core/bin/block_reverter/src/main.rs index 2e506c78b871..cbcd1a0a2acb 100644 --- a/core/bin/block_reverter/src/main.rs +++ b/core/bin/block_reverter/src/main.rs @@ -1,9 +1,7 @@ use anyhow::Context as _; use clap::{Parser, Subcommand}; use tokio::io::{self, AsyncReadExt}; -use zksync_block_reverter::{ - BlockReverter, BlockReverterEthConfig, BlockReverterFlags, L1ExecutedBatchesRevert, NodeRole, -}; +use zksync_block_reverter::{BlockReverter, BlockReverterEthConfig, BlockReverterFlags, NodeRole}; use zksync_config::{ configs::ObservabilityConfig, ContractsConfig, DBConfig, EthConfig, PostgresConfig, }; @@ -121,14 +119,12 @@ async fn main() -> anyhow::Result<()> { NodeRole::Main, db_config.state_keeper_db_path, db_config.merkle_tree.path, - Some(config), connection_pool, - L1ExecutedBatchesRevert::Disallowed, ); match command { Command::Display { json, .. } => { - let suggested_values = block_reverter.suggested_values().await?; + let suggested_values = block_reverter.suggested_values(&config).await?; if json { println!("{}", serde_json::to_string(&suggested_values)?); } else { @@ -144,6 +140,7 @@ async fn main() -> anyhow::Result<()> { priority_fee_per_gas.map_or(default_priority_fee_per_gas, U256::from); block_reverter .send_ethereum_revert_transaction( + &config, L1BatchNumber(l1_batch_number), priority_fee_per_gas, nonce, @@ -182,9 +179,7 @@ async fn main() -> anyhow::Result<()> { if input[0] != b'y' && input[0] != b'Y' { std::process::exit(0); } - block_reverter.change_rollback_executed_l1_batches_allowance( - L1ExecutedBatchesRevert::Allowed, - ); + block_reverter.allow_reverting_executed_batches(); } let mut flags = BlockReverterFlags::empty(); diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index d620ca0f40b7..b28167ee9632 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -8,7 +8,7 @@ use tokio::{ sync::{oneshot, watch, RwLock}, task::{self, JoinHandle}, }; -use zksync_block_reverter::{BlockReverter, BlockReverterFlags, L1ExecutedBatchesRevert, NodeRole}; +use zksync_block_reverter::{BlockReverter, BlockReverterFlags, NodeRole}; use zksync_commitment_generator::CommitmentGenerator; use zksync_concurrency::{ctx, scope}; use zksync_config::configs::{ @@ -934,14 +934,14 @@ async fn run_node( let sigint_receiver = env.setup_sigint_handler(); // Revert the storage if needed. - let reverter = BlockReverter::new( + let mut reverter = BlockReverter::new( NodeRole::External, config.required.state_cache_path.clone(), config.required.merkle_tree_path.clone(), - None, connection_pool.clone(), - L1ExecutedBatchesRevert::Allowed, ); + // Reverting executed batches is more-or-less safe for external nodes. + reverter.allow_reverting_executed_batches(); let mut reorg_detector = ReorgDetector::new(main_node_client.clone(), connection_pool.clone()); // We're checking for the reorg in the beginning because we expect that if reorg is detected during diff --git a/core/node/block_reverter/src/lib.rs b/core/node/block_reverter/src/lib.rs index 4041cfff793b..96583c9f0710 100644 --- a/core/node/block_reverter/src/lib.rs +++ b/core/node/block_reverter/src/lib.rs @@ -31,18 +31,6 @@ bitflags! { } } -/// Flag determining whether the reverter is allowed to revert the state -/// past the last batch finalized on L1. If this flag is set to `Disallowed`, -/// block reverter will error upon such an attempt. -/// -/// Main use case for the `Allowed` flag is the external node, where may obtain an -/// incorrect state even for a block that was marked as executed. On the EN, this mode is not destructive. -#[derive(Debug)] -pub enum L1ExecutedBatchesRevert { - Allowed, - Disallowed, -} - #[derive(Debug)] pub struct BlockReverterEthConfig { eth_client_url: String, @@ -95,10 +83,11 @@ pub enum NodeRole { /// It is also used to automatically perform a rollback on the external node /// after it is detected on the main node. /// -/// There are a few state components that we can roll back +/// There are a few state components that we can roll back: +/// /// - State of the Postgres database -/// - State of the merkle tree -/// - State of the state_keeper cache +/// - State of the Merkle tree +/// - State of the state keeper cache /// - State of the Ethereum contract (if the block was committed) #[derive(Debug)] pub struct BlockReverter { @@ -107,9 +96,8 @@ pub struct BlockReverter { node_role: NodeRole, state_keeper_cache_path: String, merkle_tree_path: String, - eth_config: Option, connection_pool: ConnectionPool, - executed_batches_revert_mode: L1ExecutedBatchesRevert, + allow_reverting_executed_batches: bool, } impl BlockReverter { @@ -117,20 +105,26 @@ impl BlockReverter { node_role: NodeRole, state_keeper_cache_path: String, merkle_tree_path: String, - eth_config: Option, connection_pool: ConnectionPool, - executed_batches_revert_mode: L1ExecutedBatchesRevert, ) -> Self { Self { node_role, state_keeper_cache_path, merkle_tree_path, - eth_config, connection_pool, - executed_batches_revert_mode, + allow_reverting_executed_batches: false, } } + /// Allows reverting the state past the last batch finalized on L1. If this is disallowed (which is the default), + /// block reverter will error upon such an attempt. + /// + /// Main use case for the setting this flag is the external node, where may obtain an + /// incorrect state even for a block that was marked as executed. On the EN, this mode is not destructive. + pub fn allow_reverting_executed_batches(&mut self) { + self.allow_reverting_executed_batches = true; + } + /// Rolls back DBs (Postgres + RocksDB) to a previous state. pub async fn rollback_db( &self, @@ -141,11 +135,8 @@ impl BlockReverter { let rollback_postgres = flags.contains(BlockReverterFlags::POSTGRES); let rollback_sk_cache = flags.contains(BlockReverterFlags::SK_CACHE); - if matches!( - self.executed_batches_revert_mode, - L1ExecutedBatchesRevert::Disallowed - ) { - let mut storage = self.connection_pool.connection().await.unwrap(); + if !self.allow_reverting_executed_batches { + let mut storage = self.connection_pool.connection().await?; let last_executed_l1_batch = storage .blocks_dal() .get_number_of_last_l1_batch_executed_on_eth() @@ -363,15 +354,11 @@ impl BlockReverter { /// Sends a revert transaction to L1. pub async fn send_ethereum_revert_transaction( &self, + eth_config: &BlockReverterEthConfig, last_l1_batch_to_keep: L1BatchNumber, priority_fee_per_gas: U256, nonce: u64, ) -> anyhow::Result<()> { - let eth_config = self - .eth_config - .as_ref() - .context("eth_config is not provided")?; - let web3 = Web3::new(Http::new(ð_config.eth_client_url).context("cannot create L1 client")?); let contract = zksync_contract(); @@ -395,7 +382,7 @@ impl BlockReverter { )?; let data = revert_function .encode_input(&[Token::Uint(last_l1_batch_to_keep.0.into())]) - .unwrap(); + .context("failed encoding revert function input")?; let base_fee = web3 .eth() @@ -487,11 +474,10 @@ impl BlockReverter { } /// Returns suggested values for rollback. - pub async fn suggested_values(&self) -> anyhow::Result { - let eth_config = self - .eth_config - .as_ref() - .context("eth_config is not provided")?; + pub async fn suggested_values( + &self, + eth_config: &BlockReverterEthConfig, + ) -> anyhow::Result { let web3 = Web3::new(Http::new(ð_config.eth_client_url).context("cannot create L1 client")?); let contract_address = eth_config.diamond_proxy_addr; @@ -543,13 +529,6 @@ impl BlockReverter { .await?; Ok(()) } - - pub fn change_rollback_executed_l1_batches_allowance( - &mut self, - revert_executed_batches: L1ExecutedBatchesRevert, - ) { - self.executed_batches_revert_mode = revert_executed_batches - } } #[derive(Debug, Serialize)] From b7387d4a416cbff0d02f930f36803e7e495624d4 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Fri, 19 Apr 2024 16:39:40 +0300 Subject: [PATCH 03/14] Add DB query to remove snapshot metadata --- ...a192a143f6a7501b7da65bb7df40a0a4ead70.json | 40 +++++++++++++ core/lib/dal/src/snapshots_dal.rs | 60 ++++++++++++++++++- 2 files changed, 98 insertions(+), 2 deletions(-) create mode 100644 core/lib/dal/.sqlx/query-b42fc86726ac40d0ca38640884da192a143f6a7501b7da65bb7df40a0a4ead70.json diff --git a/core/lib/dal/.sqlx/query-b42fc86726ac40d0ca38640884da192a143f6a7501b7da65bb7df40a0a4ead70.json b/core/lib/dal/.sqlx/query-b42fc86726ac40d0ca38640884da192a143f6a7501b7da65bb7df40a0a4ead70.json new file mode 100644 index 000000000000..625fa4ab36e6 --- /dev/null +++ b/core/lib/dal/.sqlx/query-b42fc86726ac40d0ca38640884da192a143f6a7501b7da65bb7df40a0a4ead70.json @@ -0,0 +1,40 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM snapshots\n WHERE\n l1_batch_number > $1\n RETURNING\n VERSION,\n l1_batch_number,\n factory_deps_filepath,\n storage_logs_filepaths\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "version", + "type_info": "Int4" + }, + { + "ordinal": 1, + "name": "l1_batch_number", + "type_info": "Int8" + }, + { + "ordinal": 2, + "name": "factory_deps_filepath", + "type_info": "Text" + }, + { + "ordinal": 3, + "name": "storage_logs_filepaths", + "type_info": "TextArray" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + false, + false, + false, + false + ] + }, + "hash": "b42fc86726ac40d0ca38640884da192a143f6a7501b7da65bb7df40a0a4ead70" +} diff --git a/core/lib/dal/src/snapshots_dal.rs b/core/lib/dal/src/snapshots_dal.rs index 4fcfd019de18..b5883550aa27 100644 --- a/core/lib/dal/src/snapshots_dal.rs +++ b/core/lib/dal/src/snapshots_dal.rs @@ -185,6 +185,35 @@ impl SnapshotsDal<'_, '_> { .fetch_optional(self.storage) .await } + + /// Deletes all snapshots after the specified L1 batch number and returns their metadata. + pub async fn delete_snapshots_after( + &mut self, + last_retained_l1_batch_number: L1BatchNumber, + ) -> DalResult> { + sqlx::query_as!( + StorageSnapshotMetadata, + r#" + DELETE FROM snapshots + WHERE + l1_batch_number > $1 + RETURNING + VERSION, + l1_batch_number, + factory_deps_filepath, + storage_logs_filepaths + "#, + last_retained_l1_batch_number.0 as i32 + ) + .try_map(SnapshotMetadata::try_from) + .instrument("delete_snapshots_after") + .with_arg( + "last_retained_l1_batch_number", + &last_retained_l1_batch_number, + ) + .fetch_all(self.storage) + .await + } } #[cfg(test)] @@ -233,9 +262,36 @@ mod tests { let snapshot_metadata = dal .get_snapshot_metadata(l1_batch_number) .await - .expect("Failed to retrieve snapshot") - .unwrap(); + .unwrap() + .expect("snapshot is not persisted"); assert_eq!(snapshot_metadata.l1_batch_number, l1_batch_number); + + let deleted_snapshots = dal.delete_snapshots_after(l1_batch_number).await.unwrap(); + assert!(deleted_snapshots.is_empty(), "{deleted_snapshots:?}"); + let deleted_snapshots = dal + .delete_snapshots_after(l1_batch_number - 1) + .await + .unwrap(); + assert_eq!(deleted_snapshots.len(), 1); + assert_eq!(deleted_snapshots[0].version, snapshot_metadata.version); + assert_eq!( + deleted_snapshots[0].l1_batch_number, + snapshot_metadata.l1_batch_number + ); + assert_eq!( + deleted_snapshots[0].factory_deps_filepath, + snapshot_metadata.factory_deps_filepath + ); + assert_eq!( + deleted_snapshots[0].storage_logs_filepaths, + snapshot_metadata.storage_logs_filepaths + ); + + let deleted_snapshot_metadata = dal.get_snapshot_metadata(l1_batch_number).await.unwrap(); + assert!( + deleted_snapshot_metadata.is_none(), + "{deleted_snapshot_metadata:?}" + ); } #[tokio::test] From 5b76f1b4e14052991fef02768e808d131671e8ab Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Fri, 19 Apr 2024 17:40:59 +0300 Subject: [PATCH 04/14] Sketch deleting snapshot files on revert --- Cargo.lock | 2 + core/bin/block_reverter/Cargo.toml | 1 + core/bin/block_reverter/src/main.rs | 17 ++++- core/bin/external_node/src/main.rs | 4 +- core/lib/object_store/src/objects.rs | 10 +++ core/node/block_reverter/Cargo.toml | 1 + core/node/block_reverter/src/lib.rs | 100 +++++++++++++++++++++++++-- 7 files changed, 126 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bdbea0c2e18f..bbb4fc8fbed2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -661,6 +661,7 @@ dependencies = [ "zksync_config", "zksync_dal", "zksync_env_config", + "zksync_object_store", "zksync_types", ] @@ -8142,6 +8143,7 @@ dependencies = [ "zksync_dal", "zksync_eth_signer", "zksync_merkle_tree", + "zksync_object_store", "zksync_state", "zksync_storage", "zksync_types", diff --git a/core/bin/block_reverter/Cargo.toml b/core/bin/block_reverter/Cargo.toml index 8f546e1be59a..5f32f68acbd8 100644 --- a/core/bin/block_reverter/Cargo.toml +++ b/core/bin/block_reverter/Cargo.toml @@ -14,6 +14,7 @@ publish = false zksync_config.workspace = true zksync_env_config.workspace = true zksync_dal.workspace = true +zksync_object_store.workspace = true zksync_types.workspace = true zksync_block_reverter.workspace = true vlog.workspace = true diff --git a/core/bin/block_reverter/src/main.rs b/core/bin/block_reverter/src/main.rs index cbcd1a0a2acb..91aaea0f9afa 100644 --- a/core/bin/block_reverter/src/main.rs +++ b/core/bin/block_reverter/src/main.rs @@ -6,7 +6,8 @@ use zksync_config::{ configs::ObservabilityConfig, ContractsConfig, DBConfig, EthConfig, PostgresConfig, }; use zksync_dal::{ConnectionPool, Core}; -use zksync_env_config::FromEnv; +use zksync_env_config::{object_store::SnapshotsObjectStoreConfig, FromEnv}; +use zksync_object_store::ObjectStoreFactory; use zksync_types::{Address, L1BatchNumber, U256}; #[derive(Debug, Parser)] @@ -182,9 +183,17 @@ async fn main() -> anyhow::Result<()> { block_reverter.allow_reverting_executed_batches(); } + let mut object_store = None; let mut flags = BlockReverterFlags::empty(); if rollback_postgres { flags |= BlockReverterFlags::POSTGRES; + let object_store_config = SnapshotsObjectStoreConfig::from_env() + .context("SnapshotsObjectStoreConfig::from_env()")?; + object_store = Some( + ObjectStoreFactory::new(object_store_config.0) + .create_store() + .await, + ); } if rollback_tree { flags |= BlockReverterFlags::TREE; @@ -194,7 +203,11 @@ async fn main() -> anyhow::Result<()> { } block_reverter - .rollback_db(L1BatchNumber(l1_batch_number), flags) + .rollback_db( + L1BatchNumber(l1_batch_number), + flags, + object_store.as_deref(), + ) .await?; } Command::ClearFailedL1Transactions => { diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index b28167ee9632..79057c850f66 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -953,7 +953,7 @@ async fn run_node( Err(reorg_detector::Error::ReorgDetected(last_correct_l1_batch)) => { tracing::info!("Rolling back to l1 batch number {last_correct_l1_batch}"); reverter - .rollback_db(last_correct_l1_batch, BlockReverterFlags::all()) + .rollback_db(last_correct_l1_batch, BlockReverterFlags::all(), None) .await?; tracing::info!("Rollback successfully completed"); } @@ -973,7 +973,7 @@ async fn run_node( tracing::info!("Rolling back to l1 batch number {sealed_l1_batch_number}"); reverter - .rollback_db(sealed_l1_batch_number, BlockReverterFlags::all()) + .rollback_db(sealed_l1_batch_number, BlockReverterFlags::all(), None) .await?; tracing::info!("Rollback successfully completed"); } diff --git a/core/lib/object_store/src/objects.rs b/core/lib/object_store/src/objects.rs index 75c0f5460ad4..0fee5af8fb9b 100644 --- a/core/lib/object_store/src/objects.rs +++ b/core/lib/object_store/src/objects.rs @@ -159,6 +159,16 @@ impl dyn ObjectStore + '_ { Ok(key) } + /// Removes a value associated with the key. + /// + /// # Errors + /// + /// Returns I/O errors specific to the storage. + pub async fn remove(&self, key: V::Key<'_>) -> Result<(), ObjectStoreError> { + let key = V::encode_key(key); + self.remove_raw(V::BUCKET, &key).await + } + pub fn get_storage_prefix(&self) -> String { self.storage_prefix_raw(V::BUCKET) } diff --git a/core/node/block_reverter/Cargo.toml b/core/node/block_reverter/Cargo.toml index 4b345b768de3..6cb30aba3b90 100644 --- a/core/node/block_reverter/Cargo.toml +++ b/core/node/block_reverter/Cargo.toml @@ -14,6 +14,7 @@ zksync_types.workspace = true zksync_dal.workspace = true zksync_config.workspace = true zksync_contracts.workspace = true +zksync_object_store.workspace = true zksync_storage.workspace = true zksync_eth_signer.workspace = true zksync_state.workspace = true diff --git a/core/node/block_reverter/src/lib.rs b/core/node/block_reverter/src/lib.rs index 96583c9f0710..dcb5e472cbed 100644 --- a/core/node/block_reverter/src/lib.rs +++ b/core/node/block_reverter/src/lib.rs @@ -9,11 +9,16 @@ use zksync_contracts::zksync_contract; use zksync_dal::{ConnectionPool, Core, CoreDal}; use zksync_eth_signer::{EthereumSigner, PrivateKeySigner, TransactionParameters}; use zksync_merkle_tree::domain::ZkSyncTree; +use zksync_object_store::{ObjectStore, ObjectStoreError}; use zksync_state::RocksdbStorage; use zksync_storage::RocksDB; use zksync_types::{ aggregated_operations::AggregatedActionType, ethabi::Token, + snapshots::{ + SnapshotFactoryDependencies, SnapshotMetadata, SnapshotStorageLogsChunk, + SnapshotStorageLogsStorageKey, + }, web3::{ contract::{Contract, Options}, transports::Http, @@ -125,11 +130,13 @@ impl BlockReverter { self.allow_reverting_executed_batches = true; } - /// Rolls back DBs (Postgres + RocksDB) to a previous state. + /// Rolls back DBs (Postgres + RocksDB) to a previous state. If Postgres is rolled back and `snapshots_object_store` + /// is specified, snapshot files will be deleted as well. pub async fn rollback_db( &self, last_l1_batch_to_keep: L1BatchNumber, flags: BlockReverterFlags, + snapshots_object_store: Option<&dyn ObjectStore>, ) -> anyhow::Result<()> { let rollback_tree = flags.contains(BlockReverterFlags::TREE); let rollback_postgres = flags.contains(BlockReverterFlags::POSTGRES); @@ -150,9 +157,20 @@ impl BlockReverter { // Tree needs to be reverted first to keep state recoverable self.rollback_rocks_dbs(last_l1_batch_to_keep, rollback_tree, rollback_sk_cache) .await?; - if rollback_postgres { - self.rollback_postgres(last_l1_batch_to_keep).await?; + let deleted_snapshots = if rollback_postgres { + self.rollback_postgres(last_l1_batch_to_keep).await? + } else { + vec![] + }; + if let Some(object_store) = snapshots_object_store { + Self::delete_snapshot_files(object_store, &deleted_snapshots).await?; + } else if !deleted_snapshots.is_empty() { + tracing::info!( + "Did not remove snapshot files in object store since it was not provided; \ + metadata for deleted snapshots: {deleted_snapshots:?}" + ); } + Ok(()) } @@ -273,7 +291,10 @@ impl BlockReverter { /// Reverts data in the Postgres database. /// If `node_role` is `Main` a consensus hard-fork is performed. - async fn rollback_postgres(&self, last_l1_batch_to_keep: L1BatchNumber) -> anyhow::Result<()> { + async fn rollback_postgres( + &self, + last_l1_batch_to_keep: L1BatchNumber, + ) -> anyhow::Result> { tracing::info!("Rolling back postgres data"); let mut storage = self.connection_pool.connection().await?; let mut transaction = storage.start_transaction().await?; @@ -327,6 +348,14 @@ impl BlockReverter { .eth_sender_dal() .delete_eth_txs(last_l1_batch_to_keep) .await?; + + tracing::info!("Rolling back snapshots"); + let deleted_snapshots = transaction + .snapshots_dal() + .delete_snapshots_after(last_l1_batch_to_keep) + .await?; + + // Remove data from main tables (L2 blocks and L1 batches). tracing::info!("Rolling back L1 batches"); transaction .blocks_dal() @@ -348,7 +377,68 @@ impl BlockReverter { } transaction.commit().await?; - Ok(()) + Ok(deleted_snapshots) + } + + async fn delete_snapshot_files( + object_store: &dyn ObjectStore, + deleted_snapshots: &[SnapshotMetadata], + ) -> anyhow::Result<()> { + fn ignore_not_found_errors(err: ObjectStoreError) -> Result<(), ObjectStoreError> { + match err { + ObjectStoreError::KeyNotFound(err) => { + tracing::debug!("Ignoring 'not found' object store error: {err}"); + Ok(()) + } + _ => Err(err), + } + } + + fn combine_results(output: &mut anyhow::Result<()>, result: anyhow::Result<()>) { + if let Err(err) = result { + tracing::warn!("{err:?}"); + *output = Err(err); + } + } + + if deleted_snapshots.is_empty() { + return Ok(()); + } + + let mut overall_result = Ok(()); + for snapshot in deleted_snapshots { + tracing::info!( + "Removing factory deps for snapshot for L1 batch #{}", + snapshot.l1_batch_number + ); + let result = object_store + .remove::(snapshot.l1_batch_number) + .await + .or_else(ignore_not_found_errors) + .with_context(|| { + format!( + "failed removing factory deps for snapshot for L1 batch #{}", + snapshot.l1_batch_number + ) + }); + combine_results(&mut overall_result, result); + + for chunk_id in 0..snapshot.storage_logs_filepaths.len() as u64 { + let key = SnapshotStorageLogsStorageKey { + l1_batch_number: snapshot.l1_batch_number, + chunk_id, + }; + tracing::info!("Removing storage logs chunk {key:?}"); + + let result = object_store + .remove::(key) + .await + .or_else(ignore_not_found_errors) + .with_context(|| format!("failed removing storage logs chunk {key:?}")); + combine_results(&mut overall_result, result); + } + } + overall_result } /// Sends a revert transaction to L1. From 460664010567d5e6c100dc181881d20e364c1939 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Mon, 22 Apr 2024 12:23:10 +0300 Subject: [PATCH 05/14] Fix tree serialization with single node --- .../merkle_tree/src/storage/serialization.rs | 9 ++++++- .../merkle_tree/tests/integration/domain.rs | 27 +++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/core/lib/merkle_tree/src/storage/serialization.rs b/core/lib/merkle_tree/src/storage/serialization.rs index 09a06a3630cd..6ad6e1ff0b2f 100644 --- a/core/lib/merkle_tree/src/storage/serialization.rs +++ b/core/lib/merkle_tree/src/storage/serialization.rs @@ -168,7 +168,14 @@ impl Root { })?; let node = match leaf_count { 0 => return Ok(Self::Empty), - 1 => Node::Leaf(LeafNode::deserialize(bytes)?), + 1 => { + // Try both the leaf and internal node serialization; in some cases, a single leaf + // may still be persisted as an internal node. Since serialization of an internal node with a single child + // is always shorter than that a leaf, the order (first leaf, then internal node) is chosen intentionally. + LeafNode::deserialize(bytes) + .map(Node::Leaf) + .or_else(|_| InternalNode::deserialize(bytes).map(Node::Internal))? + } _ => Node::Internal(InternalNode::deserialize(bytes)?), }; Ok(Self::new(leaf_count, node)) diff --git a/core/lib/merkle_tree/tests/integration/domain.rs b/core/lib/merkle_tree/tests/integration/domain.rs index ebf2cbce7bc8..adc96694f1a5 100644 --- a/core/lib/merkle_tree/tests/integration/domain.rs +++ b/core/lib/merkle_tree/tests/integration/domain.rs @@ -101,6 +101,33 @@ fn basic_workflow_multiblock() { assert_eq!(tree.next_l1_batch_number(), L1BatchNumber(12)); } +#[test] +fn tree_with_single_leaf_works_correctly() { + let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); + let storage_logs = gen_storage_logs(); + let db = RocksDB::new(temp_dir.as_ref()).unwrap(); + { + let mut tree = ZkSyncTree::new(db.clone().into()); + tree.process_l1_batch(&storage_logs[0..1]); + tree.save(); + } + let mut tree = ZkSyncTree::new(db.into()); + tree.verify_consistency(L1BatchNumber(0)); + + // Add more logs to the tree. + for single_log_slice in storage_logs[1..].chunks(1) { + tree.process_l1_batch(single_log_slice); + tree.save(); + } + assert_eq!( + tree.root_hash(), + H256([ + 125, 25, 107, 171, 182, 155, 32, 70, 138, 108, 238, 150, 140, 205, 193, 39, 90, 92, + 122, 233, 118, 238, 248, 201, 160, 55, 58, 206, 244, 216, 188, 10 + ]), + ); +} + #[test] fn filtering_out_no_op_writes() { let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); From 0302f3371037950aedb99c81b4f2460e1921a0f7 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Mon, 22 Apr 2024 12:24:11 +0300 Subject: [PATCH 06/14] Use `tempfile` in object store tests --- Cargo.lock | 21 +-------------------- Cargo.toml | 1 - core/lib/object_store/Cargo.toml | 2 +- core/lib/object_store/src/file.rs | 8 ++++---- 4 files changed, 6 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bbb4fc8fbed2..9e0871506069 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5133,15 +5133,6 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" -[[package]] -name = "remove_dir_all" -version = "0.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7" -dependencies = [ - "winapi", -] - [[package]] name = "rend" version = "0.4.1" @@ -6571,16 +6562,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" -[[package]] -name = "tempdir" -version = "0.3.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15f2b5fb00ccdf689e0149d1b1b3c03fead81c2b37735d812fa8bddbbf41b6d8" -dependencies = [ - "rand 0.4.6", - "remove_dir_all", -] - [[package]] name = "tempfile" version = "3.8.0" @@ -8831,7 +8812,7 @@ dependencies = [ "http", "prost 0.12.1", "serde_json", - "tempdir", + "tempfile", "tokio", "tracing", "vise", diff --git a/Cargo.toml b/Cargo.toml index 622b711ddb24..12f6647cab0b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -139,7 +139,6 @@ sqlx = "0.7.3" static_assertions = "1.1" structopt = "0.3.20" strum = "0.24" -tempdir = "0.3.7" tempfile = "3.0.2" test-casing = "0.1.2" test-log = "0.2.15" diff --git a/core/lib/object_store/Cargo.toml b/core/lib/object_store/Cargo.toml index f3f53e082391..e8d5322765ec 100644 --- a/core/lib/object_store/Cargo.toml +++ b/core/lib/object_store/Cargo.toml @@ -27,4 +27,4 @@ tracing.workspace = true prost.workspace = true [dev-dependencies] -tempdir.workspace = true +tempfile.workspace = true diff --git a/core/lib/object_store/src/file.rs b/core/lib/object_store/src/file.rs index 2d77366a952c..72b7d2dd35fc 100644 --- a/core/lib/object_store/src/file.rs +++ b/core/lib/object_store/src/file.rs @@ -78,13 +78,13 @@ impl ObjectStore for FileBackedObjectStore { #[cfg(test)] mod test { - use tempdir::TempDir; + use tempfile::TempDir; use super::*; #[tokio::test] async fn test_get() { - let dir = TempDir::new("test-data").unwrap(); + let dir = TempDir::new().unwrap(); let path = dir.into_path().into_os_string().into_string().unwrap(); let object_store = FileBackedObjectStore::new(path).await; let expected = vec![9, 0, 8, 9, 0, 7]; @@ -101,7 +101,7 @@ mod test { #[tokio::test] async fn test_put() { - let dir = TempDir::new("test-data").unwrap(); + let dir = TempDir::new().unwrap(); let path = dir.into_path().into_os_string().into_string().unwrap(); let object_store = FileBackedObjectStore::new(path).await; let bytes = vec![9, 0, 8, 9, 0, 7]; @@ -113,7 +113,7 @@ mod test { #[tokio::test] async fn test_remove() { - let dir = TempDir::new("test-data").unwrap(); + let dir = TempDir::new().unwrap(); let path = dir.into_path().into_os_string().into_string().unwrap(); let object_store = FileBackedObjectStore::new(path).await; let result = object_store From ec08b52fbc348a47867b62eab6800a4f1be727a9 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Mon, 22 Apr 2024 13:16:02 +0300 Subject: [PATCH 07/14] Test block reverter library --- Cargo.lock | 3 + core/node/block_reverter/Cargo.toml | 5 + core/node/block_reverter/src/lib.rs | 5 +- core/node/block_reverter/src/tests.rs | 317 ++++++++++++++++++++++++++ 4 files changed, 329 insertions(+), 1 deletion(-) create mode 100644 core/node/block_reverter/src/tests.rs diff --git a/Cargo.lock b/Cargo.lock index 9e0871506069..5d94a587a975 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8115,8 +8115,11 @@ name = "zksync_block_reverter" version = "0.1.0" dependencies = [ "anyhow", + "assert_matches", "bitflags 1.3.2", "serde", + "tempfile", + "test-casing", "tokio", "tracing", "zksync_config", diff --git a/core/node/block_reverter/Cargo.toml b/core/node/block_reverter/Cargo.toml index 6cb30aba3b90..dd3a0b55e014 100644 --- a/core/node/block_reverter/Cargo.toml +++ b/core/node/block_reverter/Cargo.toml @@ -25,3 +25,8 @@ tokio = { workspace = true, features = ["time", "fs"] } bitflags.workspace = true serde.workspace = true tracing.workspace = true + +[dev-dependencies] +assert_matches.workspace = true +tempfile.workspace = true +test-casing.workspace = true diff --git a/core/node/block_reverter/src/lib.rs b/core/node/block_reverter/src/lib.rs index dcb5e472cbed..3e8afb5aa6bd 100644 --- a/core/node/block_reverter/src/lib.rs +++ b/core/node/block_reverter/src/lib.rs @@ -28,6 +28,9 @@ use zksync_types::{ Address, L1BatchNumber, H160, H256, U256, }; +#[cfg(test)] +mod tests; + bitflags! { pub struct BlockReverterFlags: u32 { const POSTGRES = 0b_0001; @@ -188,7 +191,7 @@ impl BlockReverter { .blocks_dal() .get_l1_batch_state_root(last_l1_batch_to_keep) .await? - .context("failed to fetch root hash for target L1 batch")?; + .context("no state root hash for target L1 batch")?; // Rolling back Merkle tree let merkle_tree_path = Path::new(&self.merkle_tree_path); diff --git a/core/node/block_reverter/src/tests.rs b/core/node/block_reverter/src/tests.rs new file mode 100644 index 000000000000..6ece230b5bb4 --- /dev/null +++ b/core/node/block_reverter/src/tests.rs @@ -0,0 +1,317 @@ +//! Tests for block reverter. + +use assert_matches::assert_matches; +use test_casing::test_casing; +use tokio::sync::watch; +use zksync_dal::Connection; +use zksync_merkle_tree::TreeInstruction; +use zksync_object_store::ObjectStoreFactory; +use zksync_state::ReadStorage; +use zksync_types::{ + block::{L1BatchHeader, L2BlockHeader}, + snapshots::SnapshotVersion, + AccountTreeId, L2BlockNumber, ProtocolVersion, ProtocolVersionId, StorageKey, StorageLog, +}; + +use super::*; + +fn gen_storage_logs() -> Vec { + (0..10) + .map(|i| { + let key = StorageKey::new(AccountTreeId::default(), H256::from_low_u64_be(i)); + StorageLog::new_write_log(key, H256::repeat_byte(0xff)) + }) + .collect() +} + +fn initialize_merkle_tree(path: &Path, storage_logs: &[StorageLog]) -> Vec { + let db = RocksDB::new(path).unwrap().with_sync_writes(); + let mut tree = ZkSyncTree::new(db.into()); + let hashes = storage_logs.iter().enumerate().map(|(i, log)| { + let output = + tree.process_l1_batch(&[TreeInstruction::write(log.key, i as u64 + 1, log.value)]); + tree.save(); + output.root_hash + }); + hashes.collect() +} + +async fn setup_storage(storage: &mut Connection<'_, Core>, storage_logs: &[StorageLog]) { + storage + .protocol_versions_dal() + .save_protocol_version_with_tx(&ProtocolVersion::default()) + .await + .unwrap(); + + for (number, storage_log) in (0..).zip(storage_logs) { + let l2_block_header = L2BlockHeader { + number: L2BlockNumber(number), + timestamp: number.into(), + hash: H256::from_low_u64_be(number.into()), + l1_tx_count: 0, + l2_tx_count: 0, + fee_account_address: Address::default(), + base_fee_per_gas: 0, + batch_fee_input: Default::default(), + gas_per_pubdata_limit: 0, + base_system_contracts_hashes: Default::default(), + protocol_version: Some(ProtocolVersionId::latest()), + virtual_blocks: 1, + gas_limit: 0, + }; + storage + .blocks_dal() + .insert_l2_block(&l2_block_header) + .await + .unwrap(); + let l1_batch_header = L1BatchHeader { + number: L1BatchNumber(number), + timestamp: number.into(), + l1_tx_count: 0, + l2_tx_count: 0, + priority_ops_onchain_data: vec![], + l2_to_l1_logs: vec![], + l2_to_l1_messages: vec![], + bloom: Default::default(), + used_contract_hashes: vec![], + base_system_contracts_hashes: Default::default(), + system_logs: vec![], + protocol_version: Some(ProtocolVersionId::latest()), + pubdata_input: None, + }; + storage + .blocks_dal() + .insert_mock_l1_batch(&l1_batch_header) + .await + .unwrap(); + storage + .blocks_dal() + .mark_l2_blocks_as_executed_in_l1_batch(l1_batch_header.number) + .await + .unwrap(); + + storage + .storage_logs_dal() + .insert_storage_logs( + l2_block_header.number, + &[(H256::zero(), vec![*storage_log])], + ) + .await + .unwrap(); + storage + .storage_logs_dedup_dal() + .insert_initial_writes(l1_batch_header.number, &[storage_log.key]) + .await + .unwrap(); + } +} + +#[test_casing(2, [false, true])] +#[tokio::test] +async fn block_reverter_basics(sync_merkle_tree: bool) { + let storage_logs = gen_storage_logs(); + let pool = ConnectionPool::::test_pool().await; + let mut storage = pool.connection().await.unwrap(); + setup_storage(&mut storage, &storage_logs).await; + + let temp_dir = tempfile::tempdir().unwrap(); + let merkle_tree_path = temp_dir.path().join("tree"); + let storage_logs_for_merkle_tree = if sync_merkle_tree { + &storage_logs + } else { + &storage_logs[..7] // include the target L1 batch #5, but don't process some newer batches + }; + let l1_batch_hashes = initialize_merkle_tree(&merkle_tree_path, storage_logs_for_merkle_tree); + for (number, hash) in (0..).zip(l1_batch_hashes) { + storage + .blocks_dal() + .set_l1_batch_hash(L1BatchNumber(number), hash) + .await + .unwrap(); + } + + let sk_cache_path = temp_dir.path().join("sk_cache"); + let sk_cache = RocksdbStorage::builder(&sk_cache_path).await.unwrap(); + let (_stop_sender, stop_receiver) = watch::channel(false); + sk_cache + .synchronize(&mut storage, &stop_receiver) + .await + .unwrap(); + + let block_reverter = BlockReverter::new( + NodeRole::External, + sk_cache_path.to_str().unwrap().to_owned(), + merkle_tree_path.to_str().unwrap().to_owned(), + pool.clone(), + ); + block_reverter + .rollback_db(L1BatchNumber(5), BlockReverterFlags::all(), None) + .await + .unwrap(); + + let last_l1_batch_number = storage + .blocks_dal() + .get_sealed_l1_batch_number() + .await + .unwrap(); + assert_eq!(last_l1_batch_number, Some(L1BatchNumber(5))); + let last_l2_block_number = storage + .blocks_dal() + .get_sealed_l2_block_number() + .await + .unwrap(); + assert_eq!(last_l2_block_number, Some(L2BlockNumber(5))); + + let all_storage_logs = storage + .storage_logs_dal() + .dump_all_storage_logs_for_tests() + .await; + assert_eq!(all_storage_logs.len(), 6); + for (i, log) in all_storage_logs.iter().enumerate() { + assert_eq!(log.l2_block_number, L2BlockNumber(i as u32)); + assert_eq!(log.value, H256::repeat_byte(0xff)); + } + + let mut all_initial_writes = storage + .storage_logs_dedup_dal() + .dump_all_initial_writes_for_tests() + .await; + assert_eq!(all_initial_writes.len(), 6); + all_initial_writes.sort_unstable_by_key(|write| write.l1_batch_number); + for (i, write) in all_initial_writes.iter().enumerate() { + assert_eq!(write.l1_batch_number, L1BatchNumber(i as u32)); + assert_eq!(write.index, i as u64 + 1); + } + + let db = RocksDB::new(&merkle_tree_path).unwrap(); + let tree = ZkSyncTree::new(db.into()); + assert_eq!(tree.next_l1_batch_number(), L1BatchNumber(6)); + + let sk_cache = RocksdbStorage::builder(&sk_cache_path).await.unwrap(); + let mut sk_cache = sk_cache + .synchronize(&mut storage, &stop_receiver) + .await + .unwrap() + .expect("sk_cache syncing unexpectedly stopped"); + for (i, log) in storage_logs.iter().enumerate() { + let expected_value = if i <= 5 { log.value } else { H256::zero() }; + assert_eq!(sk_cache.read_value(&log.key), expected_value); + } +} + +async fn create_mock_snapshot( + storage: &mut Connection<'_, Core>, + object_store: &dyn ObjectStore, + l1_batch_number: L1BatchNumber, +) { + let storage_logs_chunk_count = 5; + + let factory_deps_key = object_store + .put( + l1_batch_number, + &SnapshotFactoryDependencies { + factory_deps: vec![], + }, + ) + .await + .unwrap(); + storage + .snapshots_dal() + .add_snapshot( + SnapshotVersion::Version0, + l1_batch_number, + storage_logs_chunk_count, + &factory_deps_key, + ) + .await + .unwrap(); + + for chunk_id in 0..storage_logs_chunk_count { + let key = SnapshotStorageLogsStorageKey { + l1_batch_number, + chunk_id, + }; + let key = object_store + .put( + key, + &SnapshotStorageLogsChunk { + storage_logs: vec![], + }, + ) + .await + .unwrap(); + storage + .snapshots_dal() + .add_storage_logs_filepath_for_snapshot(l1_batch_number, chunk_id, &key) + .await + .unwrap(); + } +} + +#[test_casing(2, [false, true])] +#[tokio::test] +async fn reverting_snapshot(remove_objects: bool) { + let storage_logs = gen_storage_logs(); + let pool = ConnectionPool::::test_pool().await; + let mut storage = pool.connection().await.unwrap(); + setup_storage(&mut storage, &storage_logs).await; + + let object_store = ObjectStoreFactory::mock().create_store().await; + create_mock_snapshot(&mut storage, &object_store, L1BatchNumber(7)).await; + // Sanity check: snapshot should be visible. + let all_snapshots = storage + .snapshots_dal() + .get_all_complete_snapshots() + .await + .unwrap(); + assert_eq!(all_snapshots.snapshots_l1_batch_numbers, [L1BatchNumber(7)]); + + let block_reverter = BlockReverter::new( + NodeRole::External, + "unused".to_owned(), // FIXME: don't require these params + "unused".to_owned(), + pool.clone(), + ); + let maybe_object_store = remove_objects.then_some(object_store.as_ref()); + block_reverter + .rollback_db( + L1BatchNumber(5), + BlockReverterFlags::POSTGRES, + maybe_object_store, + ) + .await + .unwrap(); + + // Check that snapshot has been removed. + let all_snapshots = storage + .snapshots_dal() + .get_all_complete_snapshots() + .await + .unwrap(); + assert_eq!(all_snapshots.snapshots_l1_batch_numbers, []); + + let factory_deps_result = object_store + .get::(L1BatchNumber(7)) + .await; + if remove_objects { + assert_matches!( + factory_deps_result.unwrap_err(), + ObjectStoreError::KeyNotFound(_) + ); + } else { + factory_deps_result.unwrap(); + } + + for chunk_id in 0..5 { + let key = SnapshotStorageLogsStorageKey { + l1_batch_number: L1BatchNumber(7), + chunk_id, + }; + let chunk_result = object_store.get::(key).await; + if remove_objects { + assert_matches!(chunk_result.unwrap_err(), ObjectStoreError::KeyNotFound(_)); + } else { + chunk_result.unwrap(); + } + } +} From 94f92a497a81506606465a8e9d38a5dcbd155ae7 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Mon, 22 Apr 2024 14:08:02 +0300 Subject: [PATCH 08/14] Rework passing params to reverter --- Cargo.lock | 2 - Cargo.toml | 1 - core/bin/block_reverter/src/main.rs | 25 ++---- core/bin/external_node/src/main.rs | 37 ++++---- core/lib/zksync_core/Cargo.toml | 1 - core/node/block_reverter/Cargo.toml | 1 - core/node/block_reverter/src/lib.rs | 119 +++++++++++++------------- core/node/block_reverter/src/tests.rs | 34 +++----- 8 files changed, 92 insertions(+), 128 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5d94a587a975..3d67e267b830 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8116,7 +8116,6 @@ version = "0.1.0" dependencies = [ "anyhow", "assert_matches", - "bitflags 1.3.2", "serde", "tempfile", "test-casing", @@ -8402,7 +8401,6 @@ dependencies = [ "assert_matches", "async-trait", "axum", - "bitflags 1.3.2", "chrono", "ctrlc", "futures 0.3.28", diff --git a/Cargo.toml b/Cargo.toml index 12f6647cab0b..d4e02aab90d2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -80,7 +80,6 @@ async-trait = "0.1" axum = "0.6.19" bigdecimal = "0.3.0" bincode = "1" -bitflags = "1.3.2" blake2 = "0.10" chrono = "0.4" clap = "4.2.2" diff --git a/core/bin/block_reverter/src/main.rs b/core/bin/block_reverter/src/main.rs index 91aaea0f9afa..ec0f2ec5ab58 100644 --- a/core/bin/block_reverter/src/main.rs +++ b/core/bin/block_reverter/src/main.rs @@ -1,7 +1,7 @@ use anyhow::Context as _; use clap::{Parser, Subcommand}; use tokio::io::{self, AsyncReadExt}; -use zksync_block_reverter::{BlockReverter, BlockReverterEthConfig, BlockReverterFlags, NodeRole}; +use zksync_block_reverter::{BlockReverter, BlockReverterEthConfig, NodeRole}; use zksync_config::{ configs::ObservabilityConfig, ContractsConfig, DBConfig, EthConfig, PostgresConfig, }; @@ -116,12 +116,7 @@ async fn main() -> anyhow::Result<()> { .build() .await .context("failed to build a connection pool")?; - let mut block_reverter = BlockReverter::new( - NodeRole::Main, - db_config.state_keeper_db_path, - db_config.merkle_tree.path, - connection_pool, - ); + let mut block_reverter = BlockReverter::new(NodeRole::Main, connection_pool); match command { Command::Display { json, .. } => { @@ -183,31 +178,25 @@ async fn main() -> anyhow::Result<()> { block_reverter.allow_reverting_executed_batches(); } - let mut object_store = None; - let mut flags = BlockReverterFlags::empty(); if rollback_postgres { - flags |= BlockReverterFlags::POSTGRES; + block_reverter.enable_reverting_postgres(); let object_store_config = SnapshotsObjectStoreConfig::from_env() .context("SnapshotsObjectStoreConfig::from_env()")?; - object_store = Some( + block_reverter.enable_reverting_snapshot_objects( ObjectStoreFactory::new(object_store_config.0) .create_store() .await, ); } if rollback_tree { - flags |= BlockReverterFlags::TREE; + block_reverter.enable_reverting_merkle_tree(db_config.merkle_tree.path); } if rollback_sk_cache { - flags |= BlockReverterFlags::SK_CACHE; + block_reverter.enable_reverting_state_keeper_cache(db_config.state_keeper_db_path); } block_reverter - .rollback_db( - L1BatchNumber(l1_batch_number), - flags, - object_store.as_deref(), - ) + .revert(L1BatchNumber(l1_batch_number)) .await?; } Command::ClearFailedL1Transactions => { diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index 79057c850f66..b827f6e28018 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -8,7 +8,7 @@ use tokio::{ sync::{oneshot, watch, RwLock}, task::{self, JoinHandle}, }; -use zksync_block_reverter::{BlockReverter, BlockReverterFlags, NodeRole}; +use zksync_block_reverter::{BlockReverter, NodeRole}; use zksync_commitment_generator::CommitmentGenerator; use zksync_concurrency::{ctx, scope}; use zksync_config::configs::{ @@ -934,14 +934,13 @@ async fn run_node( let sigint_receiver = env.setup_sigint_handler(); // Revert the storage if needed. - let mut reverter = BlockReverter::new( - NodeRole::External, - config.required.state_cache_path.clone(), - config.required.merkle_tree_path.clone(), - connection_pool.clone(), - ); + let mut reverter = BlockReverter::new(NodeRole::External, connection_pool.clone()); // Reverting executed batches is more-or-less safe for external nodes. - reverter.allow_reverting_executed_batches(); + let reverter = reverter + .allow_reverting_executed_batches() + .enable_reverting_postgres() + .enable_reverting_merkle_tree(config.required.merkle_tree_path.clone()) + .enable_reverting_state_keeper_cache(config.required.state_cache_path.clone()); let mut reorg_detector = ReorgDetector::new(main_node_client.clone(), connection_pool.clone()); // We're checking for the reorg in the beginning because we expect that if reorg is detected during @@ -951,31 +950,25 @@ async fn run_node( match reorg_detector.check_consistency().await { Ok(()) => {} Err(reorg_detector::Error::ReorgDetected(last_correct_l1_batch)) => { - tracing::info!("Rolling back to l1 batch number {last_correct_l1_batch}"); - reverter - .rollback_db(last_correct_l1_batch, BlockReverterFlags::all(), None) - .await?; - tracing::info!("Rollback successfully completed"); + tracing::info!("Reverting to l1 batch number {last_correct_l1_batch}"); + reverter.revert(last_correct_l1_batch).await?; + tracing::info!("Revert successfully completed"); } Err(err) => return Err(err).context("reorg_detector.check_consistency()"), } if opt.revert_pending_l1_batch { - tracing::info!("Rolling pending L1 batch back.."); + tracing::info!("Reverting pending L1 batch"); let mut connection = connection_pool.connection().await?; let sealed_l1_batch_number = connection .blocks_dal() .get_sealed_l1_batch_number() .await? - .context( - "Cannot roll back pending L1 batch since there are no L1 batches in Postgres", - )?; + .context("Cannot revert pending L1 batch since there are no L1 batches in Postgres")?; drop(connection); - tracing::info!("Rolling back to l1 batch number {sealed_l1_batch_number}"); - reverter - .rollback_db(sealed_l1_batch_number, BlockReverterFlags::all(), None) - .await?; - tracing::info!("Rollback successfully completed"); + tracing::info!("Reverting to l1 batch number {sealed_l1_batch_number}"); + reverter.revert(sealed_l1_batch_number).await?; + tracing::info!("Revert successfully completed"); } app_health.insert_component(reorg_detector.health_check().clone()); diff --git a/core/lib/zksync_core/Cargo.toml b/core/lib/zksync_core/Cargo.toml index e3e05226a751..97d3a45a38cb 100644 --- a/core/lib/zksync_core/Cargo.toml +++ b/core/lib/zksync_core/Cargo.toml @@ -76,7 +76,6 @@ chrono = { workspace = true, features = ["serde"] } anyhow.workspace = true thiserror.workspace = true async-trait.workspace = true -bitflags.workspace = true thread_local.workspace = true reqwest = { workspace = true, features = ["blocking", "json"] } diff --git a/core/node/block_reverter/Cargo.toml b/core/node/block_reverter/Cargo.toml index dd3a0b55e014..586b6c78cd5e 100644 --- a/core/node/block_reverter/Cargo.toml +++ b/core/node/block_reverter/Cargo.toml @@ -22,7 +22,6 @@ zksync_merkle_tree.workspace = true anyhow.workspace = true tokio = { workspace = true, features = ["time", "fs"] } -bitflags.workspace = true serde.workspace = true tracing.workspace = true diff --git a/core/node/block_reverter/src/lib.rs b/core/node/block_reverter/src/lib.rs index 3e8afb5aa6bd..5bff5508ed10 100644 --- a/core/node/block_reverter/src/lib.rs +++ b/core/node/block_reverter/src/lib.rs @@ -1,7 +1,6 @@ -use std::{path::Path, time::Duration}; +use std::{path::Path, sync::Arc, time::Duration}; use anyhow::Context as _; -use bitflags::bitflags; use serde::Serialize; use tokio::fs; use zksync_config::{ContractsConfig, EthConfig}; @@ -31,14 +30,6 @@ use zksync_types::{ #[cfg(test)] mod tests; -bitflags! { - pub struct BlockReverterFlags: u32 { - const POSTGRES = 0b_0001; - const TREE = 0b_0010; - const SK_CACHE = 0b_0100; - } -} - #[derive(Debug)] pub struct BlockReverterEthConfig { eth_client_url: String, @@ -102,25 +93,24 @@ pub struct BlockReverter { /// It affects the interactions with the consensus state. /// This distinction will be removed once consensus genesis is moved to the L1 state. node_role: NodeRole, - state_keeper_cache_path: String, - merkle_tree_path: String, - connection_pool: ConnectionPool, allow_reverting_executed_batches: bool, + connection_pool: ConnectionPool, + should_revert_postgres: bool, + state_keeper_cache_path: Option, + merkle_tree_path: Option, + snapshots_object_store: Option>, } impl BlockReverter { - pub fn new( - node_role: NodeRole, - state_keeper_cache_path: String, - merkle_tree_path: String, - connection_pool: ConnectionPool, - ) -> Self { + pub fn new(node_role: NodeRole, connection_pool: ConnectionPool) -> Self { Self { node_role, - state_keeper_cache_path, - merkle_tree_path, - connection_pool, allow_reverting_executed_batches: false, + connection_pool, + should_revert_postgres: false, + state_keeper_cache_path: None, + merkle_tree_path: None, + snapshots_object_store: None, } } @@ -129,22 +119,37 @@ impl BlockReverter { /// /// Main use case for the setting this flag is the external node, where may obtain an /// incorrect state even for a block that was marked as executed. On the EN, this mode is not destructive. - pub fn allow_reverting_executed_batches(&mut self) { + pub fn allow_reverting_executed_batches(&mut self) -> &mut Self { self.allow_reverting_executed_batches = true; + self } - /// Rolls back DBs (Postgres + RocksDB) to a previous state. If Postgres is rolled back and `snapshots_object_store` - /// is specified, snapshot files will be deleted as well. - pub async fn rollback_db( - &self, - last_l1_batch_to_keep: L1BatchNumber, - flags: BlockReverterFlags, - snapshots_object_store: Option<&dyn ObjectStore>, - ) -> anyhow::Result<()> { - let rollback_tree = flags.contains(BlockReverterFlags::TREE); - let rollback_postgres = flags.contains(BlockReverterFlags::POSTGRES); - let rollback_sk_cache = flags.contains(BlockReverterFlags::SK_CACHE); + pub fn enable_reverting_postgres(&mut self) -> &mut Self { + self.should_revert_postgres = true; + self + } + + pub fn enable_reverting_merkle_tree(&mut self, path: String) -> &mut Self { + self.merkle_tree_path = Some(path); + self + } + + pub fn enable_reverting_state_keeper_cache(&mut self, path: String) -> &mut Self { + self.state_keeper_cache_path = Some(path); + self + } + + pub fn enable_reverting_snapshot_objects( + &mut self, + object_store: Arc, + ) -> &mut Self { + self.snapshots_object_store = Some(object_store); + self + } + /// Reverts DBs (Postgres + RocksDB) to a previous state. If Postgres is rolled back and `snapshots_object_store` + /// is specified, snapshot files will be deleted as well. + pub async fn revert(&self, last_l1_batch_to_keep: L1BatchNumber) -> anyhow::Result<()> { if !self.allow_reverting_executed_batches { let mut storage = self.connection_pool.connection().await?; let last_executed_l1_batch = storage @@ -157,15 +162,15 @@ impl BlockReverter { ); } - // Tree needs to be reverted first to keep state recoverable - self.rollback_rocks_dbs(last_l1_batch_to_keep, rollback_tree, rollback_sk_cache) - .await?; - let deleted_snapshots = if rollback_postgres { - self.rollback_postgres(last_l1_batch_to_keep).await? + // Tree needs to be reverted first to keep the state recoverable + self.revert_rocksdb_instances(last_l1_batch_to_keep).await?; + let deleted_snapshots = if self.should_revert_postgres { + self.revert_postgres(last_l1_batch_to_keep).await? } else { vec![] }; - if let Some(object_store) = snapshots_object_store { + + if let Some(object_store) = &self.snapshots_object_store { Self::delete_snapshot_files(object_store, &deleted_snapshots).await?; } else if !deleted_snapshots.is_empty() { tracing::info!( @@ -177,13 +182,11 @@ impl BlockReverter { Ok(()) } - async fn rollback_rocks_dbs( + async fn revert_rocksdb_instances( &self, last_l1_batch_to_keep: L1BatchNumber, - rollback_tree: bool, - rollback_sk_cache: bool, ) -> anyhow::Result<()> { - if rollback_tree { + if let Some(merkle_tree_path) = &self.merkle_tree_path { let storage_root_hash = self .connection_pool .connection() @@ -194,7 +197,7 @@ impl BlockReverter { .context("no state root hash for target L1 batch")?; // Rolling back Merkle tree - let merkle_tree_path = Path::new(&self.merkle_tree_path); + let merkle_tree_path = Path::new(merkle_tree_path); let merkle_tree_exists = fs::try_exists(merkle_tree_path).await.with_context(|| { format!( "cannot check whether Merkle tree path `{}` exists", @@ -221,21 +224,19 @@ impl BlockReverter { } } - if rollback_sk_cache { - let sk_cache_exists = fs::try_exists(&self.state_keeper_cache_path) + if let Some(state_keeper_cache_path) = &self.state_keeper_cache_path { + let sk_cache_exists = fs::try_exists(state_keeper_cache_path) .await .with_context(|| { format!( - "cannot check whether state keeper cache path `{}` exists", - self.state_keeper_cache_path + "cannot check whether state keeper cache path `{state_keeper_cache_path}` exists" ) })?; anyhow::ensure!( sk_cache_exists, - "Path with state keeper cache DB doesn't exist at {}", - self.state_keeper_cache_path + "Path with state keeper cache DB doesn't exist at `{state_keeper_cache_path}`" ); - self.rollback_state_keeper_cache(last_l1_batch_to_keep) + self.revert_state_keeper_cache(last_l1_batch_to_keep, state_keeper_cache_path) .await?; } Ok(()) @@ -267,21 +268,19 @@ impl BlockReverter { } /// Reverts blocks in the state keeper cache. - async fn rollback_state_keeper_cache( + async fn revert_state_keeper_cache( &self, last_l1_batch_to_keep: L1BatchNumber, + state_keeper_cache_path: &str, ) -> anyhow::Result<()> { - tracing::info!( - "Opening DB with state keeper cache at `{}`", - self.state_keeper_cache_path - ); - let sk_cache = RocksdbStorage::builder(self.state_keeper_cache_path.as_ref()) + tracing::info!("Opening DB with state keeper cache at `{state_keeper_cache_path}`"); + let sk_cache = RocksdbStorage::builder(state_keeper_cache_path.as_ref()) .await .context("failed initializing state keeper cache")?; if sk_cache.l1_batch_number().await > Some(last_l1_batch_to_keep + 1) { let mut storage = self.connection_pool.connection().await?; - tracing::info!("Rolling back state keeper cache..."); + tracing::info!("Rolling back state keeper cache"); sk_cache .rollback(&mut storage, last_l1_batch_to_keep) .await @@ -294,7 +293,7 @@ impl BlockReverter { /// Reverts data in the Postgres database. /// If `node_role` is `Main` a consensus hard-fork is performed. - async fn rollback_postgres( + async fn revert_postgres( &self, last_l1_batch_to_keep: L1BatchNumber, ) -> anyhow::Result> { diff --git a/core/node/block_reverter/src/tests.rs b/core/node/block_reverter/src/tests.rs index 6ece230b5bb4..04b5fe521056 100644 --- a/core/node/block_reverter/src/tests.rs +++ b/core/node/block_reverter/src/tests.rs @@ -138,14 +138,11 @@ async fn block_reverter_basics(sync_merkle_tree: bool) { .await .unwrap(); - let block_reverter = BlockReverter::new( - NodeRole::External, - sk_cache_path.to_str().unwrap().to_owned(), - merkle_tree_path.to_str().unwrap().to_owned(), - pool.clone(), - ); - block_reverter - .rollback_db(L1BatchNumber(5), BlockReverterFlags::all(), None) + BlockReverter::new(NodeRole::External, pool.clone()) + .enable_reverting_postgres() + .enable_reverting_merkle_tree(merkle_tree_path.to_str().unwrap().to_owned()) + .enable_reverting_state_keeper_cache(sk_cache_path.to_str().unwrap().to_owned()) + .revert(L1BatchNumber(5)) .await .unwrap(); @@ -266,21 +263,12 @@ async fn reverting_snapshot(remove_objects: bool) { .unwrap(); assert_eq!(all_snapshots.snapshots_l1_batch_numbers, [L1BatchNumber(7)]); - let block_reverter = BlockReverter::new( - NodeRole::External, - "unused".to_owned(), // FIXME: don't require these params - "unused".to_owned(), - pool.clone(), - ); - let maybe_object_store = remove_objects.then_some(object_store.as_ref()); - block_reverter - .rollback_db( - L1BatchNumber(5), - BlockReverterFlags::POSTGRES, - maybe_object_store, - ) - .await - .unwrap(); + let mut block_reverter = BlockReverter::new(NodeRole::External, pool.clone()); + block_reverter.enable_reverting_postgres(); + if remove_objects { + block_reverter.enable_reverting_snapshot_objects(object_store.clone()); + } + block_reverter.revert(L1BatchNumber(5)).await.unwrap(); // Check that snapshot has been removed. let all_snapshots = storage From 4409e49ae4f7b26321352d78891d8350d700dc09 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Mon, 22 Apr 2024 14:37:35 +0300 Subject: [PATCH 09/14] Use "revert" term consistently --- core/bin/block_reverter/src/main.rs | 22 +++---- core/lib/dal/src/events_dal.rs | 12 ++-- core/lib/dal/src/factory_deps_dal.rs | 4 +- core/lib/dal/src/storage_logs_dal.rs | 14 ++--- core/lib/dal/src/tokens_dal.rs | 10 +-- core/lib/state/src/rocksdb/mod.rs | 14 ++--- core/lib/state/src/rocksdb/tests.rs | 2 +- core/lib/state/src/test_utils.rs | 2 +- .../src/metadata_calculator/tests.rs | 2 +- core/node/block_reverter/README.md | 2 +- core/node/block_reverter/src/lib.rs | 62 +++++++++---------- 11 files changed, 71 insertions(+), 75 deletions(-) diff --git a/core/bin/block_reverter/src/main.rs b/core/bin/block_reverter/src/main.rs index ec0f2ec5ab58..6ad6657fd427 100644 --- a/core/bin/block_reverter/src/main.rs +++ b/core/bin/block_reverter/src/main.rs @@ -32,15 +32,15 @@ enum Command { /// Sends revert transaction to L1. #[command(name = "send-eth-transaction")] SendEthTransaction { - /// L1 batch number used to rollback to. + /// L1 batch number used to revert to. #[arg(long)] l1_batch_number: u32, - /// Priority fee used for rollback Ethereum transaction. + /// Priority fee used for the reverting Ethereum transaction. // We operate only by priority fee because we want to use base fee from Ethereum // and send transaction as soon as possible without any resend logic #[arg(long)] priority_fee_per_gas: Option, - /// Nonce used for rollback Ethereum transaction. + /// Nonce used for reverting Ethereum transaction. #[arg(long)] nonce: u64, }, @@ -48,19 +48,19 @@ enum Command { /// Reverts internal database state to previous block. #[command(name = "rollback-db")] RollbackDB { - /// L1 batch number used to rollback to. + /// L1 batch number used to revert to. #[arg(long)] l1_batch_number: u32, - /// Flag that specifies if Postgres DB should be rolled back. + /// Flag that specifies if Postgres DB should be reverted. #[arg(long)] rollback_postgres: bool, - /// Flag that specifies if RocksDB with tree should be rolled back. + /// Flag that specifies if RocksDB with tree should be reverted. #[arg(long)] rollback_tree: bool, - /// Flag that specifies if RocksDB with state keeper cache should be rolled back. + /// Flag that specifies if RocksDB with state keeper cache should be reverted. #[arg(long)] rollback_sk_cache: bool, - /// Flag that allows to revert already executed blocks, it's ultra dangerous and required only for fixing external nodes + /// Flag that allows to revert already executed blocks. It's ultra dangerous and required only for fixing external nodes. #[arg(long)] allow_executed_block_reversion: bool, }, @@ -124,7 +124,7 @@ async fn main() -> anyhow::Result<()> { if json { println!("{}", serde_json::to_string(&suggested_values)?); } else { - println!("Suggested values for rollback: {:#?}", suggested_values); + println!("Suggested values for reversion: {:#?}", suggested_values); } } Command::SendEthTransaction { @@ -151,9 +151,9 @@ async fn main() -> anyhow::Result<()> { allow_executed_block_reversion, } => { if !rollback_tree && rollback_postgres { - println!("You want to rollback Postgres DB without rolling back tree."); + println!("You want to revert Postgres DB without reverting back tree."); println!( - "If tree is not yet rolled back to this block then the only way \ + "If tree is not yet reverted back to this block then the only way \ to make it synced with Postgres will be to completely rebuild it." ); println!("Are you sure? Print y/n"); diff --git a/core/lib/dal/src/events_dal.rs b/core/lib/dal/src/events_dal.rs index aa5995795460..7b7459b9b921 100644 --- a/core/lib/dal/src/events_dal.rs +++ b/core/lib/dal/src/events_dal.rs @@ -102,7 +102,7 @@ impl EventsDal<'_, '_> { } /// Removes events with a block number strictly greater than the specified `block_number`. - pub async fn rollback_events(&mut self, block_number: L2BlockNumber) -> DalResult<()> { + pub async fn revert_events(&mut self, block_number: L2BlockNumber) -> DalResult<()> { sqlx::query!( r#" DELETE FROM events @@ -111,7 +111,7 @@ impl EventsDal<'_, '_> { "#, i64::from(block_number.0) ) - .instrument("rollback_events") + .instrument("revert_events") .with_arg("block_number", &block_number) .execute(self.storage) .await?; @@ -182,7 +182,7 @@ impl EventsDal<'_, '_> { } /// Removes all L2-to-L1 logs with a L2 block number strictly greater than the specified `block_number`. - pub async fn rollback_l2_to_l1_logs(&mut self, block_number: L2BlockNumber) -> DalResult<()> { + pub async fn revert_l2_to_l1_logs(&mut self, block_number: L2BlockNumber) -> DalResult<()> { sqlx::query!( r#" DELETE FROM l2_to_l1_logs @@ -191,7 +191,7 @@ impl EventsDal<'_, '_> { "#, i64::from(block_number.0) ) - .instrument("rollback_l2_to_l1_logs") + .instrument("revert_l2_to_l1_logs") .with_arg("block_number", &block_number) .execute(self.storage) .await?; @@ -433,7 +433,7 @@ mod tests { let pool = ConnectionPool::::test_pool().await; let mut conn = pool.connection().await.unwrap(); conn.events_dal() - .rollback_events(L2BlockNumber(0)) + .revert_events(L2BlockNumber(0)) .await .unwrap(); conn.blocks_dal() @@ -514,7 +514,7 @@ mod tests { let pool = ConnectionPool::::test_pool().await; let mut conn = pool.connection().await.unwrap(); conn.events_dal() - .rollback_l2_to_l1_logs(L2BlockNumber(0)) + .revert_l2_to_l1_logs(L2BlockNumber(0)) .await .unwrap(); conn.blocks_dal() diff --git a/core/lib/dal/src/factory_deps_dal.rs b/core/lib/dal/src/factory_deps_dal.rs index ad0c0c7c1312..52f122b1c514 100644 --- a/core/lib/dal/src/factory_deps_dal.rs +++ b/core/lib/dal/src/factory_deps_dal.rs @@ -165,7 +165,7 @@ impl FactoryDepsDal<'_, '_> { } /// Removes all factory deps with a miniblock number strictly greater than the specified `block_number`. - pub async fn rollback_factory_deps(&mut self, block_number: L2BlockNumber) -> DalResult<()> { + pub async fn revert_factory_deps(&mut self, block_number: L2BlockNumber) -> DalResult<()> { sqlx::query!( r#" DELETE FROM factory_deps @@ -174,7 +174,7 @@ impl FactoryDepsDal<'_, '_> { "#, i64::from(block_number.0) ) - .instrument("rollback_factory_deps") + .instrument("revert_factory_deps") .with_arg("block_number", &block_number) .execute(self.storage) .await?; diff --git a/core/lib/dal/src/storage_logs_dal.rs b/core/lib/dal/src/storage_logs_dal.rs index 9f9428fda4aa..cda07a449b64 100644 --- a/core/lib/dal/src/storage_logs_dal.rs +++ b/core/lib/dal/src/storage_logs_dal.rs @@ -260,7 +260,7 @@ impl StorageLogsDal<'_, '_> { } /// Removes all storage logs with a L2 block number strictly greater than the specified `block_number`. - pub async fn rollback_storage_logs(&mut self, block_number: L2BlockNumber) -> DalResult<()> { + pub async fn revert_storage_logs(&mut self, block_number: L2BlockNumber) -> DalResult<()> { sqlx::query!( r#" DELETE FROM storage_logs @@ -269,7 +269,7 @@ impl StorageLogsDal<'_, '_> { "#, i64::from(block_number.0) ) - .instrument("rollback_storage_logs") + .instrument("revert_storage_logs") .with_arg("block_number", &block_number) .execute(self.storage) .await?; @@ -863,14 +863,10 @@ mod tests { assert_eq!(touched_slots[&first_key], H256::repeat_byte(3)); assert_eq!(touched_slots[&second_key], H256::repeat_byte(2)); - test_rollback(&mut conn, first_key, second_key).await; + test_revert(&mut conn, first_key, second_key).await; } - async fn test_rollback( - conn: &mut Connection<'_, Core>, - key: StorageKey, - second_key: StorageKey, - ) { + async fn test_revert(conn: &mut Connection<'_, Core>, key: StorageKey, second_key: StorageKey) { let new_account = AccountTreeId::new(Address::repeat_byte(2)); let new_key = StorageKey::new(new_account, H256::zero()); let log = StorageLog::new_write_log(key, H256::repeat_byte(0xff)); @@ -927,7 +923,7 @@ mod tests { } conn.storage_logs_dal() - .rollback_storage_logs(L2BlockNumber(1)) + .revert_storage_logs(L2BlockNumber(1)) .await .unwrap(); diff --git a/core/lib/dal/src/tokens_dal.rs b/core/lib/dal/src/tokens_dal.rs index 608add2e32eb..d1db87099203 100644 --- a/core/lib/dal/src/tokens_dal.rs +++ b/core/lib/dal/src/tokens_dal.rs @@ -86,7 +86,7 @@ impl TokensDal<'_, '_> { } /// Removes token records that were deployed after `block_number`. - pub async fn rollback_tokens(&mut self, block_number: L2BlockNumber) -> DalResult<()> { + pub async fn revert_tokens(&mut self, block_number: L2BlockNumber) -> DalResult<()> { let all_token_addresses = self.get_all_l2_token_addresses().await?; let token_deployment_data = self .storage @@ -242,7 +242,7 @@ mod tests { storage .tokens_dal() - .rollback_tokens(L2BlockNumber(2)) + .revert_tokens(L2BlockNumber(2)) .await .unwrap(); // Should be a no-op. @@ -257,7 +257,7 @@ mod tests { storage .tokens_dal() - .rollback_tokens(L2BlockNumber(1)) + .revert_tokens(L2BlockNumber(1)) .await .unwrap(); // The custom token should be removed; Ether shouldn't. @@ -330,7 +330,7 @@ mod tests { .await .unwrap(); - // Sanity check: before rollback the token must be present. + // Sanity check: before revert the token must be present. assert_eq!( storage .tokens_dal() @@ -342,7 +342,7 @@ mod tests { storage .tokens_dal() - .rollback_tokens(L2BlockNumber(99)) + .revert_tokens(L2BlockNumber(99)) .await .unwrap(); // Token must be removed despite it's failed deployment being earlier than the last retained miniblock. diff --git a/core/lib/state/src/rocksdb/mod.rs b/core/lib/state/src/rocksdb/mod.rs index f41724f3ea07..60be19d8abc9 100644 --- a/core/lib/state/src/rocksdb/mod.rs +++ b/core/lib/state/src/rocksdb/mod.rs @@ -180,17 +180,17 @@ impl RocksdbStorageBuilder { } } - /// Rolls back the state to a previous L1 batch number. + /// Reverts the state to a previous L1 batch number. /// /// # Errors /// /// Propagates RocksDB and Postgres errors. - pub async fn rollback( + pub async fn revert( mut self, storage: &mut Connection<'_, Core>, last_l1_batch_to_keep: L1BatchNumber, ) -> anyhow::Result<()> { - self.0.rollback(storage, last_l1_batch_to_keep).await + self.0.revert(storage, last_l1_batch_to_keep).await } } @@ -375,20 +375,20 @@ impl RocksdbStorage { self.pending_patch.factory_deps.insert(hash, bytecode); } - async fn rollback( + async fn revert( &mut self, connection: &mut Connection<'_, Core>, last_l1_batch_to_keep: L1BatchNumber, ) -> anyhow::Result<()> { - tracing::info!("Rolling back state keeper storage to L1 batch #{last_l1_batch_to_keep}..."); + tracing::info!("Reverting state keeper storage to L1 batch #{last_l1_batch_to_keep}..."); - tracing::info!("Getting logs that should be applied to rollback state..."); + tracing::info!("Getting logs that should be applied to revert the state..."); let stage_start = Instant::now(); let logs = connection .storage_logs_dal() .get_storage_logs_for_revert(last_l1_batch_to_keep) .await - .context("failed getting logs for rollback")?; + .context("failed getting logs for revert")?; tracing::info!("Got {} logs, took {:?}", logs.len(), stage_start.elapsed()); tracing::info!("Getting number of last miniblock for L1 batch #{last_l1_batch_to_keep}..."); diff --git a/core/lib/state/src/rocksdb/tests.rs b/core/lib/state/src/rocksdb/tests.rs index 86bb2447bfe2..6c28fdfa2313 100644 --- a/core/lib/state/src/rocksdb/tests.rs +++ b/core/lib/state/src/rocksdb/tests.rs @@ -219,7 +219,7 @@ async fn rocksdb_storage_revert() { } } - storage.rollback(&mut conn, L1BatchNumber(1)).await.unwrap(); + storage.revert(&mut conn, L1BatchNumber(1)).await.unwrap(); assert_eq!(storage.l1_batch_number().await, Some(L1BatchNumber(2))); { for log in &inserted_storage_logs { diff --git a/core/lib/state/src/test_utils.rs b/core/lib/state/src/test_utils.rs index a8bb304794a4..ee6d869e35c1 100644 --- a/core/lib/state/src/test_utils.rs +++ b/core/lib/state/src/test_utils.rs @@ -24,7 +24,7 @@ pub(crate) async fn prepare_postgres(conn: &mut Connection<'_, Core>) { } conn.storage_logs_dal() - .rollback_storage_logs(L2BlockNumber(0)) + .revert_storage_logs(L2BlockNumber(0)) .await .unwrap(); conn.blocks_dal() diff --git a/core/lib/zksync_core/src/metadata_calculator/tests.rs b/core/lib/zksync_core/src/metadata_calculator/tests.rs index fad46873433d..79362526d9ec 100644 --- a/core/lib/zksync_core/src/metadata_calculator/tests.rs +++ b/core/lib/zksync_core/src/metadata_calculator/tests.rs @@ -459,7 +459,7 @@ pub(crate) async fn reset_db_state(pool: &ConnectionPool, num_batches: usi // Drops all L1 batches (except the L1 batch with number 0) and their storage logs. storage .storage_logs_dal() - .rollback_storage_logs(L2BlockNumber(0)) + .revert_storage_logs(L2BlockNumber(0)) .await .unwrap(); storage diff --git a/core/node/block_reverter/README.md b/core/node/block_reverter/README.md index bcbe098144e5..51a60fc24c2e 100644 --- a/core/node/block_reverter/README.md +++ b/core/node/block_reverter/README.md @@ -1,3 +1,3 @@ # zkSync Era Block reverter -This crate contains functionality for zkSync state rollback. +This crate contains functionality for reverting state of a zkSync Era node. diff --git a/core/node/block_reverter/src/lib.rs b/core/node/block_reverter/src/lib.rs index 5bff5508ed10..53a2e09291a8 100644 --- a/core/node/block_reverter/src/lib.rs +++ b/core/node/block_reverter/src/lib.rs @@ -74,15 +74,16 @@ pub enum NodeRole { External, } -/// This struct is used to perform a rollback of the state. -/// Rollback is a rare event of manual intervention, when the node operator +/// This struct is used to revert node state. +/// +/// Reversion is a rare event of manual intervention, when the node operator /// decides to revert some of the not yet finalized batches for some reason /// (e.g. inability to generate a proof). /// -/// It is also used to automatically perform a rollback on the external node -/// after it is detected on the main node. +/// It is also used to automatically revert the external node state +/// after it detects a reorg on the main node. /// -/// There are a few state components that we can roll back: +/// There are a few state components that `BlockReverter` can revert: /// /// - State of the Postgres database /// - State of the Merkle tree @@ -90,7 +91,7 @@ pub enum NodeRole { /// - State of the Ethereum contract (if the block was committed) #[derive(Debug)] pub struct BlockReverter { - /// It affects the interactions with the consensus state. + /// Role affects the interactions with the consensus state. /// This distinction will be removed once consensus genesis is moved to the L1 state. node_role: NodeRole, allow_reverting_executed_batches: bool, @@ -147,7 +148,7 @@ impl BlockReverter { self } - /// Reverts DBs (Postgres + RocksDB) to a previous state. If Postgres is rolled back and `snapshots_object_store` + /// Reverts DBs (Postgres + RocksDB) to a previous state. If Postgres is reverted and `snapshots_object_store` /// is specified, snapshot files will be deleted as well. pub async fn revert(&self, last_l1_batch_to_keep: L1BatchNumber) -> anyhow::Result<()> { if !self.allow_reverting_executed_batches { @@ -196,7 +197,6 @@ impl BlockReverter { .await? .context("no state root hash for target L1 batch")?; - // Rolling back Merkle tree let merkle_tree_path = Path::new(merkle_tree_path); let merkle_tree_exists = fs::try_exists(merkle_tree_path).await.with_context(|| { format!( @@ -280,11 +280,11 @@ impl BlockReverter { if sk_cache.l1_batch_number().await > Some(last_l1_batch_to_keep + 1) { let mut storage = self.connection_pool.connection().await?; - tracing::info!("Rolling back state keeper cache"); + tracing::info!("Reverting state keeper cache"); sk_cache - .rollback(&mut storage, last_l1_batch_to_keep) + .revert(&mut storage, last_l1_batch_to_keep) .await - .context("failed rolling back state keeper cache")?; + .context("failed reverting state keeper cache")?; } else { tracing::info!("Nothing to revert in state keeper cache"); } @@ -297,7 +297,7 @@ impl BlockReverter { &self, last_l1_batch_to_keep: L1BatchNumber, ) -> anyhow::Result> { - tracing::info!("Rolling back postgres data"); + tracing::info!("Reverting Postgres data"); let mut storage = self.connection_pool.connection().await?; let mut transaction = storage.start_transaction().await?; @@ -309,30 +309,30 @@ impl BlockReverter { format!("L1 batch #{last_l1_batch_to_keep} doesn't contain L2 blocks") })?; - tracing::info!("Rolling back transactions state"); + tracing::info!("Reverting transactions state"); transaction .transactions_dal() .reset_transactions_state(last_l2_block_to_keep) .await?; - tracing::info!("Rolling back events"); + tracing::info!("Reverting events"); transaction .events_dal() - .rollback_events(last_l2_block_to_keep) + .revert_events(last_l2_block_to_keep) .await?; - tracing::info!("Rolling back L2 to L1 logs"); + tracing::info!("Reverting L2-to-L1 logs"); transaction .events_dal() - .rollback_l2_to_l1_logs(last_l2_block_to_keep) + .revert_l2_to_l1_logs(last_l2_block_to_keep) .await?; - tracing::info!("Rolling back created tokens"); + tracing::info!("Reverting created tokens"); transaction .tokens_dal() - .rollback_tokens(last_l2_block_to_keep) + .revert_tokens(last_l2_block_to_keep) .await?; - tracing::info!("Rolling back factory deps"); + tracing::info!("Revering factory deps"); transaction .factory_deps_dal() - .rollback_factory_deps(last_l2_block_to_keep) + .revert_factory_deps(last_l2_block_to_keep) .await?; tracing::info!("Rolling back storage"); #[allow(deprecated)] @@ -340,25 +340,25 @@ impl BlockReverter { .storage_logs_dal() .rollback_storage(last_l2_block_to_keep) .await?; - tracing::info!("Rolling back storage logs"); + tracing::info!("Reverting storage logs"); transaction .storage_logs_dal() - .rollback_storage_logs(last_l2_block_to_keep) + .revert_storage_logs(last_l2_block_to_keep) .await?; - tracing::info!("Rolling back Ethereum transactions"); + tracing::info!("Reverting Ethereum transactions"); transaction .eth_sender_dal() .delete_eth_txs(last_l1_batch_to_keep) .await?; - tracing::info!("Rolling back snapshots"); + tracing::info!("Reverting snapshots"); let deleted_snapshots = transaction .snapshots_dal() .delete_snapshots_after(last_l1_batch_to_keep) .await?; // Remove data from main tables (L2 blocks and L1 batches). - tracing::info!("Rolling back L1 batches"); + tracing::info!("Reverting L1 batches"); transaction .blocks_dal() .delete_l1_batches(last_l1_batch_to_keep) @@ -367,7 +367,7 @@ impl BlockReverter { .blocks_dal() .delete_initial_writes(last_l1_batch_to_keep) .await?; - tracing::info!("Rolling back L2 blocks..."); + tracing::info!("Reverting L2 blocks"); transaction .blocks_dal() .delete_l2_blocks(last_l2_block_to_keep) @@ -565,11 +565,11 @@ impl BlockReverter { Ok(L1BatchNumber(block_number.as_u32())) } - /// Returns suggested values for rollback. + /// Returns suggested values for a reversion. pub async fn suggested_values( &self, eth_config: &BlockReverterEthConfig, - ) -> anyhow::Result { + ) -> anyhow::Result { let web3 = Web3::new(Http::new(ð_config.eth_client_url).context("cannot create L1 client")?); let contract_address = eth_config.diamond_proxy_addr; @@ -603,7 +603,7 @@ impl BlockReverter { .with_context(|| format!("failed getting transaction count for {reverter_address:?}"))? .as_u64(); - Ok(SuggestedRollbackValues { + Ok(SuggestedRevertValues { last_executed_l1_batch_number, nonce, priority_fee, @@ -624,7 +624,7 @@ impl BlockReverter { } #[derive(Debug, Serialize)] -pub struct SuggestedRollbackValues { +pub struct SuggestedRevertValues { pub last_executed_l1_batch_number: L1BatchNumber, pub nonce: u64, pub priority_fee: u64, From cc582844eb8d556b98e78b5d579106097c905760 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Mon, 22 Apr 2024 14:47:57 +0300 Subject: [PATCH 10/14] Remove obsolete `rollback_storage()` method --- core/lib/dal/src/storage_logs_dal.rs | 58 ---------------------------- core/node/block_reverter/src/lib.rs | 6 --- 2 files changed, 64 deletions(-) diff --git a/core/lib/dal/src/storage_logs_dal.rs b/core/lib/dal/src/storage_logs_dal.rs index 09a98fdfd528..8687c0cbec78 100644 --- a/core/lib/dal/src/storage_logs_dal.rs +++ b/core/lib/dal/src/storage_logs_dal.rs @@ -142,64 +142,6 @@ impl StorageLogsDal<'_, '_> { .await } - /// Rolls back storage to the specified point in time. - #[deprecated(note = "`storage` table is soft-removed")] - pub async fn rollback_storage( - &mut self, - last_l2_block_to_keep: L2BlockNumber, - ) -> DalResult<()> { - let stage_start = Instant::now(); - let modified_keys = self - .modified_keys_in_l2_blocks(last_l2_block_to_keep.next()..=L2BlockNumber(u32::MAX)) - .await?; - tracing::info!( - "Loaded {} keys changed after L2 block #{last_l2_block_to_keep} in {:?}", - modified_keys.len(), - stage_start.elapsed() - ); - - let stage_start = Instant::now(); - let prev_values = self - .get_storage_values(&modified_keys, last_l2_block_to_keep) - .await?; - tracing::info!( - "Loaded previous storage values for modified keys in {:?}", - stage_start.elapsed() - ); - - let stage_start = Instant::now(); - let mut keys_to_delete = vec![]; - let mut keys_to_update = vec![]; - let mut values_to_update = vec![]; - for (key, maybe_value) in &prev_values { - if let Some(prev_value) = maybe_value { - keys_to_update.push(key.as_bytes()); - values_to_update.push(prev_value.as_bytes()); - } else { - keys_to_delete.push(key.as_bytes()); - } - } - tracing::info!( - "Created revert plan (keys to update: {}, to delete: {}) in {:?}", - keys_to_update.len(), - keys_to_delete.len(), - stage_start.elapsed() - ); - - tracing::info!( - "Removed {} keys in {:?}", - keys_to_delete.len(), - stage_start.elapsed() - ); - - tracing::info!( - "Updated {} keys to previous values in {:?}", - keys_to_update.len(), - stage_start.elapsed() - ); - Ok(()) - } - /// Returns distinct hashed storage keys that were modified in the specified L2 block range. pub async fn modified_keys_in_l2_blocks( &mut self, diff --git a/core/node/block_reverter/src/lib.rs b/core/node/block_reverter/src/lib.rs index 53a2e09291a8..f2850b02a588 100644 --- a/core/node/block_reverter/src/lib.rs +++ b/core/node/block_reverter/src/lib.rs @@ -334,12 +334,6 @@ impl BlockReverter { .factory_deps_dal() .revert_factory_deps(last_l2_block_to_keep) .await?; - tracing::info!("Rolling back storage"); - #[allow(deprecated)] - transaction - .storage_logs_dal() - .rollback_storage(last_l2_block_to_keep) - .await?; tracing::info!("Reverting storage logs"); transaction .storage_logs_dal() From 7595dfef7c1b9f1d4880e6d589565086de61fa5d Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Tue, 23 Apr 2024 22:07:16 +0300 Subject: [PATCH 11/14] Brush up new logic --- core/bin/block_reverter/src/main.rs | 32 +++++++++++++--------- core/node/block_reverter/src/lib.rs | 42 ++++++++++++----------------- 2 files changed, 37 insertions(+), 37 deletions(-) diff --git a/core/bin/block_reverter/src/main.rs b/core/bin/block_reverter/src/main.rs index 6ad6657fd427..29fc2078fa57 100644 --- a/core/bin/block_reverter/src/main.rs +++ b/core/bin/block_reverter/src/main.rs @@ -1,9 +1,12 @@ +use std::env; + use anyhow::Context as _; use clap::{Parser, Subcommand}; use tokio::io::{self, AsyncReadExt}; use zksync_block_reverter::{BlockReverter, BlockReverterEthConfig, NodeRole}; use zksync_config::{ - configs::ObservabilityConfig, ContractsConfig, DBConfig, EthConfig, PostgresConfig, + configs::{chain::NetworkConfig, ObservabilityConfig}, + ContractsConfig, DBConfig, EthConfig, PostgresConfig, }; use zksync_dal::{ConnectionPool, Core}; use zksync_env_config::{object_store::SnapshotsObjectStoreConfig, FromEnv}; @@ -98,16 +101,16 @@ async fn main() -> anyhow::Result<()> { .default_priority_fee_per_gas, ); let contracts = ContractsConfig::from_env().context("ContractsConfig::from_env()")?; + // FIXME: is it correct to parse the entire `NetworkConfig`? + let network = NetworkConfig::from_env().context("NetworkConfig::from_env()")?; let postgres_config = PostgresConfig::from_env().context("PostgresConfig::from_env()")?; - let operator_address = if let Command::Display { - operator_address, .. - } = &command - { - Some(*operator_address) - } else { - None - }; - let config = BlockReverterEthConfig::new(eth_sender, contracts, operator_address)?; + let era_chain_id = env::var("CONTRACTS_ERA_CHAIN_ID") + .context("`CONTRACTS_ERA_CHAIN_ID` env variable is not set")? + .parse() + .map_err(|err| { + anyhow::anyhow!("failed parsing `CONTRACTS_ERA_CHAIN_ID` env variable: {err}") + })?; + let config = BlockReverterEthConfig::new(eth_sender, &contracts, &network, era_chain_id)?; let connection_pool = ConnectionPool::::builder( postgres_config.master_url()?, @@ -119,8 +122,13 @@ async fn main() -> anyhow::Result<()> { let mut block_reverter = BlockReverter::new(NodeRole::Main, connection_pool); match command { - Command::Display { json, .. } => { - let suggested_values = block_reverter.suggested_values(&config).await?; + Command::Display { + json, + operator_address, + } => { + let suggested_values = block_reverter + .suggested_values(&config, operator_address) + .await?; if json { println!("{}", serde_json::to_string(&suggested_values)?); } else { diff --git a/core/node/block_reverter/src/lib.rs b/core/node/block_reverter/src/lib.rs index 300837fab5af..934b494d1083 100644 --- a/core/node/block_reverter/src/lib.rs +++ b/core/node/block_reverter/src/lib.rs @@ -3,7 +3,7 @@ use std::{path::Path, sync::Arc, time::Duration}; use anyhow::Context as _; use serde::Serialize; use tokio::fs; -use zksync_config::{ContractsConfig, EthConfig}; +use zksync_config::{configs::chain::NetworkConfig, ContractsConfig, EthConfig}; use zksync_contracts::hyperchain_contract; use zksync_dal::{ConnectionPool, Core, CoreDal}; use zksync_eth_signer::{EthereumSigner, PrivateKeySigner, TransactionParameters}; @@ -24,7 +24,7 @@ use zksync_types::{ types::{BlockId, BlockNumber}, Web3, }, - Address, L1BatchNumber, H160, H256, U256, + Address, L1BatchNumber, L2ChainId, H160, H256, U256, }; #[cfg(test)] @@ -34,17 +34,19 @@ mod tests; pub struct BlockReverterEthConfig { eth_client_url: String, reverter_private_key: Option, - reverter_address: Option
, diamond_proxy_addr: H160, validator_timelock_addr: H160, default_priority_fee_per_gas: u64, + hyperchain_id: L2ChainId, + era_chain_id: L2ChainId, } impl BlockReverterEthConfig { pub fn new( eth_config: EthConfig, - contract: ContractsConfig, - reverter_address: Option
, + contract: &ContractsConfig, + network_config: &NetworkConfig, + era_chain_id: L2ChainId, ) -> anyhow::Result { #[allow(deprecated)] // `BlockReverter` doesn't support non env configs yet @@ -56,13 +58,14 @@ impl BlockReverterEthConfig { Ok(Self { eth_client_url: eth_config.web3_url, reverter_private_key: pk, - reverter_address, diamond_proxy_addr: contract.diamond_proxy_addr, validator_timelock_addr: contract.validator_timelock_addr, default_priority_fee_per_gas: eth_config .gas_adjuster .context("gas adjuster")? .default_priority_fee_per_gas, + hyperchain_id: network_config.zksync_network_id, + era_chain_id, }) } } @@ -459,36 +462,27 @@ impl BlockReverter { .await .context("failed getting L1 chain ID")? .as_u64(); - // FIXME: brush up - let hyperchain_id = std::env::var("CHAIN_ETH_ZKSYNC_NETWORK_ID") - .ok() - .and_then(|val| val.parse::().ok()) - .expect("`CHAIN_ETH_ZKSYNC_NETWORK_ID` has to be set in config"); - let era_chain_id = std::env::var("CONTRACTS_ERA_CHAIN_ID") - .ok() - .and_then(|val| val.parse::().ok()) - .expect("`CONTRACTS_ERA_CHAIN_ID` has to be set in config"); // It is expected that for all new chains `revertBatchesSharedBridge` can be used. - // For Era we are using `revertBatches` function for backwards compatibility in case the migration + // For Era, we are using `revertBatches` function for backwards compatibility in case the migration // to the shared bridge is not yet complete. - let data = if hyperchain_id == era_chain_id { + let data = if eth_config.hyperchain_id == eth_config.era_chain_id { let revert_function = contract .function("revertBatches") - .expect("`revertBatches` function must be present in contract"); + .context("`revertBatches` function must be present in contract")?; revert_function .encode_input(&[Token::Uint(last_l1_batch_to_keep.0.into())]) - .unwrap() + .context("failed encoding `revertBatches` input")? } else { let revert_function = contract .function("revertBatchesSharedBridge") - .expect("`revertBatchesSharedBridge` function must be present in contract"); + .context("`revertBatchesSharedBridge` function must be present in contract")?; revert_function .encode_input(&[ - Token::Uint(hyperchain_id.into()), + Token::Uint(eth_config.hyperchain_id.as_u64().into()), Token::Uint(last_l1_batch_to_keep.0.into()), ]) - .unwrap() + .context("failed encoding `revertBatchesSharedBridge` input")? }; let base_fee = web3 @@ -584,6 +578,7 @@ impl BlockReverter { pub async fn suggested_values( &self, eth_config: &BlockReverterEthConfig, + reverter_address: Address, ) -> anyhow::Result { let web3 = Web3::new(Http::new(ð_config.eth_client_url).context("cannot create L1 client")?); @@ -608,9 +603,6 @@ impl BlockReverter { ); let priority_fee = eth_config.default_priority_fee_per_gas; - let reverter_address = eth_config - .reverter_address - .context("need to provide operator address to suggest reversion values")?; let nonce = web3 .eth() .transaction_count(reverter_address, Some(BlockNumber::Pending)) From fbd658652704d5bfba48e35873d5002a9280f9f4 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Tue, 23 Apr 2024 22:46:09 +0300 Subject: [PATCH 12/14] Use "roll back" where appropriate --- core/bin/block_reverter/src/main.rs | 31 ++--- core/bin/external_node/src/main.rs | 12 +- core/lib/dal/src/events_dal.rs | 12 +- core/lib/dal/src/factory_deps_dal.rs | 4 +- core/lib/dal/src/storage_logs_dal.rs | 6 +- core/lib/dal/src/tokens_dal.rs | 10 +- core/lib/merkle_tree/src/domain.rs | 4 +- .../merkle_tree/tests/integration/domain.rs | 6 +- core/lib/state/src/rocksdb/mod.rs | 2 +- core/lib/state/src/test_utils.rs | 2 +- .../src/metadata_calculator/helpers.rs | 2 +- .../src/metadata_calculator/tests.rs | 2 +- core/node/block_reverter/README.md | 3 +- core/node/block_reverter/src/lib.rs | 125 +++++++++--------- core/node/block_reverter/src/tests.rs | 14 +- 15 files changed, 122 insertions(+), 113 deletions(-) diff --git a/core/bin/block_reverter/src/main.rs b/core/bin/block_reverter/src/main.rs index 29fc2078fa57..be7e0d751c47 100644 --- a/core/bin/block_reverter/src/main.rs +++ b/core/bin/block_reverter/src/main.rs @@ -48,22 +48,22 @@ enum Command { nonce: u64, }, - /// Reverts internal database state to previous block. + /// Rolls back internal database state to a previous L1 batch. #[command(name = "rollback-db")] RollbackDB { - /// L1 batch number used to revert to. + /// L1 batch number used to roll back to. #[arg(long)] l1_batch_number: u32, - /// Flag that specifies if Postgres DB should be reverted. + /// Flag that specifies if Postgres DB should be rolled back. #[arg(long)] rollback_postgres: bool, - /// Flag that specifies if RocksDB with tree should be reverted. + /// Flag that specifies if RocksDB with tree should be rolled back. #[arg(long)] rollback_tree: bool, - /// Flag that specifies if RocksDB with state keeper cache should be reverted. + /// Flag that specifies if RocksDB with state keeper cache should be rolled back. #[arg(long)] rollback_sk_cache: bool, - /// Flag that allows to revert already executed blocks. It's ultra dangerous and required only for fixing external nodes. + /// Flag that allows to roll back already executed blocks. It's ultra dangerous and required only for fixing external nodes. #[arg(long)] allow_executed_block_reversion: bool, }, @@ -159,9 +159,9 @@ async fn main() -> anyhow::Result<()> { allow_executed_block_reversion, } => { if !rollback_tree && rollback_postgres { - println!("You want to revert Postgres DB without reverting back tree."); + println!("You want to roll back Postgres DB without rolling back tree."); println!( - "If tree is not yet reverted back to this block then the only way \ + "If the tree is not yet rolled back to this L1 batch, then the only way \ to make it synced with Postgres will be to completely rebuild it." ); println!("Are you sure? Print y/n"); @@ -174,7 +174,7 @@ async fn main() -> anyhow::Result<()> { } if allow_executed_block_reversion { - println!("You want to revert already executed blocks. It's impossible to restore them for the main node"); + println!("You want to roll back already executed blocks. It's impossible to restore them for the main node"); println!("Make sure you are doing it ONLY for external node"); println!("Are you sure? Print y/n"); @@ -183,28 +183,29 @@ async fn main() -> anyhow::Result<()> { if input[0] != b'y' && input[0] != b'Y' { std::process::exit(0); } - block_reverter.allow_reverting_executed_batches(); + block_reverter.allow_rolling_back_executed_batches(); } if rollback_postgres { - block_reverter.enable_reverting_postgres(); + block_reverter.enable_rolling_back_postgres(); let object_store_config = SnapshotsObjectStoreConfig::from_env() .context("SnapshotsObjectStoreConfig::from_env()")?; - block_reverter.enable_reverting_snapshot_objects( + block_reverter.enable_rolling_back_snapshot_objects( ObjectStoreFactory::new(object_store_config.0) .create_store() .await, ); } if rollback_tree { - block_reverter.enable_reverting_merkle_tree(db_config.merkle_tree.path); + block_reverter.enable_rolling_back_merkle_tree(db_config.merkle_tree.path); } if rollback_sk_cache { - block_reverter.enable_reverting_state_keeper_cache(db_config.state_keeper_db_path); + block_reverter + .enable_rolling_back_state_keeper_cache(db_config.state_keeper_db_path); } block_reverter - .revert(L1BatchNumber(l1_batch_number)) + .roll_back(L1BatchNumber(l1_batch_number)) .await?; } Command::ClearFailedL1Transactions => { diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index 2a8c968664ac..6af3c731253c 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -940,10 +940,10 @@ async fn run_node( let mut reverter = BlockReverter::new(NodeRole::External, connection_pool.clone()); // Reverting executed batches is more-or-less safe for external nodes. let reverter = reverter - .allow_reverting_executed_batches() - .enable_reverting_postgres() - .enable_reverting_merkle_tree(config.required.merkle_tree_path.clone()) - .enable_reverting_state_keeper_cache(config.required.state_cache_path.clone()); + .allow_rolling_back_executed_batches() + .enable_rolling_back_postgres() + .enable_rolling_back_merkle_tree(config.required.merkle_tree_path.clone()) + .enable_rolling_back_state_keeper_cache(config.required.state_cache_path.clone()); let mut reorg_detector = ReorgDetector::new(main_node_client.clone(), connection_pool.clone()); // We're checking for the reorg in the beginning because we expect that if reorg is detected during @@ -954,7 +954,7 @@ async fn run_node( Ok(()) => {} Err(reorg_detector::Error::ReorgDetected(last_correct_l1_batch)) => { tracing::info!("Reverting to l1 batch number {last_correct_l1_batch}"); - reverter.revert(last_correct_l1_batch).await?; + reverter.roll_back(last_correct_l1_batch).await?; tracing::info!("Revert successfully completed"); } Err(err) => return Err(err).context("reorg_detector.check_consistency()"), @@ -970,7 +970,7 @@ async fn run_node( drop(connection); tracing::info!("Reverting to l1 batch number {sealed_l1_batch_number}"); - reverter.revert(sealed_l1_batch_number).await?; + reverter.roll_back(sealed_l1_batch_number).await?; tracing::info!("Revert successfully completed"); } diff --git a/core/lib/dal/src/events_dal.rs b/core/lib/dal/src/events_dal.rs index a6e285989e18..3a6b86afee97 100644 --- a/core/lib/dal/src/events_dal.rs +++ b/core/lib/dal/src/events_dal.rs @@ -102,7 +102,7 @@ impl EventsDal<'_, '_> { } /// Removes events with a block number strictly greater than the specified `block_number`. - pub async fn revert_events(&mut self, block_number: L2BlockNumber) -> DalResult<()> { + pub async fn roll_back_events(&mut self, block_number: L2BlockNumber) -> DalResult<()> { sqlx::query!( r#" DELETE FROM events @@ -111,7 +111,7 @@ impl EventsDal<'_, '_> { "#, i64::from(block_number.0) ) - .instrument("revert_events") + .instrument("roll_back_events") .with_arg("block_number", &block_number) .execute(self.storage) .await?; @@ -182,7 +182,7 @@ impl EventsDal<'_, '_> { } /// Removes all L2-to-L1 logs with a L2 block number strictly greater than the specified `block_number`. - pub async fn revert_l2_to_l1_logs(&mut self, block_number: L2BlockNumber) -> DalResult<()> { + pub async fn roll_back_l2_to_l1_logs(&mut self, block_number: L2BlockNumber) -> DalResult<()> { sqlx::query!( r#" DELETE FROM l2_to_l1_logs @@ -191,7 +191,7 @@ impl EventsDal<'_, '_> { "#, i64::from(block_number.0) ) - .instrument("revert_l2_to_l1_logs") + .instrument("roll_back_l2_to_l1_logs") .with_arg("block_number", &block_number) .execute(self.storage) .await?; @@ -433,7 +433,7 @@ mod tests { let pool = ConnectionPool::::test_pool().await; let mut conn = pool.connection().await.unwrap(); conn.events_dal() - .revert_events(L2BlockNumber(0)) + .roll_back_events(L2BlockNumber(0)) .await .unwrap(); conn.blocks_dal() @@ -514,7 +514,7 @@ mod tests { let pool = ConnectionPool::::test_pool().await; let mut conn = pool.connection().await.unwrap(); conn.events_dal() - .revert_l2_to_l1_logs(L2BlockNumber(0)) + .roll_back_l2_to_l1_logs(L2BlockNumber(0)) .await .unwrap(); conn.blocks_dal() diff --git a/core/lib/dal/src/factory_deps_dal.rs b/core/lib/dal/src/factory_deps_dal.rs index 89f0333ca496..d846a233a3bf 100644 --- a/core/lib/dal/src/factory_deps_dal.rs +++ b/core/lib/dal/src/factory_deps_dal.rs @@ -165,7 +165,7 @@ impl FactoryDepsDal<'_, '_> { } /// Removes all factory deps with a miniblock number strictly greater than the specified `block_number`. - pub async fn revert_factory_deps(&mut self, block_number: L2BlockNumber) -> DalResult<()> { + pub async fn roll_back_factory_deps(&mut self, block_number: L2BlockNumber) -> DalResult<()> { sqlx::query!( r#" DELETE FROM factory_deps @@ -174,7 +174,7 @@ impl FactoryDepsDal<'_, '_> { "#, i64::from(block_number.0) ) - .instrument("revert_factory_deps") + .instrument("roll_back_factory_deps") .with_arg("block_number", &block_number) .execute(self.storage) .await?; diff --git a/core/lib/dal/src/storage_logs_dal.rs b/core/lib/dal/src/storage_logs_dal.rs index 8687c0cbec78..9a483ce2b44b 100644 --- a/core/lib/dal/src/storage_logs_dal.rs +++ b/core/lib/dal/src/storage_logs_dal.rs @@ -171,7 +171,7 @@ impl StorageLogsDal<'_, '_> { } /// Removes all storage logs with a L2 block number strictly greater than the specified `block_number`. - pub async fn revert_storage_logs(&mut self, block_number: L2BlockNumber) -> DalResult<()> { + pub async fn roll_back_storage_logs(&mut self, block_number: L2BlockNumber) -> DalResult<()> { sqlx::query!( r#" DELETE FROM storage_logs @@ -180,7 +180,7 @@ impl StorageLogsDal<'_, '_> { "#, i64::from(block_number.0) ) - .instrument("revert_storage_logs") + .instrument("roll_back_storage_logs") .with_arg("block_number", &block_number) .execute(self.storage) .await?; @@ -805,7 +805,7 @@ mod tests { assert_eq!(prev_values[&prev_keys[2]], None); conn.storage_logs_dal() - .revert_storage_logs(L2BlockNumber(1)) + .roll_back_storage_logs(L2BlockNumber(1)) .await .unwrap(); diff --git a/core/lib/dal/src/tokens_dal.rs b/core/lib/dal/src/tokens_dal.rs index 6c4349513dd3..9a892c20bb58 100644 --- a/core/lib/dal/src/tokens_dal.rs +++ b/core/lib/dal/src/tokens_dal.rs @@ -86,7 +86,7 @@ impl TokensDal<'_, '_> { } /// Removes token records that were deployed after `block_number`. - pub async fn revert_tokens(&mut self, block_number: L2BlockNumber) -> DalResult<()> { + pub async fn roll_back_tokens(&mut self, block_number: L2BlockNumber) -> DalResult<()> { let all_token_addresses = self.get_all_l2_token_addresses().await?; let token_deployment_data = self .storage @@ -105,7 +105,7 @@ impl TokensDal<'_, '_> { "#, &token_addresses_to_be_removed as &[_] ) - .instrument("rollback_tokens") + .instrument("roll_back_tokens") .with_arg("block_number", &block_number) .with_arg( "token_addresses_to_be_removed.len", @@ -242,7 +242,7 @@ mod tests { storage .tokens_dal() - .revert_tokens(L2BlockNumber(2)) + .roll_back_tokens(L2BlockNumber(2)) .await .unwrap(); // Should be a no-op. @@ -257,7 +257,7 @@ mod tests { storage .tokens_dal() - .revert_tokens(L2BlockNumber(1)) + .roll_back_tokens(L2BlockNumber(1)) .await .unwrap(); // The custom token should be removed; Ether shouldn't. @@ -342,7 +342,7 @@ mod tests { storage .tokens_dal() - .revert_tokens(L2BlockNumber(99)) + .roll_back_tokens(L2BlockNumber(99)) .await .unwrap(); // Token must be removed despite it's failed deployment being earlier than the last retained miniblock. diff --git a/core/lib/merkle_tree/src/domain.rs b/core/lib/merkle_tree/src/domain.rs index e8a634057785..0eeaaddaa7ef 100644 --- a/core/lib/merkle_tree/src/domain.rs +++ b/core/lib/merkle_tree/src/domain.rs @@ -311,10 +311,10 @@ impl ZkSyncTree { kvs.collect() } - /// Reverts the tree to a previous state. + /// Rolls back this tree to a previous state. /// /// This method will overwrite all unsaved changes in the tree. - pub fn revert_logs(&mut self, last_l1_batch_to_keep: L1BatchNumber) { + pub fn roll_back_logs(&mut self, last_l1_batch_to_keep: L1BatchNumber) { self.tree.db.reset(); let retained_version_count = u64::from(last_l1_batch_to_keep.0 + 1); self.tree.truncate_recent_versions(retained_version_count); diff --git a/core/lib/merkle_tree/tests/integration/domain.rs b/core/lib/merkle_tree/tests/integration/domain.rs index adc96694f1a5..50a4a74afdcb 100644 --- a/core/lib/merkle_tree/tests/integration/domain.rs +++ b/core/lib/merkle_tree/tests/integration/domain.rs @@ -230,7 +230,7 @@ fn revert_blocks() { { let mut tree = ZkSyncTree::new_lightweight(storage.into()); assert_eq!(tree.root_hash(), tree_metadata.last().unwrap().root_hash); - tree.revert_logs(L1BatchNumber(3)); + tree.roll_back_logs(L1BatchNumber(3)); assert_eq!(tree.root_hash(), tree_metadata[3].root_hash); tree.save(); } @@ -239,7 +239,7 @@ fn revert_blocks() { let storage = RocksDB::new(temp_dir.as_ref()).unwrap(); { let mut tree = ZkSyncTree::new_lightweight(storage.into()); - tree.revert_logs(L1BatchNumber(1)); + tree.roll_back_logs(L1BatchNumber(1)); assert_eq!(tree.root_hash(), tree_metadata[1].root_hash); tree.save(); } @@ -248,7 +248,7 @@ fn revert_blocks() { let storage = RocksDB::new(temp_dir.as_ref()).unwrap(); { let mut tree = ZkSyncTree::new_lightweight(storage.into()); - tree.revert_logs(L1BatchNumber(1)); + tree.roll_back_logs(L1BatchNumber(1)); assert_eq!(tree.root_hash(), tree_metadata[1].root_hash); tree.save(); } diff --git a/core/lib/state/src/rocksdb/mod.rs b/core/lib/state/src/rocksdb/mod.rs index 8ffb1707eea5..83dab6a8dc94 100644 --- a/core/lib/state/src/rocksdb/mod.rs +++ b/core/lib/state/src/rocksdb/mod.rs @@ -220,7 +220,7 @@ impl RocksdbStorageBuilder { /// # Errors /// /// Propagates RocksDB and Postgres errors. - pub async fn revert( + pub async fn roll_back( mut self, storage: &mut Connection<'_, Core>, last_l1_batch_to_keep: L1BatchNumber, diff --git a/core/lib/state/src/test_utils.rs b/core/lib/state/src/test_utils.rs index 475782fb7668..8a0b56588f30 100644 --- a/core/lib/state/src/test_utils.rs +++ b/core/lib/state/src/test_utils.rs @@ -24,7 +24,7 @@ pub(crate) async fn prepare_postgres(conn: &mut Connection<'_, Core>) { } conn.storage_logs_dal() - .revert_storage_logs(L2BlockNumber(0)) + .roll_back_storage_logs(L2BlockNumber(0)) .await .unwrap(); conn.blocks_dal() diff --git a/core/lib/zksync_core/src/metadata_calculator/helpers.rs b/core/lib/zksync_core/src/metadata_calculator/helpers.rs index 49f51a95116a..e46d46f69f3c 100644 --- a/core/lib/zksync_core/src/metadata_calculator/helpers.rs +++ b/core/lib/zksync_core/src/metadata_calculator/helpers.rs @@ -218,7 +218,7 @@ impl AsyncTree { } pub fn revert_logs(&mut self, last_l1_batch_to_keep: L1BatchNumber) { - self.as_mut().revert_logs(last_l1_batch_to_keep); + self.as_mut().roll_back_logs(last_l1_batch_to_keep); } } diff --git a/core/lib/zksync_core/src/metadata_calculator/tests.rs b/core/lib/zksync_core/src/metadata_calculator/tests.rs index 79362526d9ec..549996710d29 100644 --- a/core/lib/zksync_core/src/metadata_calculator/tests.rs +++ b/core/lib/zksync_core/src/metadata_calculator/tests.rs @@ -459,7 +459,7 @@ pub(crate) async fn reset_db_state(pool: &ConnectionPool, num_batches: usi // Drops all L1 batches (except the L1 batch with number 0) and their storage logs. storage .storage_logs_dal() - .revert_storage_logs(L2BlockNumber(0)) + .roll_back_storage_logs(L2BlockNumber(0)) .await .unwrap(); storage diff --git a/core/node/block_reverter/README.md b/core/node/block_reverter/README.md index 51a60fc24c2e..9d82fb0d189d 100644 --- a/core/node/block_reverter/README.md +++ b/core/node/block_reverter/README.md @@ -1,3 +1,4 @@ # zkSync Era Block reverter -This crate contains functionality for reverting state of a zkSync Era node. +This crate contains functionality for rolling back state of a zkSync Era node and reverting committed L1 batches on +Ethereum. diff --git a/core/node/block_reverter/src/lib.rs b/core/node/block_reverter/src/lib.rs index 934b494d1083..f9eeb1adfb5d 100644 --- a/core/node/block_reverter/src/lib.rs +++ b/core/node/block_reverter/src/lib.rs @@ -77,29 +77,33 @@ pub enum NodeRole { External, } -/// This struct is used to revert node state. +/// This struct is used to roll back node state and revert batches committed (but generally not finalized) on L1. /// /// Reversion is a rare event of manual intervention, when the node operator /// decides to revert some of the not yet finalized batches for some reason /// (e.g. inability to generate a proof). /// -/// It is also used to automatically revert the external node state -/// after it detects a reorg on the main node. +/// It is also used to automatically roll back the external node state +/// after it detects a reorg on the main node. Notice the difference in terminology; from the network +/// perspective, *reversion* leaves a trace (L1 transactions etc.), while from the perspective of the node state, a *rollback* +/// leaves the node in the same state as if the rolled back batches were never sealed in the first place. /// -/// There are a few state components that `BlockReverter` can revert: +/// `BlockReverter` can roll back the following pieces of node state: /// /// - State of the Postgres database /// - State of the Merkle tree /// - State of the state keeper cache -/// - State of the Ethereum contract (if the block was committed) +/// - Object store for protocol snapshots +/// +/// In addition, it can revert the state of the Ethereum contract (if the reverted L1 batches were committed). #[derive(Debug)] pub struct BlockReverter { /// Role affects the interactions with the consensus state. /// This distinction will be removed once consensus genesis is moved to the L1 state. node_role: NodeRole, - allow_reverting_executed_batches: bool, + allow_rolling_back_executed_batches: bool, connection_pool: ConnectionPool, - should_revert_postgres: bool, + should_roll_back_postgres: bool, state_keeper_cache_path: Option, merkle_tree_path: Option, snapshots_object_store: Option>, @@ -109,41 +113,41 @@ impl BlockReverter { pub fn new(node_role: NodeRole, connection_pool: ConnectionPool) -> Self { Self { node_role, - allow_reverting_executed_batches: false, + allow_rolling_back_executed_batches: false, connection_pool, - should_revert_postgres: false, + should_roll_back_postgres: false, state_keeper_cache_path: None, merkle_tree_path: None, snapshots_object_store: None, } } - /// Allows reverting the state past the last batch finalized on L1. If this is disallowed (which is the default), + /// Allows rolling back the state past the last batch finalized on L1. If this is disallowed (which is the default), /// block reverter will error upon such an attempt. /// /// Main use case for the setting this flag is the external node, where may obtain an /// incorrect state even for a block that was marked as executed. On the EN, this mode is not destructive. - pub fn allow_reverting_executed_batches(&mut self) -> &mut Self { - self.allow_reverting_executed_batches = true; + pub fn allow_rolling_back_executed_batches(&mut self) -> &mut Self { + self.allow_rolling_back_executed_batches = true; self } - pub fn enable_reverting_postgres(&mut self) -> &mut Self { - self.should_revert_postgres = true; + pub fn enable_rolling_back_postgres(&mut self) -> &mut Self { + self.should_roll_back_postgres = true; self } - pub fn enable_reverting_merkle_tree(&mut self, path: String) -> &mut Self { + pub fn enable_rolling_back_merkle_tree(&mut self, path: String) -> &mut Self { self.merkle_tree_path = Some(path); self } - pub fn enable_reverting_state_keeper_cache(&mut self, path: String) -> &mut Self { + pub fn enable_rolling_back_state_keeper_cache(&mut self, path: String) -> &mut Self { self.state_keeper_cache_path = Some(path); self } - pub fn enable_reverting_snapshot_objects( + pub fn enable_rolling_back_snapshot_objects( &mut self, object_store: Arc, ) -> &mut Self { @@ -151,10 +155,9 @@ impl BlockReverter { self } - /// Reverts DBs (Postgres + RocksDB) to a previous state. If Postgres is reverted and `snapshots_object_store` - /// is specified, snapshot files will be deleted as well. - pub async fn revert(&self, last_l1_batch_to_keep: L1BatchNumber) -> anyhow::Result<()> { - if !self.allow_reverting_executed_batches { + /// Rolls back previously enabled DBs (Postgres + RocksDB) and the snapshot object store to a previous state. + pub async fn roll_back(&self, last_l1_batch_to_keep: L1BatchNumber) -> anyhow::Result<()> { + if !self.allow_rolling_back_executed_batches { let mut storage = self.connection_pool.connection().await?; let last_executed_l1_batch = storage .blocks_dal() @@ -162,14 +165,15 @@ impl BlockReverter { .await?; anyhow::ensure!( Some(last_l1_batch_to_keep) >= last_executed_l1_batch, - "Attempt to revert already executed L1 batches; the last executed batch is: {last_executed_l1_batch:?}" + "Attempt to roll back already executed L1 batches; the last executed batch is: {last_executed_l1_batch:?}" ); } - // Tree needs to be reverted first to keep the state recoverable - self.revert_rocksdb_instances(last_l1_batch_to_keep).await?; - let deleted_snapshots = if self.should_revert_postgres { - self.revert_postgres(last_l1_batch_to_keep).await? + // Tree needs to be rolled back first to keep the state recoverable + self.roll_back_rocksdb_instances(last_l1_batch_to_keep) + .await?; + let deleted_snapshots = if self.should_roll_back_postgres { + self.roll_back_postgres(last_l1_batch_to_keep).await? } else { vec![] }; @@ -186,7 +190,7 @@ impl BlockReverter { Ok(()) } - async fn revert_rocksdb_instances( + async fn roll_back_rocksdb_instances( &self, last_l1_batch_to_keep: L1BatchNumber, ) -> anyhow::Result<()> { @@ -208,17 +212,20 @@ impl BlockReverter { ) })?; if merkle_tree_exists { - tracing::info!("Reverting Merkle tree at {}", merkle_tree_path.display()); + tracing::info!( + "Rolling back Merkle tree at `{}`", + merkle_tree_path.display() + ); let merkle_tree_path = merkle_tree_path.to_path_buf(); tokio::task::spawn_blocking(move || { - Self::revert_tree_blocking( + Self::roll_back_tree_blocking( last_l1_batch_to_keep, &merkle_tree_path, storage_root_hash, ) }) .await - .context("reverting Merkle tree panicked")??; + .context("rolling back Merkle tree panicked")??; } else { tracing::info!( "Merkle tree not found at `{}`; skipping", @@ -239,13 +246,13 @@ impl BlockReverter { sk_cache_exists, "Path with state keeper cache DB doesn't exist at `{state_keeper_cache_path}`" ); - self.revert_state_keeper_cache(last_l1_batch_to_keep, state_keeper_cache_path) + self.roll_back_state_keeper_cache(last_l1_batch_to_keep, state_keeper_cache_path) .await?; } Ok(()) } - fn revert_tree_blocking( + fn roll_back_tree_blocking( last_l1_batch_to_keep: L1BatchNumber, path: &Path, storage_root_hash: H256, @@ -254,24 +261,24 @@ impl BlockReverter { let mut tree = ZkSyncTree::new_lightweight(db.into()); if tree.next_l1_batch_number() <= last_l1_batch_to_keep { - tracing::info!("Tree is behind the L1 batch to revert to; skipping"); + tracing::info!("Tree is behind the L1 batch to roll back to; skipping"); return Ok(()); } - tree.revert_logs(last_l1_batch_to_keep); + tree.roll_back_logs(last_l1_batch_to_keep); tracing::info!("Checking match of the tree root hash and root hash from Postgres"); let tree_root_hash = tree.root_hash(); anyhow::ensure!( tree_root_hash == storage_root_hash, - "Mismatch between the tree root hash {tree_root_hash:?} and storage root hash {storage_root_hash:?} after revert" + "Mismatch between the tree root hash {tree_root_hash:?} and storage root hash {storage_root_hash:?} after rollback" ); tracing::info!("Saving tree changes to disk"); tree.save(); Ok(()) } - /// Reverts blocks in the state keeper cache. - async fn revert_state_keeper_cache( + /// Rolls back changes in the state keeper cache. + async fn roll_back_state_keeper_cache( &self, last_l1_batch_to_keep: L1BatchNumber, state_keeper_cache_path: &str, @@ -283,24 +290,24 @@ impl BlockReverter { if sk_cache.l1_batch_number().await > Some(last_l1_batch_to_keep + 1) { let mut storage = self.connection_pool.connection().await?; - tracing::info!("Reverting state keeper cache"); + tracing::info!("Rolling back state keeper cache"); sk_cache - .revert(&mut storage, last_l1_batch_to_keep) + .roll_back(&mut storage, last_l1_batch_to_keep) .await - .context("failed reverting state keeper cache")?; + .context("failed rolling back state keeper cache")?; } else { - tracing::info!("Nothing to revert in state keeper cache"); + tracing::info!("Nothing to roll back in state keeper cache"); } Ok(()) } - /// Reverts data in the Postgres database. + /// Rolls back data in the Postgres database. /// If `node_role` is `Main` a consensus hard-fork is performed. - async fn revert_postgres( + async fn roll_back_postgres( &self, last_l1_batch_to_keep: L1BatchNumber, ) -> anyhow::Result> { - tracing::info!("Reverting Postgres data"); + tracing::info!("Rolling back Postgres data"); let mut storage = self.connection_pool.connection().await?; let mut transaction = storage.start_transaction().await?; @@ -312,50 +319,50 @@ impl BlockReverter { format!("L1 batch #{last_l1_batch_to_keep} doesn't contain L2 blocks") })?; - tracing::info!("Reverting transactions state"); + tracing::info!("Rolling back transactions state"); transaction .transactions_dal() .reset_transactions_state(last_l2_block_to_keep) .await?; - tracing::info!("Reverting events"); + tracing::info!("Rolling back events"); transaction .events_dal() - .revert_events(last_l2_block_to_keep) + .roll_back_events(last_l2_block_to_keep) .await?; - tracing::info!("Reverting L2-to-L1 logs"); + tracing::info!("Rolling back L2-to-L1 logs"); transaction .events_dal() - .revert_l2_to_l1_logs(last_l2_block_to_keep) + .roll_back_l2_to_l1_logs(last_l2_block_to_keep) .await?; - tracing::info!("Reverting created tokens"); + tracing::info!("Rolling back created tokens"); transaction .tokens_dal() - .revert_tokens(last_l2_block_to_keep) + .roll_back_tokens(last_l2_block_to_keep) .await?; - tracing::info!("Revering factory deps"); + tracing::info!("Rolling back factory deps"); transaction .factory_deps_dal() - .revert_factory_deps(last_l2_block_to_keep) + .roll_back_factory_deps(last_l2_block_to_keep) .await?; - tracing::info!("Reverting storage logs"); + tracing::info!("Rolling back storage logs"); transaction .storage_logs_dal() - .revert_storage_logs(last_l2_block_to_keep) + .roll_back_storage_logs(last_l2_block_to_keep) .await?; - tracing::info!("Reverting Ethereum transactions"); + tracing::info!("Rolling back Ethereum transactions"); transaction .eth_sender_dal() .delete_eth_txs(last_l1_batch_to_keep) .await?; - tracing::info!("Reverting snapshots"); + tracing::info!("Rolling back snapshots"); let deleted_snapshots = transaction .snapshots_dal() .delete_snapshots_after(last_l1_batch_to_keep) .await?; // Remove data from main tables (L2 blocks and L1 batches). - tracing::info!("Reverting L1 batches"); + tracing::info!("Rolling back L1 batches"); transaction .blocks_dal() .delete_l1_batches(last_l1_batch_to_keep) @@ -364,7 +371,7 @@ impl BlockReverter { .blocks_dal() .delete_initial_writes(last_l1_batch_to_keep) .await?; - tracing::info!("Reverting L2 blocks"); + tracing::info!("Rolling back L2 blocks"); transaction .blocks_dal() .delete_l2_blocks(last_l2_block_to_keep) diff --git a/core/node/block_reverter/src/tests.rs b/core/node/block_reverter/src/tests.rs index 9c5dbd5a7583..668704b27153 100644 --- a/core/node/block_reverter/src/tests.rs +++ b/core/node/block_reverter/src/tests.rs @@ -139,10 +139,10 @@ async fn block_reverter_basics(sync_merkle_tree: bool) { .unwrap(); BlockReverter::new(NodeRole::External, pool.clone()) - .enable_reverting_postgres() - .enable_reverting_merkle_tree(merkle_tree_path.to_str().unwrap().to_owned()) - .enable_reverting_state_keeper_cache(sk_cache_path.to_str().unwrap().to_owned()) - .revert(L1BatchNumber(5)) + .enable_rolling_back_postgres() + .enable_rolling_back_merkle_tree(merkle_tree_path.to_str().unwrap().to_owned()) + .enable_rolling_back_state_keeper_cache(sk_cache_path.to_str().unwrap().to_owned()) + .roll_back(L1BatchNumber(5)) .await .unwrap(); @@ -264,11 +264,11 @@ async fn reverting_snapshot(remove_objects: bool) { assert_eq!(all_snapshots.snapshots_l1_batch_numbers, [L1BatchNumber(7)]); let mut block_reverter = BlockReverter::new(NodeRole::External, pool.clone()); - block_reverter.enable_reverting_postgres(); + block_reverter.enable_rolling_back_postgres(); if remove_objects { - block_reverter.enable_reverting_snapshot_objects(object_store.clone()); + block_reverter.enable_rolling_back_snapshot_objects(object_store.clone()); } - block_reverter.revert(L1BatchNumber(5)).await.unwrap(); + block_reverter.roll_back(L1BatchNumber(5)).await.unwrap(); // Check that snapshot has been removed. let all_snapshots = storage From 28bae451c2494eb7023c285c398e87870096bc63 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Wed, 24 Apr 2024 12:10:20 +0300 Subject: [PATCH 13/14] Test deleting snapshots separately --- core/lib/dal/src/snapshots_dal.rs | 36 +++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/core/lib/dal/src/snapshots_dal.rs b/core/lib/dal/src/snapshots_dal.rs index b5883550aa27..009efd3da078 100644 --- a/core/lib/dal/src/snapshots_dal.rs +++ b/core/lib/dal/src/snapshots_dal.rs @@ -265,6 +265,39 @@ mod tests { .unwrap() .expect("snapshot is not persisted"); assert_eq!(snapshot_metadata.l1_batch_number, l1_batch_number); + } + + #[tokio::test] + async fn deleting_snapshots() { + let pool = ConnectionPool::::test_pool().await; + let mut conn = pool.connection().await.unwrap(); + let mut dal = conn.snapshots_dal(); + let l1_batch_number = L1BatchNumber(100); + dal.add_snapshot( + SnapshotVersion::Version0, + l1_batch_number, + 2, + "gs:///bucket/factory_deps.bin", + ) + .await + .unwrap(); + + for i in 0..2 { + dal.add_storage_logs_filepath_for_snapshot( + l1_batch_number, + i, + "gs:///bucket/chunk.bin", + ) + .await + .unwrap(); + } + + let snapshot_metadata = dal + .get_snapshot_metadata(l1_batch_number) + .await + .unwrap() + .expect("snapshot is not persisted"); + assert!(snapshot_metadata.is_complete()); let deleted_snapshots = dal.delete_snapshots_after(l1_batch_number).await.unwrap(); assert!(deleted_snapshots.is_empty(), "{deleted_snapshots:?}"); @@ -292,6 +325,9 @@ mod tests { deleted_snapshot_metadata.is_none(), "{deleted_snapshot_metadata:?}" ); + + let complete_snapshots = dal.get_all_complete_snapshots().await.unwrap(); + assert_eq!(complete_snapshots.snapshots_l1_batch_numbers, []); } #[tokio::test] From 5fd06d80f5ff5530bd3740445c9f7100da925ac3 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Fri, 26 Apr 2024 14:31:54 +0300 Subject: [PATCH 14/14] Remove obsolete FIXME --- core/bin/block_reverter/src/main.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/core/bin/block_reverter/src/main.rs b/core/bin/block_reverter/src/main.rs index be7e0d751c47..e24c1f72655d 100644 --- a/core/bin/block_reverter/src/main.rs +++ b/core/bin/block_reverter/src/main.rs @@ -101,7 +101,6 @@ async fn main() -> anyhow::Result<()> { .default_priority_fee_per_gas, ); let contracts = ContractsConfig::from_env().context("ContractsConfig::from_env()")?; - // FIXME: is it correct to parse the entire `NetworkConfig`? let network = NetworkConfig::from_env().context("NetworkConfig::from_env()")?; let postgres_config = PostgresConfig::from_env().context("PostgresConfig::from_env()")?; let era_chain_id = env::var("CONTRACTS_ERA_CHAIN_ID")