Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-6088: [Rust] [DataFusion] Projection execution plan #4988

Closed
wants to merge 11 commits into from

Conversation

andygrove
Copy link
Member

@andygrove andygrove commented Aug 1, 2019

This PR implements the projection and CSV execution plans (I can split this into two PRs if necessary - one for CSV then one for projection).

Note that while I implement execution plans for each relational operator (projection, selection, aggregate, etc) there will be duplicate implementations because we already have the existing execution code that directly executes the logical plan. Once the new physical plan is in place, I will remove the original execution logic (and translate the logical plan to a physical plan).

@andygrove andygrove requested review from sunchao and nevi-me August 1, 2019 16:44
@andygrove andygrove marked this pull request as ready for review August 1, 2019 20:16
@andygrove andygrove self-assigned this Aug 1, 2019
@andygrove
Copy link
Member Author

@sunchao @nevi-me This is ready for review now. Please take a look when you can.

@codecov-io
Copy link

Codecov Report

Merging #4988 into master will decrease coverage by 4.78%.
The diff coverage is 89.94%.

Impacted file tree graph

@@             Coverage Diff             @@
##           master    #4988       +/-   ##
===========================================
- Coverage   87.56%   82.78%    -4.79%     
===========================================
  Files        1002       90      -912     
  Lines      143208    25599   -117609     
  Branches     1418        0     -1418     
===========================================
- Hits       125397    21191   -104206     
+ Misses      17449     4408    -13041     
+ Partials      362        0      -362
Impacted Files Coverage Δ
...afusion/src/execution/physical_plan/expressions.rs 100% <100%> (ø)
rust/datafusion/src/execution/physical_plan/csv.rs 86.88% <86.88%> (ø)
...tafusion/src/execution/physical_plan/projection.rs 91.08% <91.08%> (ø)
rust/parquet/src/encodings/encoding.rs 92.4% <0%> (-0.18%) ⬇️
python/pyarrow/ipc.pxi
cpp/src/arrow/csv/chunker-test.cc
cpp/src/parquet/column_page.h
cpp/src/parquet/bloom_filter-test.cc
cpp/src/arrow/array/builder_decimal.cc
r/src/symbols.cpp
... and 912 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update ed41cb3...5248c60. Read the comment docs.

@sunchao
Copy link
Member

sunchao commented Aug 2, 2019

@andygrove Left some comments. I've lost track of the bigger picture here - do you happen to have a design doc describing what you are trying to achieve with all the PRs?

batch_size: usize,
}

impl ExecutionPlan for CsvExec {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now thinking about it: would it be better if we rename ExecutionPlan something like DataSource? Also with the current interface how can we pushdown various filters to the storage?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A good reference may be Spark Data Source V2 APIs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more like org.apache.spark.sql.execution.SparkPlan which is the base class for physical operators.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. The doc helped a lot, although this is different from other designs I've touched before (e.g., Hive, Impala) where there are query fragments and operators within a query fragment. The parallelism/repartitioning happens on query fragment level but instead here it is on the operator level.

for entry in fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
let path_name = path.as_os_str().to_str().unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: avoid unwrap here?

// specific language governing permissions and limitations
// under the License.

//! Defines the projection execution plan. A projection determines which columns or expressions
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. Not sure if I understand this. Shouldn't this be part of logical query optimization?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Projection can happen in many places in a query plan e.g. SELECT sqrt(x) FROM (SELECT MAX(a) AS x FROM foo). This is separate from the projection that gets pushed down to the data source.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I thought this can be done by a separate analysis on the logical plan though (in Hive this is how it is done with column pruning).

@andygrove
Copy link
Member Author

@sunchao Good point. I will write up a design doc to explain all of this.

@andygrove
Copy link
Member Author

@sunchao Here is a document explaining the changes https://docs.google.com/document/d/1ATZGIs8ry_kJeoTgmJjLrg6Ssb5VE7lNzWuz_4p6EWk/edit?usp=sharing

@sunchao
Copy link
Member

sunchao commented Aug 5, 2019

Thanks @andygrove . The doc is very helpful, and looking forward to see the entire piece to be done. It would be great if you can help to address/answer the comments I had above. You also need to rebase the PR.

@andygrove andygrove closed this in d9b0ef1 Aug 6, 2019
pprudhvi pushed a commit to pprudhvi/arrow that referenced this pull request Aug 11, 2019
This PR implements the projection and CSV execution plans (I can split this into two PRs if necessary - one for CSV then one for projection).

Note that while I implement execution plans for each relational operator (projection, selection, aggregate, etc) there will be duplicate implementations because we already have the existing execution code that directly executes the logical plan. Once the new physical plan is in place, I will remove the original execution logic (and translate the logical plan to a physical plan).

Closes apache#4988 from andygrove/ARROW-6088 and squashes the following commits:

755365c <Andy Grove> Rebase and remove unwrap
fec84af <Andy Grove> test only delete temp path if exist
8f11c81 <Andy Grove> save
6db609f <Andy Grove> test passes
717dcd8 <Andy Grove> implement mutex for iterator
abf6d5e <Andy Grove> Save
a26575e <Andy Grove> rough out CSV execution plan
e806c76 <Andy Grove> formatting
768a7ae <Andy Grove> Implement Column expression
d1ede3c <Andy Grove> Implement projection logix
1875902 <Andy Grove> Roughing out projection execution plan

Authored-by: Andy Grove <[email protected]>
Signed-off-by: Andy Grove <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants