From fc7069a8c50861c2dda99bba42832795c86cffd9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alihan=20=C3=87elikcan?= Date: Mon, 4 Nov 2024 14:38:00 +0300 Subject: [PATCH 1/2] Make schema_infer_max_rec an Option --- datafusion/common/src/config.rs | 6 +++--- .../core/src/datasource/file_format/csv.rs | 14 +++++++++---- .../core/src/datasource/file_format/json.rs | 11 +++++++--- .../src/datasource/listing_table_factory.rs | 2 +- .../proto/datafusion_common.proto | 4 ++-- datafusion/proto-common/src/from_proto/mod.rs | 4 ++-- .../proto-common/src/generated/pbjson.rs | 20 +++++++++---------- .../proto-common/src/generated/prost.rs | 12 +++++------ datafusion/proto-common/src/to_proto/mod.rs | 4 ++-- .../src/generated/datafusion_proto_common.rs | 12 +++++------ .../proto/src/logical_plan/file_formats.rs | 8 ++++---- .../tests/cases/roundtrip_logical_plan.rs | 2 +- 12 files changed, 55 insertions(+), 44 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 336513035036..1ad10d164868 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1648,7 +1648,7 @@ config_namespace! { /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting. pub newlines_in_values: Option, default = None pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED - pub schema_infer_max_rec: usize, default = 100 + pub schema_infer_max_rec: Option, default = None pub date_format: Option, default = None pub datetime_format: Option, default = None pub timestamp_format: Option, default = None @@ -1673,7 +1673,7 @@ impl CsvOptions { /// Set a limit in terms of records to scan to infer the schema /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD` pub fn with_schema_infer_max_rec(mut self, max_rec: usize) -> Self { - self.schema_infer_max_rec = max_rec; + self.schema_infer_max_rec = Some(max_rec); self } @@ -1773,7 +1773,7 @@ config_namespace! { /// Options controlling JSON format pub struct JsonOptions { pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED - pub schema_infer_max_rec: usize, default = 100 + pub schema_infer_max_rec: Option, default = None } } diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 2aaef2cda1c8..022cd8cded33 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -23,7 +23,7 @@ use std::fmt::{self, Debug}; use std::sync::Arc; use super::write::orchestration::stateless_multipart_put; -use super::{FileFormat, FileFormatFactory}; +use super::{FileFormat, FileFormatFactory, DEFAULT_SCHEMA_INFER_MAX_RECORD}; use crate::datasource::file_format::file_compression_type::FileCompressionType; use crate::datasource::file_format::write::BatchSerializer; use crate::datasource::physical_plan::{ @@ -204,7 +204,7 @@ impl CsvFormat { /// Set a limit in terms of records to scan to infer the schema /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD` pub fn with_schema_infer_max_rec(mut self, max_rec: usize) -> Self { - self.options.schema_infer_max_rec = max_rec; + self.options.schema_infer_max_rec = Some(max_rec); self } @@ -319,7 +319,10 @@ impl FileFormat for CsvFormat { ) -> Result { let mut schemas = vec![]; - let mut records_to_read = self.options.schema_infer_max_rec; + let mut records_to_read = self + .options + .schema_infer_max_rec + .unwrap_or(DEFAULT_SCHEMA_INFER_MAX_RECORD); for object in objects { let stream = self.read_to_delimited_chunks(store, object).await; @@ -891,7 +894,10 @@ mod tests { let integration = LocalFileSystem::new_with_prefix(arrow_test_data()).unwrap(); let path = Path::from("csv/aggregate_test_100.csv"); let csv = CsvFormat::default().with_has_header(true); - let records_to_read = csv.options().schema_infer_max_rec; + let records_to_read = csv + .options() + .schema_infer_max_rec + .unwrap_or(DEFAULT_SCHEMA_INFER_MAX_RECORD); let store = Arc::new(integration) as Arc; let original_stream = store.get(&path).await?; diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index fd97da52165b..6a7bfd2040f0 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -25,7 +25,9 @@ use std::io::BufReader; use std::sync::Arc; use super::write::orchestration::stateless_multipart_put; -use super::{FileFormat, FileFormatFactory, FileScanConfig}; +use super::{ + FileFormat, FileFormatFactory, FileScanConfig, DEFAULT_SCHEMA_INFER_MAX_RECORD, +}; use crate::datasource::file_format::file_compression_type::FileCompressionType; use crate::datasource::file_format::write::BatchSerializer; use crate::datasource::physical_plan::FileGroupDisplay; @@ -147,7 +149,7 @@ impl JsonFormat { /// Set a limit in terms of records to scan to infer the schema /// - defaults to `DEFAULT_SCHEMA_INFER_MAX_RECORD` pub fn with_schema_infer_max_rec(mut self, max_rec: usize) -> Self { - self.options.schema_infer_max_rec = max_rec; + self.options.schema_infer_max_rec = Some(max_rec); self } @@ -187,7 +189,10 @@ impl FileFormat for JsonFormat { objects: &[ObjectMeta], ) -> Result { let mut schemas = Vec::new(); - let mut records_to_read = self.options.schema_infer_max_rec; + let mut records_to_read = self + .options + .schema_infer_max_rec + .unwrap_or(DEFAULT_SCHEMA_INFER_MAX_RECORD); let file_compression_type = FileCompressionType::from(self.options.compression); for object in objects { let mut take_while = || { diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 581d88d25884..1f6a19ceb55c 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -254,7 +254,7 @@ mod tests { let format = listing_table.options().format.clone(); let csv_format = format.as_any().downcast_ref::().unwrap(); let csv_options = csv_format.options().clone(); - assert_eq!(csv_options.schema_infer_max_rec, 1000); + assert_eq!(csv_options.schema_infer_max_rec, Some(1000)); let listing_options = listing_table.options(); assert_eq!(".tbl", listing_options.file_extension); } diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 65cd33d523cd..2da8b6066742 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -414,7 +414,7 @@ message CsvOptions { bytes quote = 3; // Quote character as a byte bytes escape = 4; // Optional escape character as a byte CompressionTypeVariant compression = 5; // Compression type - uint64 schema_infer_max_rec = 6; // Max records for schema inference + optional uint64 schema_infer_max_rec = 6; // Optional max records for schema inference string date_format = 7; // Optional date format string datetime_format = 8; // Optional datetime format string timestamp_format = 9; // Optional timestamp format @@ -430,7 +430,7 @@ message CsvOptions { // Options controlling CSV format message JsonOptions { CompressionTypeVariant compression = 1; // Compression type - uint64 schema_infer_max_rec = 2; // Max records for schema inference + optional uint64 schema_infer_max_rec = 2; // Optional max records for schema inference } message TableParquetOptions { diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index a554e4ed2805..14375c0590a4 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -869,7 +869,7 @@ impl TryFrom<&protobuf::CsvOptions> for CsvOptions { double_quote: proto_opts.has_header.first().map(|h| *h != 0), newlines_in_values: proto_opts.newlines_in_values.first().map(|h| *h != 0), compression: proto_opts.compression().into(), - schema_infer_max_rec: proto_opts.schema_infer_max_rec as usize, + schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize), date_format: (!proto_opts.date_format.is_empty()) .then(|| proto_opts.date_format.clone()), datetime_format: (!proto_opts.datetime_format.is_empty()) @@ -1050,7 +1050,7 @@ impl TryFrom<&protobuf::JsonOptions> for JsonOptions { let compression: protobuf::CompressionTypeVariant = proto_opts.compression(); Ok(JsonOptions { compression: compression.into(), - schema_infer_max_rec: proto_opts.schema_infer_max_rec as usize, + schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize), }) } } diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index e8235ef7b9dd..6a75b14d35a8 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -1512,7 +1512,7 @@ impl serde::Serialize for CsvOptions { if self.compression != 0 { len += 1; } - if self.schema_infer_max_rec != 0 { + if self.schema_infer_max_rec.is_some() { len += 1; } if !self.date_format.is_empty() { @@ -1571,10 +1571,10 @@ impl serde::Serialize for CsvOptions { .map_err(|_| serde::ser::Error::custom(format!("Invalid variant {}", self.compression)))?; struct_ser.serialize_field("compression", &v)?; } - if self.schema_infer_max_rec != 0 { + if let Some(v) = self.schema_infer_max_rec.as_ref() { #[allow(clippy::needless_borrow)] #[allow(clippy::needless_borrows_for_generic_args)] - struct_ser.serialize_field("schemaInferMaxRec", ToString::to_string(&self.schema_infer_max_rec).as_str())?; + struct_ser.serialize_field("schemaInferMaxRec", ToString::to_string(&v).as_str())?; } if !self.date_format.is_empty() { struct_ser.serialize_field("dateFormat", &self.date_format)?; @@ -1787,7 +1787,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { return Err(serde::de::Error::duplicate_field("schemaInferMaxRec")); } schema_infer_max_rec__ = - Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| x.0) ; } GeneratedField::DateFormat => { @@ -1866,7 +1866,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { quote: quote__.unwrap_or_default(), escape: escape__.unwrap_or_default(), compression: compression__.unwrap_or_default(), - schema_infer_max_rec: schema_infer_max_rec__.unwrap_or_default(), + schema_infer_max_rec: schema_infer_max_rec__, date_format: date_format__.unwrap_or_default(), datetime_format: datetime_format__.unwrap_or_default(), timestamp_format: timestamp_format__.unwrap_or_default(), @@ -3929,7 +3929,7 @@ impl serde::Serialize for JsonOptions { if self.compression != 0 { len += 1; } - if self.schema_infer_max_rec != 0 { + if self.schema_infer_max_rec.is_some() { len += 1; } let mut struct_ser = serializer.serialize_struct("datafusion_common.JsonOptions", len)?; @@ -3938,10 +3938,10 @@ impl serde::Serialize for JsonOptions { .map_err(|_| serde::ser::Error::custom(format!("Invalid variant {}", self.compression)))?; struct_ser.serialize_field("compression", &v)?; } - if self.schema_infer_max_rec != 0 { + if let Some(v) = self.schema_infer_max_rec.as_ref() { #[allow(clippy::needless_borrow)] #[allow(clippy::needless_borrows_for_generic_args)] - struct_ser.serialize_field("schemaInferMaxRec", ToString::to_string(&self.schema_infer_max_rec).as_str())?; + struct_ser.serialize_field("schemaInferMaxRec", ToString::to_string(&v).as_str())?; } struct_ser.end() } @@ -4019,14 +4019,14 @@ impl<'de> serde::Deserialize<'de> for JsonOptions { return Err(serde::de::Error::duplicate_field("schemaInferMaxRec")); } schema_infer_max_rec__ = - Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| x.0) ; } } } Ok(JsonOptions { compression: compression__.unwrap_or_default(), - schema_infer_max_rec: schema_infer_max_rec__.unwrap_or_default(), + schema_infer_max_rec: schema_infer_max_rec__, }) } } diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index 68e7f74c7f49..fa77d23a6ae6 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -572,9 +572,9 @@ pub struct CsvOptions { /// Compression type #[prost(enumeration = "CompressionTypeVariant", tag = "5")] pub compression: i32, - /// Max records for schema inference - #[prost(uint64, tag = "6")] - pub schema_infer_max_rec: u64, + /// Optional max records for schema inference + #[prost(uint64, optional, tag = "6")] + pub schema_infer_max_rec: ::core::option::Option, /// Optional date format #[prost(string, tag = "7")] pub date_format: ::prost::alloc::string::String, @@ -612,9 +612,9 @@ pub struct JsonOptions { /// Compression type #[prost(enumeration = "CompressionTypeVariant", tag = "1")] pub compression: i32, - /// Max records for schema inference - #[prost(uint64, tag = "2")] - pub schema_infer_max_rec: u64, + /// Optional max records for schema inference + #[prost(uint64, optional, tag = "2")] + pub schema_infer_max_rec: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct TableParquetOptions { diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index 02a642a4af93..1b9583516ced 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -921,7 +921,7 @@ impl TryFrom<&CsvOptions> for protobuf::CsvOptions { .newlines_in_values .map_or_else(Vec::new, |h| vec![h as u8]), compression: compression.into(), - schema_infer_max_rec: opts.schema_infer_max_rec as u64, + schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64), date_format: opts.date_format.clone().unwrap_or_default(), datetime_format: opts.datetime_format.clone().unwrap_or_default(), timestamp_format: opts.timestamp_format.clone().unwrap_or_default(), @@ -940,7 +940,7 @@ impl TryFrom<&JsonOptions> for protobuf::JsonOptions { let compression: protobuf::CompressionTypeVariant = opts.compression.into(); Ok(protobuf::JsonOptions { compression: compression.into(), - schema_infer_max_rec: opts.schema_infer_max_rec as u64, + schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64), }) } } diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index 68e7f74c7f49..fa77d23a6ae6 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -572,9 +572,9 @@ pub struct CsvOptions { /// Compression type #[prost(enumeration = "CompressionTypeVariant", tag = "5")] pub compression: i32, - /// Max records for schema inference - #[prost(uint64, tag = "6")] - pub schema_infer_max_rec: u64, + /// Optional max records for schema inference + #[prost(uint64, optional, tag = "6")] + pub schema_infer_max_rec: ::core::option::Option, /// Optional date format #[prost(string, tag = "7")] pub date_format: ::prost::alloc::string::String, @@ -612,9 +612,9 @@ pub struct JsonOptions { /// Compression type #[prost(enumeration = "CompressionTypeVariant", tag = "1")] pub compression: i32, - /// Max records for schema inference - #[prost(uint64, tag = "2")] - pub schema_infer_max_rec: u64, + /// Optional max records for schema inference + #[prost(uint64, optional, tag = "2")] + pub schema_infer_max_rec: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct TableParquetOptions { diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 02be3e11c1cb..1e2b12dacc29 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -57,7 +57,7 @@ impl CsvOptionsProto { escape: options.escape.map_or(vec![], |v| vec![v]), double_quote: options.double_quote.map_or(vec![], |v| vec![v as u8]), compression: options.compression as i32, - schema_infer_max_rec: options.schema_infer_max_rec as u64, + schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64), date_format: options.date_format.clone().unwrap_or_default(), datetime_format: options.datetime_format.clone().unwrap_or_default(), timestamp_format: options.timestamp_format.clone().unwrap_or_default(), @@ -110,7 +110,7 @@ impl From<&CsvOptionsProto> for CsvOptions { 3 => CompressionTypeVariant::ZSTD, _ => CompressionTypeVariant::UNCOMPRESSED, }, - schema_infer_max_rec: proto.schema_infer_max_rec as usize, + schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize), date_format: if proto.date_format.is_empty() { None } else { @@ -239,7 +239,7 @@ impl JsonOptionsProto { if let Some(options) = &factory.options { JsonOptionsProto { compression: options.compression as i32, - schema_infer_max_rec: options.schema_infer_max_rec as u64, + schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64), } } else { JsonOptionsProto::default() @@ -257,7 +257,7 @@ impl From<&JsonOptionsProto> for JsonOptions { 3 => CompressionTypeVariant::ZSTD, _ => CompressionTypeVariant::UNCOMPRESSED, }, - schema_infer_max_rec: proto.schema_infer_max_rec as usize, + schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize), } } } diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index 14d91913e7cd..531563717482 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -556,7 +556,7 @@ async fn roundtrip_logical_plan_copy_to_json() -> Result<()> { // Set specific JSON format options json_format.compression = CompressionTypeVariant::GZIP; - json_format.schema_infer_max_rec = 1000; + json_format.schema_infer_max_rec = Some(1000); let file_type = format_as_file_type(Arc::new(JsonFormatFactory::new_with_options( json_format.clone(), From a80609c474c54a06e43d2d6d98b0186756797144 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alihan=20=C3=87elikcan?= Date: Mon, 4 Nov 2024 14:43:08 +0300 Subject: [PATCH 2/2] Add lifetime parameter to CSV and compression BoxStreams --- datafusion/core/src/datasource/file_format/csv.rs | 10 +++++----- .../datasource/file_format/file_compression_type.rs | 12 ++++++------ 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 022cd8cded33..848c7f7319e6 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -137,11 +137,11 @@ impl CsvFormat { /// Return a newline delimited stream from the specified file on /// Stream, decompressing if necessary /// Each returned `Bytes` has a whole number of newline delimited rows - async fn read_to_delimited_chunks( + async fn read_to_delimited_chunks<'a>( &self, store: &Arc, object: &ObjectMeta, - ) -> BoxStream<'static, Result> { + ) -> BoxStream<'a, Result> { // stream to only read as many rows as needed into memory let stream = store .get(&object.location) @@ -165,10 +165,10 @@ impl CsvFormat { stream.boxed() } - async fn read_to_delimited_chunks_from_stream( + async fn read_to_delimited_chunks_from_stream<'a>( &self, - stream: BoxStream<'static, Result>, - ) -> BoxStream<'static, Result> { + stream: BoxStream<'a, Result>, + ) -> BoxStream<'a, Result> { let file_compression_type: FileCompressionType = self.options.compression.into(); let decoder = file_compression_type.convert_stream(stream); let steam = match decoder { diff --git a/datafusion/core/src/datasource/file_format/file_compression_type.rs b/datafusion/core/src/datasource/file_format/file_compression_type.rs index a054094822d0..6612de077988 100644 --- a/datafusion/core/src/datasource/file_format/file_compression_type.rs +++ b/datafusion/core/src/datasource/file_format/file_compression_type.rs @@ -123,10 +123,10 @@ impl FileCompressionType { } /// Given a `Stream`, create a `Stream` which data are compressed with `FileCompressionType`. - pub fn convert_to_compress_stream( + pub fn convert_to_compress_stream<'a>( &self, - s: BoxStream<'static, Result>, - ) -> Result>> { + s: BoxStream<'a, Result>, + ) -> Result>> { Ok(match self.variant { #[cfg(feature = "compression")] GZIP => ReaderStream::new(AsyncGzEncoder::new(StreamReader::new(s))) @@ -180,10 +180,10 @@ impl FileCompressionType { } /// Given a `Stream`, create a `Stream` which data are decompressed with `FileCompressionType`. - pub fn convert_stream( + pub fn convert_stream<'a>( &self, - s: BoxStream<'static, Result>, - ) -> Result>> { + s: BoxStream<'a, Result>, + ) -> Result>> { Ok(match self.variant { #[cfg(feature = "compression")] GZIP => {