-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-6101: [Rust] [DataFusion] Parallel execution of physical query plan #5111
Conversation
Codecov Report
@@ Coverage Diff @@
## master #5111 +/- ##
===========================================
- Coverage 87.66% 82.79% -4.87%
===========================================
Files 1022 92 -930
Lines 146443 25783 -120660
Branches 1437 0 -1437
===========================================
- Hits 128372 21346 -107026
+ Misses 17709 4437 -13272
+ Partials 362 0 -362 Continue to review full report at Codecov.
|
Hi @andygrove, I'll catch up on this and other PRs during the week |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@andygrove I'm not too familiar with the design of datafusion
so my review is relatively high level, but overall LGTM.
"Table provider returned no partitions".to_string(), | ||
)) | ||
} else { | ||
let partition = partitions[0].lock().unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we avoid the unwrap
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, I think we want the unwrap. From https://doc.rust-lang.org/std/sync/struct.Mutex.html:
Most usage of a mutex will simply unwrap() these results, propagating panics among threads to ensure that a possibly invalid invariant is not witnessed.
These queries are now happening on threads, so although a panic here would fail the query, it would not fail the main thread that launched the thread(s) to execute the query.
Ok(Arc::new(exec)) | ||
} | ||
} | ||
_ => panic!(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
panic
should provide some error message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops. That one was not intentional ... fixed.
.map(|p| { | ||
let p = p.clone(); | ||
thread::spawn(move || { | ||
let it = p.execute().unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again do we need to unwrap
here two lines in a row?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
// combine the results from each thread | ||
let mut combined_results: Vec<RecordBatch> = vec![]; | ||
for thread in threads { | ||
let result = thread.join().unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unwrap
again
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
Thanks @paddyhoran. I have addressed the comments, except for the two uses of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM pending CI
Travis failure unrelated, so going ahead with merge.
|
…plan This PR implements parallel query execution, with the ability to create a physical query plan from a logical plan, and call `ExecutionContext.collect()` to execute the plan and collect the results into a single vector. This currently only supports trivial projections against partitioned CSV files. I will create separate PRs to add the other operators (selection, limit, aggregation, etc). Closes apache#5111 from andygrove/ARROW-6101 and squashes the following commits: b820e69 <Andy Grove> trigger build 0210c70 <Andy Grove> remove an unwrap d8ff02f <Andy Grove> remove hard-coded batch size, remove comment, remove panic 21e646f <Andy Grove> test passes dd3cbe6 <Andy Grove> format 104f3ad <Andy Grove> csv provider now supports partitioned csv files 20216bc <Andy Grove> realistic test (failing) 7ff8ab7 <Andy Grove> gmt 5515033 <Andy Grove> unit test works a292cce <Andy Grove> unit test (currently fails) 63c88d8 <Andy Grove> error handling 42566c3 <Andy Grove> implement collect() c692217 <Andy Grove> rebase Authored-by: Andy Grove <[email protected]> Signed-off-by: Andy Grove <[email protected]>
This PR implements parallel query execution, with the ability to create a physical query plan from a logical plan, and call
ExecutionContext.collect()
to execute the plan and collect the results into a single vector.This currently only supports trivial projections against partitioned CSV files.
I will create separate PRs to add the other operators (selection, limit, aggregation, etc).