Skip to content

Commit

Permalink
refactor(state-keeper): Propagate errors in batch executor (#2090)
Browse files Browse the repository at this point in the history
## What ❔

Propagates errors in the batch executor instead of panicking.

## Why ❔

Batch executor somewhat frequently panics on node shutdown. This is
suboptimal UX, can lead to false positive bug reports etc.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
  • Loading branch information
slowli authored May 30, 2024
1 parent c8c8ea9 commit b3248e4
Show file tree
Hide file tree
Showing 8 changed files with 275 additions and 150 deletions.
25 changes: 16 additions & 9 deletions core/node/state_keeper/src/batch_executor/main_executor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use anyhow::Context as _;
use async_trait::async_trait;
use multivm::{
interface::{
Expand Down Expand Up @@ -67,17 +68,15 @@ impl BatchExecutor for MainBatchExecutor {
.block_on(
storage_factory.access_storage(&stop_receiver, l1_batch_params.number - 1),
)
.expect("failed getting access to state keeper storage")
.context("failed accessing state keeper storage")?
{
executor.run(storage, l1_batch_params, system_env);
} else {
tracing::info!("Interrupted while trying to access state keeper storage");
}
anyhow::Ok(())
});
Some(BatchExecutorHandle {
handle,
commands: commands_sender,
})
Some(BatchExecutorHandle::from_raw(handle, commands_sender))
}
}

