From 4ef3bb50f816026d489a9dffd4c6380288e09ed5 Mon Sep 17 00:00:00 2001 From: waruto210 Date: Tue, 10 Sep 2024 20:09:01 +0800 Subject: [PATCH 1/2] use defeault file extention filter from FileFormat --- .../core/src/datasource/listing/table.rs | 70 ++++++++++++++++--- 1 file changed, 61 insertions(+), 9 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 225995ca4f7a..bbd595cae7f1 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -228,13 +228,13 @@ pub struct ListingOptions { impl ListingOptions { /// Creates an options instance with the given format /// Default values: - /// - no file extension filter + /// - use default file extension filter /// - no input partition to discover /// - one target partition /// - stat collection pub fn new(format: Arc) -> Self { Self { - file_extension: String::new(), + file_extension: format.get_ext(), format, table_partition_cols: vec![], collect_stat: true, @@ -1314,6 +1314,7 @@ mod tests { "test:///bucket/key-prefix/", 12, 5, + Some(""), ) .await?; @@ -1328,6 +1329,7 @@ mod tests { "test:///bucket/key-prefix/", 4, 4, + Some(""), ) .await?; @@ -1343,12 +1345,19 @@ mod tests { "test:///bucket/key-prefix/", 2, 2, + Some(""), ) .await?; // no files => no groups - assert_list_files_for_scan_grouping(&[], "test:///bucket/key-prefix/", 2, 0) - .await?; + assert_list_files_for_scan_grouping( + &[], + "test:///bucket/key-prefix/", + 2, + 0, + Some(""), + ) + .await?; // files that don't match the prefix assert_list_files_for_scan_grouping( @@ -1360,6 +1369,21 @@ mod tests { "test:///bucket/key-prefix/", 10, 2, + Some(""), + ) + .await?; + + // files that don't match the prefix or the default file extention + assert_list_files_for_scan_grouping( + &[ + "bucket/key-prefix/file0.avro", + "bucket/key-prefix/file1.parquet", + "bucket/other-prefix/roguefile.avro", + ], + "test:///bucket/key-prefix/", + 10, + 1, + None, ) .await?; Ok(()) @@ -1380,6 +1404,7 @@ mod tests { &["test:///bucket/key1/", "test:///bucket/key2/"], 12, 5, + Some(""), ) .await?; @@ -1396,6 +1421,7 @@ mod tests { &["test:///bucket/key1/", "test:///bucket/key2/"], 5, 5, + Some(""), ) .await?; @@ -1412,11 +1438,13 @@ mod tests { &["test:///bucket/key1/"], 2, 2, + Some(""), ) .await?; // no files => no groups - assert_list_files_for_multi_paths(&[], &["test:///bucket/key1/"], 2, 0).await?; + assert_list_files_for_multi_paths(&[], &["test:///bucket/key1/"], 2, 0, Some("")) + .await?; // files that don't match the prefix assert_list_files_for_multi_paths( @@ -1431,6 +1459,24 @@ mod tests { &["test:///bucket/key3/"], 2, 1, + Some(""), + ) + .await?; + + // files that don't match the prefix or the default file ext + assert_list_files_for_multi_paths( + &[ + "bucket/key1/file0.avro", + "bucket/key1/file1.csv", + "bucket/key1/file2.avro", + "bucket/key2/file3.csv", + "bucket/key2/file4.avro", + "bucket/key3/file5.csv", + ], + &["test:///bucket/key1/", "test:///bucket/key3/"], + 2, + 2, + None, ) .await?; Ok(()) @@ -1458,15 +1504,18 @@ mod tests { table_prefix: &str, target_partitions: usize, output_partitioning: usize, + file_ext: Option<&str>, ) -> Result<()> { let ctx = SessionContext::new(); register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::>()); let format = AvroFormat {}; - let opt = ListingOptions::new(Arc::new(format)) - .with_file_extension("") + let mut opt = ListingOptions::new(Arc::new(format)) .with_target_partitions(target_partitions); + if let Some(ext) = file_ext { + opt = opt.with_file_extension(ext); + } let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]); @@ -1491,15 +1540,18 @@ mod tests { table_prefix: &[&str], target_partitions: usize, output_partitioning: usize, + file_ext: Option<&str>, ) -> Result<()> { let ctx = SessionContext::new(); register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::>()); let format = AvroFormat {}; - let opt = ListingOptions::new(Arc::new(format)) - .with_file_extension("") + let mut opt = ListingOptions::new(Arc::new(format)) .with_target_partitions(target_partitions); + if let Some(ext) = file_ext { + opt = opt.with_file_extension(ext); + } let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]); From ad55fdba0c54f6b2a5c3316737ee85731d65a285 Mon Sep 17 00:00:00 2001 From: waruto210 Date: Sat, 14 Sep 2024 22:06:21 +0800 Subject: [PATCH 2/2] use with_file_extension_opt api --- datafusion/core/src/datasource/listing/table.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index bbd595cae7f1..adf907011b8d 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1511,11 +1511,9 @@ mod tests { let format = AvroFormat {}; - let mut opt = ListingOptions::new(Arc::new(format)) + let opt = ListingOptions::new(Arc::new(format)) + .with_file_extension_opt(file_ext) .with_target_partitions(target_partitions); - if let Some(ext) = file_ext { - opt = opt.with_file_extension(ext); - } let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]); @@ -1547,11 +1545,9 @@ mod tests { let format = AvroFormat {}; - let mut opt = ListingOptions::new(Arc::new(format)) + let opt = ListingOptions::new(Arc::new(format)) + .with_file_extension_opt(file_ext) .with_target_partitions(target_partitions); - if let Some(ext) = file_ext { - opt = opt.with_file_extension(ext); - } let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);