Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(state-keeper): Propagate errors in batch executor #2090

Merged
merged 1 commit into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions core/node/state_keeper/src/batch_executor/main_executor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use anyhow::Context as _;
use async_trait::async_trait;
use multivm::{
interface::{
Expand Down Expand Up @@ -67,17 +68,15 @@ impl BatchExecutor for MainBatchExecutor {
.block_on(
storage_factory.access_storage(&stop_receiver, l1_batch_params.number - 1),
)
.expect("failed getting access to state keeper storage")
.context("failed accessing state keeper storage")?
{
executor.run(storage, l1_batch_params, system_env);
} else {
tracing::info!("Interrupted while trying to access state keeper storage");
}
anyhow::Ok(())
});
Some(BatchExecutorHandle {
handle,
commands: commands_sender,
})
Some(BatchExecutorHandle::from_raw(handle, commands_sender))
}
}

Expand Down Expand Up @@ -111,19 +110,27 @@ impl CommandReceiver {
match cmd {
Command::ExecuteTx(tx, resp) => {
let result = self.execute_tx(&tx, &mut vm);
resp.send(result).unwrap();
if resp.send(result).is_err() {
break;
}
}
Command::RollbackLastTx(resp) => {
self.rollback_last_tx(&mut vm);
resp.send(()).unwrap();
if resp.send(()).is_err() {
break;
}
}
Command::StartNextL2Block(l2_block_env, resp) => {
self.start_next_l2_block(l2_block_env, &mut vm);
resp.send(()).unwrap();
if resp.send(()).is_err() {
break;
}
}
Command::FinishBatch(resp) => {
let vm_block_result = self.finish_batch(&mut vm);
resp.send(vm_block_result).unwrap();
if resp.send(vm_block_result).is_err() {
break;
}

// `storage_view` cannot be accessed while borrowed by the VM,
// so this is the only point at which storage metrics can be obtained
Expand Down
117 changes: 94 additions & 23 deletions core/node/state_keeper/src/batch_executor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{fmt, sync::Arc};
use std::{error::Error as StdError, fmt, sync::Arc};

use anyhow::Context as _;
use async_trait::async_trait;
use multivm::interface::{
FinishedL1Batch, Halt, L1BatchEnv, L2BlockEnv, SystemEnv, VmExecutionResultAndLogs,
Expand Down Expand Up @@ -66,36 +67,82 @@ pub trait BatchExecutor: 'static + Send + Sync + fmt::Debug {
) -> Option<BatchExecutorHandle>;
}

#[derive(Debug)]
enum HandleOrError {
Handle(JoinHandle<anyhow::Result<()>>),
Err(Arc<dyn StdError + Send + Sync>),
}

impl HandleOrError {
async fn wait_for_error(&mut self) -> anyhow::Error {
let err_arc = match self {
Self::Handle(handle) => {
let err = match handle.await {
Ok(Ok(())) => anyhow::anyhow!("batch executor unexpectedly stopped"),
Ok(Err(err)) => err,
Err(err) => anyhow::Error::new(err).context("batch executor panicked"),
};
let err: Box<dyn StdError + Send + Sync> = err.into();
let err: Arc<dyn StdError + Send + Sync> = err.into();
*self = Self::Err(err.clone());
err
}
Self::Err(err) => err.clone(),
};
anyhow::Error::new(err_arc)
}

async fn wait(self) -> anyhow::Result<()> {
match self {
Self::Handle(handle) => handle.await.context("batch executor panicked")?,
Self::Err(err_arc) => Err(anyhow::Error::new(err_arc)),
}
}
}

/// A public interface for interaction with the `BatchExecutor`.
/// `BatchExecutorHandle` is stored in the state keeper and is used to invoke or rollback transactions, and also seal
/// the batches.
#[derive(Debug)]
pub struct BatchExecutorHandle {
handle: JoinHandle<()>,
handle: HandleOrError,
commands: mpsc::Sender<Command>,
}

impl BatchExecutorHandle {
/// Creates a batch executor handle from the provided sender and thread join handle.
/// Can be used to inject an alternative batch executor implementation.
#[doc(hidden)]
pub(super) fn from_raw(handle: JoinHandle<()>, commands: mpsc::Sender<Command>) -> Self {
Self { handle, commands }
pub(super) fn from_raw(
handle: JoinHandle<anyhow::Result<()>>,
commands: mpsc::Sender<Command>,
) -> Self {
Self {
handle: HandleOrError::Handle(handle),
commands,
}
}

pub async fn execute_tx(&self, tx: Transaction) -> TxExecutionResult {
pub async fn execute_tx(&mut self, tx: Transaction) -> anyhow::Result<TxExecutionResult> {
let tx_gas_limit = tx.gas_limit().as_u64();

let (response_sender, response_receiver) = oneshot::channel();
self.commands
let send_failed = self
.commands
.send(Command::ExecuteTx(Box::new(tx), response_sender))
.await
.unwrap();
.is_err();
if send_failed {
return Err(self.handle.wait_for_error().await);
}

let latency = EXECUTOR_METRICS.batch_executor_command_response_time
[&ExecutorCommand::ExecuteTx]
.start();
let res = response_receiver.await.unwrap();
let res = match response_receiver.await {
Ok(res) => res,
Err(_) => return Err(self.handle.wait_for_error().await),
};
let elapsed = latency.observe();

if let TxExecutionResult::Success { tx_metrics, .. } = &res {
Expand All @@ -112,52 +159,76 @@ impl BatchExecutorHandle {
.failed_tx_gas_limit_per_nanosecond
.observe(tx_gas_limit as f64 / elapsed.as_nanos() as f64);
}
res
Ok(res)
}

pub async fn start_next_l2_block(&self, env: L2BlockEnv) {
pub async fn start_next_l2_block(&mut self, env: L2BlockEnv) -> anyhow::Result<()> {
// While we don't get anything from the channel, it's useful to have it as a confirmation that the operation
// indeed has been processed.
let (response_sender, response_receiver) = oneshot::channel();
self.commands
let send_failed = self
.commands
.send(Command::StartNextL2Block(env, response_sender))
.await
.unwrap();
.is_err();
if send_failed {
return Err(self.handle.wait_for_error().await);
}

let latency = EXECUTOR_METRICS.batch_executor_command_response_time
[&ExecutorCommand::StartNextL2Block]
.start();
response_receiver.await.unwrap();
if response_receiver.await.is_err() {
return Err(self.handle.wait_for_error().await);
}
latency.observe();
Ok(())
}

pub async fn rollback_last_tx(&self) {
pub async fn rollback_last_tx(&mut self) -> anyhow::Result<()> {
// While we don't get anything from the channel, it's useful to have it as a confirmation that the operation
// indeed has been processed.
let (response_sender, response_receiver) = oneshot::channel();
self.commands
let send_failed = self
.commands
.send(Command::RollbackLastTx(response_sender))
.await
.unwrap();
.is_err();
if send_failed {
return Err(self.handle.wait_for_error().await);
}

let latency = EXECUTOR_METRICS.batch_executor_command_response_time
[&ExecutorCommand::RollbackLastTx]
.start();
response_receiver.await.unwrap();
if response_receiver.await.is_err() {
return Err(self.handle.wait_for_error().await);
}
latency.observe();
Ok(())
}

pub async fn finish_batch(self) -> FinishedL1Batch {
pub async fn finish_batch(mut self) -> anyhow::Result<FinishedL1Batch> {
let (response_sender, response_receiver) = oneshot::channel();
self.commands
let send_failed = self
.commands
.send(Command::FinishBatch(response_sender))
.await
.unwrap();
.is_err();
if send_failed {
return Err(self.handle.wait_for_error().await);
}

let latency = EXECUTOR_METRICS.batch_executor_command_response_time
[&ExecutorCommand::FinishBatch]
.start();
let finished_batch = response_receiver.await.unwrap();
self.handle.await.unwrap();
let finished_batch = match response_receiver.await {
Ok(batch) => batch,
Err(_) => return Err(self.handle.wait_for_error().await),
};
self.handle.wait().await?;
latency.observe();
finished_batch
Ok(finished_batch)
}
}

Expand Down
Loading
Loading