Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add serde support for Arrow FileTypeWriterOptions #8850

Merged
merged 9 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions datafusion/common/src/file_options/arrow_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,18 @@ use super::StatementOptions;
#[derive(Clone, Debug)]
pub struct ArrowWriterOptions {}

impl ArrowWriterOptions {
pub fn new() -> Self {
Self {}
}
}

impl Default for ArrowWriterOptions {
fn default() -> Self {
Self::new()
}
}

impl TryFrom<(&ConfigOptions, &StatementOptions)> for ArrowWriterOptions {
type Error = DataFusionError;

Expand Down
3 changes: 3 additions & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1213,6 +1213,7 @@ message FileTypeWriterOptions {
JsonWriterOptions json_options = 1;
ParquetWriterOptions parquet_options = 2;
CsvWriterOptions csv_options = 3;
ArrowWriterOptions arrow_options = 4;
}
}

Expand Down Expand Up @@ -1243,6 +1244,8 @@ message CsvWriterOptions {
string null_value = 8;
}

message ArrowWriterOptions {}

message WriterProperties {
uint64 data_page_size_limit = 1;
uint64 dictionary_page_size_limit = 2;
Expand Down
85 changes: 85 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions datafusion/proto/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use arrow::csv::WriterBuilder;
use datafusion_common::file_options::arrow_writer::ArrowWriterOptions;
use std::collections::HashMap;
use std::fmt::Debug;
use std::str::FromStr;
Expand Down Expand Up @@ -858,6 +859,13 @@ impl AsLogicalPlan for LogicalPlanNode {
Some(copy_to_node::CopyOptions::WriterOptions(opt)) => {
match &opt.file_type {
Some(ft) => match ft {
file_type_writer_options::FileType::ArrowOptions(_) => {
CopyOptions::WriterOptions(Box::new(
FileTypeWriterOptions::Arrow(
ArrowWriterOptions::new(),
),
))
}
file_type_writer_options::FileType::CsvOptions(
writer_options,
) => {
Expand Down Expand Up @@ -1659,6 +1667,17 @@ impl AsLogicalPlan for LogicalPlanNode {
}
CopyOptions::WriterOptions(opt) => {
match opt.as_ref() {
FileTypeWriterOptions::Arrow(_) => {
let arrow_writer_options =
file_type_writer_options::FileType::ArrowOptions(
protobuf::ArrowWriterOptions {},
);
Some(copy_to_node::CopyOptions::WriterOptions(
protobuf::FileTypeWriterOptions {
file_type: Some(arrow_writer_options),
},
))
}
FileTypeWriterOptions::CSV(csv_opts) => {
let csv_options = &csv_opts.writer_options;
let csv_writer_options = csv_writer_options_to_proto(
Expand Down
5 changes: 5 additions & 0 deletions datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use datafusion::physical_plan::windows::create_window_expr;
use datafusion::physical_plan::{
functions, ColumnStatistics, Partitioning, PhysicalExpr, Statistics, WindowExpr,
};
use datafusion_common::file_options::arrow_writer::ArrowWriterOptions;
use datafusion_common::file_options::csv_writer::CsvWriterOptions;
use datafusion_common::file_options::json_writer::JsonWriterOptions;
use datafusion_common::file_options::parquet_writer::ParquetWriterOptions;
Expand Down Expand Up @@ -834,6 +835,10 @@ impl TryFrom<&protobuf::FileTypeWriterOptions> for FileTypeWriterOptions {
.ok_or_else(|| proto_error("Missing required file_type field in protobuf"))?;

match file_type {
protobuf::file_type_writer_options::FileType::ArrowOptions(_) => {
Ok(Self::Arrow(ArrowWriterOptions::new()))
}

protobuf::file_type_writer_options::FileType::JsonOptions(opts) => {
let compression: CompressionTypeVariant = opts.compression().into();
Ok(Self::JSON(JsonWriterOptions::new(compression)))
Expand Down
40 changes: 40 additions & 0 deletions datafusion/proto/tests/cases/roundtrip_logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use arrow::datatypes::{
IntervalUnit, Schema, SchemaRef, TimeUnit, UnionFields, UnionMode,
};

use datafusion_common::file_options::arrow_writer::ArrowWriterOptions;
use prost::Message;

use datafusion::datasource::provider::TableProviderFactory;
Expand Down Expand Up @@ -394,6 +395,45 @@ async fn roundtrip_logical_plan_copy_to_writer_options() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn roundtrip_logical_plan_copy_to_arrow() -> Result<()> {
let ctx = SessionContext::new();

let input = create_csv_scan(&ctx).await?;

let plan = LogicalPlan::Copy(CopyTo {
input: Arc::new(input),
output_url: "test.arrow".to_string(),
file_format: FileType::ARROW,
single_file_output: true,
copy_options: CopyOptions::WriterOptions(Box::new(FileTypeWriterOptions::Arrow(
ArrowWriterOptions::new(),
))),
});

let bytes = logical_plan_to_bytes(&plan)?;
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
assert_eq!(format!("{plan:?}"), format!("{logical_round_trip:?}"));

match logical_round_trip {
LogicalPlan::Copy(copy_to) => {
assert_eq!("test.arrow", copy_to.output_url);
assert_eq!(FileType::ARROW, copy_to.file_format);
assert!(copy_to.single_file_output);
match &copy_to.copy_options {
CopyOptions::WriterOptions(y) => match y.as_ref() {
FileTypeWriterOptions::Arrow(_) => {}
_ => panic!(),
},
_ => panic!(),
}
}
_ => panic!(),
}

Ok(())
}

#[tokio::test]
async fn roundtrip_logical_plan_copy_to_csv() -> Result<()> {
let ctx = SessionContext::new();
Expand Down