Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ignore: Use a crossbeam deque instead of an Arc<Mutex<Vec<_>>> #2591

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/ignore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ name = "ignore"
bench = false

[dependencies]
crossbeam-deque = "0.8.3"
globset = { version = "0.4.10", path = "../globset" }
lazy_static = "1.1"
log = "0.4.5"
Expand Down
101 changes: 81 additions & 20 deletions crates/ignore/src/walk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ use std::ffi::OsStr;
use std::fmt;
use std::fs::{self, FileType, Metadata};
use std::io;
use std::iter::FusedIterator;
use std::iter::{self, FusedIterator};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use std::vec;

use crossbeam_deque::{Stealer, Worker as Deque};
use same_file::Handle;
use walkdir::{self, WalkDir};

Expand Down Expand Up @@ -1231,9 +1232,8 @@ impl WalkParallel {
/// can be merged together into a single data structure.
pub fn visit(mut self, builder: &mut dyn ParallelVisitorBuilder<'_>) {
let threads = self.threads();
let stack = Arc::new(Mutex::new(vec![]));
let mut stack = vec![];
{
let mut stack = stack.lock().unwrap();
let mut visitor = builder.build();
let mut paths = Vec::new().into_iter();
std::mem::swap(&mut paths, &mut self.paths);
Expand Down Expand Up @@ -1283,24 +1283,24 @@ impl WalkParallel {
}
// Create the workers and then wait for them to finish.
let quit_now = Arc::new(AtomicBool::new(false));
let num_pending =
Arc::new(AtomicUsize::new(stack.lock().unwrap().len()));
let num_pending = Arc::new(AtomicUsize::new(stack.len()));
let stacks = Stack::make_stacks(threads, stack);
std::thread::scope(|s| {
let mut handles = vec![];
for _ in 0..threads {
let worker = Worker {
let handles: Vec<_> = stacks
.into_iter()
.map(|stack| Worker {
visitor: builder.build(),
stack: stack.clone(),
stack,
quit_now: quit_now.clone(),
num_pending: num_pending.clone(),
max_depth: self.max_depth,
max_filesize: self.max_filesize,
follow_links: self.follow_links,
skip: self.skip.clone(),
filter: self.filter.clone(),
};
handles.push(s.spawn(|| worker.run()));
}
})
.map(|worker| s.spawn(|| worker.run()))
.collect();
for handle in handles {
handle.join().unwrap();
}
Expand Down Expand Up @@ -1390,6 +1390,70 @@ impl Work {
}
}

/// A work-stealing stack.
struct Stack {
/// This thread's index.
index: usize,
/// The thread-local stack.
deque: Deque<Message>,
/// The work stealers.
stealers: Arc<[Stealer<Message>]>,
}

impl Stack {
/// Create a stack for each thread.
fn make_stacks(threads: usize, msgs: Vec<Message>) -> Vec<Self> {
// Use new_lifo() to ensure each worker operates depth-first, not breadth-first
let deques: Vec<_> =
iter::repeat_with(Deque::new_lifo).take(threads).collect();

let stealers: Vec<_> = deques.iter().map(Deque::stealer).collect();
let stealers: Arc<[_]> = stealers.into();

let stacks: Vec<_> = deques
.into_iter()
.enumerate()
.map(|(index, deque)| Self {
index,
deque,
stealers: stealers.clone(),
})
.collect();

// Distribute the initial messages round-robin amongst the stacks
msgs.into_iter()
.zip(stacks.iter().cycle())
.for_each(|(m, s)| s.push(m));

stacks
}

/// Push a message.
fn push(&self, msg: Message) {
self.deque.push(msg);
}

/// Pop a message.
fn pop(&self) -> Option<Message> {
self.deque.pop().or_else(|| self.steal())
}

/// Steal a message from another queue.
fn steal(&self) -> Option<Message> {
// For fairness, try to steal from index - 1, then index - 2, ... 0,
// then wrap around to len - 1, len - 2, ... index + 1.
let (left, right) = self.stealers.split_at(self.index);
// Don't steal from ourselves
let right = &right[1..];

left.iter()
.rev()
.chain(right.iter().rev())
.map(|s| s.steal_batch_and_pop(&self.deque))
.find_map(|s| s.success())
}
}

/// A worker is responsible for descending into directories, updating the
/// ignore matchers, producing new work and invoking the caller's callback.
///
Expand All @@ -1403,7 +1467,7 @@ struct Worker<'s> {
/// directories in depth first order. This can substantially reduce peak
/// memory usage by keeping both the number of files path and gitignore
/// matchers in memory lower.
stack: Arc<Mutex<Vec<Message>>>,
stack: Stack,
/// Whether all workers should terminate at the next opportunity. Note
/// that we need this because we don't want other `Work` to be done after
/// we quit. We wouldn't need this if have a priority channel.
Expand Down Expand Up @@ -1668,20 +1732,17 @@ impl<'s> Worker<'s> {
/// Send work.
fn send(&self, work: Work) {
self.num_pending.fetch_add(1, Ordering::SeqCst);
let mut stack = self.stack.lock().unwrap();
stack.push(Message::Work(work));
self.stack.push(Message::Work(work));
}

/// Send a quit message.
fn send_quit(&self) {
let mut stack = self.stack.lock().unwrap();
stack.push(Message::Quit);
self.stack.push(Message::Quit);
}

/// Receive work.
fn recv(&self) -> Option<Message> {
let mut stack = self.stack.lock().unwrap();
stack.pop()
self.stack.pop()
}

/// Signal that work has been finished.
Expand Down