Skip to content

Commit

Permalink
use FileFormat::get_ext as the default file extension filter (#12417)
Browse files Browse the repository at this point in the history
* use defeault file extention filter from FileFormat

* use with_file_extension_opt api
  • Loading branch information
waruto210 authored Sep 15, 2024
1 parent 1e31093 commit 3ac92ad
Showing 1 changed file with 55 additions and 7 deletions.
62 changes: 55 additions & 7 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,13 @@ pub struct ListingOptions {
impl ListingOptions {
/// Creates an options instance with the given format
/// Default values:
/// - no file extension filter
/// - use default file extension filter
/// - no input partition to discover
/// - one target partition
/// - stat collection
pub fn new(format: Arc<dyn FileFormat>) -> Self {
Self {
file_extension: String::new(),
file_extension: format.get_ext(),
format,
table_partition_cols: vec![],
collect_stat: true,
Expand Down Expand Up @@ -1314,6 +1314,7 @@ mod tests {
"test:///bucket/key-prefix/",
12,
5,
Some(""),
)
.await?;

Expand All @@ -1328,6 +1329,7 @@ mod tests {
"test:///bucket/key-prefix/",
4,
4,
Some(""),
)
.await?;

Expand All @@ -1343,12 +1345,19 @@ mod tests {
"test:///bucket/key-prefix/",
2,
2,
Some(""),
)
.await?;

// no files => no groups
assert_list_files_for_scan_grouping(&[], "test:///bucket/key-prefix/", 2, 0)
.await?;
assert_list_files_for_scan_grouping(
&[],
"test:///bucket/key-prefix/",
2,
0,
Some(""),
)
.await?;

// files that don't match the prefix
assert_list_files_for_scan_grouping(
Expand All @@ -1360,6 +1369,21 @@ mod tests {
"test:///bucket/key-prefix/",
10,
2,
Some(""),
)
.await?;

// files that don't match the prefix or the default file extention
assert_list_files_for_scan_grouping(
&[
"bucket/key-prefix/file0.avro",
"bucket/key-prefix/file1.parquet",
"bucket/other-prefix/roguefile.avro",
],
"test:///bucket/key-prefix/",
10,
1,
None,
)
.await?;
Ok(())
Expand All @@ -1380,6 +1404,7 @@ mod tests {
&["test:///bucket/key1/", "test:///bucket/key2/"],
12,
5,
Some(""),
)
.await?;

Expand All @@ -1396,6 +1421,7 @@ mod tests {
&["test:///bucket/key1/", "test:///bucket/key2/"],
5,
5,
Some(""),
)
.await?;

Expand All @@ -1412,11 +1438,13 @@ mod tests {
&["test:///bucket/key1/"],
2,
2,
Some(""),
)
.await?;

// no files => no groups
assert_list_files_for_multi_paths(&[], &["test:///bucket/key1/"], 2, 0).await?;
assert_list_files_for_multi_paths(&[], &["test:///bucket/key1/"], 2, 0, Some(""))
.await?;

// files that don't match the prefix
assert_list_files_for_multi_paths(
Expand All @@ -1431,6 +1459,24 @@ mod tests {
&["test:///bucket/key3/"],
2,
1,
Some(""),
)
.await?;

// files that don't match the prefix or the default file ext
assert_list_files_for_multi_paths(
&[
"bucket/key1/file0.avro",
"bucket/key1/file1.csv",
"bucket/key1/file2.avro",
"bucket/key2/file3.csv",
"bucket/key2/file4.avro",
"bucket/key3/file5.csv",
],
&["test:///bucket/key1/", "test:///bucket/key3/"],
2,
2,
None,
)
.await?;
Ok(())
Expand Down Expand Up @@ -1458,14 +1504,15 @@ mod tests {
table_prefix: &str,
target_partitions: usize,
output_partitioning: usize,
file_ext: Option<&str>,
) -> Result<()> {
let ctx = SessionContext::new();
register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::<Vec<_>>());

let format = AvroFormat {};

let opt = ListingOptions::new(Arc::new(format))
.with_file_extension("")
.with_file_extension_opt(file_ext)
.with_target_partitions(target_partitions);

let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
Expand All @@ -1491,14 +1538,15 @@ mod tests {
table_prefix: &[&str],
target_partitions: usize,
output_partitioning: usize,
file_ext: Option<&str>,
) -> Result<()> {
let ctx = SessionContext::new();
register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::<Vec<_>>());

let format = AvroFormat {};

let opt = ListingOptions::new(Arc::new(format))
.with_file_extension("")
.with_file_extension_opt(file_ext)
.with_target_partitions(target_partitions);

let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
Expand Down

0 comments on commit 3ac92ad

Please sign in to comment.