diff --git a/crates/ignore/src/walk.rs b/crates/ignore/src/walk.rs index 2d754da1b..2b7476d73 100644 --- a/crates/ignore/src/walk.rs +++ b/crates/ignore/src/walk.rs @@ -1279,7 +1279,7 @@ impl WalkParallel { } // Create the workers and then wait for them to finish. let quit_now = Arc::new(AtomicBool::new(false)); - let num_pending = Arc::new(AtomicUsize::new(stack.len())); + let active_workers = Arc::new(AtomicUsize::new(threads)); let stacks = Stack::new_for_each_thread(threads, stack); std::thread::scope(|s| { let handles: Vec<_> = stacks @@ -1288,7 +1288,7 @@ impl WalkParallel { visitor: builder.build(), stack, quit_now: quit_now.clone(), - num_pending: num_pending.clone(), + active_workers: active_workers.clone(), max_depth: self.max_depth, max_filesize: self.max_filesize, follow_links: self.follow_links, @@ -1471,8 +1471,8 @@ struct Worker<'s> { /// that we need this because we don't want other `Work` to be done after /// we quit. We wouldn't need this if have a priority channel. quit_now: Arc, - /// The number of outstanding work items. - num_pending: Arc, + /// The number of currently active workers. + active_workers: Arc, /// The maximum depth of directories to descend. A value of `0` means no /// descension at all. max_depth: Option, @@ -1500,7 +1500,6 @@ impl<'s> Worker<'s> { if let WalkState::Quit = self.run_one(work) { self.quit_now(); } - self.work_done(); } } @@ -1682,23 +1681,20 @@ impl<'s> Worker<'s> { return None; } None => { - // Once num_pending reaches 0, it is impossible for it to - // ever increase again. Namely, it only reaches 0 once - // all jobs have run such that no jobs have produced more - // work. We have this guarantee because num_pending is - // always incremented before each job is submitted and only - // decremented once each job is completely finished. - // Therefore, if this reaches zero, then there can be no - // other job running. - if self.num_pending() == 0 { - // Every other thread is blocked at the next recv(). - // Send the initial quit message and quit. + if self.deactivate_worker() == 0 { + // If deactivate_worker() returns 0, every worker thread + // is currently within the critical section between the + // acquire in deactivate_worker() and the release in + // activate_worker() below. For this to happen, every + // worker's local deque must be simultaneously empty, + // meaning there is no more work left at all. self.send_quit(); return None; } // Wait for next `Work` or `Quit` message. loop { if let Some(v) = self.recv() { + self.activate_worker(); value = Some(v); break; } @@ -1724,14 +1720,8 @@ impl<'s> Worker<'s> { self.quit_now.load(AtomicOrdering::SeqCst) } - /// Returns the number of pending jobs. - fn num_pending(&self) -> usize { - self.num_pending.load(AtomicOrdering::SeqCst) - } - /// Send work. fn send(&self, work: Work) { - self.num_pending.fetch_add(1, AtomicOrdering::SeqCst); self.stack.push(Message::Work(work)); } @@ -1745,9 +1735,14 @@ impl<'s> Worker<'s> { self.stack.pop() } - /// Signal that work has been finished. - fn work_done(&self) { - self.num_pending.fetch_sub(1, AtomicOrdering::SeqCst); + /// Deactivates a worker and returns the number of currently active workers. + fn deactivate_worker(&self) -> usize { + self.active_workers.fetch_sub(1, AtomicOrdering::Acquire) - 1 + } + + /// Reactivates a worker. + fn activate_worker(&self) { + self.active_workers.fetch_add(1, AtomicOrdering::Release); } }