Skip to content

Commit

Permalink
Regularly check for panics in the state upgrade task
Browse files Browse the repository at this point in the history
  • Loading branch information
teor2345 committed Jun 21, 2023
1 parent 6633f2f commit c82aa58
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 13 deletions.
2 changes: 2 additions & 0 deletions zebra-state/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1183,6 +1183,8 @@ impl Service<ReadRequest> for ReadStateService {
}
}

self.db.check_for_panics();

Poll::Ready(Ok(()))
}

Expand Down
74 changes: 63 additions & 11 deletions zebra-state/src/service/finalized_state/disk_format/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use std::{
cmp::Ordering,
panic,
sync::{mpsc, Arc},
thread::{self, JoinHandle},
};
Expand Down Expand Up @@ -51,7 +52,7 @@ pub struct DbFormatChangeThreadHandle {
/// A handle that can wait for the running format change thread to finish.
///
/// Panics from this thread are propagated into Zebra's state service.
join_handle: Arc<JoinHandle<()>>,
update_task: Option<Arc<JoinHandle<()>>>,

/// A channel that tells the running format thread to finish early.
cancel_handle: mpsc::SyncSender<CancelFormatChange>,
Expand Down Expand Up @@ -133,16 +134,20 @@ impl DbFormatChange {
let (cancel_handle, cancel_receiver) = mpsc::sync_channel(1);

let span = Span::current();
let join_handle = thread::spawn(move || {
let update_task = thread::spawn(move || {
span.in_scope(move || {
self.apply_format_change(config, network, initial_tip_height, cancel_receiver);
})
});

DbFormatChangeThreadHandle {
join_handle: Arc::new(join_handle),
let mut handle = DbFormatChangeThreadHandle {
update_task: Some(Arc::new(update_task)),
cancel_handle,
}
};

handle.check_for_panics();

handle
}

/// Apply this format change to the database.
Expand Down Expand Up @@ -237,9 +242,11 @@ impl DbFormatChangeThreadHandle {
// the same time.
//
// If cancelling the thread is important, the owner of the handle must call force_cancel().
if Arc::strong_count(&self.join_handle) <= 1 {
self.force_cancel();
return true;
if let Some(update_task) = self.update_task.as_ref() {
if Arc::strong_count(update_task) <= 1 {
self.force_cancel();
return true;
}
}

false
Expand All @@ -252,13 +259,58 @@ impl DbFormatChangeThreadHandle {
// If it's full, it's already been cancelled.
let _ = self.cancel_handle.try_send(CancelFormatChange);
}

/// Check for panics in the code running in the spawned thread.
/// If the thread exited with a panic, resume that panic.
///
/// This method should be called regularly, so that panics are detected as soon as possible.
pub fn check_for_panics(&mut self) {
let update_task = self.update_task.take();

if let Some(update_task) = update_task {
if update_task.is_finished() {
// We use into_inner() because it guarantees that exactly one of the tasks
// gets the JoinHandle. try_unwrap() lets us keep the JoinHandle, but it can also
// miss panics.
if let Some(update_task) = Arc::into_inner(update_task) {
// We are the last handle with a reference to this task,
// so we can propagate any panics
if let Err(thread_panic) = update_task.join() {
panic::resume_unwind(thread_panic);
}
}
} else {
// It hasn't finished, so we need to put it back
self.update_task = Some(update_task);
}
}
}

/// Wait for the spawned thread to finish. If it exited with a panic, resume that panic.
///
/// This method should be called during shutdown.
pub fn wait_for_panics(&mut self) {
if let Some(update_task) = self.update_task.take() {
// We use into_inner() because it guarantees that exactly one of the tasks
// gets the JoinHandle. See the comments in check_for_panics().
if let Some(update_task) = Arc::into_inner(update_task) {
// We are the last handle with a reference to this task,
// so we can propagate any panics
if let Err(thread_panic) = update_task.join() {
panic::resume_unwind(thread_panic);
}
}
}
}
}

impl Drop for DbFormatChangeThreadHandle {
fn drop(&mut self) {
// Only cancel the format change if the state service is shutting down.
self.cancel_if_needed();

// There's no point waiting for the task to finish here.
if self.cancel_if_needed() {
self.wait_for_panics();
} else {
self.check_for_panics();
}
}
}
17 changes: 15 additions & 2 deletions zebra-state/src/service/finalized_state/zebra_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,19 @@ impl ZebraDb {
self.db.path()
}

/// Check for panics in code running in spawned threads.
/// If a thread exited with a panic, resume that panic.
///
/// This method should be called regularly, so that panics are detected as soon as possible.
pub fn check_for_panics(&mut self) {
if let Some(format_change_handle) = self.format_change_handle.as_mut() {
format_change_handle.check_for_panics();
}

// This check doesn't panic, but we want to check it regularly anyway.
self.check_max_on_disk_tip_height();
}

/// Shut down the database, cleaning up background tasks and ephemeral data.
///
/// If `force` is true, clean up regardless of any shared references.
Expand All @@ -111,12 +124,12 @@ impl ZebraDb {
//
// See also the correctness note in `DiskDb::shutdown()`.
if force || self.db.shared_database_owners() <= 1 {
if let Some(format_change_handle) = self.format_change_handle.as_ref() {
if let Some(format_change_handle) = self.format_change_handle.as_mut() {
format_change_handle.force_cancel();
}
}

self.check_max_on_disk_tip_height();
self.check_for_panics();

self.db.shutdown(force);
}
Expand Down

0 comments on commit c82aa58

Please sign in to comment.