From 6d7550d58e88583deeb142b56e0dbe52f5102cbf Mon Sep 17 00:00:00 2001 From: Tavian Barnes Date: Mon, 30 Oct 2023 15:56:08 -0400 Subject: [PATCH] ignore: Avoid contention on num_pending Previously, every worker would increment the shared num_pending count on every new work item, and decrement it after finishing them, leading to lots of contention. Now, we only track the number of workers actively running, so there is no contention except when workers go to sleep or wake up. Closes #2642 --- CHANGELOG.md | 2 ++ crates/ignore/src/walk.rs | 45 +++++++++++++++++---------------------- 2 files changed, 22 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d8c7e6abe..795d26b9e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,8 @@ Performance improvements: Make most searches with `\b` look-arounds (among others) much faster. * [PERF #2591](https://github.com/BurntSushi/ripgrep/pull/2591): Parallel directory traversal now uses work stealing for faster searches. +* [PERF #2642](https://github.com/BurntSushi/ripgrep/pull/2642): + Parallel directory traversal has some contention reduced. Feature enhancements: diff --git a/crates/ignore/src/walk.rs b/crates/ignore/src/walk.rs index 4fee1d88a..2288fe0bd 100644 --- a/crates/ignore/src/walk.rs +++ b/crates/ignore/src/walk.rs @@ -1279,7 +1279,7 @@ impl WalkParallel { } // Create the workers and then wait for them to finish. let quit_now = Arc::new(AtomicBool::new(false)); - let num_pending = Arc::new(AtomicUsize::new(stack.len())); + let active_workers = Arc::new(AtomicUsize::new(threads)); let stacks = Stack::new_for_each_thread(threads, stack); std::thread::scope(|s| { let handles: Vec<_> = stacks @@ -1288,7 +1288,7 @@ impl WalkParallel { visitor: builder.build(), stack, quit_now: quit_now.clone(), - num_pending: num_pending.clone(), + active_workers: active_workers.clone(), max_depth: self.max_depth, max_filesize: self.max_filesize, follow_links: self.follow_links, @@ -1471,8 +1471,8 @@ struct Worker<'s> { /// that we need this because we don't want other `Work` to be done after /// we quit. We wouldn't need this if have a priority channel. quit_now: Arc, - /// The number of outstanding work items. - num_pending: Arc, + /// The number of currently active workers. + active_workers: Arc, /// The maximum depth of directories to descend. A value of `0` means no /// descension at all. max_depth: Option, @@ -1500,7 +1500,6 @@ impl<'s> Worker<'s> { if let WalkState::Quit = self.run_one(work) { self.quit_now(); } - self.work_done(); } } @@ -1682,23 +1681,20 @@ impl<'s> Worker<'s> { return None; } None => { - // Once num_pending reaches 0, it is impossible for it to - // ever increase again. Namely, it only reaches 0 once - // all jobs have run such that no jobs have produced more - // work. We have this guarantee because num_pending is - // always incremented before each job is submitted and only - // decremented once each job is completely finished. - // Therefore, if this reaches zero, then there can be no - // other job running. - if self.num_pending() == 0 { - // Every other thread is blocked at the next recv(). - // Send the initial quit message and quit. + if self.deactivate_worker() == 0 { + // If deactivate_worker() returns 0, every worker thread + // is currently within the critical section between the + // acquire in deactivate_worker() and the release in + // activate_worker() below. For this to happen, every + // worker's local deque must be simultaneously empty, + // meaning there is no more work left at all. self.send_quit(); return None; } // Wait for next `Work` or `Quit` message. loop { if let Some(v) = self.recv() { + self.activate_worker(); value = Some(v); break; } @@ -1724,14 +1720,8 @@ impl<'s> Worker<'s> { self.quit_now.load(AtomicOrdering::SeqCst) } - /// Returns the number of pending jobs. - fn num_pending(&self) -> usize { - self.num_pending.load(AtomicOrdering::SeqCst) - } - /// Send work. fn send(&self, work: Work) { - self.num_pending.fetch_add(1, AtomicOrdering::SeqCst); self.stack.push(Message::Work(work)); } @@ -1745,9 +1735,14 @@ impl<'s> Worker<'s> { self.stack.pop() } - /// Signal that work has been finished. - fn work_done(&self) { - self.num_pending.fetch_sub(1, AtomicOrdering::SeqCst); + /// Deactivates a worker and returns the number of currently active workers. + fn deactivate_worker(&self) -> usize { + self.active_workers.fetch_sub(1, AtomicOrdering::Acquire) - 1 + } + + /// Reactivates a worker. + fn activate_worker(&self) { + self.active_workers.fetch_add(1, AtomicOrdering::Release); } }