Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix sort order aware file group parallelization #8517

Merged
merged 3 commits into from
Dec 17, 2023

Conversation

alamb
Copy link
Contributor

@alamb alamb commented Dec 12, 2023

Draft as it builds on #8505

Which issue does this PR close?

Closes #8451

Rationale for this change

Repatitioning data for pre-sorted listing tables can sometimes result in incorrect results. See descriptions on #8451 and in the comments in this PR for details

What changes are included in this PR?

  1. Move the code / tests for redistributing files amongst groups to its own module
  2. Add code + tests to handle redistributing files and preserving sort order

Are these changes tested?

Yes, new unit tests and end to end coverage (updates to #8505)

Are there any user-facing changes?

Correct answers with pre-sorted data

@github-actions github-actions bot added core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) labels Dec 12, 2023
// ordering is lost here
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]",
"ParquetExec: file_groups={10 groups: [[x:0..20], [y:0..20], [x:20..40], [y:20..40], [x:40..60], [y:40..60], [x:60..80], [y:60..80], [x:80..100], [y:80..100]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file is now divided into groups that preserve its order and thus no resort is needed

@@ -118,7 +118,7 @@ physical_plan
SortPreservingMergeExec: [column1@0 ASC NULLS LAST]
--CoalesceBatchesExec: target_batch_size=8192
----FilterExec: column1@0 != 42
------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..200], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:200..394, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..6], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:6..206], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:206..403]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1
------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..197], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..201], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:201..403], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:197..394]]}, projection=[column1], output_ordering=[column1@0 ASC NULLS LAST], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shows the test added in #8505 is now fixed (the two files are not intermixed)

use std::collections::BinaryHeap;
use std::iter::repeat_with;

/// Repartition input files into `target_partitions` partitions, if total file size exceed
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a bunch of comments trying to clarify what this code was supposed to be doing

/// divides into 4 groups
/// ```
#[derive(Debug, Clone, Copy)]
pub struct FileGroupPartitioner {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API is new, but the file distribution algorithm is the same for unordered inputs

&self,
file_groups: &[Vec<PartitionedFile>],
) -> Option<Vec<Vec<PartitionedFile>>> {
let target_partitions = self.target_partitions;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the old algorithm, unmodified

mod test {
use super::*;

/// Empty file won't get partitioned
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first set of tests are the original tests, though I refactored them so they didn't rely on ParquetExec

}

#[test]
fn repartition_ordered_no_action_too_few_partitions() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New tests start here

pub fn repartition_file_groups(
file_groups: Vec<Vec<PartitionedFile>>,
target_partitions: usize,
repartition_file_min_size: usize,
) -> Option<Vec<Vec<PartitionedFile>>> {
let flattened_files = file_groups.iter().flatten().collect::<Vec<_>>();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactored into datafusion/core/src/datasource/physical_plan/file_groups.rs

@@ -809,345 +810,4 @@ mod tests {
extensions: None,
}
}

/// Unit tests for `repartition_file_groups()`
#[cfg(feature = "parquet")]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests were moved and refactored into datafusion/core/src/datasource/physical_plan/file_groups.rs

@@ -3862,6 +3863,56 @@ pub(crate) mod tests {
Ok(())
}

#[test]
fn parallelization_multiple_files() -> Result<()> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test fails on main

@alamb alamb changed the title Fix sort order aware redistribution Fix sort order aware file group paralleization Dec 13, 2023
@alamb alamb marked this pull request as ready for review December 13, 2023 21:09
@alamb alamb changed the title Fix sort order aware file group paralleization Fix sort order aware file group parallelization Dec 13, 2023
@alamb
Copy link
Contributor Author

alamb commented Dec 17, 2023

Thank you for the review @Dandandan

@alamb alamb merged commit 2e16c75 into apache:main Dec 17, 2023
22 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Incorrect results due to repartitioning a sorted ParquetExec
2 participants