Skip to content

Commit

Permalink
ignore: Use a crossbeam deque instead of an Arc<Mutex<Vec<_>>>
Browse files Browse the repository at this point in the history
  • Loading branch information
tavianator committed Aug 30, 2023
1 parent 6cd9479 commit 1272b70
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 20 deletions.
40 changes: 40 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/ignore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ name = "ignore"
bench = false

[dependencies]
crossbeam-deque = "0.8.3"
globset = { version = "0.4.10", path = "../globset" }
lazy_static = "1.1"
log = "0.4.5"
Expand Down
101 changes: 81 additions & 20 deletions crates/ignore/src/walk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ use std::ffi::OsStr;
use std::fmt;
use std::fs::{self, FileType, Metadata};
use std::io;
use std::iter::FusedIterator;
use std::iter::{self, FusedIterator};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use std::vec;

use crossbeam_deque::{Stealer, Worker as Deque};
use same_file::Handle;
use walkdir::{self, WalkDir};

Expand Down Expand Up @@ -1231,9 +1232,8 @@ impl WalkParallel {
/// can be merged together into a single data structure.
pub fn visit(mut self, builder: &mut dyn ParallelVisitorBuilder<'_>) {
let threads = self.threads();
let stack = Arc::new(Mutex::new(vec![]));
let mut stack = vec![];
{
let mut stack = stack.lock().unwrap();
let mut visitor = builder.build();
let mut paths = Vec::new().into_iter();
std::mem::swap(&mut paths, &mut self.paths);
Expand Down Expand Up @@ -1283,24 +1283,24 @@ impl WalkParallel {
}
// Create the workers and then wait for them to finish.
let quit_now = Arc::new(AtomicBool::new(false));
let num_pending =
Arc::new(AtomicUsize::new(stack.lock().unwrap().len()));
let num_pending = Arc::new(AtomicUsize::new(stack.len()));
let stacks = Stack::make_stacks(threads, stack);
std::thread::scope(|s| {
let mut handles = vec![];
for _ in 0..threads {
let worker = Worker {
let handles: Vec<_> = stacks
.into_iter()
.map(|stack| Worker {
visitor: builder.build(),
stack: stack.clone(),
stack,
quit_now: quit_now.clone(),
num_pending: num_pending.clone(),
max_depth: self.max_depth,
max_filesize: self.max_filesize,
follow_links: self.follow_links,
skip: self.skip.clone(),
filter: self.filter.clone(),
};
handles.push(s.spawn(|| worker.run()));
}
})
.map(|worker| s.spawn(|| worker.run()))
.collect();
for handle in handles {
handle.join().unwrap();
}
Expand Down Expand Up @@ -1390,6 +1390,70 @@ impl Work {
}
}

/// A work-stealing stack.
struct Stack {
/// This thread's index.
index: usize,
/// The thread-local stack.
deque: Deque<Message>,
/// The work stealers.
stealers: Arc<[Stealer<Message>]>,
}

impl Stack {
/// Create a stack for each thread.
fn make_stacks(threads: usize, msgs: Vec<Message>) -> Vec<Self> {
// Use new_lifo() to ensure each worker operates depth-first, not breadth-first
let deques: Vec<_> =
iter::repeat_with(Deque::new_lifo).take(threads).collect();

let stealers: Vec<_> = deques.iter().map(Deque::stealer).collect();
let stealers: Arc<[_]> = stealers.into();

let stacks: Vec<_> = deques
.into_iter()
.enumerate()
.map(|(index, deque)| Self {
index,
deque,
stealers: stealers.clone(),
})
.collect();

// Distribute the initial messages round-robin amongst the stacks
msgs.into_iter()
.zip(stacks.iter().cycle())
.for_each(|(m, s)| s.push(m));

stacks
}

/// Push a message.
fn push(&self, msg: Message) {
self.deque.push(msg);
}

/// Pop a message.
fn pop(&self) -> Option<Message> {
self.deque.pop().or_else(|| self.steal())
}

/// Steal a message from another queue.
fn steal(&self) -> Option<Message> {
// For fairness, try to steal from index - 1, then index - 2, ... 0,
// then wrap around to len - 1, len - 2, ... index + 1.
let (left, right) = self.stealers.split_at(self.index);
// Don't steal from ourselves
let right = &right[1..];

left.iter()
.rev()
.chain(right.iter().rev())
.map(|s| s.steal_batch_and_pop(&self.deque))
.find_map(|s| s.success())
}
}

/// A worker is responsible for descending into directories, updating the
/// ignore matchers, producing new work and invoking the caller's callback.
///
Expand All @@ -1403,7 +1467,7 @@ struct Worker<'s> {
/// directories in depth first order. This can substantially reduce peak
/// memory usage by keeping both the number of files path and gitignore
/// matchers in memory lower.
stack: Arc<Mutex<Vec<Message>>>,
stack: Stack,
/// Whether all workers should terminate at the next opportunity. Note
/// that we need this because we don't want other `Work` to be done after
/// we quit. We wouldn't need this if have a priority channel.
Expand Down Expand Up @@ -1668,20 +1732,17 @@ impl<'s> Worker<'s> {
/// Send work.
fn send(&self, work: Work) {
self.num_pending.fetch_add(1, Ordering::SeqCst);
let mut stack = self.stack.lock().unwrap();
stack.push(Message::Work(work));
self.stack.push(Message::Work(work));
}

/// Send a quit message.
fn send_quit(&self) {
let mut stack = self.stack.lock().unwrap();
stack.push(Message::Quit);
self.stack.push(Message::Quit);
}

/// Receive work.
fn recv(&self) -> Option<Message> {
let mut stack = self.stack.lock().unwrap();
stack.pop()
self.stack.pop()
}

/// Signal that work has been finished.
Expand Down

0 comments on commit 1272b70

Please sign in to comment.