Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Do not try and preserve order when there is no order to preserve in RepartitionExec #8127

Merged
merged 11 commits into from
Nov 12, 2023

Conversation

alamb
Copy link
Contributor

@alamb alamb commented Nov 10, 2023

Which issue does this PR close?

Closes #8043

Rationale for this change

We encountered problem is our downstream tests that RepartitionExec was trying to preserve an order even when there is no order to preserve.

This was because one codepath (RepartitionExec::new_with_children) it set the preserve_order flag to true when it creates a new RepartitionExec even if the new child had no order to preserve. During execution, this cause the code to try and merge a stream with no sort exprs, which causes an internal error

What changes are included in this PR?

  1. Update RepartitionExec::with_preserve_order to take existing sortedness into account
  2. Improve documentation
  3. add test

Are these changes tested?

Yes, though I am not thrilled with the test (I will comment inline)

Are there any user-facing changes?

@github-actions github-actions bot added the core Core DataFusion crate label Nov 10, 2023
@@ -1053,6 +1074,28 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_repartition_with_unecessary_preserve_order() -> Result<()> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not super happy with the test as it isn't a test of the optimizer, but the only time this can cause problems is when RepartitionExec::with_new_child is called, which I found really hard to
trigger with the optimizer tests.

Without the code in this PR, this test fails like:

[
    "SortPreservingRepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
    "  UnionExec",
    "    MemoryExec: partitions=1, partition_sizes=[0]",
    "    MemoryExec: partitions=1, partition_sizes=[0]",
]
actual:

[
    "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
    "  UnionExec",
    "    MemoryExec: partitions=1, partition_sizes=[0]",
    "    MemoryExec: partitions=1, partition_sizes=[0]",
]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to move this test to the same file with struct RepartitionExec as a unit test?

Copy link
Contributor

@ozankabak ozankabak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two call sites of with_preserve_order in enforce_distribution.rs that implements the same logic but passes the is_some term from the outside. I think we should simplify those call sites to with_preserve_order(true) in this PR.

I also see one call with_preserve_order(false) which is unnecessary and confusing in enforce_distribution.rs as well as one such call in replace_with_order_preserving_variants.rs. Let's remove them.

@@ -1053,6 +1074,28 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_repartition_with_unecessary_preserve_order() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to move this test to the same file with struct RepartitionExec as a unit test?

@alamb
Copy link
Contributor Author

alamb commented Nov 11, 2023

Is it possible to move this test to the same file with struct RepartitionExec as a unit test?

yes, that is a good idea -- I did so in ee55ec3

I also see one call with_preserve_order(false) which is unnecessary and confusing in enforce_distribution.rs as well as one such call in replace_with_order_preserving_variants.rs. Let's remove them.

That is an excellent idea, I did so in 0c492e1 and I think the logic is much clearer now

Copy link
Contributor

@ozankabak ozankabak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thank you

RepartitionExec::try_new(
updated_children.swap_remove(0),
repartition.partitioning().clone(),
)?
.with_preserve_order(false),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per @ozankabak 's suggestion, I removed the with_preserve_order(false) as well as the boolean argument which was confusing. Now with_preserve_order() is only called when the code is attempting to maintain the order

@alamb alamb merged commit 5a2e0ba into apache:main Nov 12, 2023
23 checks passed
@mustafasrepo
Copy link
Contributor

Thanks @alamb for this PR. Having this check at initialization is great, and doesn't lead to unexpected behavior.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Internal error with repartitioning after equivalence consolidation
3 participants