Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor: Add repartition_file.slt end to end test for repartitioning files, and supporting tweaks #8505

Merged
merged 2 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,18 @@ const CONCURRENCY_LIMIT: usize = 100;

/// Partition the list of files into `n` groups
pub fn split_files(
partitioned_files: Vec<PartitionedFile>,
mut partitioned_files: Vec<PartitionedFile>,
n: usize,
) -> Vec<Vec<PartitionedFile>> {
if partitioned_files.is_empty() {
return vec![];
}

// ObjectStore::list does not guarantee any consistent order and for some
// implementations such as LocalFileSystem, it may be inconsistent. Thus
// Sort files by path to ensure consistent plans when run more than once.
partitioned_files.sort_by(|a, b| a.path().cmp(b.path()));

// effectively this is div with rounding up instead of truncating
let chunk_size = (partitioned_files.len() + n - 1) / n;
partitioned_files
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/datasource/listing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ impl PartitionedFile {
let size = std::fs::metadata(path.clone())?.len();
Ok(Self::new(path, size))
}

/// Return the path of this partitioned file
pub fn path(&self) -> &Path {
&self.object_meta.location
}
}

impl From<ObjectMeta> for PartitionedFile {
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/listing/url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ impl ListingTableUrl {
if is_directory {
fs::create_dir_all(path)?;
} else {
// ensure parent directory exists
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this I can't write into /dir/table_name/1.parquet if /dir/table_name doesn't exist already

This is not enabled by deafult, it only affects when the user has specified SINGLE_FILE_OUTPUT explicitly

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aah, I still need to fix this, this is a weird hold over from append

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What needs to be fixed? I would like to write up a catalog

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no reason you should need to create these files ahead of time anymore, it was a consequence of the head request being performed to accommodate the append semantics - I can add it to my list to clean it up

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As in the object_store LocalFile::put implementation should automatically create the paths?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR in #8539

if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
fs::File::create(path)?;
}
}
Expand Down
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/bin/sqllogictests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ async fn run_test_file_with_postgres(test_file: TestFile) -> Result<()> {
relative_path,
} = test_file;
info!("Running with Postgres runner: {}", path.display());
setup_scratch_dir(&relative_path)?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without this complete mode didn't clear the scratch dir resulting in confusing errors

let mut runner =
sqllogictest::Runner::new(|| Postgres::connect(relative_path.clone()));
runner.with_column_validator(strict_column_validator);
Expand Down Expand Up @@ -188,6 +189,7 @@ async fn run_complete_file(test_file: TestFile) -> Result<()> {
info!("Skipping: {}", path.display());
return Ok(());
};
setup_scratch_dir(&relative_path)?;
let mut runner = sqllogictest::Runner::new(|| async {
Ok(DataFusion::new(
test_ctx.session_ctx().clone(),
Expand Down
268 changes: 268 additions & 0 deletions datafusion/sqllogictest/test_files/repartition_scan.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

##########
# Tests for automatically reading files in parallel during scan
##########

# Set 4 partitions for deterministic output plans
statement ok
set datafusion.execution.target_partitions = 4;

# automatically partition all files over 1 byte
statement ok
set datafusion.optimizer.repartition_file_min_size = 1;

###################
### Parquet tests
###################

# create a single parquet file
# Note filename 2.parquet to test sorting (on local file systems it is often listed before 1.parquet)
statement ok
COPY (VALUES (1), (2), (3), (4), (5)) TO 'test_files/scratch/repartition_scan/parquet_table/2.parquet'
(FORMAT PARQUET, SINGLE_FILE_OUTPUT true);

statement ok
CREATE EXTERNAL TABLE parquet_table(column1 int)
STORED AS PARQUET
LOCATION 'test_files/scratch/repartition_scan/parquet_table/';

query I
select * from parquet_table;
----
1
2
3
4
5

## Expect to see the scan read the file as "4" groups with even sizes (offsets)
query TT
EXPLAIN SELECT column1 FROM parquet_table WHERE column1 <> 42;
----
logical_plan
Filter: parquet_table.column1 != Int32(42)
--TableScan: parquet_table projection=[column1], partial_filters=[parquet_table.column1 != Int32(42)]
physical_plan
CoalesceBatchesExec: target_batch_size=8192
--FilterExec: column1@0 != 42
----ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..101], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:101..202], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:202..303], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:303..403]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1

# create a second parquet file
statement ok
COPY (VALUES (100), (200)) TO 'test_files/scratch/repartition_scan/parquet_table/1.parquet'
(FORMAT PARQUET, SINGLE_FILE_OUTPUT true);

## Still expect to see the scan read the file as "4" groups with even sizes. One group should read
## parts of both files.
query TT
EXPLAIN SELECT column1 FROM parquet_table WHERE column1 <> 42 ORDER BY column1;
----
logical_plan
Sort: parquet_table.column1 ASC NULLS LAST
--Filter: parquet_table.column1 != Int32(42)
----TableScan: parquet_table projection=[column1], partial_filters=[parquet_table.column1 != Int32(42)]
physical_plan
SortPreservingMergeExec: [column1@0 ASC NULLS LAST]
--SortExec: expr=[column1@0 ASC NULLS LAST]
----CoalesceBatchesExec: target_batch_size=8192
------FilterExec: column1@0 != 42
--------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..200], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:200..394, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..6], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:6..206], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:206..403]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1


