-
-
Notifications
You must be signed in to change notification settings - Fork 109
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rayon support for walkdir iterators #21
Comments
Is there documentation that will help me write such an implementation? Which traits do I need to implement? I wouldn't expect this to be an easy task. In fact, I'd expect this to require a complete rewrite of the current implementation. (If my expectations turn out to be true, then perhaps there should be a separate crate for it.) |
If you made the iterator |
Ah, I see the problem now. It's the "sorter" closure. You could probably just require that it be We don't have a good "So You Want To Implement Parallel Iterator" tutorial yet. The closest thing is the README describing the internals. |
@nikomatsakis Awesome! Perhaps my expectations were wrong. :-) I will give the internal docs a read soon. (I also secretly hope that I can use this in ripgrep, which uses its own home-grown parallel directory traversal... The part I've struggled most with is when to terminate.) |
@BurntSushi glancing through the code it does seem like it'd require something of a rewrite. I'd expect this code to probably act as its own "producer", that is, to drive a consumer directly. Basically you'd get a callback Basically, parallel iterators generally require a rewrite to work in "push mode", versus sequential iterators, that act in pull mode. |
But I've been playing with various "adapters" that will take sequential iterators and drive parallel code with them; such things get easier if the iterator is |
@nikomatsakis Yup. This is exactly why I (personally) found it difficult to write a parallel directory traversal. |
Running in parallel on a spinning disk should give a speedup because it allows the io scheduler to group reads that are close together on the disk. I've tried to confirm this with a simple program. It has runs serially ( Perhaps the data that I'm testing on is not very fragmented. extern crate walkdir;
extern crate rayon;
use walkdir::WalkDir;
use rayon::prelude::*;
use std::env;
use std::thread;
fn walk(path: &String) {
for entry in WalkDir::new(path.as_str()) {
let entry = entry.unwrap();
println!("{}", entry.path().display());
}
}
fn r(paths: Vec<String>) {
paths.par_iter().map(|path| {
walk(path);
}).collect::<Vec<_>>();
}
fn t(paths: Vec<String>) {
let mut handles = Vec::new();
for path in paths {
handles.push(thread::spawn(move || {
walk(&path);
}));
}
for handle in handles {
handle.join().expect("Panic waiting for thread.");
}
}
fn s(paths: Vec<String>) {
for path in paths {
walk(&path);
}
}
fn main() {
let mut args = env::args();
args.next();
let mode = args.next().unwrap();
let mut paths = Vec::new();
for path in args {
paths.push(path);
}
if mode == "s" {
s(paths);
} else if mode == "t" {
t(paths);
} else {
r(paths);
}
} |
Some more measuring on 1M files does show a difference. The results stay within a few seconds when repeated. echo threads
sync && echo 3 > /proc/sys/vm/drop_caches
time target/release/wd t $dir/*/* | wc -l
sleep 5
echo serial
sync && echo 3 > /proc/sys/vm/drop_caches
time target/release/wd s $dir/*/* | wc -l
sleep 5
echo rayon
sync && echo 3 > /proc/sys/vm/drop_caches
time target/release/wd r $dir/*/* | wc -l
sleep 5
echo find
sync && echo 3 > /proc/sys/vm/drop_caches
time find $dir/*/* |wc -l
Running in parallel with threads is a few % faster than |
Running in parallel also helps many SSDs, which benefit from having many transactions in flight at once ("queue depth"). |
Doing a naive parallel implementation does give a nice speedup. The code below runs 32 threads and handles the same case as from the timings above in 3m9 which is 28% less time than the fastest so far (threads). The speedup comes at a cost of CPU use. CPU use was 3% during this run. fn u(paths: Vec<String>) {
let mut vec = Vec::new();
for v in paths {
vec.push(PathBuf::from(v))
}
let data = Arc::new(Mutex::new(vec));
let (tx, rx) = channel();
let n = 32;
for _ in 0..n {
let (data, tx) = (data.clone(), tx.clone());
thread::spawn(move || {
let mut buf = Vec::new();
loop {
let p: Option<PathBuf> = {
let mut data = data.lock().unwrap();
data.pop()
};
match p.map(|p| p.read_dir()) {
Some(Ok(p)) => {
for f in p {
let entry = f.unwrap();
let path = entry.path();
if entry.file_type().unwrap().is_dir() {
buf.push(path.clone());
}
tx.send(Some(path)).unwrap();
}
if buf.len() > 0 {
let mut data = data.lock().unwrap();
for v in buf.iter() {
data.push(v.clone());
}
buf.clear();
}
},
Some(Err(_)) => {},
None => {
tx.send(None).unwrap();
break;
}
}
}
});
let ten_millis = time::Duration::from_millis(10);
thread::sleep(ten_millis); // hack to let stack populate
}
let mut nclosed = 0;
while nclosed < n {
match rx.recv() {
Ok(Some(p)) => println!("{}", p.to_string_lossy()),
_ => nclosed += 1
}
}
}
with 1024 threads:
|
Here's another version. This one only adds threads when the io is slow. With a warm cache it is slower than Threads are used to wait for the blocking function read_dir to return. More waiting threads means more information for the io scheduler to come up with an efficient route for the disk head. The number of threads should not be derived from the number of cpu cores because the threads are mostly sleeping. The number should probably be lower than For the warm cache case, most of the extra time is spent in channel communication: with one So using threads certainly has advantages. It would be nice if the cost of communication could be brought down to make the warm cache case faster. fn waiter(irx: spmc::Receiver<Option<PathBuf>>, otx: mpsc::Sender<Option<DirEntry>>) {
thread::spawn(move || {
loop {
match irx.recv().unwrap().map(|p| p.read_dir()) {
Some(Ok(p)) => {
for f in p {
otx.send(Some(f.unwrap())).unwrap();
}
otx.send(None).unwrap();
}
Some(Err(_)) => {
otx.send(None).unwrap();
},
None => { break; }
}
}
});
}
fn o(paths: Vec<String>) {
let mut nwaiters = 1;
let max_waiters = 100;
let wait_time = time::Duration::from_millis(20);
let (itx, irx) = spmc::channel();
let (otx, orx) = mpsc::channel();
waiter(irx.clone(), otx.clone());
let mut jobs = paths.len();
for v in paths {
itx.send(Some(PathBuf::from(v))).unwrap();
}
loop {
match orx.try_recv() {
Ok(Some(entry)) => {
let path = entry.path();
println!("{}", path.to_string_lossy());
if entry.file_type().unwrap().is_dir() {
jobs += 1;
itx.send(Some(path)).unwrap();
}
},
Ok(None) => {
jobs -= 1;
if jobs == 0 {
break;
}
},
Err(_) => {
// slow io, add a waiter
if nwaiters < max_waiters {
nwaiters += 1;
waiter(irx.clone(), otx.clone());
}
thread::sleep(wait_time);
}
}
}
for _ in 0..nwaiters {
itx.send(None).unwrap();
}
println!("used waiters: {}", nwaiters);
} |
I found this thread through https://www.reddit.com/r/rust/comments/6eif7r/walkdir_users_we_need_you/ This is interesting to me because trying to use rayon with walkdir was my first toy project with Rust :) But I failed due to the complexity of trying something way beyond my hello world-level Rust skills at the time. I ended up writing a workaround using channels instead (also a BurntSushi crate): https://github.com/anderejd/treesum/blob/master/src/main.rs (the toy project) I suspect the code linked above does not provide much value for this issue, but you never know. |
Some more numbers: running on a 4TB hard disk with 60M files:
|
Turns out walkdir had a trick up its example: On an SSD with 1.5M files:
macro_rules! wout { ($($tt:tt)*) => { {writeln!($($tt)*)}.unwrap() } }
fn waiter(irx: spmc::Receiver<Option<PathBuf>>, otx: mpsc::Sender<Option<DirEntry>>) {
thread::spawn(move || {
loop {
match irx.recv().unwrap().map(|p| p.read_dir()) {
Some(Ok(p)) => {
for f in p {
otx.send(Some(f.unwrap())).unwrap();
}
otx.send(None).unwrap();
}
Some(Err(_)) => {
otx.send(None).unwrap();
},
None => { break; }
}
}
});
}
fn hundred_waiters(paths: Vec<String>) {
let mut nwaiters = 1;
let max_waiters = 100;
let wait_time = time::Duration::from_millis(20);
let (itx, irx) = spmc::channel();
let (otx, orx) = mpsc::channel();
waiter(irx.clone(), otx.clone());
let mut jobs = paths.len();
for v in paths {
itx.send(Some(PathBuf::from(v))).unwrap();
}
let mut out = io::BufWriter::new(io::stdout());
loop {
match orx.try_recv() {
Ok(Some(entry)) => {
let path = entry.path();
wout!(out, "{}", path.to_string_lossy());
if entry.file_type().unwrap().is_dir() {
jobs += 1;
itx.send(Some(path)).unwrap();
}
},
Ok(None) => {
jobs -= 1;
if jobs == 0 {
break;
}
},
Err(_) => {
if nwaiters < max_waiters {
nwaiters += 1;
waiter(irx.clone(), otx.clone());
}
thread::sleep(wait_time);
}
}
}
for _ in 0..nwaiters {
itx.send(None).unwrap();
}
error!("waiters: {}", nwaiters);
} The issue that remains to be solved is how to efficiently sort while working in parallel. A simple solution would be to only support sorting when not running in parallel. Another option is to 'fire and forget': do a readdir, but only collect and handle the result that is needed at the moment. That way, the filesystem cache is prepped with the right data when you need it. |
Doesn't the ignore package of ripgrep accomplish this or am I missing something? Although I would find it strange to use the ignore package for parallel walkdir. |
The ignore package does have a recursive parallel directory iterator, but the API is strange and it doesn't use rayon. |
I am going to close this. I don't think anyone has any concrete ideas on how to use rayon for this, and even if someone did (or some other parallel implementation), I think it would be best prototyped in a separate crate. |
Parallel traversal wouldn't just benefit HDDs, on one NFS system it takes me ~16s just to traverse a mere 1000 directories with 40000 files. The main cost there is the opendir latency. The filesystem can sustain a decent amount of IOPS and throughput but it is terrible in latency compared to local NVMe drives. Also note that internally performing the work in parallel and exposing parallel result iteration via rayon APIs are somewhat orthogonal. In principle the former could even be done with zero API changes by coordinating |
I hope I'm not completely out of context here and not saying something irrelevant, but I remember asking about having The idea is to dispatch work to parallel workers with sequential ID, and then assemble it back when converting to a sorted iter, keeping |
For some uses only parallelizing when no sorting function is passed and operating in single-threaded mode when sorting is requested would also be acceptable. |
Parallel processing of entries returned by |
Found this on Google and with some more effort I found an alternative. Pasting here for others: https://github.com/Byron/jwalk. |
I'd love to use Rayon parallel iterators with walkdir. Walkdir's iterators don't implement the necessary traits to do so. @nikomatsakis provided a different pattern that provided some thread-pool-based parallelism, but it would be nice to directly use
par_iter
/into_par_iter
with walkdir. walkdir itself could actually run the walk in parallel in that case, processing multiple directories in parallel.The text was updated successfully, but these errors were encountered: