diff --git a/crates/aptos-drop-helper/src/async_concurrent_dropper.rs b/crates/aptos-drop-helper/src/async_concurrent_dropper.rs index 7464af019f308..6fcb7ffc2e2f0 100644 --- a/crates/aptos-drop-helper/src/async_concurrent_dropper.rs +++ b/crates/aptos-drop-helper/src/async_concurrent_dropper.rs @@ -27,7 +27,7 @@ impl AsyncConcurrentDropper { pub fn new(name: &'static str, max_tasks: usize, num_threads: usize) -> Self { Self { name, - num_tasks_tracker: Arc::new(NumTasksTracker::new(max_tasks)), + num_tasks_tracker: Arc::new(NumTasksTracker::new(name, max_tasks)), thread_pool: ThreadPool::with_name(format!("{}_conc_dropper", name), num_threads), } } @@ -49,8 +49,7 @@ impl AsyncConcurrentDropper { fn schedule_drop_impl(&self, v: V, notif_sender_opt: Option>) { let _timer = TIMER.timer_with(&[self.name, "enqueue_drop"]); - let num_tasks = self.num_tasks_tracker.inc(); - GAUGE.set_with(&[self.name, "num_tasks"], num_tasks as i64); + self.num_tasks_tracker.inc(); let name = self.name; let num_tasks_tracker = self.num_tasks_tracker.clone(); @@ -70,32 +69,35 @@ impl AsyncConcurrentDropper { } struct NumTasksTracker { + name: &'static str, lock: Mutex, cvar: Condvar, max_tasks: usize, } impl NumTasksTracker { - fn new(max_tasks: usize) -> Self { + fn new(name: &'static str, max_tasks: usize) -> Self { Self { + name, lock: Mutex::new(0), cvar: Condvar::new(), max_tasks, } } - fn inc(&self) -> usize { + fn inc(&self) { let mut num_tasks = self.lock.lock(); while *num_tasks >= self.max_tasks { num_tasks = self.cvar.wait(num_tasks).expect("lock poisoned."); } *num_tasks += 1; - *num_tasks + GAUGE.set_with(&[self.name, "num_tasks"], *num_tasks as i64); } fn dec(&self) { let mut num_tasks = self.lock.lock(); *num_tasks -= 1; + GAUGE.set_with(&[self.name, "num_tasks"], *num_tasks as i64); self.cvar.notify_all(); }