Skip to content
This repository has been archived by the owner on Aug 21, 2024. It is now read-only.

Commit

Permalink
fix(concurrency): stopping the program if one of the threads panics
Browse files Browse the repository at this point in the history
  • Loading branch information
meship-starkware committed Jul 16, 2024
1 parent f6fa5af commit b555dda
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 2 deletions.
23 changes: 21 additions & 2 deletions crates/blockifier/src/blockifier/transaction_executor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
#[cfg(feature = "concurrency")]
use std::collections::{HashMap, HashSet};
#[cfg(feature = "concurrency")]
use std::io::{self, Write};
#[cfg(feature = "concurrency")]
use std::panic::{self, catch_unwind, AssertUnwindSafe};
#[cfg(feature = "concurrency")]
use std::sync::Arc;
#[cfg(feature = "concurrency")]
use std::sync::Mutex;
Expand Down Expand Up @@ -227,6 +231,14 @@ impl<S: StateReader + Send + Sync> TransactionExecutor<S> {
Mutex::new(&mut self.bouncer),
));

// Change the defult way of handling panics to write to stderr.
// this ensure that the panic massge and location will be printed to stderr.
// without adding flags to the cargo command.
let orig_hook = panic::take_hook();
panic::set_hook(Box::new(move |panic_info| {
let _ = writeln!(io::stderr(), "{}", panic_info);
}));

// No thread pool implementation is needed here since we already have our scheduler. The
// initialized threads below will "busy wait" for new tasks using the `run` method until the
// chunk execution is completed, and then they will be joined together in a for loop.
Expand All @@ -236,11 +248,19 @@ impl<S: StateReader + Send + Sync> TransactionExecutor<S> {
for _ in 0..self.config.concurrency_config.n_workers {
let worker_executor = Arc::clone(&worker_executor);
s.spawn(move || {
worker_executor.run();
let result = catch_unwind(AssertUnwindSafe(|| {
worker_executor.run();
}));
if result.is_err() {
::std::process::abort();
}
});
}
});

// Restore the original panic hook.
panic::set_hook(orig_hook);

let n_committed_txs = worker_executor.scheduler.get_n_committed_txs();
let mut tx_execution_results = Vec::new();
let mut visited_pcs: HashMap<ClassHash, HashSet<usize>> = HashMap::new();
Expand Down Expand Up @@ -270,7 +290,6 @@ impl<S: StateReader + Send + Sync> TransactionExecutor<S> {
})
.commit_chunk_and_recover_block_state(n_committed_txs, visited_pcs);
self.block_state.replace(block_state_after_commit);

tx_execution_results
}
}
3 changes: 3 additions & 0 deletions crates/blockifier/src/concurrency/worker_logic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ impl<'a, S: StateReader> WorkerExecutor<'a, S> {

fn execute(&self, tx_index: TxIndex) {
self.execute_tx(tx_index);
if tx_index ==1 {
panic!("test concurrency panic behaviour");
}
self.scheduler.finish_execution(tx_index)
}

Expand Down

0 comments on commit b555dda

Please sign in to comment.