Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move TopKAggregation rule into physical-optimizer crate #12334

Merged
merged 5 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 32 additions & 18 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion datafusion/core/src/physical_optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ pub mod replace_with_order_preserving_variants;
pub mod sanity_checker;
#[cfg(test)]
pub mod test_utils;
pub mod topk_aggregation;
pub mod update_aggr_exprs;

mod sort_pushdown;
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-optimizer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ rust-version = { workspace = true }
workspace = true

[dependencies]
arrow-schema = { workspace = true }
datafusion-common = { workspace = true, default-features = true }
datafusion-execution = { workspace = true }
datafusion-physical-expr = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-optimizer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@ pub mod limit_pushdown;
pub mod limited_distinct_aggregation;
mod optimizer;
pub mod output_requirements;
pub mod topk_aggregation;

pub use optimizer::PhysicalOptimizerRule;
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@

use std::sync::Arc;

use crate::physical_plan::aggregates::AggregateExec;
use crate::physical_plan::coalesce_batches::CoalesceBatchesExec;
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::ExecutionPlan;
use datafusion_physical_plan::aggregates::AggregateExec;
use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion_physical_plan::filter::FilterExec;
use datafusion_physical_plan::repartition::RepartitionExec;
use datafusion_physical_plan::sorts::sort::SortExec;
use datafusion_physical_plan::ExecutionPlan;

use arrow_schema::DataType;
use datafusion_common::config::ConfigOptions;
Expand All @@ -33,7 +33,7 @@ use datafusion_common::Result;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::PhysicalSortExpr;

use datafusion_physical_optimizer::PhysicalOptimizerRule;
use crate::PhysicalOptimizerRule;
use itertools::Itertools;

/// An optimizer rule that passes a `limit` hint to aggregations if the whole result is not needed
Expand Down Expand Up @@ -76,7 +76,7 @@ impl TopKAggregation {
aggr.group_expr().clone(),
aggr.aggr_expr().to_vec(),
aggr.filter_expr().to_vec(),
aggr.input().clone(),
Arc::clone(aggr.input()),
aggr.input_schema(),
)
.expect("Unable to copy Aggregate!")
Expand Down Expand Up @@ -114,13 +114,13 @@ impl TopKAggregation {
}
} else {
// or we continue down whitelisted nodes of other types
if !is_cardinality_preserving(plan.clone()) {
if !is_cardinality_preserving(Arc::clone(&plan)) {
cardinality_preserved = false;
}
}
Ok(Transformed::no(plan))
};
let child = child.clone().transform_down(closure).data().ok()?;
let child = Arc::clone(child).transform_down(closure).data().ok()?;
let sort = SortExec::new(sort.expr().to_vec(), child)
.with_fetch(sort.fetch())
.with_preserve_partitioning(sort.preserve_partitioning());
Expand Down