Skip to content

Commit

Permalink
fix deadlock on nested DropHelper
Browse files Browse the repository at this point in the history
  • Loading branch information
msmouse committed Nov 19, 2024
1 parent 6acc3d5 commit 265c821
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/aptos-drop-helper/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,6 @@ aptos-metrics-core = { workspace = true }
derive_more = { workspace = true }
once_cell = { workspace = true }
threadpool = { workspace = true }

[dev-dependencies]
rayon = { workspace = true }
46 changes: 44 additions & 2 deletions crates/aptos-drop-helper/src/async_concurrent_dropper.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::metrics::{GAUGE, TIMER};
use crate::{
metrics::{GAUGE, TIMER},
IN_ANY_DROP_POOL,
};
use aptos_infallible::Mutex;
use aptos_metrics_core::{IntGaugeHelper, TimerHelper};
use std::sync::{
Expand Down Expand Up @@ -42,12 +45,25 @@ impl AsyncConcurrentDropper {
rx
}

pub fn max_tasks(&self) -> usize {
self.num_tasks_tracker.max_tasks
}

pub fn num_threads(&self) -> usize {
self.thread_pool.max_count()
}

pub fn wait_for_backlog_drop(&self, no_more_than: usize) {
let _timer = TIMER.timer_with(&[self.name, "wait_for_backlog_drop"]);
self.num_tasks_tracker.wait_for_backlog_drop(no_more_than);
}

fn schedule_drop_impl<V: Send + 'static>(&self, v: V, notif_sender_opt: Option<Sender<()>>) {
if IN_ANY_DROP_POOL.get() {
drop(v);
return;
}

let _timer = TIMER.timer_with(&[self.name, "enqueue_drop"]);
self.num_tasks_tracker.inc();

Expand All @@ -56,6 +72,9 @@ impl AsyncConcurrentDropper {

self.thread_pool.execute(move || {
let _timer = TIMER.timer_with(&[name, "real_drop"]);
IN_ANY_DROP_POOL.with(|flag| {
flag.set(true);
});

drop(v);

Expand Down Expand Up @@ -111,10 +130,12 @@ impl NumTasksTracker {

#[cfg(test)]
mod tests {
use crate::AsyncConcurrentDropper;
use crate::{AsyncConcurrentDropper, DropHelper, DEFAULT_DROPPER};
use rayon::prelude::*;
use std::{sync::Arc, thread::sleep, time::Duration};
use threadpool::ThreadPool;

#[derive(Clone, Default)]
struct SlowDropper;

impl Drop for SlowDropper {
Expand Down Expand Up @@ -197,4 +218,25 @@ mod tests {
s.wait_for_backlog_drop(0);
assert!(now.elapsed() < Duration::from_millis(600));
}

#[test]
fn test_nested_drops() {
#[derive(Clone, Default)]
struct Nested {
_inner: DropHelper<SlowDropper>,
}

// pump 2 x max_tasks to the drop queue
let num_items = DEFAULT_DROPPER.max_tasks() * 2;
let items = vec![DropHelper::new(Nested::default()); num_items];
let drop_thread = std::thread::spawn(move || {
items.into_par_iter().for_each(drop);
});

// expect no deadlock and the whole thing to be dropped in full concurrency (with some leeway)
sleep(Duration::from_millis(
200 + 200 * num_items as u64 / DEFAULT_DROPPER.num_threads() as u64,
));
assert!(drop_thread.is_finished(), "Drop queue deadlocked.");
}
}
6 changes: 5 additions & 1 deletion crates/aptos-drop-helper/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@
use crate::async_concurrent_dropper::AsyncConcurrentDropper;
use derive_more::{Deref, DerefMut};
use once_cell::sync::Lazy;
use std::mem::ManuallyDrop;
use std::{cell::Cell, mem::ManuallyDrop};

pub mod async_concurrent_dropper;
pub mod async_drop_queue;
mod metrics;

thread_local! {
static IN_ANY_DROP_POOL: Cell<bool> = const { Cell::new(false) };
}

pub static DEFAULT_DROPPER: Lazy<AsyncConcurrentDropper> =
Lazy::new(|| AsyncConcurrentDropper::new("default", 32, 8));

Expand Down

0 comments on commit 265c821

Please sign in to comment.