From 1272b709c1745662b899a19d7bd5592b812f5eae Mon Sep 17 00:00:00 2001 From: Tavian Barnes Date: Mon, 21 Aug 2023 13:58:27 -0400 Subject: [PATCH] ignore: Use a crossbeam deque instead of an Arc>> --- Cargo.lock | 40 +++++++++++++++ crates/ignore/Cargo.toml | 1 + crates/ignore/src/walk.rs | 101 ++++++++++++++++++++++++++++++-------- 3 files changed, 122 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6ddf1a9c5..414b64e2a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -84,6 +84,30 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-deque" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce6fd6f855243022dcecf8702fef0c297d4338e226845fe067f6341ad9fa0cef" +dependencies = [ + "cfg-if", + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae211234986c545741a7dc064309f67ee1e5ad243d0e48335adc0484d960bcc7" +dependencies = [ + "autocfg", + "cfg-if", + "crossbeam-utils", + "memoffset", + "scopeguard", +] + [[package]] name = "crossbeam-utils" version = "0.8.16" @@ -230,6 +254,7 @@ name = "ignore" version = "0.4.20" dependencies = [ "crossbeam-channel", + "crossbeam-deque", "globset", "lazy_static", "log", @@ -315,6 +340,15 @@ dependencies = [ "libc", ] +[[package]] +name = "memoffset" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a634b1c61a95585bd15607c6ab0c4e5b226e695ff2800ba0cdccddf208c406c" +dependencies = [ + "autocfg", +] + [[package]] name = "num-traits" version = "0.2.16" @@ -450,6 +484,12 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + [[package]] name = "serde" version = "1.0.188" diff --git a/crates/ignore/Cargo.toml b/crates/ignore/Cargo.toml index a9495aa33..c659bb2d4 100644 --- a/crates/ignore/Cargo.toml +++ b/crates/ignore/Cargo.toml @@ -19,6 +19,7 @@ name = "ignore" bench = false [dependencies] +crossbeam-deque = "0.8.3" globset = { version = "0.4.10", path = "../globset" } lazy_static = "1.1" log = "0.4.5" diff --git a/crates/ignore/src/walk.rs b/crates/ignore/src/walk.rs index 734b87667..a3c29a3fe 100644 --- a/crates/ignore/src/walk.rs +++ b/crates/ignore/src/walk.rs @@ -3,14 +3,15 @@ use std::ffi::OsStr; use std::fmt; use std::fs::{self, FileType, Metadata}; use std::io; -use std::iter::FusedIterator; +use std::iter::{self, FusedIterator}; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::thread; use std::time::Duration; use std::vec; +use crossbeam_deque::{Stealer, Worker as Deque}; use same_file::Handle; use walkdir::{self, WalkDir}; @@ -1231,9 +1232,8 @@ impl WalkParallel { /// can be merged together into a single data structure. pub fn visit(mut self, builder: &mut dyn ParallelVisitorBuilder<'_>) { let threads = self.threads(); - let stack = Arc::new(Mutex::new(vec![])); + let mut stack = vec![]; { - let mut stack = stack.lock().unwrap(); let mut visitor = builder.build(); let mut paths = Vec::new().into_iter(); std::mem::swap(&mut paths, &mut self.paths); @@ -1283,14 +1283,14 @@ impl WalkParallel { } // Create the workers and then wait for them to finish. let quit_now = Arc::new(AtomicBool::new(false)); - let num_pending = - Arc::new(AtomicUsize::new(stack.lock().unwrap().len())); + let num_pending = Arc::new(AtomicUsize::new(stack.len())); + let stacks = Stack::make_stacks(threads, stack); std::thread::scope(|s| { - let mut handles = vec![]; - for _ in 0..threads { - let worker = Worker { + let handles: Vec<_> = stacks + .into_iter() + .map(|stack| Worker { visitor: builder.build(), - stack: stack.clone(), + stack, quit_now: quit_now.clone(), num_pending: num_pending.clone(), max_depth: self.max_depth, @@ -1298,9 +1298,9 @@ impl WalkParallel { follow_links: self.follow_links, skip: self.skip.clone(), filter: self.filter.clone(), - }; - handles.push(s.spawn(|| worker.run())); - } + }) + .map(|worker| s.spawn(|| worker.run())) + .collect(); for handle in handles { handle.join().unwrap(); } @@ -1390,6 +1390,70 @@ impl Work { } } +/// A work-stealing stack. +struct Stack { + /// This thread's index. + index: usize, + /// The thread-local stack. + deque: Deque, + /// The work stealers. + stealers: Arc<[Stealer]>, +} + +impl Stack { + /// Create a stack for each thread. + fn make_stacks(threads: usize, msgs: Vec) -> Vec { + // Use new_lifo() to ensure each worker operates depth-first, not breadth-first + let deques: Vec<_> = + iter::repeat_with(Deque::new_lifo).take(threads).collect(); + + let stealers: Vec<_> = deques.iter().map(Deque::stealer).collect(); + let stealers: Arc<[_]> = stealers.into(); + + let stacks: Vec<_> = deques + .into_iter() + .enumerate() + .map(|(index, deque)| Self { + index, + deque, + stealers: stealers.clone(), + }) + .collect(); + + // Distribute the initial messages round-robin amongst the stacks + msgs.into_iter() + .zip(stacks.iter().cycle()) + .for_each(|(m, s)| s.push(m)); + + stacks + } + + /// Push a message. + fn push(&self, msg: Message) { + self.deque.push(msg); + } + + /// Pop a message. + fn pop(&self) -> Option { + self.deque.pop().or_else(|| self.steal()) + } + + /// Steal a message from another queue. + fn steal(&self) -> Option { + // For fairness, try to steal from index - 1, then index - 2, ... 0, + // then wrap around to len - 1, len - 2, ... index + 1. + let (left, right) = self.stealers.split_at(self.index); + // Don't steal from ourselves + let right = &right[1..]; + + left.iter() + .rev() + .chain(right.iter().rev()) + .map(|s| s.steal_batch_and_pop(&self.deque)) + .find_map(|s| s.success()) + } +} + /// A worker is responsible for descending into directories, updating the /// ignore matchers, producing new work and invoking the caller's callback. /// @@ -1403,7 +1467,7 @@ struct Worker<'s> { /// directories in depth first order. This can substantially reduce peak /// memory usage by keeping both the number of files path and gitignore /// matchers in memory lower. - stack: Arc>>, + stack: Stack, /// Whether all workers should terminate at the next opportunity. Note /// that we need this because we don't want other `Work` to be done after /// we quit. We wouldn't need this if have a priority channel. @@ -1668,20 +1732,17 @@ impl<'s> Worker<'s> { /// Send work. fn send(&self, work: Work) { self.num_pending.fetch_add(1, Ordering::SeqCst); - let mut stack = self.stack.lock().unwrap(); - stack.push(Message::Work(work)); + self.stack.push(Message::Work(work)); } /// Send a quit message. fn send_quit(&self) { - let mut stack = self.stack.lock().unwrap(); - stack.push(Message::Quit); + self.stack.push(Message::Quit); } /// Receive work. fn recv(&self) -> Option { - let mut stack = self.stack.lock().unwrap(); - stack.pop() + self.stack.pop() } /// Signal that work has been finished.