Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes to ParquetRecordBatchStream to support row filtering in DataFusion #2270

Closed
Tracked by #3147
thinkharderdev opened this issue Aug 1, 2022 · 5 comments · Fixed by #2335
Closed
Tracked by #3147

Changes to ParquetRecordBatchStream to support row filtering in DataFusion #2270

thinkharderdev opened this issue Aug 1, 2022 · 5 comments · Fixed by #2335
Labels
enhancement Any new improvement worthy of a entry in the changelog parquet Changes to the parquet crate

Comments

@thinkharderdev
Copy link
Contributor

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

The current (non-public) API for skipping records has some shortcomings when trying to implement predicate pushdown from DataFusion. It assumes that we have the entire VecDequeue<RowSelection> for an entire row group when construction the ParquetRecordBatchReader which makes doing streaming filtering a challenge.

Describe the solution you'd like

I wrote a prototype of row-level filtering in DataFusion that required a few changes to the API here. Two main things:

  1. Add a method to ParquetRecordBatchStream
pub fn next_selection(
        &mut self,
        selection: &mut VecDeque<RowSelection>,
    ) -> Option<ArrowResult<RecordBatch>>

which mimics poll_next but applies a selection to the next batch.

  1. Add a method to ParquetRecordBatchReader
pub fn next_selection(
        &mut self,
        selection: &mut VecDeque<RowSelection>,
    ) -> Option<ArrowResult<RecordBatch>>

which mimics next but applies a selection and, importantly, emits a single batch containing all selected rows in selection. The current implementation just emits the next RowSelection that selects and stops.

See https://github.com/coralogix/arrow-datafusion/blob/row-filtering/datafusion/core/src/physical_plan/file_format/parquet.rs for the (very hacky) protoype in DataFusion built on https://github.com/coralogix/arrow-rs/tree/row-filtering.

Describe alternatives you've considered

Additional context

@thinkharderdev thinkharderdev added the enhancement Any new improvement worthy of a entry in the changelog label Aug 1, 2022
@tustvold
Copy link
Contributor

tustvold commented Aug 1, 2022

Could you perhaps expand upon what you mean by streaming filtering? My understanding of the two major use-cases for the row skipping functionality were:

  • Selecting specific rows identified from the page index
  • Selecting specific rows identified by a previous scan (possibly on a different set of columns)

As such I'm not entirely sure why you would need to "refine" your selection whilst reading, or even really what this would mean?

FWIW I created #2201 to give a basic example of how filter pushdown might work beyond what is supported by the page index.

The current implementation just emits the next RowSelection that selects and stops.

@Ted-Jiang is working on fixing this in #2237

@thinkharderdev
Copy link
Contributor Author

So basically the implementation in DataFusion is:

  1. Take the filter predicate and all projections it needs to evaluate and wrap a ParquetRecordBatchStream in a stream that will poll, apply the filter and emit the filtered batch along with corresponding row selections.
  2. Take all the other columns and create a ParquetRecordBatchStream for them
  3. Wrap those two streams in another stream which pools the filter stream, then polls the selection stream with the row selection.
  4. Graft those batches onto each other to produce the output batch.

Using your example, you would need to decode/filter the entire row group for your filter columns then apply the predicate to generate the row selection for the remaining columns. So you'd have to buffer the entire row group in memory for those columns. That's not too terrible but it seems like it would add overhead and make the decision of whether to try and apply a row-level filter higher stakes. I though that ideally we would be able to do the filtering one batch at a time. That should be very little overhead (aside from computing the filter expression which has to be done at some point anyway) so we can always apply a row filter if one is available.

@tustvold
Copy link
Contributor

tustvold commented Aug 1, 2022

I need to think more on this, but some immediate thoughts that may or may not make sense:

  • This system will only be able to pushdown to eliminate decode overheads, i.e. it will be unable to eliminate IO to fetch data (which is fine we have the page index for that)
  • I wonder if it would be simpler to push the predicate into the ParquetRecordBatchReader that way you don't need to futz around with async, and would also potentially eventually allow predicate evaluation on encoded data
  • We probably need to work on a way to represent predicates within parquet directly so that all the various pruning, skipping logic can be centralised and not spread across two repos
  • The nature of parquet is such that skipping runs of rows less than the normal batch_size may in fact be slower than just reading them normally. This means if we don't determine the ranges up front, we'll need some way to bail out if it gets too expensive

@thinkharderdev
Copy link
Contributor Author

This system will only be able to pushdown to eliminate decode overheads, i.e. it will be unable to eliminate IO to fetch data (which is fine we have the page index for that)

Agreed, the ideal scenario would be to use the page index to avoid the IO altogether. I think the tradeoff there will be around when we fetch the data. Using buffered prefetch has been a big performance improvement for us and avoiding the eager IO in order to possible fetch less data could be a net loss depending on how much you can reduce the data fetched.

I wonder if it would be simpler to push the predicate into the ParquetRecordBatchReader that way you don't need to futz around with async, and would also potentially eventually allow predicate evaluation on encoded data

Yeah, it would be nice to be able to just pass a FnOnce(RecordBatch) -> Result<(RecordBatch,VecDequeue<RowSelection>)> into ParquetRecordBatchStream and let it handle all the details.

We probably need to work on a way to represent predicates within parquet directly so that all the various pruning, skipping logic can be centralised and not spread across two repos

Agreed, it would be great to just be able to pass a predicate in an expression language defined in arrow-rs. That said, it would be good to expose an arbitrary computation as well, so you can use things like scalar functions for filtering.

The nature of parquet is such that skipping runs of rows less than the normal batch_size may in fact be slower than just reading them normally. This means if we don't determine the ranges up front, we'll need some way to bail out if it gets too expensive

Interesting, when is this the case? Would there be situations where the decoder couldn't optimize based on the number of values to skip?

@tustvold
Copy link
Contributor

tustvold commented Aug 1, 2022

Interesting, when is this the case? Would there be situations where the decoder couldn't optimize based on the number of values to skip?

It needs to be benchmarked but with the exception of PLAIN encoded primitives, skipping over values still requires reading block headers, interpreting them, etc... and so isn't without cost. If you're then reading very small slices the dispatch overheads will start to add up.

@alamb alamb added the parquet Changes to the parquet crate label Aug 17, 2022
@alamb alamb changed the title Changes to ParquetRecordBatchStream to support row filtering in DataFusion Changes to ParquetRecordBatchStream to support row filtering in DataFusion Aug 17, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Any new improvement worthy of a entry in the changelog parquet Changes to the parquet crate
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants