Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple files per partitions for CSV Avro Json #1138

Merged
merged 3 commits into from
Oct 20, 2021

Conversation

rdettai
Copy link
Contributor

@rdettai rdettai commented Oct 18, 2021

Which issue does this PR close?

Closes #1122.

Rationale for this change

Enable multiple files per partitions in the CsvExec, NdJsonExec and AvroExec.
Factorize the [Csv,Json,Avro]Stream code.

What changes are included in this PR?

  • CsvExec, NdJsonExec and AvroExec now take a grouped list of files as input
  • A generic FileStream abstraction was created that serves as internal Stream implementation for CsvExec, NdJsonExec and AvroExec

Are there any user-facing changes?

No, apart for users that would instantiate CsvExec, NdJsonExec and AvroExec directly (unlikely)

@github-actions github-actions bot added ballista datafusion Changes in the datafusion crate labels Oct 18, 2021
Comment on lines +138 to +149
if *remain >= item.num_rows() {
*remain -= item.num_rows();
Some(Ok(item))
} else {
let len = *remain;
*remain = 0;
Some(Ok(RecordBatch::try_new(
item.schema(),
item.columns()
.iter()
.map(|column| column.slice(0, len))
.collect(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This factorizes the current implementations for AvroStream and NdJsonStream as it is today, but I am wondering if it is really worth it to slice the batch to match exactly the limit. According to the TableProvider doc, limit indicates that // The datasource should return *at least* this number of rows if available. The slicing operation is "zero copy" anyway, so mostly free, but it is surprising to have this extra operation if it is not required by the TableProvider API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even in plans when limit is pushed down to the TableProvider scan, there is still at least one LimitExec above it. Thus I think it is likely fine to avoid slicing up the record batches here (though also as you point out, it also likely won't hurt)

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks really nice to me. Great job @rdettai 👍

In case it is not clear to anyone else (it took me a bit of reading) this PR makes the csv, avro, and json file readers use the same (nicely created) pattern as the parquet file

Comment on lines +138 to +149
if *remain >= item.num_rows() {
*remain -= item.num_rows();
Some(Ok(item))
} else {
let len = *remain;
*remain = 0;
Some(Ok(RecordBatch::try_new(
item.schema(),
item.columns()
.iter()
.map(|column| column.slice(0, len))
.collect(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even in plans when limit is pushed down to the TableProvider scan, there is still at least one LimitExec above it. Thus I think it is likely fine to avoid slicing up the record batches here (though also as you point out, it also likely won't hurt)

datafusion/src/physical_plan/file_format/file_stream.rs Outdated Show resolved Hide resolved
//! Note: Most traits here need to be marked `Sync + Send` to be
//! compliant with the `SendableRecordBatchStream` trait.

use crate::{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice tests

datafusion/src/physical_plan/file_format/json.rs Outdated Show resolved Hide resolved
datafusion/src/physical_plan/file_format/json.rs Outdated Show resolved Hide resolved
/// A stream that iterates record batch by record batch, file over file.
pub struct FileStream<F>
where
F: FnMut(Box<dyn Read + Send + Sync>, &Option<usize>) -> BatchIter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something about feels overly complicated to me.

I wonder it would be possible to combine the file_iter and file_reader together into an iterator that returns BatchIters? The only thing FileStream seems to do is to take the output of the file_iterator and pass it to file_reader.

I may be missing something too

Copy link
Contributor Author

@rdettai rdettai Oct 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that closures are a bit complex in Rust 😅... But I don't think that the usage we have for it here is particularly convoluted.

The problem here is that different readers take different configurations (in particular CSV, with configurations such as delimiter). Thus you need a way to have this variable set of configurations in the context when you initialize the reader. You could achieve this with a trait. In fact, the closure here allows us to create an anonymous version of a trait that would look like:

pub trait ReaderOpener {
  pub fn open_reader(&mut self, file: Box<dyn Read+Send+Sync> remaining: &Option<usize>) -> BatchIter;
}

Using the trait instead of a closure would require us to create an implem of that trait for each format (AvroReaderOpener, JsonReaderOpener...). These structs would contain all the configurations that are required for the reader initialization as fields. Here, instead of doing this explicit declaration, the closures capture the configurations they need. Under the hood, the closure is doing exactly the same thing as we would have done if we chose to declare the implementions of ReaderOpener, except that ReaderOpener is replaced by FnMut(Box<dyn Read + Send + Sync>, &Option<usize>) -> BatchIter and the implementation types are created by the compiler.

Now, if you prefer the trait version, I don't mind changing the implementation for it. But as always, what you'll gain in terms of explicitness you will loose in terms of over-verbosity (a bit like declaring variables with/without types 😉).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your comment made me think and I found some sort of intermediate solution:

  • We keep using a closure to avoid having to explicitly declare the trait implementation
  • We "alias" the FnMut(...) -> ... trait to explicitly define what it means in this context (FormatReaderOpener)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I was imagining that the configuration / file format differences would handled by some closure for each format that would take a file reader and produce an appropriate BatchIterator

Or put another way, it might be possible to move the code that loops over each Read (aka File) and returned a BatchIterator into the different formats (Csv, Avro, etc)

However, this would make the formats somewhat more complicated, so it would be a tradeoff.

I think what you have here is good and I think giving the closure a name makes the code more readable 👍

@@ -64,8 +64,7 @@ impl FileFormat for AvroFormat {
) -> Result<Arc<dyn ExecutionPlan>> {
let exec = AvroExec::new(
conf.object_store,
// flattening this for now because CsvExec does not support partitioning yet
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

datafusion/src/physical_plan/file_format/avro.rs Outdated Show resolved Hide resolved
@alamb alamb merged commit 4b577f3 into apache:master Oct 20, 2021
@rdettai
Copy link
Contributor Author

rdettai commented Oct 20, 2021

thanks for the review Andrew! ❤️

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
datafusion Changes in the datafusion crate enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Multiple files per partition for CSV Json and Avro exec plans
3 participants