Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/infer schema upstream #43

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1648,7 +1648,7 @@ config_namespace! {
/// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
pub newlines_in_values: Option<bool>, default = None
pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
pub schema_infer_max_rec: usize, default = 100
pub schema_infer_max_rec: Option<usize>, default = None
pub date_format: Option<String>, default = None
pub datetime_format: Option<String>, default = None
pub timestamp_format: Option<String>, default = None
Expand All @@ -1673,7 +1673,7 @@ impl CsvOptions {
/// Set a limit in terms of records to scan to infer the schema
/// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
pub fn with_schema_infer_max_rec(mut self, max_rec: usize) -> Self {
self.schema_infer_max_rec = max_rec;
self.schema_infer_max_rec = Some(max_rec);
self
}

Expand Down Expand Up @@ -1773,7 +1773,7 @@ config_namespace! {
/// Options controlling JSON format
pub struct JsonOptions {
pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
pub schema_infer_max_rec: usize, default = 100
pub schema_infer_max_rec: Option<usize>, default = None
}
}

Expand Down
24 changes: 15 additions & 9 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::fmt::{self, Debug};
use std::sync::Arc;

use super::write::orchestration::stateless_multipart_put;
use super::{FileFormat, FileFormatFactory};
use super::{FileFormat, FileFormatFactory, DEFAULT_SCHEMA_INFER_MAX_RECORD};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::file_format::write::BatchSerializer;
use crate::datasource::physical_plan::{
Expand Down Expand Up @@ -137,11 +137,11 @@ impl CsvFormat {
/// Return a newline delimited stream from the specified file on
/// Stream, decompressing if necessary
/// Each returned `Bytes` has a whole number of newline delimited rows
async fn read_to_delimited_chunks(
async fn read_to_delimited_chunks<'a>(
&self,
store: &Arc<dyn ObjectStore>,
object: &ObjectMeta,
) -> BoxStream<'static, Result<Bytes>> {
) -> BoxStream<'a, Result<Bytes>> {
// stream to only read as many rows as needed into memory
let stream = store
.get(&object.location)
Expand All @@ -165,10 +165,10 @@ impl CsvFormat {
stream.boxed()
}

async fn read_to_delimited_chunks_from_stream(
async fn read_to_delimited_chunks_from_stream<'a>(
&self,
stream: BoxStream<'static, Result<Bytes>>,
) -> BoxStream<'static, Result<Bytes>> {
stream: BoxStream<'a, Result<Bytes>>,
) -> BoxStream<'a, Result<Bytes>> {
let file_compression_type: FileCompressionType = self.options.compression.into();
let decoder = file_compression_type.convert_stream(stream);
let steam = match decoder {
Expand Down Expand Up @@ -204,7 +204,7 @@ impl CsvFormat {
/// Set a limit in terms of records to scan to infer the schema
/// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
pub fn with_schema_infer_max_rec(mut self, max_rec: usize) -> Self {
self.options.schema_infer_max_rec = max_rec;
self.options.schema_infer_max_rec = Some(max_rec);
self
}

Expand Down Expand Up @@ -319,7 +319,10 @@ impl FileFormat for CsvFormat {
) -> Result<SchemaRef> {
let mut schemas = vec![];

let mut records_to_read = self.options.schema_infer_max_rec;
let mut records_to_read = self
.options
.schema_infer_max_rec
.unwrap_or(DEFAULT_SCHEMA_INFER_MAX_RECORD);

for object in objects {
let stream = self.read_to_delimited_chunks(store, object).await;
Expand Down Expand Up @@ -891,7 +894,10 @@ mod tests {
let integration = LocalFileSystem::new_with_prefix(arrow_test_data()).unwrap();
let path = Path::from("csv/aggregate_test_100.csv");
let csv = CsvFormat::default().with_has_header(true);
let records_to_read = csv.options().schema_infer_max_rec;
let records_to_read = csv
.options()
.schema_infer_max_rec
.unwrap_or(DEFAULT_SCHEMA_INFER_MAX_RECORD);
let store = Arc::new(integration) as Arc<dyn ObjectStore>;
let original_stream = store.get(&path).await?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,10 @@ impl FileCompressionType {
}

/// Given a `Stream`, create a `Stream` which data are compressed with `FileCompressionType`.
pub fn convert_to_compress_stream(
pub fn convert_to_compress_stream<'a>(
&self,
s: BoxStream<'static, Result<Bytes>>,
) -> Result<BoxStream<'static, Result<Bytes>>> {
s: BoxStream<'a, Result<Bytes>>,
) -> Result<BoxStream<'a, Result<Bytes>>> {
Ok(match self.variant {
#[cfg(feature = "compression")]
GZIP => ReaderStream::new(AsyncGzEncoder::new(StreamReader::new(s)))
Expand Down Expand Up @@ -180,10 +180,10 @@ impl FileCompressionType {
}

/// Given a `Stream`, create a `Stream` which data are decompressed with `FileCompressionType`.
pub fn convert_stream(
pub fn convert_stream<'a>(
&self,
s: BoxStream<'static, Result<Bytes>>,
) -> Result<BoxStream<'static, Result<Bytes>>> {
s: BoxStream<'a, Result<Bytes>>,
) -> Result<BoxStream<'a, Result<Bytes>>> {
Ok(match self.variant {
#[cfg(feature = "compression")]
GZIP => {
Expand Down
11 changes: 8 additions & 3 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ use std::io::BufReader;
use std::sync::Arc;

use super::write::orchestration::stateless_multipart_put;
use super::{FileFormat, FileFormatFactory, FileScanConfig};
use super::{
FileFormat, FileFormatFactory, FileScanConfig, DEFAULT_SCHEMA_INFER_MAX_RECORD,
};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::file_format::write::BatchSerializer;
use crate::datasource::physical_plan::FileGroupDisplay;
Expand Down Expand Up @@ -147,7 +149,7 @@ impl JsonFormat {
/// Set a limit in terms of records to scan to infer the schema
/// - defaults to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
pub fn with_schema_infer_max_rec(mut self, max_rec: usize) -> Self {
self.options.schema_infer_max_rec = max_rec;
self.options.schema_infer_max_rec = Some(max_rec);
self
}

Expand Down Expand Up @@ -187,7 +189,10 @@ impl FileFormat for JsonFormat {
objects: &[ObjectMeta],
) -> Result<SchemaRef> {
let mut schemas = Vec::new();
let mut records_to_read = self.options.schema_infer_max_rec;
let mut records_to_read = self
.options
.schema_infer_max_rec
.unwrap_or(DEFAULT_SCHEMA_INFER_MAX_RECORD);
let file_compression_type = FileCompressionType::from(self.options.compression);
for object in objects {
let mut take_while = || {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ mod tests {
let format = listing_table.options().format.clone();
let csv_format = format.as_any().downcast_ref::<CsvFormat>().unwrap();
let csv_options = csv_format.options().clone();
assert_eq!(csv_options.schema_infer_max_rec, 1000);
assert_eq!(csv_options.schema_infer_max_rec, Some(1000));
let listing_options = listing_table.options();
assert_eq!(".tbl", listing_options.file_extension);
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/proto-common/proto/datafusion_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ message CsvOptions {
bytes quote = 3; // Quote character as a byte
bytes escape = 4; // Optional escape character as a byte
CompressionTypeVariant compression = 5; // Compression type
uint64 schema_infer_max_rec = 6; // Max records for schema inference
optional uint64 schema_infer_max_rec = 6; // Optional max records for schema inference
string date_format = 7; // Optional date format
string datetime_format = 8; // Optional datetime format
string timestamp_format = 9; // Optional timestamp format
Expand All @@ -430,7 +430,7 @@ message CsvOptions {
// Options controlling CSV format
message JsonOptions {
CompressionTypeVariant compression = 1; // Compression type
uint64 schema_infer_max_rec = 2; // Max records for schema inference
optional uint64 schema_infer_max_rec = 2; // Optional max records for schema inference
}

message TableParquetOptions {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/proto-common/src/from_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,7 @@ impl TryFrom<&protobuf::CsvOptions> for CsvOptions {
double_quote: proto_opts.has_header.first().map(|h| *h != 0),
newlines_in_values: proto_opts.newlines_in_values.first().map(|h| *h != 0),
compression: proto_opts.compression().into(),
schema_infer_max_rec: proto_opts.schema_infer_max_rec as usize,
schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize),
date_format: (!proto_opts.date_format.is_empty())
.then(|| proto_opts.date_format.clone()),
datetime_format: (!proto_opts.datetime_format.is_empty())
Expand Down Expand Up @@ -1050,7 +1050,7 @@ impl TryFrom<&protobuf::JsonOptions> for JsonOptions {
let compression: protobuf::CompressionTypeVariant = proto_opts.compression();
Ok(JsonOptions {
compression: compression.into(),
schema_infer_max_rec: proto_opts.schema_infer_max_rec as usize,
schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize),
})
}
}
Expand Down
20 changes: 10 additions & 10 deletions datafusion/proto-common/src/generated/pbjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1512,7 +1512,7 @@ impl serde::Serialize for CsvOptions {
if self.compression != 0 {
len += 1;
}
if self.schema_infer_max_rec != 0 {
if self.schema_infer_max_rec.is_some() {
len += 1;
}
if !self.date_format.is_empty() {
Expand Down Expand Up @@ -1571,10 +1571,10 @@ impl serde::Serialize for CsvOptions {
.map_err(|_| serde::ser::Error::custom(format!("Invalid variant {}", self.compression)))?;
struct_ser.serialize_field("compression", &v)?;
}
if self.schema_infer_max_rec != 0 {
if let Some(v) = self.schema_infer_max_rec.as_ref() {
#[allow(clippy::needless_borrow)]
#[allow(clippy::needless_borrows_for_generic_args)]
struct_ser.serialize_field("schemaInferMaxRec", ToString::to_string(&self.schema_infer_max_rec).as_str())?;
struct_ser.serialize_field("schemaInferMaxRec", ToString::to_string(&v).as_str())?;
}
if !self.date_format.is_empty() {
struct_ser.serialize_field("dateFormat", &self.date_format)?;
Expand Down Expand Up @@ -1787,7 +1787,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions {
return Err(serde::de::Error::duplicate_field("schemaInferMaxRec"));
}
schema_infer_max_rec__ =
Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| x.0)
;
}
GeneratedField::DateFormat => {
Expand Down Expand Up @@ -1866,7 +1866,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions {
quote: quote__.unwrap_or_default(),
escape: escape__.unwrap_or_default(),
compression: compression__.unwrap_or_default(),
schema_infer_max_rec: schema_infer_max_rec__.unwrap_or_default(),
schema_infer_max_rec: schema_infer_max_rec__,
date_format: date_format__.unwrap_or_default(),
datetime_format: datetime_format__.unwrap_or_default(),
timestamp_format: timestamp_format__.unwrap_or_default(),
Expand Down Expand Up @@ -3929,7 +3929,7 @@ impl serde::Serialize for JsonOptions {
if self.compression != 0 {
len += 1;
}
if self.schema_infer_max_rec != 0 {
if self.schema_infer_max_rec.is_some() {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("datafusion_common.JsonOptions", len)?;
Expand All @@ -3938,10 +3938,10 @@ impl serde::Serialize for JsonOptions {
.map_err(|_| serde::ser::Error::custom(format!("Invalid variant {}", self.compression)))?;
struct_ser.serialize_field("compression", &v)?;
}
if self.schema_infer_max_rec != 0 {
if let Some(v) = self.schema_infer_max_rec.as_ref() {
#[allow(clippy::needless_borrow)]
#[allow(clippy::needless_borrows_for_generic_args)]
struct_ser.serialize_field("schemaInferMaxRec", ToString::to_string(&self.schema_infer_max_rec).as_str())?;
struct_ser.serialize_field("schemaInferMaxRec", ToString::to_string(&v).as_str())?;
}
struct_ser.end()
}
Expand Down Expand Up @@ -4019,14 +4019,14 @@ impl<'de> serde::Deserialize<'de> for JsonOptions {
return Err(serde::de::Error::duplicate_field("schemaInferMaxRec"));
}
schema_infer_max_rec__ =
Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| x.0)
;
}
}
}
Ok(JsonOptions {
compression: compression__.unwrap_or_default(),
schema_infer_max_rec: schema_infer_max_rec__.unwrap_or_default(),
schema_infer_max_rec: schema_infer_max_rec__,
})
}
}
Expand Down
12 changes: 6 additions & 6 deletions datafusion/proto-common/src/generated/prost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -572,9 +572,9 @@ pub struct CsvOptions {
/// Compression type
#[prost(enumeration = "CompressionTypeVariant", tag = "5")]
pub compression: i32,
/// Max records for schema inference
#[prost(uint64, tag = "6")]
pub schema_infer_max_rec: u64,
/// Optional max records for schema inference
#[prost(uint64, optional, tag = "6")]
pub schema_infer_max_rec: ::core::option::Option<u64>,
/// Optional date format
#[prost(string, tag = "7")]
pub date_format: ::prost::alloc::string::String,
Expand Down Expand Up @@ -612,9 +612,9 @@ pub struct JsonOptions {
/// Compression type
#[prost(enumeration = "CompressionTypeVariant", tag = "1")]
pub compression: i32,
/// Max records for schema inference
#[prost(uint64, tag = "2")]
pub schema_infer_max_rec: u64,
/// Optional max records for schema inference
#[prost(uint64, optional, tag = "2")]
pub schema_infer_max_rec: ::core::option::Option<u64>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TableParquetOptions {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/proto-common/src/to_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -921,7 +921,7 @@ impl TryFrom<&CsvOptions> for protobuf::CsvOptions {
.newlines_in_values
.map_or_else(Vec::new, |h| vec![h as u8]),
compression: compression.into(),
schema_infer_max_rec: opts.schema_infer_max_rec as u64,
schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64),
date_format: opts.date_format.clone().unwrap_or_default(),
datetime_format: opts.datetime_format.clone().unwrap_or_default(),
timestamp_format: opts.timestamp_format.clone().unwrap_or_default(),
Expand All @@ -940,7 +940,7 @@ impl TryFrom<&JsonOptions> for protobuf::JsonOptions {
let compression: protobuf::CompressionTypeVariant = opts.compression.into();
Ok(protobuf::JsonOptions {
compression: compression.into(),
schema_infer_max_rec: opts.schema_infer_max_rec as u64,
schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64),
})
}
}
Expand Down
12 changes: 6 additions & 6 deletions datafusion/proto/src/generated/datafusion_proto_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -572,9 +572,9 @@ pub struct CsvOptions {
/// Compression type
#[prost(enumeration = "CompressionTypeVariant", tag = "5")]
pub compression: i32,
/// Max records for schema inference
#[prost(uint64, tag = "6")]
pub schema_infer_max_rec: u64,
/// Optional max records for schema inference
#[prost(uint64, optional, tag = "6")]
pub schema_infer_max_rec: ::core::option::Option<u64>,
/// Optional date format
#[prost(string, tag = "7")]
pub date_format: ::prost::alloc::string::String,
Expand Down Expand Up @@ -612,9 +612,9 @@ pub struct JsonOptions {
/// Compression type
#[prost(enumeration = "CompressionTypeVariant", tag = "1")]
pub compression: i32,
/// Max records for schema inference
#[prost(uint64, tag = "2")]
pub schema_infer_max_rec: u64,
/// Optional max records for schema inference
#[prost(uint64, optional, tag = "2")]
pub schema_infer_max_rec: ::core::option::Option<u64>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TableParquetOptions {
Expand Down
8 changes: 4 additions & 4 deletions datafusion/proto/src/logical_plan/file_formats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl CsvOptionsProto {
escape: options.escape.map_or(vec![], |v| vec![v]),
double_quote: options.double_quote.map_or(vec![], |v| vec![v as u8]),
compression: options.compression as i32,
schema_infer_max_rec: options.schema_infer_max_rec as u64,
schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64),
date_format: options.date_format.clone().unwrap_or_default(),
datetime_format: options.datetime_format.clone().unwrap_or_default(),
timestamp_format: options.timestamp_format.clone().unwrap_or_default(),
Expand Down Expand Up @@ -110,7 +110,7 @@ impl From<&CsvOptionsProto> for CsvOptions {
3 => CompressionTypeVariant::ZSTD,
_ => CompressionTypeVariant::UNCOMPRESSED,
},
schema_infer_max_rec: proto.schema_infer_max_rec as usize,
schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize),
date_format: if proto.date_format.is_empty() {
None
} else {
Expand Down Expand Up @@ -239,7 +239,7 @@ impl JsonOptionsProto {
if let Some(options) = &factory.options {
JsonOptionsProto {
compression: options.compression as i32,
schema_infer_max_rec: options.schema_infer_max_rec as u64,
schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64),
}
} else {
JsonOptionsProto::default()
Expand All @@ -257,7 +257,7 @@ impl From<&JsonOptionsProto> for JsonOptions {
3 => CompressionTypeVariant::ZSTD,
_ => CompressionTypeVariant::UNCOMPRESSED,
},
schema_infer_max_rec: proto.schema_infer_max_rec as usize,
schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/proto/tests/cases/roundtrip_logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ async fn roundtrip_logical_plan_copy_to_json() -> Result<()> {

// Set specific JSON format options
json_format.compression = CompressionTypeVariant::GZIP;
json_format.schema_infer_max_rec = 1000;
json_format.schema_infer_max_rec = Some(1000);

let file_type = format_as_file_type(Arc::new(JsonFormatFactory::new_with_options(
json_format.clone(),
Expand Down