Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ignore: Avoid contention on num_pending #2642

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 20 additions & 25 deletions crates/ignore/src/walk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1279,7 +1279,7 @@ impl WalkParallel {
}
// Create the workers and then wait for them to finish.
let quit_now = Arc::new(AtomicBool::new(false));
let num_pending = Arc::new(AtomicUsize::new(stack.len()));
let active_workers = Arc::new(AtomicUsize::new(threads));
let stacks = Stack::new_for_each_thread(threads, stack);
std::thread::scope(|s| {
let handles: Vec<_> = stacks
Expand All @@ -1288,7 +1288,7 @@ impl WalkParallel {
visitor: builder.build(),
stack,
quit_now: quit_now.clone(),
num_pending: num_pending.clone(),
active_workers: active_workers.clone(),
max_depth: self.max_depth,
max_filesize: self.max_filesize,
follow_links: self.follow_links,
Expand Down Expand Up @@ -1471,8 +1471,8 @@ struct Worker<'s> {
/// that we need this because we don't want other `Work` to be done after
/// we quit. We wouldn't need this if have a priority channel.
quit_now: Arc<AtomicBool>,
/// The number of outstanding work items.
num_pending: Arc<AtomicUsize>,
/// The number of currently active workers.
active_workers: Arc<AtomicUsize>,
/// The maximum depth of directories to descend. A value of `0` means no
/// descension at all.
max_depth: Option<usize>,
Expand Down Expand Up @@ -1500,7 +1500,6 @@ impl<'s> Worker<'s> {
if let WalkState::Quit = self.run_one(work) {
self.quit_now();
}
self.work_done();
}
}

Expand Down Expand Up @@ -1682,23 +1681,20 @@ impl<'s> Worker<'s> {
return None;
}
None => {
// Once num_pending reaches 0, it is impossible for it to
// ever increase again. Namely, it only reaches 0 once
// all jobs have run such that no jobs have produced more
// work. We have this guarantee because num_pending is
// always incremented before each job is submitted and only
// decremented once each job is completely finished.
// Therefore, if this reaches zero, then there can be no
// other job running.
if self.num_pending() == 0 {
// Every other thread is blocked at the next recv().
// Send the initial quit message and quit.
if self.deactivate_worker() == 0 {
// If deactivate_worker() returns 0, every worker thread
// is currently within the critical section between the
// acquire in deactivate_worker() and the release in
// activate_worker() below. For this to happen, every
// worker's local deque must be simultaneously empty,
// meaning there is no more work left at all.
self.send_quit();
return None;
}
// Wait for next `Work` or `Quit` message.
loop {
if let Some(v) = self.recv() {
self.activate_worker();
value = Some(v);
break;
}
Expand All @@ -1724,14 +1720,8 @@ impl<'s> Worker<'s> {
self.quit_now.load(AtomicOrdering::SeqCst)
}

/// Returns the number of pending jobs.
fn num_pending(&self) -> usize {
self.num_pending.load(AtomicOrdering::SeqCst)
}

/// Send work.
fn send(&self, work: Work) {
self.num_pending.fetch_add(1, AtomicOrdering::SeqCst);
self.stack.push(Message::Work(work));
}

Expand All @@ -1745,9 +1735,14 @@ impl<'s> Worker<'s> {
self.stack.pop()
}

/// Signal that work has been finished.
fn work_done(&self) {
self.num_pending.fetch_sub(1, AtomicOrdering::SeqCst);
/// Deactivates a worker and returns the number of currently active workers.
fn deactivate_worker(&self) -> usize {
self.active_workers.fetch_sub(1, AtomicOrdering::Acquire) - 1
}

/// Reactivates a worker.
fn activate_worker(&self) {
self.active_workers.fetch_add(1, AtomicOrdering::Release);
}
}

Expand Down