## Read the files as though they are ordered

statement ok
CREATE EXTERNAL TABLE parquet_table_with_order(column1 int)
STORED AS PARQUET
LOCATION 'test_files/scratch/repartition_scan/parquet_table'
WITH ORDER (column1 ASC);

# output should be ordered
query I
SELECT column1 FROM parquet_table_with_order WHERE column1 <> 42 ORDER BY column1;
----
1
2
3
4
5
100
200

# explain should not have any groups with more than one file
# https://github.com/apache/arrow-datafusion/issues/8451
query TT
EXPLAIN SELECT column1 FROM parquet_table_with_order WHERE column1 <> 42 ORDER BY column1;
----
logical_plan
Sort: parquet_table_with_order.column1 ASC NULLS LAST
--Filter: parquet_table_with_order.column1 != Int32(42)
----TableScan: parquet_table_with_order projection=[column1], partial_filters=[parquet_table_with_order.column1 != Int32(42)]
physical_plan
SortPreservingMergeExec: [column1@0 ASC NULLS LAST]
--CoalesceBatchesExec: target_batch_size=8192
----FilterExec: column1@0 != 42
------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..200], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:200..394, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..6], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:6..206], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:206..403]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1

# Cleanup
statement ok
DROP TABLE parquet_table;

statement ok
DROP TABLE parquet_table_with_order;


###################
### CSV tests
###################

# Since parquet and CSV share most of the same implementation, this test checks
# that the basics are connected properly

# create a single csv file
statement ok
COPY (VALUES (1), (2), (3), (4), (5)) TO 'test_files/scratch/repartition_scan/csv_table/1.csv'
(FORMAT csv, SINGLE_FILE_OUTPUT true, HEADER true);

statement ok
CREATE EXTERNAL TABLE csv_table(column1 int)
STORED AS csv
WITH HEADER ROW
LOCATION 'test_files/scratch/repartition_scan/csv_table/';

query I
select * from csv_table;
----
1
2
3
4
5

## Expect to see the scan read the file as "4" groups with even sizes (offsets)
query TT
EXPLAIN SELECT column1 FROM csv_table WHERE column1 <> 42;
----
logical_plan
Filter: csv_table.column1 != Int32(42)
--TableScan: csv_table projection=[column1], partial_filters=[csv_table.column1 != Int32(42)]
physical_plan
CoalesceBatchesExec: target_batch_size=8192
--FilterExec: column1@0 != 42
----CsvExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/csv_table/1.csv:0..5], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/csv_table/1.csv:5..10], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/csv_table/1.csv:10..15], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/csv_table/1.csv:15..18]]}, projection=[column1], has_header=true

# Cleanup
statement ok
DROP TABLE csv_table;


###################
### JSON tests
###################

# Since parquet and json share most of the same implementation, this test checks
# that the basics are connected properly

# create a single json file
statement ok
COPY (VALUES (1), (2), (3), (4), (5)) TO 'test_files/scratch/repartition_scan/json_table/1.json'
(FORMAT json, SINGLE_FILE_OUTPUT true);

statement ok
CREATE EXTERNAL TABLE json_table(column1 int)
STORED AS json
LOCATION 'test_files/scratch/repartition_scan/json_table/';

query I
select * from json_table;
----
1
2
3
4
5

## In the future it would be cool to see the file read as "4" groups with even sizes (offsets)
## but for now it is just one group
## https://github.com/apache/arrow-datafusion/issues/8502
query TT
EXPLAIN SELECT column1 FROM json_table WHERE column1 <> 42;
----
logical_plan
Filter: json_table.column1 != Int32(42)
--TableScan: json_table projection=[column1], partial_filters=[json_table.column1 != Int32(42)]
physical_plan
CoalesceBatchesExec: target_batch_size=8192
--FilterExec: column1@0 != 42
----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
------JsonExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/json_table/1.json]]}, projection=[column1]


# Cleanup
statement ok
DROP TABLE json_table;


###################
### Arrow File tests
###################

## Use pre-existing files we don't have a way to create arrow files yet
## (https://github.com/apache/arrow-datafusion/issues/8504)
statement ok
CREATE EXTERNAL TABLE arrow_table
STORED AS ARROW
LOCATION '../core/tests/data/example.arrow';


# It would be great to see the file read as "4" groups with even sizes (offsets) eventually
# https://github.com/apache/arrow-datafusion/issues/8503
query TT
EXPLAIN SELECT * FROM arrow_table
----
logical_plan TableScan: arrow_table projection=[f0, f1, f2]
physical_plan ArrowExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.arrow]]}, projection=[f0, f1, f2]

# Cleanup
statement ok
DROP TABLE arrow_table;

###################
### Avro File tests
###################

## Use pre-existing files we don't have a way to create avro files yet

statement ok
CREATE EXTERNAL TABLE avro_table
STORED AS AVRO
WITH HEADER ROW
LOCATION '../../testing/data/avro/simple_enum.avro'


# It would be great to see the file read as "4" groups with even sizes (offsets) eventually
query TT
EXPLAIN SELECT * FROM avro_table
----
logical_plan TableScan: avro_table projection=[f1, f2, f3]
physical_plan AvroExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/avro/simple_enum.avro]]}, projection=[f1, f2, f3]

# Cleanup
statement ok
DROP TABLE avro_table;