diff --git a/core/node/state_keeper/src/batch_executor/main_executor.rs b/core/node/state_keeper/src/batch_executor/main_executor.rs index ddbe166a04c1..a16b9920dd6e 100644 --- a/core/node/state_keeper/src/batch_executor/main_executor.rs +++ b/core/node/state_keeper/src/batch_executor/main_executor.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use anyhow::Context as _; use async_trait::async_trait; use multivm::{ interface::{ @@ -67,17 +68,15 @@ impl BatchExecutor for MainBatchExecutor { .block_on( storage_factory.access_storage(&stop_receiver, l1_batch_params.number - 1), ) - .expect("failed getting access to state keeper storage") + .context("failed accessing state keeper storage")? { executor.run(storage, l1_batch_params, system_env); } else { tracing::info!("Interrupted while trying to access state keeper storage"); } + anyhow::Ok(()) }); - Some(BatchExecutorHandle { - handle, - commands: commands_sender, - }) + Some(BatchExecutorHandle::from_raw(handle, commands_sender)) } } @@ -111,19 +110,27 @@ impl CommandReceiver { match cmd { Command::ExecuteTx(tx, resp) => { let result = self.execute_tx(&tx, &mut vm); - resp.send(result).unwrap(); + if resp.send(result).is_err() { + break; + } } Command::RollbackLastTx(resp) => { self.rollback_last_tx(&mut vm); - resp.send(()).unwrap(); + if resp.send(()).is_err() { + break; + } } Command::StartNextL2Block(l2_block_env, resp) => { self.start_next_l2_block(l2_block_env, &mut vm); - resp.send(()).unwrap(); + if resp.send(()).is_err() { + break; + } } Command::FinishBatch(resp) => { let vm_block_result = self.finish_batch(&mut vm); - resp.send(vm_block_result).unwrap(); + if resp.send(vm_block_result).is_err() { + break; + } // `storage_view` cannot be accessed while borrowed by the VM, // so this is the only point at which storage metrics can be obtained diff --git a/core/node/state_keeper/src/batch_executor/mod.rs b/core/node/state_keeper/src/batch_executor/mod.rs index cc216c07bd44..eb6292ee1daf 100644 --- a/core/node/state_keeper/src/batch_executor/mod.rs +++ b/core/node/state_keeper/src/batch_executor/mod.rs @@ -1,5 +1,6 @@ -use std::{fmt, sync::Arc}; +use std::{error::Error as StdError, fmt, sync::Arc}; +use anyhow::Context as _; use async_trait::async_trait; use multivm::interface::{ FinishedL1Batch, Halt, L1BatchEnv, L2BlockEnv, SystemEnv, VmExecutionResultAndLogs, @@ -66,12 +67,45 @@ pub trait BatchExecutor: 'static + Send + Sync + fmt::Debug { ) -> Option; } +#[derive(Debug)] +enum HandleOrError { + Handle(JoinHandle>), + Err(Arc), +} + +impl HandleOrError { + async fn wait_for_error(&mut self) -> anyhow::Error { + let err_arc = match self { + Self::Handle(handle) => { + let err = match handle.await { + Ok(Ok(())) => anyhow::anyhow!("batch executor unexpectedly stopped"), + Ok(Err(err)) => err, + Err(err) => anyhow::Error::new(err).context("batch executor panicked"), + }; + let err: Box = err.into(); + let err: Arc = err.into(); + *self = Self::Err(err.clone()); + err + } + Self::Err(err) => err.clone(), + }; + anyhow::Error::new(err_arc) + } + + async fn wait(self) -> anyhow::Result<()> { + match self { + Self::Handle(handle) => handle.await.context("batch executor panicked")?, + Self::Err(err_arc) => Err(anyhow::Error::new(err_arc)), + } + } +} + /// A public interface for interaction with the `BatchExecutor`. /// `BatchExecutorHandle` is stored in the state keeper and is used to invoke or rollback transactions, and also seal /// the batches. #[derive(Debug)] pub struct BatchExecutorHandle { - handle: JoinHandle<()>, + handle: HandleOrError, commands: mpsc::Sender, } @@ -79,23 +113,36 @@ impl BatchExecutorHandle { /// Creates a batch executor handle from the provided sender and thread join handle. /// Can be used to inject an alternative batch executor implementation. #[doc(hidden)] - pub(super) fn from_raw(handle: JoinHandle<()>, commands: mpsc::Sender) -> Self { - Self { handle, commands } + pub(super) fn from_raw( + handle: JoinHandle>, + commands: mpsc::Sender, + ) -> Self { + Self { + handle: HandleOrError::Handle(handle), + commands, + } } - pub async fn execute_tx(&self, tx: Transaction) -> TxExecutionResult { + pub async fn execute_tx(&mut self, tx: Transaction) -> anyhow::Result { let tx_gas_limit = tx.gas_limit().as_u64(); let (response_sender, response_receiver) = oneshot::channel(); - self.commands + let send_failed = self + .commands .send(Command::ExecuteTx(Box::new(tx), response_sender)) .await - .unwrap(); + .is_err(); + if send_failed { + return Err(self.handle.wait_for_error().await); + } let latency = EXECUTOR_METRICS.batch_executor_command_response_time [&ExecutorCommand::ExecuteTx] .start(); - let res = response_receiver.await.unwrap(); + let res = match response_receiver.await { + Ok(res) => res, + Err(_) => return Err(self.handle.wait_for_error().await), + }; let elapsed = latency.observe(); if let TxExecutionResult::Success { tx_metrics, .. } = &res { @@ -112,52 +159,76 @@ impl BatchExecutorHandle { .failed_tx_gas_limit_per_nanosecond .observe(tx_gas_limit as f64 / elapsed.as_nanos() as f64); } - res + Ok(res) } - pub async fn start_next_l2_block(&self, env: L2BlockEnv) { + pub async fn start_next_l2_block(&mut self, env: L2BlockEnv) -> anyhow::Result<()> { // While we don't get anything from the channel, it's useful to have it as a confirmation that the operation // indeed has been processed. let (response_sender, response_receiver) = oneshot::channel(); - self.commands + let send_failed = self + .commands .send(Command::StartNextL2Block(env, response_sender)) .await - .unwrap(); + .is_err(); + if send_failed { + return Err(self.handle.wait_for_error().await); + } + let latency = EXECUTOR_METRICS.batch_executor_command_response_time [&ExecutorCommand::StartNextL2Block] .start(); - response_receiver.await.unwrap(); + if response_receiver.await.is_err() { + return Err(self.handle.wait_for_error().await); + } latency.observe(); + Ok(()) } - pub async fn rollback_last_tx(&self) { + pub async fn rollback_last_tx(&mut self) -> anyhow::Result<()> { // While we don't get anything from the channel, it's useful to have it as a confirmation that the operation // indeed has been processed. let (response_sender, response_receiver) = oneshot::channel(); - self.commands + let send_failed = self + .commands .send(Command::RollbackLastTx(response_sender)) .await - .unwrap(); + .is_err(); + if send_failed { + return Err(self.handle.wait_for_error().await); + } + let latency = EXECUTOR_METRICS.batch_executor_command_response_time [&ExecutorCommand::RollbackLastTx] .start(); - response_receiver.await.unwrap(); + if response_receiver.await.is_err() { + return Err(self.handle.wait_for_error().await); + } latency.observe(); + Ok(()) } - pub async fn finish_batch(self) -> FinishedL1Batch { + pub async fn finish_batch(mut self) -> anyhow::Result { let (response_sender, response_receiver) = oneshot::channel(); - self.commands + let send_failed = self + .commands .send(Command::FinishBatch(response_sender)) .await - .unwrap(); + .is_err(); + if send_failed { + return Err(self.handle.wait_for_error().await); + } + let latency = EXECUTOR_METRICS.batch_executor_command_response_time [&ExecutorCommand::FinishBatch] .start(); - let finished_batch = response_receiver.await.unwrap(); - self.handle.await.unwrap(); + let finished_batch = match response_receiver.await { + Ok(batch) => batch, + Err(_) => return Err(self.handle.wait_for_error().await), + }; + self.handle.wait().await?; latency.observe(); - finished_batch + Ok(finished_batch) } } diff --git a/core/node/state_keeper/src/batch_executor/tests/mod.rs b/core/node/state_keeper/src/batch_executor/tests/mod.rs index 829e2d66f8f2..c2196a7b6b28 100644 --- a/core/node/state_keeper/src/batch_executor/tests/mod.rs +++ b/core/node/state_keeper/src/batch_executor/tests/mod.rs @@ -50,11 +50,11 @@ async fn execute_l2_tx(storage_type: StorageType) { let mut tester = Tester::new(connection_pool); tester.genesis().await; tester.fund(&[alice.address()]).await; - let executor = tester.create_batch_executor(storage_type).await; + let mut executor = tester.create_batch_executor(storage_type).await; - let res = executor.execute_tx(alice.execute()).await; + let res = executor.execute_tx(alice.execute()).await.unwrap(); assert_executed(&res); - executor.finish_batch().await; + executor.finish_batch().await.unwrap(); } #[derive(Debug, Clone, Copy)] @@ -107,13 +107,13 @@ async fn execute_l2_tx_after_snapshot_recovery( let snapshot = storage_snapshot.recover(&connection_pool).await; let mut tester = Tester::new(connection_pool); - let executor = tester + let mut executor = tester .recover_batch_executor_custom(&storage_type, &snapshot) .await; - let res = executor.execute_tx(alice.execute()).await; + let res = executor.execute_tx(alice.execute()).await.unwrap(); if mutation.is_none() { assert_executed(&res); - executor.finish_batch().await; + executor.finish_batch().await.unwrap(); } else { assert_rejected(&res); } @@ -129,13 +129,16 @@ async fn execute_l1_tx() { tester.genesis().await; tester.fund(&[alice.address()]).await; - let executor = tester + let mut executor = tester .create_batch_executor(StorageType::AsyncRocksdbCache) .await; - let res = executor.execute_tx(alice.l1_execute(PriorityOpId(1))).await; + let res = executor + .execute_tx(alice.l1_execute(PriorityOpId(1))) + .await + .unwrap(); assert_executed(&res); - executor.finish_batch().await; + executor.finish_batch().await.unwrap(); } /// Checks that we can successfully execute a single L2 tx and a single L1 tx in batch executor. @@ -147,17 +150,20 @@ async fn execute_l2_and_l1_txs() { let mut tester = Tester::new(connection_pool); tester.genesis().await; tester.fund(&[alice.address()]).await; - let executor = tester + let mut executor = tester .create_batch_executor(StorageType::AsyncRocksdbCache) .await; - let res = executor.execute_tx(alice.execute()).await; + let res = executor.execute_tx(alice.execute()).await.unwrap(); assert_executed(&res); - let res = executor.execute_tx(alice.l1_execute(PriorityOpId(1))).await; + let res = executor + .execute_tx(alice.l1_execute(PriorityOpId(1))) + .await + .unwrap(); assert_executed(&res); - executor.finish_batch().await; + executor.finish_batch().await.unwrap(); } /// Checks that we can successfully rollback the transaction and execute it once again. @@ -170,18 +176,18 @@ async fn rollback() { tester.genesis().await; tester.fund(&[alice.address()]).await; - let executor = tester + let mut executor = tester .create_batch_executor(StorageType::AsyncRocksdbCache) .await; let tx = alice.execute(); - let res_old = executor.execute_tx(tx.clone()).await; + let res_old = executor.execute_tx(tx.clone()).await.unwrap(); assert_executed(&res_old); - executor.rollback_last_tx().await; + executor.rollback_last_tx().await.unwrap(); // Execute the same transaction, it must succeed. - let res_new = executor.execute_tx(tx).await; + let res_new = executor.execute_tx(tx).await.unwrap(); assert_executed(&res_new); let ( @@ -203,7 +209,7 @@ async fn rollback() { "Execution results must be the same" ); - executor.finish_batch().await; + executor.finish_batch().await.unwrap(); } /// Checks that incorrect transactions are marked as rejected. @@ -215,12 +221,12 @@ async fn reject_tx() { let mut tester = Tester::new(connection_pool); tester.genesis().await; - let executor = tester + let mut executor = tester .create_batch_executor(StorageType::AsyncRocksdbCache) .await; // Wallet is not funded, it can't pay for fees. - let res = executor.execute_tx(alice.execute()).await; + let res = executor.execute_tx(alice.execute()).await.unwrap(); assert_rejected(&res); } @@ -234,15 +240,15 @@ async fn too_big_gas_limit() { let mut tester = Tester::new(connection_pool); tester.genesis().await; tester.fund(&[alice.address()]).await; - let executor = tester + let mut executor = tester .create_batch_executor(StorageType::AsyncRocksdbCache) .await; let big_gas_limit_tx = alice.execute_with_gas_limit(u32::MAX); - let res = executor.execute_tx(big_gas_limit_tx).await; + let res = executor.execute_tx(big_gas_limit_tx).await.unwrap(); assert_executed(&res); - executor.finish_batch().await; + executor.finish_batch().await.unwrap(); } /// Checks that we can't execute the same transaction twice. @@ -254,16 +260,16 @@ async fn tx_cant_be_reexecuted() { let mut tester = Tester::new(connection_pool); tester.genesis().await; tester.fund(&[alice.address()]).await; - let executor = tester + let mut executor = tester .create_batch_executor(StorageType::AsyncRocksdbCache) .await; let tx = alice.execute(); - let res1 = executor.execute_tx(tx.clone()).await; + let res1 = executor.execute_tx(tx.clone()).await.unwrap(); assert_executed(&res1); // Nonce is used for the second tx. - let res2 = executor.execute_tx(tx).await; + let res2 = executor.execute_tx(tx).await.unwrap(); assert_rejected(&res2); } @@ -276,23 +282,25 @@ async fn deploy_and_call_loadtest() { let mut tester = Tester::new(connection_pool); tester.genesis().await; tester.fund(&[alice.address()]).await; - let executor = tester + let mut executor = tester .create_batch_executor(StorageType::AsyncRocksdbCache) .await; let tx = alice.deploy_loadnext_tx(); - assert_executed(&executor.execute_tx(tx.tx).await); + assert_executed(&executor.execute_tx(tx.tx).await.unwrap()); assert_executed( &executor .execute_tx(alice.loadnext_custom_gas_call(tx.address, 10, 10_000_000)) - .await, + .await + .unwrap(), ); assert_executed( &executor .execute_tx(alice.loadnext_custom_writes_call(tx.address, 1, 500_000_000)) - .await, + .await + .unwrap(), ); - executor.finish_batch().await; + executor.finish_batch().await.unwrap(); } /// Checks that a tx that is reverted by the VM still can be included into a batch. @@ -305,12 +313,12 @@ async fn execute_reverted_tx() { tester.genesis().await; tester.fund(&[alice.address()]).await; - let executor = tester + let mut executor = tester .create_batch_executor(StorageType::AsyncRocksdbCache) .await; let tx = alice.deploy_loadnext_tx(); - assert_executed(&executor.execute_tx(tx.tx).await); + assert_executed(&executor.execute_tx(tx.tx).await.unwrap()); assert_reverted( &executor @@ -318,9 +326,10 @@ async fn execute_reverted_tx() { tx.address, 1, 1_000_000, // We provide enough gas for tx to be executed, but not enough for the call to be successful. )) - .await, + .await + .unwrap(), ); - executor.finish_batch().await; + executor.finish_batch().await.unwrap(); } /// Runs the batch executor through a semi-realistic basic scenario: @@ -336,44 +345,53 @@ async fn execute_realistic_scenario() { tester.genesis().await; tester.fund(&[alice.address()]).await; tester.fund(&[bob.address()]).await; - let executor = tester + let mut executor = tester .create_batch_executor(StorageType::AsyncRocksdbCache) .await; // A good tx should be executed successfully. - let res = executor.execute_tx(alice.execute()).await; + let res = executor.execute_tx(alice.execute()).await.unwrap(); assert_executed(&res); // Execute a good tx successfully, roll if back, and execute it again. let tx_to_be_rolled_back = alice.execute(); - let res = executor.execute_tx(tx_to_be_rolled_back.clone()).await; + let res = executor + .execute_tx(tx_to_be_rolled_back.clone()) + .await + .unwrap(); assert_executed(&res); - executor.rollback_last_tx().await; + executor.rollback_last_tx().await.unwrap(); - let res = executor.execute_tx(tx_to_be_rolled_back.clone()).await; + let res = executor + .execute_tx(tx_to_be_rolled_back.clone()) + .await + .unwrap(); assert_executed(&res); // A good tx from a different account should be executed successfully. - let res = executor.execute_tx(bob.execute()).await; + let res = executor.execute_tx(bob.execute()).await.unwrap(); assert_executed(&res); // If we try to execute an already executed again it should be rejected. - let res = executor.execute_tx(tx_to_be_rolled_back).await; + let res = executor.execute_tx(tx_to_be_rolled_back).await.unwrap(); assert_rejected(&res); // An unrelated good tx should be executed successfully. - executor.rollback_last_tx().await; // Roll back the vm to the pre-rejected-tx state. + executor.rollback_last_tx().await.unwrap(); // Roll back the vm to the pre-rejected-tx state. // No need to reset the nonce because a tx with the current nonce was indeed executed. - let res = executor.execute_tx(alice.execute()).await; + let res = executor.execute_tx(alice.execute()).await.unwrap(); assert_executed(&res); // A good L1 tx should also be executed successfully. - let res = executor.execute_tx(alice.l1_execute(PriorityOpId(1))).await; + let res = executor + .execute_tx(alice.l1_execute(PriorityOpId(1))) + .await + .unwrap(); assert_executed(&res); - executor.finish_batch().await; + executor.finish_batch().await.unwrap(); } /// Checks that we handle the bootloader out of gas error on execution phase. @@ -393,11 +411,11 @@ async fn bootloader_out_of_gas_for_any_tx() { tester.genesis().await; tester.fund(&[alice.address()]).await; - let executor = tester + let mut executor = tester .create_batch_executor(StorageType::AsyncRocksdbCache) .await; - let res = executor.execute_tx(alice.execute()).await; + let res = executor.execute_tx(alice.execute()).await.unwrap(); assert_matches!(res, TxExecutionResult::BootloaderOutOfGasForTx); } @@ -412,14 +430,14 @@ async fn bootloader_tip_out_of_gas() { tester.genesis().await; tester.fund(&[alice.address()]).await; - let executor = tester + let mut executor = tester .create_batch_executor(StorageType::AsyncRocksdbCache) .await; - let res = executor.execute_tx(alice.execute()).await; + let res = executor.execute_tx(alice.execute()).await.unwrap(); assert_executed(&res); - let finished_batch = executor.finish_batch().await; + let finished_batch = executor.finish_batch().await.unwrap(); // Just a bit below the gas used for the previous batch execution should be fine to execute the tx // but not enough to execute the block tip. @@ -435,11 +453,11 @@ async fn bootloader_tip_out_of_gas() { validation_computational_gas_limit: u32::MAX, }); - let second_executor = tester + let mut second_executor = tester .create_batch_executor(StorageType::AsyncRocksdbCache) .await; - let res = second_executor.execute_tx(alice.execute()).await; + let res = second_executor.execute_tx(alice.execute()).await.unwrap(); assert_matches!(res, TxExecutionResult::BootloaderOutOfGasForTx); } @@ -455,34 +473,34 @@ async fn catchup_rocksdb_cache() { tester.fund(&[alice.address(), bob.address()]).await; // Execute a bunch of transactions to populate Postgres-based storage (note that RocksDB stays empty) - let executor = tester.create_batch_executor(StorageType::Postgres).await; + let mut executor = tester.create_batch_executor(StorageType::Postgres).await; for _ in 0..10 { - let res = executor.execute_tx(alice.execute()).await; + let res = executor.execute_tx(alice.execute()).await.unwrap(); assert_executed(&res); } // Execute one more tx on PG let tx = alice.execute(); - let res = executor.execute_tx(tx.clone()).await; + let res = executor.execute_tx(tx.clone()).await.unwrap(); assert_executed(&res); - executor.finish_batch().await; + executor.finish_batch().await.unwrap(); // Async RocksDB cache should be aware of the tx and should reject it - let executor = tester + let mut executor = tester .create_batch_executor(StorageType::AsyncRocksdbCache) .await; - let res = executor.execute_tx(tx.clone()).await; + let res = executor.execute_tx(tx.clone()).await.unwrap(); assert_rejected(&res); // Execute one tx just so we can finish the batch - executor.rollback_last_tx().await; // Roll back the vm to the pre-rejected-tx state. - let res = executor.execute_tx(bob.execute()).await; + executor.rollback_last_tx().await.unwrap(); // Roll back the vm to the pre-rejected-tx state. + let res = executor.execute_tx(bob.execute()).await.unwrap(); assert_executed(&res); - executor.finish_batch().await; + executor.finish_batch().await.unwrap(); // Wait for all background tasks to exit, otherwise we might still be holding a RocksDB lock tester.wait_for_tasks().await; // Sync RocksDB storage should be aware of the tx and should reject it - let executor = tester.create_batch_executor(StorageType::Rocksdb).await; - let res = executor.execute_tx(tx).await; + let mut executor = tester.create_batch_executor(StorageType::Rocksdb).await; + let res = executor.execute_tx(tx).await.unwrap(); assert_rejected(&res); } diff --git a/core/node/state_keeper/src/batch_executor/tests/tester.rs b/core/node/state_keeper/src/batch_executor/tests/tester.rs index 0b8459fe6625..d091520e652a 100644 --- a/core/node/state_keeper/src/batch_executor/tests/tester.rs +++ b/core/node/state_keeper/src/batch_executor/tests/tester.rs @@ -495,7 +495,7 @@ impl StorageSnapshot { .collect(); drop(storage); - let executor = tester + let mut executor = tester .create_batch_executor(StorageType::AsyncRocksdbCache) .await; let mut l2_block_env = L2BlockEnv { @@ -509,7 +509,7 @@ impl StorageSnapshot { for _ in 0..transaction_count { let tx = alice.execute(); let tx_hash = tx.hash(); // probably incorrect - let res = executor.execute_tx(tx).await; + let res = executor.execute_tx(tx).await.unwrap(); if let TxExecutionResult::Success { tx_result, .. } = res { let storage_logs = &tx_result.logs.storage_logs; storage_writes_deduplicator @@ -528,10 +528,10 @@ impl StorageSnapshot { l2_block_env.number += 1; l2_block_env.timestamp += 1; l2_block_env.prev_block_hash = hasher.finalize(ProtocolVersionId::latest()); - executor.start_next_l2_block(l2_block_env).await; + executor.start_next_l2_block(l2_block_env).await.unwrap(); } - let finished_batch = executor.finish_batch().await; + let finished_batch = executor.finish_batch().await.unwrap(); let storage_logs = &finished_batch.block_tip_execution_result.logs.storage_logs; storage_writes_deduplicator.apply(storage_logs.iter().filter(|log| log.log_query.rw_flag)); let modified_entries = storage_writes_deduplicator.into_modified_key_values(); diff --git a/core/node/state_keeper/src/keeper.rs b/core/node/state_keeper/src/keeper.rs index d04e4c2e5920..6e315ddd6c09 100644 --- a/core/node/state_keeper/src/keeper.rs +++ b/core/node/state_keeper/src/keeper.rs @@ -154,14 +154,18 @@ impl ZkSyncStateKeeper { .await .ok_or(Error::Canceled)?; - self.restore_state(&batch_executor, &mut updates_manager, pending_l2_blocks) + self.restore_state(&mut batch_executor, &mut updates_manager, pending_l2_blocks) .await?; let mut l1_batch_seal_delta: Option = None; while !self.is_canceled() { // This function will run until the batch can be sealed. - self.process_l1_batch(&batch_executor, &mut updates_manager, protocol_upgrade_tx) - .await?; + self.process_l1_batch( + &mut batch_executor, + &mut updates_manager, + protocol_upgrade_tx, + ) + .await?; // Finish current batch. if !updates_manager.l2_block.executed_transactions.is_empty() { @@ -173,12 +177,12 @@ impl ZkSyncStateKeeper { Self::start_next_l2_block( new_l2_block_params, &mut updates_manager, - &batch_executor, + &mut batch_executor, ) - .await; + .await?; } - let finished_batch = batch_executor.finish_batch().await; + let finished_batch = batch_executor.finish_batch().await?; let sealed_batch_protocol_version = updates_manager.protocol_version(); updates_manager.finish_batch(finished_batch); let mut next_cursor = updates_manager.io_cursor(); @@ -345,12 +349,16 @@ impl ZkSyncStateKeeper { async fn start_next_l2_block( params: L2BlockParams, updates_manager: &mut UpdatesManager, - batch_executor: &BatchExecutorHandle, - ) { + batch_executor: &mut BatchExecutorHandle, + ) -> anyhow::Result<()> { updates_manager.push_l2_block(params); + let block_env = updates_manager.l2_block.get_env(); batch_executor - .start_next_l2_block(updates_manager.l2_block.get_env()) - .await; + .start_next_l2_block(block_env) + .await + .with_context(|| { + format!("failed starting L2 block with {block_env:?} in batch executor") + }) } async fn seal_l2_block(&mut self, updates_manager: &UpdatesManager) -> anyhow::Result<()> { @@ -372,7 +380,7 @@ impl ZkSyncStateKeeper { /// Additionally, it initialized the next L2 block timestamp. async fn restore_state( &mut self, - batch_executor: &BatchExecutorHandle, + batch_executor: &mut BatchExecutorHandle, updates_manager: &mut UpdatesManager, l2_blocks_to_reexecute: Vec, ) -> Result<(), Error> { @@ -391,7 +399,7 @@ impl ZkSyncStateKeeper { updates_manager, batch_executor, ) - .await; + .await?; } let l2_block_number = l2_block.number; @@ -399,7 +407,10 @@ impl ZkSyncStateKeeper { "Starting to reexecute transactions from sealed L2 block #{l2_block_number}" ); for tx in l2_block.txs { - let result = batch_executor.execute_tx(tx.clone()).await; + let result = batch_executor + .execute_tx(tx.clone()) + .await + .with_context(|| format!("failed re-executing transaction {:?}", tx.hash()))?; let TxExecutionResult::Success { tx_result, @@ -462,20 +473,20 @@ impl ZkSyncStateKeeper { .wait_for_new_l2_block_params(updates_manager) .await .map_err(|e| e.context("wait_for_new_l2_block_params"))?; - Self::start_next_l2_block(new_l2_block_params, updates_manager, batch_executor).await; + Self::start_next_l2_block(new_l2_block_params, updates_manager, batch_executor).await?; Ok(()) } async fn process_l1_batch( &mut self, - batch_executor: &BatchExecutorHandle, + batch_executor: &mut BatchExecutorHandle, updates_manager: &mut UpdatesManager, protocol_upgrade_tx: Option, ) -> Result<(), Error> { if let Some(protocol_upgrade_tx) = protocol_upgrade_tx { self.process_upgrade_tx(batch_executor, updates_manager, protocol_upgrade_tx) - .await; + .await?; } while !self.is_canceled() { @@ -509,7 +520,7 @@ impl ZkSyncStateKeeper { display_timestamp(new_l2_block_params.timestamp) ); Self::start_next_l2_block(new_l2_block_params, updates_manager, batch_executor) - .await; + .await?; } let waiting_latency = KEEPER_METRICS.waiting_for_tx.start(); @@ -528,7 +539,7 @@ impl ZkSyncStateKeeper { let tx_hash = tx.hash(); let (seal_resolution, exec_result) = self .process_one_tx(batch_executor, updates_manager, tx.clone()) - .await; + .await?; match &seal_resolution { SealResolution::NoSeal | SealResolution::IncludeAndSeal => { @@ -558,14 +569,17 @@ impl ZkSyncStateKeeper { ); } SealResolution::ExcludeAndSeal => { - batch_executor.rollback_last_tx().await; - self.io - .rollback(tx) - .await - .context("failed rolling back transaction")?; + batch_executor.rollback_last_tx().await.with_context(|| { + format!("failed rolling back transaction {tx_hash:?} in batch executor") + })?; + self.io.rollback(tx).await.with_context(|| { + format!("failed rolling back transaction {tx_hash:?} in I/O") + })?; } SealResolution::Unexecutable(reason) => { - batch_executor.rollback_last_tx().await; + batch_executor.rollback_last_tx().await.with_context(|| { + format!("failed rolling back transaction {tx_hash:?} in batch executor") + })?; self.io .reject(&tx, reason) .await @@ -587,17 +601,17 @@ impl ZkSyncStateKeeper { async fn process_upgrade_tx( &mut self, - batch_executor: &BatchExecutorHandle, + batch_executor: &mut BatchExecutorHandle, updates_manager: &mut UpdatesManager, protocol_upgrade_tx: ProtocolUpgradeTx, - ) { + ) -> anyhow::Result<()> { // Sanity check: protocol upgrade tx must be the first one in the batch. assert_eq!(updates_manager.pending_executed_transactions_len(), 0); let tx: Transaction = protocol_upgrade_tx.into(); let (seal_resolution, exec_result) = self .process_one_tx(batch_executor, updates_manager, tx.clone()) - .await; + .await?; match &seal_resolution { SealResolution::NoSeal | SealResolution::IncludeAndSeal => { @@ -608,15 +622,13 @@ impl ZkSyncStateKeeper { .. } = exec_result else { - panic!( - "Tx inclusion seal resolution must be a result of a successful tx execution", - ); + anyhow::bail!("Tx inclusion seal resolution must be a result of a successful tx execution"); }; // Despite success of upgrade transaction is not enforced by protocol, // we panic here because failed upgrade tx is not intended in any case. if tx_result.result.is_failed() { - panic!("Failed upgrade tx {:?}", tx.hash()); + anyhow::bail!("Failed upgrade tx {:?}", tx.hash()); } let ExecutionMetricsForCriteria { @@ -632,18 +644,18 @@ impl ZkSyncStateKeeper { tx_execution_metrics, vec![], ); + Ok(()) } SealResolution::ExcludeAndSeal => { - unreachable!("First tx in batch cannot result into `ExcludeAndSeal`"); + anyhow::bail!("first tx in batch cannot result into `ExcludeAndSeal`"); } SealResolution::Unexecutable(reason) => { - panic!( - "Upgrade transaction {:?} is unexecutable: {}", - tx.hash(), - reason + anyhow::bail!( + "Upgrade transaction {:?} is unexecutable: {reason}", + tx.hash() ); } - }; + } } /// Executes one transaction in the batch executor, and then decides whether the batch should be sealed. @@ -655,11 +667,14 @@ impl ZkSyncStateKeeper { /// because we use `apply_and_rollback` method of `updates_manager.storage_writes_deduplicator`. async fn process_one_tx( &mut self, - batch_executor: &BatchExecutorHandle, + batch_executor: &mut BatchExecutorHandle, updates_manager: &mut UpdatesManager, tx: Transaction, - ) -> (SealResolution, TxExecutionResult) { - let exec_result = batch_executor.execute_tx(tx.clone()).await; + ) -> anyhow::Result<(SealResolution, TxExecutionResult)> { + let exec_result = batch_executor + .execute_tx(tx.clone()) + .await + .with_context(|| format!("failed executing transaction {:?}", tx.hash()))?; // All of `TxExecutionResult::BootloaderOutOfGasForTx`, // `Halt::NotEnoughGasProvided` correspond to out-of-gas errors but of different nature. // - `BootloaderOutOfGasForTx`: it is returned when bootloader stack frame run out of gas before tx execution finished. @@ -770,6 +785,6 @@ impl ZkSyncStateKeeper { ) } }; - (resolution, exec_result) + Ok((resolution, exec_result)) } } diff --git a/core/node/state_keeper/src/testonly/mod.rs b/core/node/state_keeper/src/testonly/mod.rs index 77e913fb8b75..b50cd483fc5e 100644 --- a/core/node/state_keeper/src/testonly/mod.rs +++ b/core/node/state_keeper/src/testonly/mod.rs @@ -95,10 +95,11 @@ impl BatchExecutor for MockBatchExecutor { Command::FinishBatch(resp) => { // Blanket result, it doesn't really matter. resp.send(default_vm_batch_result()).unwrap(); - return; + break; } } } + anyhow::Ok(()) }); Some(BatchExecutorHandle::from_raw(handle, send)) } diff --git a/core/node/state_keeper/src/testonly/test_batch_executor.rs b/core/node/state_keeper/src/testonly/test_batch_executor.rs index c748a25ed79f..5b1bf3ceeba8 100644 --- a/core/node/state_keeper/src/testonly/test_batch_executor.rs +++ b/core/node/state_keeper/src/testonly/test_batch_executor.rs @@ -424,8 +424,10 @@ impl BatchExecutor for TestBatchExecutorBuilder { self.txs.pop_front().unwrap(), self.rollback_set.clone(), ); - let handle = tokio::task::spawn_blocking(move || executor.run()); - + let handle = tokio::task::spawn_blocking(move || { + executor.run(); + Ok(()) + }); Some(BatchExecutorHandle::from_raw(handle, commands_sender)) } } @@ -829,10 +831,11 @@ impl BatchExecutor for MockBatchExecutor { Command::FinishBatch(resp) => { // Blanket result, it doesn't really matter. resp.send(default_vm_batch_result()).unwrap(); - return; + break; } } } + anyhow::Ok(()) }); Some(BatchExecutorHandle::from_raw(handle, send)) } diff --git a/core/node/vm_runner/src/process.rs b/core/node/vm_runner/src/process.rs index 8fafc715c59f..5ff7d7cc0b87 100644 --- a/core/node/vm_runner/src/process.rs +++ b/core/node/vm_runner/src/process.rs @@ -56,7 +56,7 @@ impl VmRunner { } async fn process_batch( - batch_executor: BatchExecutorHandle, + mut batch_executor: BatchExecutorHandle, l2_blocks: Vec, mut updates_manager: UpdatesManager, mut output_handler: Box, @@ -68,12 +68,19 @@ impl VmRunner { timestamp: l2_block.timestamp, virtual_blocks: l2_block.virtual_blocks, }); + let block_env = L2BlockEnv::from_l2_block_data(&l2_block); batch_executor - .start_next_l2_block(L2BlockEnv::from_l2_block_data(&l2_block)) - .await; + .start_next_l2_block(block_env) + .await + .with_context(|| { + format!("failed starting L2 block with {block_env:?} in batch executor") + })?; } for tx in l2_block.txs { - let exec_result = batch_executor.execute_tx(tx.clone()).await; + let exec_result = batch_executor + .execute_tx(tx.clone()) + .await + .with_context(|| format!("failed executing transaction {:?}", tx.hash()))?; let TxExecutionResult::Success { tx_result, tx_metrics, @@ -102,7 +109,10 @@ impl VmRunner { .await .context("VM runner failed to handle L2 block")?; } - batch_executor.finish_batch().await; + batch_executor + .finish_batch() + .await + .context("failed finishing L1 batch in executor")?; output_handler .handle_l1_batch(Arc::new(updates_manager)) .await