-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Writing Arrow files #8608
Conversation
1 Foo | ||
2 Bar | ||
|
||
# Copy from dict encoded values to single arrow file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know @tustvold's main concern was dictionaries. I think this test shows we are OK, but let me know if I am overlooking something.
datafusion-proto = { path = "datafusion/proto", version = "34.0.0" } | ||
datafusion-sql = { path = "datafusion/sql", version = "34.0.0" } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, are these changes related? Looks like just moving lines around? Maybe you can revert unrelated change to keep diff smaller?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added arrow-ipc as a dependency for core, and ran cargo tomlfmt. I'm not sure why cargo tomlfmt changed so much of the formatting 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for this PR @devinjdangelo -- I tried it out and it looks quite awesome.
I also tried it out locally and it works great.
I left a few suggestions which would be nice to address but I don't think are required to merge. We can do them as a follow on PR as well
❯ copy (values (1), (2)) to '/tmp/foo.arrow';
+-------+
| count |
+-------+
| 2 |
+-------+
1 row in set. Query took 0.030 seconds.
datafusion-cli -c "select * from '/tmp/foo.arrow'";
DataFusion CLI v34.0.0
+---------+
| column1 |
+---------+
| 1 |
| 2 |
+---------+
2 rows in set. Query took 0.028 seconds.
``shell
$ datafusion-cli -c "select arrow_typeof(column1) from '/tmp/foo.arrow'";
DataFusion CLI v34.0.0
+--------------------------------------+
| arrow_typeof(/tmp/foo.arrow.column1) |
+--------------------------------------+
| Int64 |
| Int64 |
+--------------------------------------+
👍
) -> Result<u64> { | ||
// No props are supported yet, but can be by updating FileTypeWriterOptions | ||
// to populate this struct and use those options to initialize the arrow_ipc::writer::FileWriter | ||
let _arrow_props = self.config.file_type_writer_options.try_into_arrow()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should track this as a follow on ticket and ideally leave a comment in the code pointing to the ticket so it eventually gets cleaned up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. When we update FileTypeWriterProperties for arrow files, we should also take care to maintain serialization support which @andygrove has been working on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filed #8635 and added comment linking to it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also filed apache/arrow-rs#5236 which would help with #8635 (though is not blocking)
let mut file_write_tasks: JoinSet<std::result::Result<usize, DataFusionError>> = | ||
JoinSet::new(); | ||
while let Some((path, mut rx)) = file_stream_rx.recv().await { | ||
let shared_buffer = SharedBuffer::new(1048576); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean that if any record batch takes more than 1MB to write out we'll get an error?
Would it be possible to make this constant and 1024000
below into names constants with comments that explain what they do ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The initial buffer size is just a size hint for efficiency. It will grow beyond the set value if needed.
We can definitely make it a named constant, or even make it configurable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This buffer holds serialized bytes in memory until it is periodically uploaded to an object store. This is similar to how the parquet AsyncArrowWriter for parquet is implemented.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filed #8642 to name the constants
Co-authored-by: Andrew Lamb <[email protected]>
&self.get_writer_schema(), | ||
)?; | ||
let mut object_store_writer = create_writer( | ||
FileCompressionType::UNCOMPRESSED, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably want to default this to a compressed version (and make it configurable later)? Default of pyarrow is lz4.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took a quick look at the arrow_ipc::FileWriter code and it appears that the writer manages compression internally in batches. The referenced line controls whole file compression (like for CSV and JSON).
Since we are not setting a compression explicitly in DataFusion in this PR, we are inheriting the arrow-rs default compression. I think the arrow-rs default is also lz4, but I am not 100% sure glancing over the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking more closely, I was wrong. arrow_ipc defaults to uncompressed arrow files. I pushed up a change to default to lz4.
Which issue does this PR close?
Closes #8504
Rationale for this change
What changes are included in this PR?
Implements initial support for writing out arrow files via
COPY TO
andINSERT INTO
for listing tables.Are these changes tested?
Adds new sqllogictests to cover.
Are there any user-facing changes?
Writing arrow files is now possible