Skip to content

Commit

Permalink
Add serde support for Arrow FileTypeWriterOptions (#8850)
Browse files Browse the repository at this point in the history
* refactor

* generated files

* feat

* feat

* feat

* feat

* tests

* clippy

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
tushushu and alamb authored Jan 18, 2024
1 parent d14f766 commit 9b78efa
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 1 deletion.
12 changes: 12 additions & 0 deletions datafusion/common/src/file_options/arrow_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,18 @@ use super::StatementOptions;
#[derive(Clone, Debug)]
pub struct ArrowWriterOptions {}

impl ArrowWriterOptions {
pub fn new() -> Self {
Self {}
}
}

impl Default for ArrowWriterOptions {
fn default() -> Self {
Self::new()
}
}

impl TryFrom<(&ConfigOptions, &StatementOptions)> for ArrowWriterOptions {
type Error = DataFusionError;

Expand Down
3 changes: 3 additions & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1213,6 +1213,7 @@ message FileTypeWriterOptions {
JsonWriterOptions json_options = 1;
ParquetWriterOptions parquet_options = 2;
CsvWriterOptions csv_options = 3;
ArrowWriterOptions arrow_options = 4;
}
}

Expand Down Expand Up @@ -1243,6 +1244,8 @@ message CsvWriterOptions {
string null_value = 8;
}

message ArrowWriterOptions {}

message WriterProperties {
uint64 data_page_size_limit = 1;
uint64 dictionary_page_size_limit = 2;
Expand Down
85 changes: 85 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions datafusion/proto/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use arrow::csv::WriterBuilder;
use datafusion_common::file_options::arrow_writer::ArrowWriterOptions;
use std::collections::HashMap;
use std::fmt::Debug;
use std::str::FromStr;
Expand Down Expand Up @@ -858,6 +859,13 @@ impl AsLogicalPlan for LogicalPlanNode {
Some(copy_to_node::CopyOptions::WriterOptions(opt)) => {
match &opt.file_type {
Some(ft) => match ft {
file_type_writer_options::FileType::ArrowOptions(_) => {
CopyOptions::WriterOptions(Box::new(
FileTypeWriterOptions::Arrow(
ArrowWriterOptions::new(),
),
))
}
file_type_writer_options::FileType::CsvOptions(
writer_options,
) => {
Expand Down Expand Up @@ -1659,6 +1667,17 @@ impl AsLogicalPlan for LogicalPlanNode {
}
CopyOptions::WriterOptions(opt) => {
match opt.as_ref() {
FileTypeWriterOptions::Arrow(_) => {
let arrow_writer_options =
file_type_writer_options::FileType::ArrowOptions(
protobuf::ArrowWriterOptions {},
);
Some(copy_to_node::CopyOptions::WriterOptions(
protobuf::FileTypeWriterOptions {
file_type: Some(arrow_writer_options),
},
))
}
FileTypeWriterOptions::CSV(csv_opts) => {
let csv_options = &csv_opts.writer_options;
let csv_writer_options = csv_writer_options_to_proto(
Expand Down
5 changes: 5 additions & 0 deletions datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use datafusion::physical_plan::windows::create_window_expr;
use datafusion::physical_plan::{
functions, ColumnStatistics, Partitioning, PhysicalExpr, Statistics, WindowExpr,
};
use datafusion_common::file_options::arrow_writer::ArrowWriterOptions;
use datafusion_common::file_options::csv_writer::CsvWriterOptions;
use datafusion_common::file_options::json_writer::JsonWriterOptions;
use datafusion_common::file_options::parquet_writer::ParquetWriterOptions;
Expand Down Expand Up @@ -834,6 +835,10 @@ impl TryFrom<&protobuf::FileTypeWriterOptions> for FileTypeWriterOptions {
.ok_or_else(|| proto_error("Missing required file_type field in protobuf"))?;

match file_type {
protobuf::file_type_writer_options::FileType::ArrowOptions(_) => {
Ok(Self::Arrow(ArrowWriterOptions::new()))
}

protobuf::file_type_writer_options::FileType::JsonOptions(opts) => {
let compression: CompressionTypeVariant = opts.compression().into();
Ok(Self::JSON(JsonWriterOptions::new(compression)))
Expand Down
40 changes: 40 additions & 0 deletions datafusion/proto/tests/cases/roundtrip_logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use arrow::datatypes::{
IntervalUnit, Schema, SchemaRef, TimeUnit, UnionFields, UnionMode,
};

use datafusion_common::file_options::arrow_writer::ArrowWriterOptions;
use prost::Message;

use datafusion::datasource::provider::TableProviderFactory;
Expand Down Expand Up @@ -394,6 +395,45 @@ async fn roundtrip_logical_plan_copy_to_writer_options() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn roundtrip_logical_plan_copy_to_arrow() -> Result<()> {
let ctx = SessionContext::new();

let input = create_csv_scan(&ctx).await?;

let plan = LogicalPlan::Copy(CopyTo {
input: Arc::new(input),
output_url: "test.arrow".to_string(),
file_format: FileType::ARROW,
single_file_output: true,
copy_options: CopyOptions::WriterOptions(Box::new(FileTypeWriterOptions::Arrow(
ArrowWriterOptions::new(),
))),
});

let bytes = logical_plan_to_bytes(&plan)?;
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
assert_eq!(format!("{plan:?}"), format!("{logical_round_trip:?}"));

match logical_round_trip {
LogicalPlan::Copy(copy_to) => {
assert_eq!("test.arrow", copy_to.output_url);
assert_eq!(FileType::ARROW, copy_to.file_format);
assert!(copy_to.single_file_output);
match &copy_to.copy_options {
CopyOptions::WriterOptions(y) => match y.as_ref() {
FileTypeWriterOptions::Arrow(_) => {}
_ => panic!(),
},
_ => panic!(),
}
}
_ => panic!(),
}

Ok(())
}

#[tokio::test]
async fn roundtrip_logical_plan_copy_to_csv() -> Result<()> {
let ctx = SessionContext::new();
Expand Down

0 comments on commit 9b78efa

Please sign in to comment.