From cb2695109e014c4ac031e9aa3c928f0574386a13 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 22 Dec 2023 09:02:31 -0700 Subject: [PATCH 01/11] Add serde support for ParquetWriterOptions --- datafusion/proto/proto/datafusion.proto | 21 ++ datafusion/proto/src/generated/pbjson.rs | 316 ++++++++++++++++++ datafusion/proto/src/generated/prost.rs | 34 +- .../proto/src/physical_plan/from_proto.rs | 19 ++ .../tests/cases/roundtrip_logical_plan.rs | 1 - 5 files changed, 389 insertions(+), 2 deletions(-) diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 05f0b6434368..fdf2acc67a80 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1206,6 +1206,7 @@ message PartitionColumn { message FileTypeWriterOptions { oneof FileType { JsonWriterOptions json_options = 1; + ParquetWriterOptions parquet_options = 2; } } @@ -1213,6 +1214,26 @@ message JsonWriterOptions { CompressionTypeVariant compression = 1; } +message ParquetWriterOptions { + WriterProperties writer_properties = 1; +} + +message WriterProperties { + int32 data_page_size_limit = 1; + int32 dictionary_page_size_limit = 2; + int32 data_page_row_count_limit = 3; + int32 write_batch_size = 4; + int32 max_row_group_size = 5; + string writer_version = 6; + string created_by = 7; + // TODO add advanced writer options + // key_value_metadata: Option>, + // default_column_properties: ColumnProperties, + // column_properties: HashMap, + // sorting_columns: Option>, + // column_index_truncate_length: Option, +} + message FileSinkConfig { reserved 6; // writer_mode diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 0fdeab0a40f6..2d1ff8ca4b8b 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -7890,6 +7890,9 @@ impl serde::Serialize for FileTypeWriterOptions { file_type_writer_options::FileType::JsonOptions(v) => { struct_ser.serialize_field("jsonOptions", v)?; } + file_type_writer_options::FileType::ParquetOptions(v) => { + struct_ser.serialize_field("parquetOptions", v)?; + } } } struct_ser.end() @@ -7904,11 +7907,14 @@ impl<'de> serde::Deserialize<'de> for FileTypeWriterOptions { const FIELDS: &[&str] = &[ "json_options", "jsonOptions", + "parquet_options", + "parquetOptions", ]; #[allow(clippy::enum_variant_names)] enum GeneratedField { JsonOptions, + ParquetOptions, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -7931,6 +7937,7 @@ impl<'de> serde::Deserialize<'de> for FileTypeWriterOptions { { match value { "jsonOptions" | "json_options" => Ok(GeneratedField::JsonOptions), + "parquetOptions" | "parquet_options" => Ok(GeneratedField::ParquetOptions), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -7958,6 +7965,13 @@ impl<'de> serde::Deserialize<'de> for FileTypeWriterOptions { return Err(serde::de::Error::duplicate_field("jsonOptions")); } file_type__ = map_.next_value::<::std::option::Option<_>>()?.map(file_type_writer_options::FileType::JsonOptions) +; + } + GeneratedField::ParquetOptions => { + if file_type__.is_some() { + return Err(serde::de::Error::duplicate_field("parquetOptions")); + } + file_type__ = map_.next_value::<::std::option::Option<_>>()?.map(file_type_writer_options::FileType::ParquetOptions) ; } } @@ -15171,6 +15185,98 @@ impl<'de> serde::Deserialize<'de> for ParquetScanExecNode { deserializer.deserialize_struct("datafusion.ParquetScanExecNode", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for ParquetWriterOptions { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.writer_properties.is_some() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.ParquetWriterOptions", len)?; + if let Some(v) = self.writer_properties.as_ref() { + struct_ser.serialize_field("writerProperties", v)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for ParquetWriterOptions { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "writer_properties", + "writerProperties", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + WriterProperties, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "writerProperties" | "writer_properties" => Ok(GeneratedField::WriterProperties), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = ParquetWriterOptions; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.ParquetWriterOptions") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut writer_properties__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::WriterProperties => { + if writer_properties__.is_some() { + return Err(serde::de::Error::duplicate_field("writerProperties")); + } + writer_properties__ = map_.next_value()?; + } + } + } + Ok(ParquetWriterOptions { + writer_properties: writer_properties__, + }) + } + } + deserializer.deserialize_struct("datafusion.ParquetWriterOptions", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for PartialTableReference { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -27144,3 +27250,213 @@ impl<'de> serde::Deserialize<'de> for WindowNode { deserializer.deserialize_struct("datafusion.WindowNode", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for WriterProperties { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.data_page_size_limit != 0 { + len += 1; + } + if self.dictionary_page_size_limit != 0 { + len += 1; + } + if self.data_page_row_count_limit != 0 { + len += 1; + } + if self.write_batch_size != 0 { + len += 1; + } + if self.max_row_group_size != 0 { + len += 1; + } + if !self.writer_version.is_empty() { + len += 1; + } + if !self.created_by.is_empty() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.WriterProperties", len)?; + if self.data_page_size_limit != 0 { + struct_ser.serialize_field("dataPageSizeLimit", &self.data_page_size_limit)?; + } + if self.dictionary_page_size_limit != 0 { + struct_ser.serialize_field("dictionaryPageSizeLimit", &self.dictionary_page_size_limit)?; + } + if self.data_page_row_count_limit != 0 { + struct_ser.serialize_field("dataPageRowCountLimit", &self.data_page_row_count_limit)?; + } + if self.write_batch_size != 0 { + struct_ser.serialize_field("writeBatchSize", &self.write_batch_size)?; + } + if self.max_row_group_size != 0 { + struct_ser.serialize_field("maxRowGroupSize", &self.max_row_group_size)?; + } + if !self.writer_version.is_empty() { + struct_ser.serialize_field("writerVersion", &self.writer_version)?; + } + if !self.created_by.is_empty() { + struct_ser.serialize_field("createdBy", &self.created_by)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for WriterProperties { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "data_page_size_limit", + "dataPageSizeLimit", + "dictionary_page_size_limit", + "dictionaryPageSizeLimit", + "data_page_row_count_limit", + "dataPageRowCountLimit", + "write_batch_size", + "writeBatchSize", + "max_row_group_size", + "maxRowGroupSize", + "writer_version", + "writerVersion", + "created_by", + "createdBy", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + DataPageSizeLimit, + DictionaryPageSizeLimit, + DataPageRowCountLimit, + WriteBatchSize, + MaxRowGroupSize, + WriterVersion, + CreatedBy, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "dataPageSizeLimit" | "data_page_size_limit" => Ok(GeneratedField::DataPageSizeLimit), + "dictionaryPageSizeLimit" | "dictionary_page_size_limit" => Ok(GeneratedField::DictionaryPageSizeLimit), + "dataPageRowCountLimit" | "data_page_row_count_limit" => Ok(GeneratedField::DataPageRowCountLimit), + "writeBatchSize" | "write_batch_size" => Ok(GeneratedField::WriteBatchSize), + "maxRowGroupSize" | "max_row_group_size" => Ok(GeneratedField::MaxRowGroupSize), + "writerVersion" | "writer_version" => Ok(GeneratedField::WriterVersion), + "createdBy" | "created_by" => Ok(GeneratedField::CreatedBy), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = WriterProperties; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.WriterProperties") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut data_page_size_limit__ = None; + let mut dictionary_page_size_limit__ = None; + let mut data_page_row_count_limit__ = None; + let mut write_batch_size__ = None; + let mut max_row_group_size__ = None; + let mut writer_version__ = None; + let mut created_by__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::DataPageSizeLimit => { + if data_page_size_limit__.is_some() { + return Err(serde::de::Error::duplicate_field("dataPageSizeLimit")); + } + data_page_size_limit__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::DictionaryPageSizeLimit => { + if dictionary_page_size_limit__.is_some() { + return Err(serde::de::Error::duplicate_field("dictionaryPageSizeLimit")); + } + dictionary_page_size_limit__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::DataPageRowCountLimit => { + if data_page_row_count_limit__.is_some() { + return Err(serde::de::Error::duplicate_field("dataPageRowCountLimit")); + } + data_page_row_count_limit__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::WriteBatchSize => { + if write_batch_size__.is_some() { + return Err(serde::de::Error::duplicate_field("writeBatchSize")); + } + write_batch_size__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::MaxRowGroupSize => { + if max_row_group_size__.is_some() { + return Err(serde::de::Error::duplicate_field("maxRowGroupSize")); + } + max_row_group_size__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::WriterVersion => { + if writer_version__.is_some() { + return Err(serde::de::Error::duplicate_field("writerVersion")); + } + writer_version__ = Some(map_.next_value()?); + } + GeneratedField::CreatedBy => { + if created_by__.is_some() { + return Err(serde::de::Error::duplicate_field("createdBy")); + } + created_by__ = Some(map_.next_value()?); + } + } + } + Ok(WriterProperties { + data_page_size_limit: data_page_size_limit__.unwrap_or_default(), + dictionary_page_size_limit: dictionary_page_size_limit__.unwrap_or_default(), + data_page_row_count_limit: data_page_row_count_limit__.unwrap_or_default(), + write_batch_size: write_batch_size__.unwrap_or_default(), + max_row_group_size: max_row_group_size__.unwrap_or_default(), + writer_version: writer_version__.unwrap_or_default(), + created_by: created_by__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("datafusion.WriterProperties", FIELDS, GeneratedVisitor) + } +} diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index e44355859d65..4a5bbeb11d65 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1642,7 +1642,7 @@ pub struct PartitionColumn { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct FileTypeWriterOptions { - #[prost(oneof = "file_type_writer_options::FileType", tags = "1")] + #[prost(oneof = "file_type_writer_options::FileType", tags = "1, 2")] pub file_type: ::core::option::Option, } /// Nested message and enum types in `FileTypeWriterOptions`. @@ -1652,6 +1652,8 @@ pub mod file_type_writer_options { pub enum FileType { #[prost(message, tag = "1")] JsonOptions(super::JsonWriterOptions), + #[prost(message, tag = "2")] + ParquetOptions(super::ParquetWriterOptions), } } #[allow(clippy::derive_partial_eq_without_eq)] @@ -1662,6 +1664,36 @@ pub struct JsonWriterOptions { } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] +pub struct ParquetWriterOptions { + #[prost(message, optional, tag = "1")] + pub writer_properties: ::core::option::Option, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct WriterProperties { + #[prost(int32, tag = "1")] + pub data_page_size_limit: i32, + #[prost(int32, tag = "2")] + pub dictionary_page_size_limit: i32, + #[prost(int32, tag = "3")] + pub data_page_row_count_limit: i32, + #[prost(int32, tag = "4")] + pub write_batch_size: i32, + #[prost(int32, tag = "5")] + pub max_row_group_size: i32, + #[prost(string, tag = "6")] + pub writer_version: ::prost::alloc::string::String, + /// TODO add support for all writer properties + /// key_value_metadata: Option>, + /// default_column_properties: ColumnProperties, + /// column_properties: HashMap, + /// sorting_columns: Option>, + /// int32 column_index_truncate_length: Option, + #[prost(string, tag = "7")] + pub created_by: ::prost::alloc::string::String, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct FileSinkConfig { #[prost(string, tag = "1")] pub object_store_url: ::prost::alloc::string::String, diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 65f9f139a87b..b15a7a01e013 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -40,6 +40,7 @@ use datafusion::physical_plan::{ functions, ColumnStatistics, Partitioning, PhysicalExpr, Statistics, WindowExpr, }; use datafusion_common::file_options::json_writer::JsonWriterOptions; +use datafusion_common::file_options::parquet_writer::ParquetWriterOptions; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::stats::Precision; use datafusion_common::{ @@ -53,6 +54,7 @@ use crate::protobuf; use crate::protobuf::physical_expr_node::ExprType; use chrono::{TimeZone, Utc}; +use datafusion::parquet::file::properties::WriterProperties; use object_store::path::Path; use object_store::ObjectMeta; @@ -769,6 +771,23 @@ impl TryFrom<&protobuf::FileTypeWriterOptions> for FileTypeWriterOptions { protobuf::file_type_writer_options::FileType::JsonOptions(opts) => Ok( Self::JSON(JsonWriterOptions::new(opts.compression().into())), ), + protobuf::file_type_writer_options::FileType::ParquetOptions(opt) => { + let props = opt.writer_properties.clone().unwrap_or_default(); + let writer_properties = WriterProperties::builder() + //.set_writer_version(props.writer_version) + .set_created_by(props.created_by) + .set_dictionary_page_size_limit( + props.dictionary_page_size_limit as usize, + ) + .set_data_page_row_count_limit( + props.data_page_row_count_limit as usize, + ) + .set_data_page_size_limit(props.data_page_size_limit as usize) + .set_write_batch_size(props.write_batch_size as usize) + .set_max_row_group_size(props.max_row_group_size as usize) + .build(); + Ok(Self::Parquet(ParquetWriterOptions::new(writer_properties))) + } } } } diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index 9798b06f4724..a2996db9e891 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -330,7 +330,6 @@ async fn roundtrip_logical_plan_copy_to_sql_options() -> Result<()> { } #[tokio::test] -#[ignore] // see https://github.com/apache/arrow-datafusion/issues/8619 async fn roundtrip_logical_plan_copy_to_writer_options() -> Result<()> { let ctx = SessionContext::new(); From 4b387a1cd755b54e888f0f697a3ef2b13d1f2022 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 22 Dec 2023 10:12:05 -0700 Subject: [PATCH 02/11] save progress --- datafusion/proto/src/logical_plan/mod.rs | 95 +++++++++++++------ .../tests/cases/roundtrip_logical_plan.rs | 13 +++ 2 files changed, 79 insertions(+), 29 deletions(-) diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index e03b3ffa7b84..ffcf564ac148 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -23,7 +23,8 @@ use std::sync::Arc; use crate::common::{byte_to_string, proto_error, str_to_byte}; use crate::protobuf::logical_plan_node::LogicalPlanType::CustomScan; use crate::protobuf::{ - copy_to_node, CustomTableScanNode, LogicalExprNodeCollection, SqlOption, + copy_to_node, file_type_writer_options, CustomTableScanNode, + LogicalExprNodeCollection, SqlOption, }; use crate::{ convert_required, @@ -49,7 +50,7 @@ use datafusion::{ use datafusion_common::{ context, file_options::StatementOptions, internal_err, not_impl_err, parsers::CompressionTypeVariant, plan_datafusion_err, DataFusionError, FileType, - OwnedTableReference, Result, + FileTypeWriterOptions, OwnedTableReference, Result, }; use datafusion_expr::{ dml, @@ -62,6 +63,8 @@ use datafusion_expr::{ DistinctOn, DropView, Expr, LogicalPlan, LogicalPlanBuilder, }; +use datafusion::parquet::file::properties::WriterProperties; +use datafusion_common::file_options::parquet_writer::ParquetWriterOptions; use datafusion_expr::dml::CopyOptions; use prost::bytes::BufMut; use prost::Message; @@ -833,19 +836,47 @@ impl AsLogicalPlan for LogicalPlanNode { let copy_options = match ©.copy_options { Some(copy_to_node::CopyOptions::SqlOptions(opt)) => { - let options = opt.option.iter().map(|o| (o.key.clone(), o.value.clone())).collect(); - CopyOptions::SQLOptions(StatementOptions::from( - &options, - )) + let options = opt + .option + .iter() + .map(|o| (o.key.clone(), o.value.clone())) + .collect(); + CopyOptions::SQLOptions(StatementOptions::from(&options)) } - Some(copy_to_node::CopyOptions::WriterOptions(_)) => { - return Err(proto_error( - "LogicalPlan serde is not yet implemented for CopyTo with WriterOptions", - )) + Some(copy_to_node::CopyOptions::WriterOptions(opt)) => { + match &opt.file_type { + Some(x) => match x { + file_type_writer_options::FileType::ParquetOptions( + writer_options, + ) => { + // TODO in progress + let props = + writer_options.writer_properties.clone().unwrap(); + let writer_properties = WriterProperties::builder() + .set_created_by(props.created_by) + .build(); + let parquet_writer_options = + ParquetWriterOptions::new(writer_properties); + CopyOptions::WriterOptions(Box::new( + FileTypeWriterOptions::Parquet( + parquet_writer_options, + ), + )) + } + _ => { + return Err(proto_error( + "WriterOptions unsupported file_type", + )) + } + }, + None => { + return Err(proto_error( + "WriterOptions missing file_type", + )) + } + } } - other => return Err(proto_error(format!( - "LogicalPlan serde is not yet implemented for CopyTo with CopyOptions {other:?}", - ))) + None => return Err(proto_error("CopyTo missing CopyOptions")), }; Ok(datafusion_expr::LogicalPlan::Copy( datafusion_expr::dml::CopyTo { @@ -1580,22 +1611,28 @@ impl AsLogicalPlan for LogicalPlanNode { extension_codec, )?; - let copy_options_proto: Option = match copy_options { - CopyOptions::SQLOptions(opt) => { - let options: Vec = opt.clone().into_inner().iter().map(|(k, v)| SqlOption { - key: k.to_string(), - value: v.to_string(), - }).collect(); - Some(copy_to_node::CopyOptions::SqlOptions(protobuf::SqlOptions { - option: options - })) - } - CopyOptions::WriterOptions(_) => { - return Err(proto_error( - "LogicalPlan serde is not yet implemented for CopyTo with WriterOptions", - )) - } - }; + let copy_options_proto: Option = + match copy_options { + CopyOptions::SQLOptions(opt) => { + let options: Vec = opt + .clone() + .into_inner() + .iter() + .map(|(k, v)| SqlOption { + key: k.to_string(), + value: v.to_string(), + }) + .collect(); + Some(copy_to_node::CopyOptions::SqlOptions( + protobuf::SqlOptions { option: options }, + )) + } + CopyOptions::WriterOptions(opt) => { + Some(copy_to_node::CopyOptions::WriterOptions( + protobuf::FileTypeWriterOptions { file_type: None }, + )) + } + }; Ok(protobuf::LogicalPlanNode { logical_plan_type: Some(LogicalPlanType::CopyTo(Box::new( diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index a2996db9e891..6930294f00a6 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -353,6 +353,19 @@ async fn roundtrip_logical_plan_copy_to_writer_options() -> Result<()> { let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?; assert_eq!(format!("{plan:?}"), format!("{logical_round_trip:?}")); + match logical_round_trip { + LogicalPlan::Copy(x) => match &x.copy_options { + CopyOptions::WriterOptions(y) => match y.as_ref() { + FileTypeWriterOptions::Parquet(p) => { + assert_eq!("DataFusion Test", p.writer_options.created_by()); + } + _ => panic!(), + }, + _ => panic!(), + }, + _ => panic!(), + } + Ok(()) } From 5a3bc8d0964399abc2a0aefbf0312ba1f1478335 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 22 Dec 2023 10:25:05 -0700 Subject: [PATCH 03/11] test passes --- datafusion/proto/src/logical_plan/mod.rs | 38 ++++++++++++++++++++++-- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index ffcf564ac148..5e26096013b7 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -1628,9 +1628,41 @@ impl AsLogicalPlan for LogicalPlanNode { )) } CopyOptions::WriterOptions(opt) => { - Some(copy_to_node::CopyOptions::WriterOptions( - protobuf::FileTypeWriterOptions { file_type: None }, - )) + match opt.as_ref() { + FileTypeWriterOptions::Parquet(parquet_opts) => { + let props = &parquet_opts.writer_options; + let writer_properties = protobuf::WriterProperties { + data_page_size_limit: props.data_page_size_limit() + as i32, + dictionary_page_size_limit: props + .dictionary_page_size_limit() + as i32, + data_page_row_count_limit: props + .data_page_row_count_limit() + as i32, + write_batch_size: props.write_batch_size() as i32, + max_row_group_size: props.max_row_group_size() + as i32, + writer_version: "".to_string(), //TODO + created_by: props.created_by().to_string(), + }; + let parquet_writer_options = + protobuf::ParquetWriterOptions { + writer_properties: Some(writer_properties), + }; + let parquet_options = file_type_writer_options::FileType::ParquetOptions(parquet_writer_options); + Some(copy_to_node::CopyOptions::WriterOptions( + protobuf::FileTypeWriterOptions { + file_type: Some(parquet_options), + }, + )) + } + _ => { + return Err(proto_error( + "Unsupported FileTypeWriterOptions in CopyTo", + )) + } + } } }; From d778fad908c18d03d870f75f191163ee1c287b30 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 22 Dec 2023 11:20:50 -0700 Subject: [PATCH 04/11] Improve test --- datafusion/proto/src/logical_plan/mod.rs | 44 ++++++++++++++----- .../tests/cases/roundtrip_logical_plan.rs | 43 +++++++++++++----- 2 files changed, 64 insertions(+), 23 deletions(-) diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 5e26096013b7..3411e66ca399 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -63,7 +63,7 @@ use datafusion_expr::{ DistinctOn, DropView, Expr, LogicalPlan, LogicalPlanBuilder, }; -use datafusion::parquet::file::properties::WriterProperties; +use datafusion::parquet::file::properties::{WriterProperties, WriterVersion}; use datafusion_common::file_options::parquet_writer::ParquetWriterOptions; use datafusion_expr::dml::CopyOptions; use prost::bytes::BufMut; @@ -845,21 +845,22 @@ impl AsLogicalPlan for LogicalPlanNode { } Some(copy_to_node::CopyOptions::WriterOptions(opt)) => { match &opt.file_type { - Some(x) => match x { + Some(ft) => match ft { file_type_writer_options::FileType::ParquetOptions( writer_options, ) => { - // TODO in progress - let props = - writer_options.writer_properties.clone().unwrap(); - let writer_properties = WriterProperties::builder() - .set_created_by(props.created_by) - .build(); - let parquet_writer_options = - ParquetWriterOptions::new(writer_properties); + let writer_properties = + match &writer_options.writer_properties { + Some(serialized_writer_options) => { + writer_options_from_proto( + serialized_writer_options, + )? + } + _ => WriterProperties::default(), + }; CopyOptions::WriterOptions(Box::new( FileTypeWriterOptions::Parquet( - parquet_writer_options, + ParquetWriterOptions::new(writer_properties), ), )) } @@ -1643,7 +1644,10 @@ impl AsLogicalPlan for LogicalPlanNode { write_batch_size: props.write_batch_size() as i32, max_row_group_size: props.max_row_group_size() as i32, - writer_version: "".to_string(), //TODO + writer_version: format!( + "{:?}", + props.writer_version() + ), created_by: props.created_by().to_string(), }; let parquet_writer_options = @@ -1684,3 +1688,19 @@ impl AsLogicalPlan for LogicalPlanNode { } } } + +fn writer_options_from_proto( + props: &protobuf::WriterProperties, +) -> Result { + let writer_version = WriterVersion::from_str(&props.writer_version) + .map_err(|e| proto_error(format!("{}", e)))?; + Ok(WriterProperties::builder() + .set_created_by(props.created_by.clone()) + .set_writer_version(writer_version) + .set_dictionary_page_size_limit(props.dictionary_page_size_limit as usize) + .set_data_page_row_count_limit(props.data_page_row_count_limit as usize) + .set_data_page_size_limit(props.data_page_size_limit as usize) + .set_write_batch_size(props.write_batch_size as usize) + .set_max_row_group_size(props.max_row_group_size as usize) + .build()) +} diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index 6930294f00a6..3eeae01a643e 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -31,7 +31,7 @@ use datafusion::datasource::provider::TableProviderFactory; use datafusion::datasource::TableProvider; use datafusion::execution::context::SessionState; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; -use datafusion::parquet::file::properties::WriterProperties; +use datafusion::parquet::file::properties::{WriterProperties, WriterVersion}; use datafusion::physical_plan::functions::make_scalar_function; use datafusion::prelude::{create_udf, CsvReadOptions, SessionConfig, SessionContext}; use datafusion::test_util::{TestTableFactory, TestTableProvider}; @@ -338,11 +338,17 @@ async fn roundtrip_logical_plan_copy_to_writer_options() -> Result<()> { let writer_properties = WriterProperties::builder() .set_bloom_filter_enabled(true) .set_created_by("DataFusion Test".to_string()) + .set_writer_version(WriterVersion::PARQUET_2_0) + .set_write_batch_size(111) + .set_data_page_size_limit(222) + .set_data_page_row_count_limit(333) + .set_dictionary_page_size_limit(444) + .set_max_row_group_size(555) .build(); let plan = LogicalPlan::Copy(CopyTo { input: Arc::new(input), - output_url: "test.csv".to_string(), - file_format: FileType::CSV, + output_url: "test.parquet".to_string(), + file_format: FileType::PARQUET, single_file_output: true, copy_options: CopyOptions::WriterOptions(Box::new( FileTypeWriterOptions::Parquet(ParquetWriterOptions::new(writer_properties)), @@ -354,15 +360,30 @@ async fn roundtrip_logical_plan_copy_to_writer_options() -> Result<()> { assert_eq!(format!("{plan:?}"), format!("{logical_round_trip:?}")); match logical_round_trip { - LogicalPlan::Copy(x) => match &x.copy_options { - CopyOptions::WriterOptions(y) => match y.as_ref() { - FileTypeWriterOptions::Parquet(p) => { - assert_eq!("DataFusion Test", p.writer_options.created_by()); - } + LogicalPlan::Copy(copy_to) => { + assert_eq!("test.parquet", copy_to.output_url); + assert_eq!(FileType::PARQUET, copy_to.file_format); + assert!(copy_to.single_file_output); + match ©_to.copy_options { + CopyOptions::WriterOptions(y) => match y.as_ref() { + FileTypeWriterOptions::Parquet(p) => { + let props = &p.writer_options; + assert_eq!("DataFusion Test", props.created_by()); + assert_eq!( + "PARQUET_2_0", + format!("{:?}", props.writer_version()) + ); + assert_eq!(111, props.write_batch_size()); + assert_eq!(222, props.data_page_size_limit()); + assert_eq!(333, props.data_page_row_count_limit()); + assert_eq!(444, props.dictionary_page_size_limit()); + assert_eq!(555, props.max_row_group_size()); + } + _ => panic!(), + }, _ => panic!(), - }, - _ => panic!(), - }, + } + } _ => panic!(), } From 06db77bfb794bf0e0b7c605b408888a66433d62f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 22 Dec 2023 11:33:28 -0700 Subject: [PATCH 05/11] Refactor and add link to follow on issue --- datafusion/proto/proto/datafusion.proto | 1 + datafusion/proto/src/logical_plan/mod.rs | 37 +++++++++++------------- 2 files changed, 18 insertions(+), 20 deletions(-) diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index fdf2acc67a80..0c843cb948f1 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1227,6 +1227,7 @@ message WriterProperties { string writer_version = 6; string created_by = 7; // TODO add advanced writer options + // tracking issue: https://github.com/apache/arrow-datafusion/issues/8632 // key_value_metadata: Option>, // default_column_properties: ColumnProperties, // column_properties: HashMap, diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 3411e66ca399..033b991b5849 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -1631,28 +1631,13 @@ impl AsLogicalPlan for LogicalPlanNode { CopyOptions::WriterOptions(opt) => { match opt.as_ref() { FileTypeWriterOptions::Parquet(parquet_opts) => { - let props = &parquet_opts.writer_options; - let writer_properties = protobuf::WriterProperties { - data_page_size_limit: props.data_page_size_limit() - as i32, - dictionary_page_size_limit: props - .dictionary_page_size_limit() - as i32, - data_page_row_count_limit: props - .data_page_row_count_limit() - as i32, - write_batch_size: props.write_batch_size() as i32, - max_row_group_size: props.max_row_group_size() - as i32, - writer_version: format!( - "{:?}", - props.writer_version() - ), - created_by: props.created_by().to_string(), - }; let parquet_writer_options = protobuf::ParquetWriterOptions { - writer_properties: Some(writer_properties), + writer_properties: Some( + writer_properties_to_proto( + &parquet_opts.writer_options, + ), + ), }; let parquet_options = file_type_writer_options::FileType::ParquetOptions(parquet_writer_options); Some(copy_to_node::CopyOptions::WriterOptions( @@ -1689,6 +1674,18 @@ impl AsLogicalPlan for LogicalPlanNode { } } +fn writer_properties_to_proto(props: &WriterProperties) -> protobuf::WriterProperties { + protobuf::WriterProperties { + data_page_size_limit: props.data_page_size_limit() as i32, + dictionary_page_size_limit: props.dictionary_page_size_limit() as i32, + data_page_row_count_limit: props.data_page_row_count_limit() as i32, + write_batch_size: props.write_batch_size() as i32, + max_row_group_size: props.max_row_group_size() as i32, + writer_version: format!("{:?}", props.writer_version()), + created_by: props.created_by().to_string(), + } +} + fn writer_options_from_proto( props: &protobuf::WriterProperties, ) -> Result { From f902d6e8e865acecbd6a22407a3e8f2b43c9e8be Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 22 Dec 2023 11:38:52 -0700 Subject: [PATCH 06/11] remove duplicate code --- datafusion/proto/src/logical_plan/mod.rs | 6 ++++-- datafusion/proto/src/physical_plan/from_proto.rs | 16 ++-------------- 2 files changed, 6 insertions(+), 16 deletions(-) diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 033b991b5849..8480d3840079 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -1674,7 +1674,9 @@ impl AsLogicalPlan for LogicalPlanNode { } } -fn writer_properties_to_proto(props: &WriterProperties) -> protobuf::WriterProperties { +pub(crate) fn writer_properties_to_proto( + props: &WriterProperties, +) -> protobuf::WriterProperties { protobuf::WriterProperties { data_page_size_limit: props.data_page_size_limit() as i32, dictionary_page_size_limit: props.dictionary_page_size_limit() as i32, @@ -1686,7 +1688,7 @@ fn writer_properties_to_proto(props: &WriterProperties) -> protobuf::WriterPrope } } -fn writer_options_from_proto( +pub(crate) fn writer_options_from_proto( props: &protobuf::WriterProperties, ) -> Result { let writer_version = WriterVersion::from_str(&props.writer_version) diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index b15a7a01e013..fa09f72ead2c 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -53,8 +53,8 @@ use crate::logical_plan; use crate::protobuf; use crate::protobuf::physical_expr_node::ExprType; +use crate::logical_plan::writer_options_from_proto; use chrono::{TimeZone, Utc}; -use datafusion::parquet::file::properties::WriterProperties; use object_store::path::Path; use object_store::ObjectMeta; @@ -773,19 +773,7 @@ impl TryFrom<&protobuf::FileTypeWriterOptions> for FileTypeWriterOptions { ), protobuf::file_type_writer_options::FileType::ParquetOptions(opt) => { let props = opt.writer_properties.clone().unwrap_or_default(); - let writer_properties = WriterProperties::builder() - //.set_writer_version(props.writer_version) - .set_created_by(props.created_by) - .set_dictionary_page_size_limit( - props.dictionary_page_size_limit as usize, - ) - .set_data_page_row_count_limit( - props.data_page_row_count_limit as usize, - ) - .set_data_page_size_limit(props.data_page_size_limit as usize) - .set_write_batch_size(props.write_batch_size as usize) - .set_max_row_group_size(props.max_row_group_size as usize) - .build(); + let writer_properties = writer_options_from_proto(&props)?; Ok(Self::Parquet(ParquetWriterOptions::new(writer_properties))) } } From 9f267b410ca1813f0b01fe6353b5e736a7457a57 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 22 Dec 2023 12:12:47 -0700 Subject: [PATCH 07/11] clippy --- datafusion/proto/src/logical_plan/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 8480d3840079..793b5a223008 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -1692,7 +1692,7 @@ pub(crate) fn writer_options_from_proto( props: &protobuf::WriterProperties, ) -> Result { let writer_version = WriterVersion::from_str(&props.writer_version) - .map_err(|e| proto_error(format!("{}", e)))?; + .map_err(|e| proto_error(e.to_string()))?; Ok(WriterProperties::builder() .set_created_by(props.created_by.clone()) .set_writer_version(writer_version) From 62e23f8785351f61e94d67ac232f9973858e12d3 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 22 Dec 2023 14:06:58 -0700 Subject: [PATCH 08/11] Regen --- datafusion/proto/src/generated/prost.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 4a5bbeb11d65..ca2313b181b6 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1683,12 +1683,13 @@ pub struct WriterProperties { pub max_row_group_size: i32, #[prost(string, tag = "6")] pub writer_version: ::prost::alloc::string::String, - /// TODO add support for all writer properties - /// key_value_metadata: Option>, - /// default_column_properties: ColumnProperties, - /// column_properties: HashMap, - /// sorting_columns: Option>, - /// int32 column_index_truncate_length: Option, + /// TODO add advanced writer options + /// tracking issue: + /// key_value_metadata: Option>, + /// default_column_properties: ColumnProperties, + /// column_properties: HashMap, + /// sorting_columns: Option>, + /// column_index_truncate_length: Option, #[prost(string, tag = "7")] pub created_by: ::prost::alloc::string::String, } From 0c5b0d59d056e005e10790b1b2df0baf3ccdcff3 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 22 Dec 2023 14:50:52 -0700 Subject: [PATCH 09/11] remove comments from proto file --- datafusion/proto/proto/datafusion.proto | 7 ------- datafusion/proto/src/generated/prost.rs | 7 ------- 2 files changed, 14 deletions(-) diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 0c843cb948f1..caeb2641beac 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1226,13 +1226,6 @@ message WriterProperties { int32 max_row_group_size = 5; string writer_version = 6; string created_by = 7; - // TODO add advanced writer options - // tracking issue: https://github.com/apache/arrow-datafusion/issues/8632 - // key_value_metadata: Option>, - // default_column_properties: ColumnProperties, - // column_properties: HashMap, - // sorting_columns: Option>, - // column_index_truncate_length: Option, } message FileSinkConfig { diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index ca2313b181b6..93bd0f805292 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1683,13 +1683,6 @@ pub struct WriterProperties { pub max_row_group_size: i32, #[prost(string, tag = "6")] pub writer_version: ::prost::alloc::string::String, - /// TODO add advanced writer options - /// tracking issue: - /// key_value_metadata: Option>, - /// default_column_properties: ColumnProperties, - /// column_properties: HashMap, - /// sorting_columns: Option>, - /// column_index_truncate_length: Option, #[prost(string, tag = "7")] pub created_by: ::prost::alloc::string::String, } From e02c2581e716a429e0b0a6a484bcf408be2a0cb1 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 23 Dec 2023 08:45:33 -0700 Subject: [PATCH 10/11] change proto types from i32 to u32 pre feedback on PR --- datafusion/proto/proto/datafusion.proto | 10 +++++----- datafusion/proto/src/generated/prost.rs | 20 +++++++++---------- datafusion/proto/src/logical_plan/mod.rs | 14 ++++++------- .../proto/src/physical_plan/from_proto.rs | 4 ++-- 4 files changed, 24 insertions(+), 24 deletions(-) diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index caeb2641beac..ff0a5377a089 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1219,11 +1219,11 @@ message ParquetWriterOptions { } message WriterProperties { - int32 data_page_size_limit = 1; - int32 dictionary_page_size_limit = 2; - int32 data_page_row_count_limit = 3; - int32 write_batch_size = 4; - int32 max_row_group_size = 5; + uint32 data_page_size_limit = 1; + uint32 dictionary_page_size_limit = 2; + uint32 data_page_row_count_limit = 3; + uint32 write_batch_size = 4; + uint32 max_row_group_size = 5; string writer_version = 6; string created_by = 7; } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 93bd0f805292..be2702f6f0e4 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1671,16 +1671,16 @@ pub struct ParquetWriterOptions { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct WriterProperties { - #[prost(int32, tag = "1")] - pub data_page_size_limit: i32, - #[prost(int32, tag = "2")] - pub dictionary_page_size_limit: i32, - #[prost(int32, tag = "3")] - pub data_page_row_count_limit: i32, - #[prost(int32, tag = "4")] - pub write_batch_size: i32, - #[prost(int32, tag = "5")] - pub max_row_group_size: i32, + #[prost(uint32, tag = "1")] + pub data_page_size_limit: u32, + #[prost(uint32, tag = "2")] + pub dictionary_page_size_limit: u32, + #[prost(uint32, tag = "3")] + pub data_page_row_count_limit: u32, + #[prost(uint32, tag = "4")] + pub write_batch_size: u32, + #[prost(uint32, tag = "5")] + pub max_row_group_size: u32, #[prost(string, tag = "6")] pub writer_version: ::prost::alloc::string::String, #[prost(string, tag = "7")] diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 793b5a223008..5b96e96504f1 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -852,7 +852,7 @@ impl AsLogicalPlan for LogicalPlanNode { let writer_properties = match &writer_options.writer_properties { Some(serialized_writer_options) => { - writer_options_from_proto( + writer_properties_from_proto( serialized_writer_options, )? } @@ -1678,17 +1678,17 @@ pub(crate) fn writer_properties_to_proto( props: &WriterProperties, ) -> protobuf::WriterProperties { protobuf::WriterProperties { - data_page_size_limit: props.data_page_size_limit() as i32, - dictionary_page_size_limit: props.dictionary_page_size_limit() as i32, - data_page_row_count_limit: props.data_page_row_count_limit() as i32, - write_batch_size: props.write_batch_size() as i32, - max_row_group_size: props.max_row_group_size() as i32, + data_page_size_limit: props.data_page_size_limit() as u32, + dictionary_page_size_limit: props.dictionary_page_size_limit() as u32, + data_page_row_count_limit: props.data_page_row_count_limit() as u32, + write_batch_size: props.write_batch_size() as u32, + max_row_group_size: props.max_row_group_size() as u32, writer_version: format!("{:?}", props.writer_version()), created_by: props.created_by().to_string(), } } -pub(crate) fn writer_options_from_proto( +pub(crate) fn writer_properties_from_proto( props: &protobuf::WriterProperties, ) -> Result { let writer_version = WriterVersion::from_str(&props.writer_version) diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index fa09f72ead2c..824eb60a5715 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -53,7 +53,7 @@ use crate::logical_plan; use crate::protobuf; use crate::protobuf::physical_expr_node::ExprType; -use crate::logical_plan::writer_options_from_proto; +use crate::logical_plan::writer_properties_from_proto; use chrono::{TimeZone, Utc}; use object_store::path::Path; use object_store::ObjectMeta; @@ -773,7 +773,7 @@ impl TryFrom<&protobuf::FileTypeWriterOptions> for FileTypeWriterOptions { ), protobuf::file_type_writer_options::FileType::ParquetOptions(opt) => { let props = opt.writer_properties.clone().unwrap_or_default(); - let writer_properties = writer_options_from_proto(&props)?; + let writer_properties = writer_properties_from_proto(&props)?; Ok(Self::Parquet(ParquetWriterOptions::new(writer_properties))) } } From fd85f186670a89bbd98face53bfb3f47533ef4be Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 23 Dec 2023 08:56:20 -0700 Subject: [PATCH 11/11] change to u64 --- datafusion/proto/proto/datafusion.proto | 10 +++++----- datafusion/proto/src/generated/pbjson.rs | 15 ++++++++++----- datafusion/proto/src/generated/prost.rs | 20 ++++++++++---------- datafusion/proto/src/logical_plan/mod.rs | 10 +++++----- 4 files changed, 30 insertions(+), 25 deletions(-) diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index ff0a5377a089..d02fc8e91b41 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1219,11 +1219,11 @@ message ParquetWriterOptions { } message WriterProperties { - uint32 data_page_size_limit = 1; - uint32 dictionary_page_size_limit = 2; - uint32 data_page_row_count_limit = 3; - uint32 write_batch_size = 4; - uint32 max_row_group_size = 5; + uint64 data_page_size_limit = 1; + uint64 dictionary_page_size_limit = 2; + uint64 data_page_row_count_limit = 3; + uint64 write_batch_size = 4; + uint64 max_row_group_size = 5; string writer_version = 6; string created_by = 7; } diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 2d1ff8ca4b8b..f860b1f1e6a0 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -27281,19 +27281,24 @@ impl serde::Serialize for WriterProperties { } let mut struct_ser = serializer.serialize_struct("datafusion.WriterProperties", len)?; if self.data_page_size_limit != 0 { - struct_ser.serialize_field("dataPageSizeLimit", &self.data_page_size_limit)?; + #[allow(clippy::needless_borrow)] + struct_ser.serialize_field("dataPageSizeLimit", ToString::to_string(&self.data_page_size_limit).as_str())?; } if self.dictionary_page_size_limit != 0 { - struct_ser.serialize_field("dictionaryPageSizeLimit", &self.dictionary_page_size_limit)?; + #[allow(clippy::needless_borrow)] + struct_ser.serialize_field("dictionaryPageSizeLimit", ToString::to_string(&self.dictionary_page_size_limit).as_str())?; } if self.data_page_row_count_limit != 0 { - struct_ser.serialize_field("dataPageRowCountLimit", &self.data_page_row_count_limit)?; + #[allow(clippy::needless_borrow)] + struct_ser.serialize_field("dataPageRowCountLimit", ToString::to_string(&self.data_page_row_count_limit).as_str())?; } if self.write_batch_size != 0 { - struct_ser.serialize_field("writeBatchSize", &self.write_batch_size)?; + #[allow(clippy::needless_borrow)] + struct_ser.serialize_field("writeBatchSize", ToString::to_string(&self.write_batch_size).as_str())?; } if self.max_row_group_size != 0 { - struct_ser.serialize_field("maxRowGroupSize", &self.max_row_group_size)?; + #[allow(clippy::needless_borrow)] + struct_ser.serialize_field("maxRowGroupSize", ToString::to_string(&self.max_row_group_size).as_str())?; } if !self.writer_version.is_empty() { struct_ser.serialize_field("writerVersion", &self.writer_version)?; diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index be2702f6f0e4..459d5a965cd3 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1671,16 +1671,16 @@ pub struct ParquetWriterOptions { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct WriterProperties { - #[prost(uint32, tag = "1")] - pub data_page_size_limit: u32, - #[prost(uint32, tag = "2")] - pub dictionary_page_size_limit: u32, - #[prost(uint32, tag = "3")] - pub data_page_row_count_limit: u32, - #[prost(uint32, tag = "4")] - pub write_batch_size: u32, - #[prost(uint32, tag = "5")] - pub max_row_group_size: u32, + #[prost(uint64, tag = "1")] + pub data_page_size_limit: u64, + #[prost(uint64, tag = "2")] + pub dictionary_page_size_limit: u64, + #[prost(uint64, tag = "3")] + pub data_page_row_count_limit: u64, + #[prost(uint64, tag = "4")] + pub write_batch_size: u64, + #[prost(uint64, tag = "5")] + pub max_row_group_size: u64, #[prost(string, tag = "6")] pub writer_version: ::prost::alloc::string::String, #[prost(string, tag = "7")] diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 5b96e96504f1..d137a41fa19b 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -1678,11 +1678,11 @@ pub(crate) fn writer_properties_to_proto( props: &WriterProperties, ) -> protobuf::WriterProperties { protobuf::WriterProperties { - data_page_size_limit: props.data_page_size_limit() as u32, - dictionary_page_size_limit: props.dictionary_page_size_limit() as u32, - data_page_row_count_limit: props.data_page_row_count_limit() as u32, - write_batch_size: props.write_batch_size() as u32, - max_row_group_size: props.max_row_group_size() as u32, + data_page_size_limit: props.data_page_size_limit() as u64, + dictionary_page_size_limit: props.dictionary_page_size_limit() as u64, + data_page_row_count_limit: props.data_page_row_count_limit() as u64, + write_batch_size: props.write_batch_size() as u64, + max_row_group_size: props.max_row_group_size() as u64, writer_version: format!("{:?}", props.writer_version()), created_by: props.created_by().to_string(), }