Expand Down Expand Up @@ -111,19 +110,27 @@ impl CommandReceiver {
match cmd {
Command::ExecuteTx(tx, resp) => {
let result = self.execute_tx(&tx, &mut vm);
resp.send(result).unwrap();
if resp.send(result).is_err() {
break;
}
}
Command::RollbackLastTx(resp) => {
self.rollback_last_tx(&mut vm);
resp.send(()).unwrap();
if resp.send(()).is_err() {
break;
}
}
Command::StartNextL2Block(l2_block_env, resp) => {
self.start_next_l2_block(l2_block_env, &mut vm);
resp.send(()).unwrap();
if resp.send(()).is_err() {
break;
}
}
Command::FinishBatch(resp) => {
let vm_block_result = self.finish_batch(&mut vm);
resp.send(vm_block_result).unwrap();
if resp.send(vm_block_result).is_err() {
break;
}

// `storage_view` cannot be accessed while borrowed by the VM,
// so this is the only point at which storage metrics can be obtained
Expand Down
117 changes: 94 additions & 23 deletions core/node/state_keeper/src/batch_executor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{fmt, sync::Arc};
use std::{error::Error as StdError, fmt, sync::Arc};

use anyhow::Context as _;
use async_trait::async_trait;
use multivm::interface::{
FinishedL1Batch, Halt, L1BatchEnv, L2BlockEnv, SystemEnv, VmExecutionResultAndLogs,
Expand Down Expand Up @@ -66,36 +67,82 @@ pub trait BatchExecutor: 'static + Send + Sync + fmt::Debug {
) -> Option<BatchExecutorHandle>;
}

#[derive(Debug)]
enum HandleOrError {
Handle(JoinHandle<anyhow::Result<()>>),
Err(Arc<dyn StdError + Send + Sync>),
}

impl HandleOrError {
async fn wait_for_error(&mut self) -> anyhow::Error {
let err_arc = match self {
Self::Handle(handle) => {
let err = match handle.await {
Ok(Ok(())) => anyhow::anyhow!("batch executor unexpectedly stopped"),
Ok(Err(err)) => err,
Err(err) => anyhow::Error::new(err).context("batch executor panicked"),
};
let err: Box<dyn StdError + Send + Sync> = err.into();
let err: Arc<dyn StdError + Send + Sync> = err.into();
*self = Self::Err(err.clone());
err
}
Self::Err(err) => err.clone(),
};
anyhow::Error::new(err_arc)
}

async fn wait(self) -> anyhow::Result<()> {
match self {
Self::Handle(handle) => handle.await.context("batch executor panicked")?,
Self::Err(err_arc) => Err(anyhow::Error::new(err_arc)),
}
}
}

/// A public interface for interaction with the `BatchExecutor`.
/// `BatchExecutorHandle` is stored in the state keeper and is used to invoke or rollback transactions, and also seal
/// the batches.
#[derive(Debug)]
pub struct BatchExecutorHandle {
handle: JoinHandle<()>,
handle: HandleOrError,
commands: mpsc::Sender<Command>,
}

impl BatchExecutorHandle {
/// Creates a batch executor handle from the provided sender and thread join handle.
/// Can be used to inject an alternative batch executor implementation.
#[doc(hidden)]
pub(super) fn from_raw(handle: JoinHandle<()>, commands: mpsc::Sender<Command>) -> Self {
Self { handle, commands }
pub(super) fn from_raw(
handle: JoinHandle<anyhow::Result<()>>,
commands: mpsc::Sender<Command>,
) -> Self {
Self {
handle: HandleOrError::Handle(handle),
commands,
}
}

pub async fn execute_tx(&self, tx: Transaction) -> TxExecutionResult {
pub async fn execute_tx(&mut self, tx: Transaction) -> anyhow::Result<TxExecutionResult> {
let tx_gas_limit = tx.gas_limit().as_u64();

let (response_sender, response_receiver) = oneshot::channel();
self.commands
let send_failed = self
.commands
.send(Command::ExecuteTx(Box::new(tx), response_sender))
.await
.unwrap();
.is_err();
if send_failed {
return Err(self.handle.wait_for_error().await);
}

let latency = EXECUTOR_METRICS.batch_executor_command_response_time
[&ExecutorCommand::ExecuteTx]
.start();
let res = response_receiver.await.unwrap();
let res = match response_receiver.await {
Ok(res) => res,
Err(_) => return Err(self.handle.wait_for_error().await),
};
let elapsed = latency.observe();

if let TxExecutionResult::Success { tx_metrics, .. } = &res {
Expand All @@ -112,52 +159,76 @@ impl BatchExecutorHandle {
.failed_tx_gas_limit_per_nanosecond
.observe(tx_gas_limit as f64 / elapsed.as_nanos() as f64);
}
res
Ok(res)
}

pub async fn start_next_l2_block(&self, env: L2BlockEnv) {
pub async fn start_next_l2_block(&mut self, env: L2BlockEnv) -> anyhow::Result<()> {
// While we don't get anything from the channel, it's useful to have it as a confirmation that the operation
// indeed has been processed.
let (response_sender, response_receiver) = oneshot::channel();
self.commands
let send_failed = self
.commands
.send(Command::StartNextL2Block(env, response_sender))
.await
.unwrap();
.is_err();
if send_failed {
return Err(self.handle.wait_for_error().await);
}

let latency = EXECUTOR_METRICS.batch_executor_command_response_time
[&ExecutorCommand::StartNextL2Block]
.start();
response_receiver.await.unwrap();
if response_receiver.await.is_err() {
return Err(self.handle.wait_for_error().await);
}
latency.observe();
Ok(())
}

pub async fn rollback_last_tx(&self) {
pub async fn rollback_last_tx(&mut self) -> anyhow::Result<()> {
// While we don't get anything from the channel, it's useful to have it as a confirmation that the operation
// indeed has been processed.
let (response_sender, response_receiver) = oneshot::channel();
self.commands
let send_failed = self
.commands
.send(Command::RollbackLastTx(response_sender))
.await
.unwrap();
.is_err();
if send_failed {
return Err(self.handle.wait_for_error().await);
}

let latency = EXECUTOR_METRICS.batch_executor_command_response_time
[&ExecutorCommand::RollbackLastTx]
.start();
response_receiver.await.unwrap();
if response_receiver.await.is_err() {
return Err(self.handle.wait_for_error().await);
}
latency.observe();
Ok(())
}

pub async fn finish_batch(self) -> FinishedL1Batch {
pub async fn finish_batch(mut self) -> anyhow::Result<FinishedL1Batch> {
let (response_sender, response_receiver) = oneshot::channel();
self.commands
let send_failed = self
.commands
.send(Command::FinishBatch(response_sender))
.await
.unwrap();
.is_err();
if send_failed {
return Err(self.handle.wait_for_error().await);
}

let latency = EXECUTOR_METRICS.batch_executor_command_response_time
[&ExecutorCommand::FinishBatch]
.start();
let finished_batch = response_receiver.await.unwrap();
self.handle.await.unwrap();
let finished_batch = match response_receiver.await {
Ok(batch) => batch,
Err(_) => return Err(self.handle.wait_for_error().await),
};
self.handle.wait().await?;
latency.observe();
finished_batch
Ok(finished_batch)
}
}

Expand Down
Loading

0 comments on commit b3248e4

Please sign in to comment.