diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index ffe49dd2ba11..b12f37ed7531 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -843,8 +843,13 @@ impl TableProvider for ListingTable { }); // TODO (https://github.com/apache/datafusion/issues/11600) remove downcast_ref from here? let session_state = state.as_any().downcast_ref::().unwrap(); + + // We should not limit the number of partitioned files to scan if there are filters and limit + // at the same time. This is because the limit should be applied after the filters are applied. + let statistic_file_limit = if filters.is_empty() { limit } else { None }; + let (mut partitioned_file_lists, statistics) = self - .list_files_for_scan(session_state, &partition_filters, limit) + .list_files_for_scan(session_state, &partition_filters, statistic_file_limit) .await?; // if no files need to be read, return an `EmptyExec` diff --git a/datafusion/sqllogictest/test_files/push_down_filter.slt b/datafusion/sqllogictest/test_files/push_down_filter.slt index 64cc51b3c4ff..521aa3340981 100644 --- a/datafusion/sqllogictest/test_files/push_down_filter.slt +++ b/datafusion/sqllogictest/test_files/push_down_filter.slt @@ -122,3 +122,76 @@ logical_plan statement ok drop table d; + + +# Test push down filter with limit for parquet +statement ok +set datafusion.execution.parquet.pushdown_filters = true; + +# this one is also required to make DF skip second file due to "sufficient" amount of rows +statement ok +set datafusion.execution.collect_statistics = true; + +# Create a table as a data source +statement ok +CREATE TABLE src_table ( + part_key INT, + value INT +) AS VALUES(1, 0), (1, 1), (1, 100), (2, 0), (2, 2), (2, 2), (2, 100), (3, 4), (3, 5), (3, 6); + + +# There will be more than 2 records filtered from the table to check that `limit 1` actually applied. +# Setup 3 files, i.e., as many as there are partitions: + +# File 1: +query I +COPY (SELECT * FROM src_table where part_key = 1) +TO 'test_files/scratch/parquet/test_filter_with_limit/part-0.parquet' +STORED AS PARQUET; +---- +3 + +# File 2: +query I +COPY (SELECT * FROM src_table where part_key = 2) +TO 'test_files/scratch/parquet/test_filter_with_limit/part-1.parquet' +STORED AS PARQUET; +---- +4 + +# File 3: +query I +COPY (SELECT * FROM src_table where part_key = 3) +TO 'test_files/scratch/parquet/test_filter_with_limit/part-2.parquet' +STORED AS PARQUET; +---- +3 + +statement ok +CREATE EXTERNAL TABLE test_filter_with_limit +( + part_key INT, + value INT +) +STORED AS PARQUET +LOCATION 'test_files/scratch/parquet/test_filter_with_limit/'; + +query TT +explain select * from test_filter_with_limit where value = 2 limit 1; +---- +logical_plan +01)Limit: skip=0, fetch=1 +02)--TableScan: test_filter_with_limit projection=[part_key, value], full_filters=[test_filter_with_limit.value = Int32(2)], fetch=1 + +query II +select * from test_filter_with_limit where value = 2 limit 1; +---- +2 2 + +# Tear down test_filter_with_limit table: +statement ok +DROP TABLE test_filter_with_limit; + +# Tear down src_table table: +statement ok +DROP TABLE src_table;