Skip to content

Commit

Permalink
Draft PR is moved to new repo
Browse files Browse the repository at this point in the history
  • Loading branch information
berkaysynnada committed Feb 15, 2024
1 parent 1d8576a commit 017e78f
Show file tree
Hide file tree
Showing 18 changed files with 4,984 additions and 2,505 deletions.
2 changes: 1 addition & 1 deletion datafusion/core/src/dataframe/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ mod tests {
let plan = df.explain(false, false)?.collect().await?;
// Filters all the way to Parquet
let formatted = pretty::pretty_format_batches(&plan)?.to_string();
assert!(formatted.contains("FilterExec: id@0 = 1"));
assert!(formatted.contains("Filter: test.id = Int32(1)"));

Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate {
AggregateMode::Partial
) && can_combine(
(
agg_exec.group_by(),
agg_exec.group_expr(),
agg_exec.aggr_expr(),
agg_exec.filter_expr(),
),
(
input_agg_exec.group_by(),
input_agg_exec.group_expr(),
input_agg_exec.aggr_expr(),
input_agg_exec.filter_expr(),
),
Expand All @@ -88,7 +88,7 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate {
};
AggregateExec::try_new(
mode,
input_agg_exec.group_by().clone(),
input_agg_exec.group_expr().clone(),
input_agg_exec.aggr_expr().to_vec(),
input_agg_exec.filter_expr().to_vec(),
input_agg_exec.input().clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ fn reorder_aggregate_keys(
) -> Result<PlanWithKeyRequirements> {
let parent_required = &agg_node.data;
let output_columns = agg_exec
.group_by()
.group_expr()
.expr()
.iter()
.enumerate()
Expand All @@ -473,15 +473,15 @@ fn reorder_aggregate_keys(
.collect::<Vec<_>>();

if parent_required.len() == output_exprs.len()
&& agg_exec.group_by().null_expr().is_empty()
&& agg_exec.group_expr().null_expr().is_empty()
&& !physical_exprs_equal(&output_exprs, parent_required)
{
if let Some(positions) = expected_expr_positions(&output_exprs, parent_required) {
if let Some(agg_exec) =
agg_exec.input().as_any().downcast_ref::<AggregateExec>()
{
if matches!(agg_exec.mode(), &AggregateMode::Partial) {
let group_exprs = agg_exec.group_by().expr();
let group_exprs = agg_exec.group_expr().expr();
let new_group_exprs = positions
.into_iter()
.map(|idx| group_exprs[idx].clone())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl LimitedDistinctAggregation {
// We found what we want: clone, copy the limit down, and return modified node
let new_aggr = AggregateExec::try_new(
*aggr.mode(),
aggr.group_by().clone(),
aggr.group_expr().clone(),
aggr.aggr_expr().to_vec(),
aggr.filter_expr().to_vec(),
aggr.input().clone(),
Expand Down Expand Up @@ -113,7 +113,7 @@ impl LimitedDistinctAggregation {
if let Some(parent_aggr) =
match_aggr.as_any().downcast_ref::<AggregateExec>()
{
if !parent_aggr.group_by().eq(aggr.group_by()) {
if !parent_aggr.group_expr().eq(aggr.group_expr()) {
// a partial and final aggregation with different groupings disqualifies
// rewriting the child aggregation
rewrite_applicable = false;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ pub mod enforce_distribution;
pub mod enforce_sorting;
pub mod join_selection;
pub mod limited_distinct_aggregation;
mod optimize_projections;
pub mod optimizer;
pub mod output_requirements;
pub mod pipeline_checker;
mod projection_pushdown;
pub mod pruning;
pub mod replace_with_order_preserving_variants;
mod sort_pushdown;
Expand Down
Loading

0 comments on commit 017e78f

Please sign in to comment.