-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Incorrect results due to repartitioning a sorted ParquetExec #8451
Comments
This could be potentially more subtle as splitting itself doesn't destroy the sort order, what destroys the sort order is if any file group has more than one entry that are not contiguous from the source file (as each entry is effectively appended to the previous one) In this case, what has happened is that one group has a portion from different files in it, which is what causes the wrong results |
I have made a reproducer on a branch and I know what is wrong -- I now need to work on the fix https://github.com/alamb/arrow-datafusion/tree/alamb/bad_redistribution |
Here is a PR that adds tests: #8505 |
Update is that I have a bunch of tests and I understand the issue. I expect to have a PR up with a fix tomorrow |
Update is that the PRs are ready for review: |
Describe the bug
We have a case where the
EnforceDistribution
rule has repatitioned a ParquetExec which parallelized the read (which is good) but that parallelization resulted in destroying the sort order (as it mixes parts of different files together in the same partition). The rest of the plan relies on the output being sorted, and thus since it is no longer sorted we see incorrect resultsTo Reproduce
The input plan looks like this:
The output of EnforceDistirbution looks like this:
Specifically, the DataFusion planner parallelized the read of the parquet files into multiple partitions and in so doing has destroyed the sort order.
(the
16666666..33333333
annotations mean read that byte range in the file)This is actually reflected correctly by the
ParquetExec
(it no longer says "output_ordering" because it is no longer sorted) however, the plan now has aSortPreservingMerge
added above it, which implies that the output is sorted, which is incorrect.Input
Output:
So things that are wrong:
SortPreservingMerge
(which avoids the required resort)datafusion.optimizer.prefer_existing_sort
and IOx sets it to true:I am working on a reproducer in DataFusion
Expected behavior
The correct answer should be produced.
I think this means that either:
ParquetExec
should not be repartitioned if it would destroy the sort order,parquet exec
repartition code should be aware of the repartition and not destroy the sort orderAdditional context
We found that setting the config setting
datafusion.optimizer.repartition_file_scans
and IOx sets to false was a workaround:The text was updated successfully, but these errors were encountered: