diff --git a/pkg/planner/cascades/BUILD.bazel b/pkg/planner/cascades/BUILD.bazel index b84eeb50002a8..1c5dc15b9abac 100644 --- a/pkg/planner/cascades/BUILD.bazel +++ b/pkg/planner/cascades/BUILD.bazel @@ -58,6 +58,7 @@ go_test( "//pkg/parser/model", "//pkg/planner/core", "//pkg/planner/core/base", + "//pkg/planner/core/operator/logicalop", "//pkg/planner/memo", "//pkg/planner/pattern", "//pkg/planner/property", diff --git a/pkg/planner/cascades/implementation_rules.go b/pkg/planner/cascades/implementation_rules.go index c6292a16bd5bd..5f74b68205c4f 100644 --- a/pkg/planner/cascades/implementation_rules.go +++ b/pkg/planner/cascades/implementation_rules.go @@ -326,7 +326,7 @@ func (*ImplHashAgg) Match(_ *memo.GroupExpr, prop *property.PhysicalProperty) (m // OnImplement implements ImplementationRule OnImplement interface. func (*ImplHashAgg) OnImplement(expr *memo.GroupExpr, reqProp *property.PhysicalProperty) ([]memo.Implementation, error) { - la := expr.ExprNode.(*plannercore.LogicalAggregation) + la := expr.ExprNode.(*logicalop.LogicalAggregation) hashAgg := plannercore.NewPhysicalHashAgg( la, expr.Group.Prop.Stats.ScaleByExpectCnt(reqProp.ExpectedCnt), diff --git a/pkg/planner/cascades/optimize_test.go b/pkg/planner/cascades/optimize_test.go index 18b32dd16a7ed..0ae8cc0e6db64 100644 --- a/pkg/planner/cascades/optimize_test.go +++ b/pkg/planner/cascades/optimize_test.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/pkg/parser/model" plannercore "github.com/pingcap/tidb/pkg/planner/core" "github.com/pingcap/tidb/pkg/planner/core/base" + "github.com/pingcap/tidb/pkg/planner/core/operator/logicalop" "github.com/pingcap/tidb/pkg/planner/memo" "github.com/pingcap/tidb/pkg/planner/pattern" "github.com/pingcap/tidb/pkg/planner/property" @@ -154,7 +155,7 @@ func TestPreparePossibleProperties(t *testing.T) { require.NotNil(t, columnF) require.NotNil(t, columnA) - agg, ok := logic.Children()[0].(*plannercore.LogicalAggregation) + agg, ok := logic.Children()[0].(*logicalop.LogicalAggregation) require.True(t, ok) group := memo.Convert2Group(agg) diff --git a/pkg/planner/cascades/transformation_rules.go b/pkg/planner/cascades/transformation_rules.go index 8072771a1ad86..a8273c035c98b 100644 --- a/pkg/planner/cascades/transformation_rules.go +++ b/pkg/planner/cascades/transformation_rules.go @@ -411,7 +411,7 @@ func (r *PushAggDownGather) Match(expr *memo.ExprIter) bool { if expr.GetExpr().HasAppliedRule(r) { return false } - agg := expr.GetExpr().ExprNode.(*plannercore.LogicalAggregation) + agg := expr.GetExpr().ExprNode.(*logicalop.LogicalAggregation) for _, aggFunc := range agg.AggFuncs { if aggFunc.Mode != aggregation.CompleteMode { return false @@ -435,7 +435,7 @@ func (r *PushAggDownGather) Match(expr *memo.ExprIter) bool { // OnTransform implements Transformation interface. // It will transform `Agg->Gather` to `Agg(Final) -> Gather -> Agg(Partial1)`. func (r *PushAggDownGather) OnTransform(old *memo.ExprIter) (newExprs []*memo.GroupExpr, eraseOld bool, eraseAll bool, err error) { - agg := old.GetExpr().ExprNode.(*plannercore.LogicalAggregation) + agg := old.GetExpr().ExprNode.(*logicalop.LogicalAggregation) aggSchema := old.GetExpr().Group.Prop.Schema gather := old.Children[0].GetExpr().ExprNode.(*plannercore.TiKVSingleGather) childGroup := old.Children[0].GetExpr().Children[0] @@ -461,13 +461,13 @@ func (r *PushAggDownGather) OnTransform(old *memo.ExprIter) (newExprs []*memo.Gr partialPref.AggFuncs = plannercore.RemoveUnnecessaryFirstRow(agg.SCtx(), finalPref.GroupByItems, partialPref.AggFuncs, partialPref.GroupByItems, partialPref.Schema, firstRowFuncMap) - partialAgg := plannercore.LogicalAggregation{ + partialAgg := logicalop.LogicalAggregation{ AggFuncs: partialPref.AggFuncs, GroupByItems: partialPref.GroupByItems, }.Init(agg.SCtx(), agg.QueryBlockOffset()) partialAgg.CopyAggHints(agg) - finalAgg := plannercore.LogicalAggregation{ + finalAgg := logicalop.LogicalAggregation{ AggFuncs: finalPref.AggFuncs, GroupByItems: finalPref.GroupByItems, }.Init(agg.SCtx(), agg.QueryBlockOffset()) @@ -604,7 +604,7 @@ func NewRulePushSelDownAggregation() Transformation { // or just keep the selection unchanged. func (*PushSelDownAggregation) OnTransform(old *memo.ExprIter) (newExprs []*memo.GroupExpr, eraseOld bool, eraseAll bool, err error) { sel := old.GetExpr().ExprNode.(*logicalop.LogicalSelection) - agg := old.Children[0].GetExpr().ExprNode.(*plannercore.LogicalAggregation) + agg := old.Children[0].GetExpr().ExprNode.(*logicalop.LogicalAggregation) aggSchema := old.Children[0].Prop.Schema var pushedExprs []expression.Expression var remainedExprs []expression.Expression @@ -1525,7 +1525,7 @@ func (*MergeAggregationProjection) Match(old *memo.ExprIter) bool { // OnTransform implements Transformation interface. // It will transform `Aggregation->Projection->X` to `Aggregation->X`. func (*MergeAggregationProjection) OnTransform(old *memo.ExprIter) (newExprs []*memo.GroupExpr, eraseOld bool, eraseAll bool, err error) { - oldAgg := old.GetExpr().ExprNode.(*plannercore.LogicalAggregation) + oldAgg := old.GetExpr().ExprNode.(*logicalop.LogicalAggregation) proj := old.Children[0].GetExpr().ExprNode.(*logicalop.LogicalProjection) projSchema := old.Children[0].GetExpr().Schema() @@ -1545,7 +1545,7 @@ func (*MergeAggregationProjection) OnTransform(old *memo.ExprIter) (newExprs []* aggFuncs[i].Args = newArgs } - newAgg := plannercore.LogicalAggregation{ + newAgg := logicalop.LogicalAggregation{ GroupByItems: groupByItems, AggFuncs: aggFuncs, }.Init(ctx, oldAgg.QueryBlockOffset()) @@ -1579,7 +1579,7 @@ func (r *EliminateSingleMaxMin) Match(expr *memo.ExprIter) bool { return false } - agg := expr.GetExpr().ExprNode.(*plannercore.LogicalAggregation) + agg := expr.GetExpr().ExprNode.(*logicalop.LogicalAggregation) // EliminateSingleMaxMin only works on the complete mode. if !agg.IsCompleteModeAgg() { return false @@ -1604,7 +1604,7 @@ func (r *EliminateSingleMaxMin) Match(expr *memo.ExprIter) bool { // OnTransform implements Transformation interface. // It will transform `max/min->X` to `max/min->top1->sel->X`. func (r *EliminateSingleMaxMin) OnTransform(old *memo.ExprIter) (newExprs []*memo.GroupExpr, eraseOld bool, eraseAll bool, err error) { - agg := old.GetExpr().ExprNode.(*plannercore.LogicalAggregation) + agg := old.GetExpr().ExprNode.(*logicalop.LogicalAggregation) ectx := agg.SCtx().GetExprCtx().GetEvalCtx() childGroup := old.GetExpr().Children[0] ctx := agg.SCtx() @@ -1946,7 +1946,7 @@ func (*EliminateOuterJoinBelowAggregation) Match(expr *memo.ExprIter) bool { // OnTransform implements Transformation interface. // This rule tries to eliminate outer join which below aggregation. func (r *EliminateOuterJoinBelowAggregation) OnTransform(old *memo.ExprIter) (newExprs []*memo.GroupExpr, eraseOld bool, eraseAll bool, err error) { - agg := old.GetExpr().ExprNode.(*plannercore.LogicalAggregation) + agg := old.GetExpr().ExprNode.(*logicalop.LogicalAggregation) joinExpr := old.Children[0].GetExpr() join := joinExpr.ExprNode.(*logicalop.LogicalJoin) @@ -2054,14 +2054,14 @@ func NewRuleTransformAggregateCaseToSelection() Transformation { // Match implements Transformation interface. func (r *TransformAggregateCaseToSelection) Match(expr *memo.ExprIter) bool { - agg := expr.GetExpr().ExprNode.(*plannercore.LogicalAggregation) + agg := expr.GetExpr().ExprNode.(*logicalop.LogicalAggregation) return agg.IsCompleteModeAgg() && len(agg.GroupByItems) == 0 && len(agg.AggFuncs) == 1 && len(agg.AggFuncs[0].Args) == 1 && r.isTwoOrThreeArgCase(agg.AggFuncs[0].Args[0]) } // OnTransform implements Transformation interface. // This rule tries to convert Agg(case when) to Agg->Selection. func (r *TransformAggregateCaseToSelection) OnTransform(old *memo.ExprIter) (newExprs []*memo.GroupExpr, eraseOld bool, eraseAll bool, err error) { - agg := old.GetExpr().ExprNode.(*plannercore.LogicalAggregation) + agg := old.GetExpr().ExprNode.(*logicalop.LogicalAggregation) ok, newConditions, newAggFuncs := r.transform(agg) if !ok { @@ -2073,7 +2073,7 @@ func (r *TransformAggregateCaseToSelection) OnTransform(old *memo.ExprIter) (new newSelExpr.SetChildren(old.GetExpr().Children...) newSelGroup := memo.NewGroupWithSchema(newSelExpr, old.GetExpr().Children[0].Prop.Schema) - newAgg := plannercore.LogicalAggregation{ + newAgg := logicalop.LogicalAggregation{ AggFuncs: newAggFuncs, GroupByItems: agg.GroupByItems, }.Init(agg.SCtx(), agg.QueryBlockOffset()) @@ -2083,7 +2083,7 @@ func (r *TransformAggregateCaseToSelection) OnTransform(old *memo.ExprIter) (new return []*memo.GroupExpr{newAggExpr}, true, false, nil } -func (r *TransformAggregateCaseToSelection) transform(agg *plannercore.LogicalAggregation) (ok bool, newConditions []expression.Expression, newAggFuncs []*aggregation.AggFuncDesc) { +func (r *TransformAggregateCaseToSelection) transform(agg *logicalop.LogicalAggregation) (ok bool, newConditions []expression.Expression, newAggFuncs []*aggregation.AggFuncDesc) { aggFuncDesc := agg.AggFuncs[0] aggFuncName := aggFuncDesc.Name ctx := agg.SCtx() @@ -2174,7 +2174,7 @@ func NewRuleTransformAggToProj() Transformation { // Match implements Transformation interface. func (*TransformAggToProj) Match(expr *memo.ExprIter) bool { - agg := expr.GetExpr().ExprNode.(*plannercore.LogicalAggregation) + agg := expr.GetExpr().ExprNode.(*logicalop.LogicalAggregation) if !agg.IsCompleteModeAgg() { return false @@ -2203,7 +2203,7 @@ func (*TransformAggToProj) Match(expr *memo.ExprIter) bool { // OnTransform implements Transformation interface. // This rule tries to convert agg to proj. func (*TransformAggToProj) OnTransform(old *memo.ExprIter) (newExprs []*memo.GroupExpr, eraseOld bool, eraseAll bool, err error) { - agg := old.GetExpr().ExprNode.(*plannercore.LogicalAggregation) + agg := old.GetExpr().ExprNode.(*logicalop.LogicalAggregation) if ok, proj := plannercore.ConvertAggToProj(agg, old.GetExpr().Schema()); ok { newProjExpr := memo.NewGroupExpr(proj) newProjExpr.SetChildren(old.GetExpr().Children...) @@ -2333,14 +2333,14 @@ func NewRuleInjectProjectionBelowAgg() Transformation { // Match implements Transformation interface. func (*InjectProjectionBelowAgg) Match(expr *memo.ExprIter) bool { - agg := expr.GetExpr().ExprNode.(*plannercore.LogicalAggregation) + agg := expr.GetExpr().ExprNode.(*logicalop.LogicalAggregation) return agg.IsCompleteModeAgg() } // OnTransform implements Transformation interface. // It will convert `Agg -> X` to `Agg -> Proj -> X`. func (*InjectProjectionBelowAgg) OnTransform(old *memo.ExprIter) (newExprs []*memo.GroupExpr, eraseOld bool, eraseAll bool, err error) { - agg := old.GetExpr().ExprNode.(*plannercore.LogicalAggregation) + agg := old.GetExpr().ExprNode.(*logicalop.LogicalAggregation) ectx := agg.SCtx().GetExprCtx().GetEvalCtx() hasScalarFunc := false @@ -2417,7 +2417,7 @@ func (*InjectProjectionBelowAgg) OnTransform(old *memo.ExprIter) (newExprs []*me projExpr.SetChildren(old.GetExpr().Children[0]) projGroup := memo.NewGroupWithSchema(projExpr, projSchema) - newAgg := plannercore.LogicalAggregation{ + newAgg := logicalop.LogicalAggregation{ AggFuncs: copyFuncs, GroupByItems: newGroupByItems, }.Init(agg.SCtx(), agg.QueryBlockOffset()) diff --git a/pkg/planner/core/BUILD.bazel b/pkg/planner/core/BUILD.bazel index 86bffb8678eb9..3ea41de60ef5e 100644 --- a/pkg/planner/core/BUILD.bazel +++ b/pkg/planner/core/BUILD.bazel @@ -22,7 +22,6 @@ go_library( "indexmerge_path.go", "indexmerge_unfinished_path.go", "initialize.go", - "logical_aggregation.go", "logical_apply.go", "logical_cte.go", "logical_datasource.go", diff --git a/pkg/planner/core/casetest/stats_test.go b/pkg/planner/core/casetest/stats_test.go index 1125c15317c96..f4a816991cb2b 100644 --- a/pkg/planner/core/casetest/stats_test.go +++ b/pkg/planner/core/casetest/stats_test.go @@ -68,13 +68,13 @@ func TestGroupNDVs(t *testing.T) { lp := p.(base.LogicalPlan) _, err = core.RecursiveDeriveStats4Test(lp) require.NoError(t, err, comment) - var agg *core.LogicalAggregation + var agg *logicalop.LogicalAggregation var join *logicalop.LogicalJoin stack := make([]base.LogicalPlan, 0, 2) traversed := false for !traversed { switch v := lp.(type) { - case *core.LogicalAggregation: + case *logicalop.LogicalAggregation: agg = v lp = lp.Children()[0] case *logicalop.LogicalJoin: diff --git a/pkg/planner/core/collect_column_stats_usage.go b/pkg/planner/core/collect_column_stats_usage.go index 3a6ffdef2d8ae..28a2661e09a43 100644 --- a/pkg/planner/core/collect_column_stats_usage.go +++ b/pkg/planner/core/collect_column_stats_usage.go @@ -250,7 +250,7 @@ func (c *columnStatsUsageCollector) collectFromPlan(lp base.LogicalPlan) { // Though the conditions in LogicalSelection are complex conditions which cannot be pushed down to DataSource, we still // regard statistics of the columns in the conditions as needed. c.addPredicateColumnsFromExpressions(x.Conditions) - case *LogicalAggregation: + case *logicalop.LogicalAggregation: // Just assume statistics of all the columns in GroupByItems are needed. c.addPredicateColumnsFromExpressions(x.GroupByItems) // Schema change from children to self. diff --git a/pkg/planner/core/core_init.go b/pkg/planner/core/core_init.go index 23d47d8bbb61d..cdb35778df1dc 100644 --- a/pkg/planner/core/core_init.go +++ b/pkg/planner/core/core_init.go @@ -29,12 +29,10 @@ func init() { // For code refactor init. utilfuncp.AddSelection = addSelection utilfuncp.FindBestTask = findBestTask + utilfuncp.PruneByItems = pruneByItems utilfuncp.HasMaxOneRowUtil = HasMaxOneRow utilfuncp.GetTaskPlanCost = getTaskPlanCost utilfuncp.CanPushToCopImpl = canPushToCopImpl - utilfuncp.GetStreamAggs = getStreamAggs - utilfuncp.GetHashAggs = getHashAggs - utilfuncp.PruneByItems = pruneByItems utilfuncp.PushDownTopNForBaseLogicalPlan = pushDownTopNForBaseLogicalPlan utilfuncp.FindBestTask4LogicalShow = findBestTask4LogicalShow utilfuncp.FindBestTask4LogicalCTETable = findBestTask4LogicalCTETable @@ -52,6 +50,7 @@ func init() { utilfuncp.ExhaustPhysicalPlans4LogicalMaxOneRow = exhaustPhysicalPlans4LogicalMaxOneRow utilfuncp.ExhaustPhysicalPlans4LogicalUnionScan = exhaustPhysicalPlans4LogicalUnionScan utilfuncp.ExhaustPhysicalPlans4LogicalProjection = exhaustPhysicalPlans4LogicalProjection + utilfuncp.ExhaustPhysicalPlans4LogicalAggregation = exhaustPhysicalPlans4LogicalAggregation utilfuncp.GetActualProbeCntFromProbeParents = getActualProbeCntFromProbeParents utilfuncp.GetEstimatedProbeCntFromProbeParents = getEstimatedProbeCntFromProbeParents diff --git a/pkg/planner/core/exhaust_physical_plans.go b/pkg/planner/core/exhaust_physical_plans.go index e3f72039cac8b..f0bd7f98c8c7c 100644 --- a/pkg/planner/core/exhaust_physical_plans.go +++ b/pkg/planner/core/exhaust_physical_plans.go @@ -765,7 +765,7 @@ childLoop: case *DataSource: wrapper.ds = child break childLoop - case *logicalop.LogicalProjection, *logicalop.LogicalSelection, *LogicalAggregation: + case *logicalop.LogicalProjection, *logicalop.LogicalSelection, *logicalop.LogicalAggregation: if !p.SCtx().GetSessionVars().EnableINLJoinInnerMultiPattern { return nil } @@ -1036,14 +1036,14 @@ func constructInnerByZippedChildren(prop *property.PhysicalProperty, zippedChild child = constructInnerProj(prop, x, child) case *logicalop.LogicalSelection: child = constructInnerSel(prop, x, child) - case *LogicalAggregation: + case *logicalop.LogicalAggregation: child = constructInnerAgg(prop, x, child) } } return child } -func constructInnerAgg(prop *property.PhysicalProperty, logicalAgg *LogicalAggregation, child base.PhysicalPlan) base.PhysicalPlan { +func constructInnerAgg(prop *property.PhysicalProperty, logicalAgg *logicalop.LogicalAggregation, child base.PhysicalPlan) base.PhysicalPlan { if logicalAgg == nil { return child } @@ -1337,10 +1337,10 @@ func constructInnerIndexScanTask( // // Step2: build other inner plan node to task func constructIndexJoinInnerSideTask(p *logicalop.LogicalJoin, prop *property.PhysicalProperty, dsCopTask *CopTask, ds *DataSource, path *util.AccessPath, wrapper *indexJoinInnerChildWrapper) base.Task { - var la *LogicalAggregation + var la *logicalop.LogicalAggregation var canPushAggToCop bool if len(wrapper.zippedChildren) > 0 { - la, canPushAggToCop = wrapper.zippedChildren[len(wrapper.zippedChildren)-1].(*LogicalAggregation) + la, canPushAggToCop = wrapper.zippedChildren[len(wrapper.zippedChildren)-1].(*logicalop.LogicalAggregation) if la != nil && la.HasDistinct() { // TODO: remove AllowDistinctAggPushDown after the cost estimation of distinct pushdown is implemented. // If AllowDistinctAggPushDown is set to true, we should not consider RootTask. @@ -2277,7 +2277,7 @@ func disableAggPushDownToCop(p base.LogicalPlan) { for _, child := range p.Children() { disableAggPushDownToCop(child) } - if agg, ok := p.(*LogicalAggregation); ok { + if agg, ok := p.(*logicalop.LogicalAggregation); ok { agg.NoCopPushDown = true } } @@ -2466,7 +2466,7 @@ func canPushToCopImpl(lp base.LogicalPlan, storeTp kv.StoreType, considerDual bo ret = ret && canPushToCopImpl(&c.BaseLogicalPlan, storeTp, considerDual) case *logicalop.LogicalTableDual: return storeTp == kv.TiFlash && considerDual - case *LogicalAggregation, *logicalop.LogicalSelection, *logicalop.LogicalJoin, *logicalop.LogicalWindow: + case *logicalop.LogicalAggregation, *logicalop.LogicalSelection, *logicalop.LogicalJoin, *logicalop.LogicalWindow: if storeTp != kv.TiFlash { return false } @@ -2493,7 +2493,7 @@ func canPushToCopImpl(lp base.LogicalPlan, storeTp kv.StoreType, considerDual bo return ret } -func getEnforcedStreamAggs(la *LogicalAggregation, prop *property.PhysicalProperty) []base.PhysicalPlan { +func getEnforcedStreamAggs(la *logicalop.LogicalAggregation, prop *property.PhysicalProperty) []base.PhysicalPlan { if prop.IsFlashProp() { return nil } @@ -2538,21 +2538,8 @@ func getEnforcedStreamAggs(la *LogicalAggregation, prop *property.PhysicalProper return enforcedAggs } -func (la *LogicalAggregation) distinctArgsMeetsProperty() bool { - for _, aggFunc := range la.AggFuncs { - if aggFunc.HasDistinct { - for _, distinctArg := range aggFunc.Args { - if !expression.Contains(la.SCtx().GetExprCtx().GetEvalCtx(), la.GroupByItems, distinctArg) { - return false - } - } - } - } - return true -} - func getStreamAggs(lp base.LogicalPlan, prop *property.PhysicalProperty) []base.PhysicalPlan { - la := lp.(*LogicalAggregation) + la := lp.(*logicalop.LogicalAggregation) // TODO: support CopTiFlash task type in stream agg if prop.IsFlashProp() { return nil @@ -2630,7 +2617,7 @@ func getStreamAggs(lp base.LogicalPlan, prop *property.PhysicalProperty) []base. } // TODO: support more operators and distinct later -func checkCanPushDownToMPP(la *LogicalAggregation) bool { +func checkCanPushDownToMPP(la *logicalop.LogicalAggregation) bool { hasUnsupportedDistinct := false for _, agg := range la.AggFuncs { // MPP does not support distinct except count distinct now @@ -2656,7 +2643,7 @@ func checkCanPushDownToMPP(la *LogicalAggregation) bool { return CheckAggCanPushCop(la.SCtx(), la.AggFuncs, la.GroupByItems, kv.TiFlash) } -func tryToGetMppHashAggs(la *LogicalAggregation, prop *property.PhysicalProperty) (hashAggs []base.PhysicalPlan) { +func tryToGetMppHashAggs(la *logicalop.LogicalAggregation, prop *property.PhysicalProperty) (hashAggs []base.PhysicalPlan) { if !prop.IsSortItemEmpty() { return nil } @@ -2804,7 +2791,7 @@ func tryToGetMppHashAggs(la *LogicalAggregation, prop *property.PhysicalProperty // // That is to say, the non-complete positive judgement of canPushDownToMPP/canPushDownToTiFlash/canPushDownToTiKV is not that for sure here. func getHashAggs(lp base.LogicalPlan, prop *property.PhysicalProperty) []base.PhysicalPlan { - la := lp.(*LogicalAggregation) + la := lp.(*logicalop.LogicalAggregation) if !prop.IsSortItemEmpty() { return nil } @@ -2864,6 +2851,32 @@ func getHashAggs(lp base.LogicalPlan, prop *property.PhysicalProperty) []base.Ph return hashAggs } +func exhaustPhysicalPlans4LogicalAggregation(lp base.LogicalPlan, prop *property.PhysicalProperty) ([]base.PhysicalPlan, bool, error) { + la := lp.(*logicalop.LogicalAggregation) + if la.PreferAggToCop { + if !la.CanPushToCop(kv.TiKV) { + la.SCtx().GetSessionVars().StmtCtx.SetHintWarning( + "Optimizer Hint AGG_TO_COP is inapplicable") + la.PreferAggToCop = false + } + } + preferHash, preferStream := la.ResetHintIfConflicted() + hashAggs := getHashAggs(la, prop) + if hashAggs != nil && preferHash { + return hashAggs, true, nil + } + streamAggs := getStreamAggs(la, prop) + if streamAggs != nil && preferStream { + return streamAggs, true, nil + } + aggs := append(hashAggs, streamAggs...) + + if streamAggs == nil && preferStream && !prop.IsSortItemEmpty() { + la.SCtx().GetSessionVars().StmtCtx.SetHintWarning("Optimizer Hint STREAM_AGG is inapplicable") + } + return aggs, !(preferStream || preferHash), nil +} + func exhaustPhysicalPlans4LogicalSelection(lp base.LogicalPlan, prop *property.PhysicalProperty) ([]base.PhysicalPlan, bool, error) { p := lp.(*logicalop.LogicalSelection) newProps := make([]*property.PhysicalProperty, 0, 2) diff --git a/pkg/planner/core/expression_rewriter.go b/pkg/planner/core/expression_rewriter.go index 9c7c7e5f82203..7a1b14d28a702 100644 --- a/pkg/planner/core/expression_rewriter.go +++ b/pkg/planner/core/expression_rewriter.go @@ -814,7 +814,7 @@ func (er *expressionRewriter) handleCompareSubquery(ctx context.Context, planCtx // it will be rewrote to t.id < (select max(s.id) from s). func (er *expressionRewriter) handleOtherComparableSubq(planCtx *exprRewriterPlanCtx, lexpr, rexpr expression.Expression, np base.LogicalPlan, useMin bool, cmpFunc string, all, markNoDecorrelate bool) { intest.AssertNotNil(planCtx) - plan4Agg := LogicalAggregation{}.Init(planCtx.builder.ctx, planCtx.builder.getSelectOffset()) + plan4Agg := logicalop.LogicalAggregation{}.Init(planCtx.builder.ctx, planCtx.builder.getSelectOffset()) if hintinfo := planCtx.builder.TableHints(); hintinfo != nil { plan4Agg.PreferAggType = hintinfo.PreferAggType plan4Agg.PreferAggToCop = hintinfo.PreferAggToCop @@ -849,7 +849,7 @@ func (er *expressionRewriter) handleOtherComparableSubq(planCtx *exprRewriterPla } // buildQuantifierPlan adds extra condition for any / all subquery. -func (er *expressionRewriter) buildQuantifierPlan(planCtx *exprRewriterPlanCtx, plan4Agg *LogicalAggregation, cond, lexpr, rexpr expression.Expression, all, markNoDecorrelate bool) { +func (er *expressionRewriter) buildQuantifierPlan(planCtx *exprRewriterPlanCtx, plan4Agg *logicalop.LogicalAggregation, cond, lexpr, rexpr expression.Expression, all, markNoDecorrelate bool) { intest.AssertNotNil(planCtx) innerIsNull := expression.NewFunctionInternal(er.sctx, ast.IsNull, types.NewFieldType(mysql.TypeTiny), rexpr) outerIsNull := expression.NewFunctionInternal(er.sctx, ast.IsNull, types.NewFieldType(mysql.TypeTiny), lexpr) @@ -951,7 +951,7 @@ func (er *expressionRewriter) handleNEAny(planCtx *exprRewriterPlanCtx, lexpr, r er.err = err return } - plan4Agg := LogicalAggregation{ + plan4Agg := logicalop.LogicalAggregation{ AggFuncs: []*aggregation.AggFuncDesc{maxFunc, countFunc}, }.Init(sctx, planCtx.builder.getSelectOffset()) if hintinfo := planCtx.builder.TableHints(); hintinfo != nil { @@ -994,7 +994,7 @@ func (er *expressionRewriter) handleEQAll(planCtx *exprRewriterPlanCtx, lexpr, r er.err = err return } - plan4Agg := LogicalAggregation{ + plan4Agg := logicalop.LogicalAggregation{ AggFuncs: []*aggregation.AggFuncDesc{maxFunc, countFunc}, }.Init(sctx, planCtx.builder.getSelectOffset()) if hintinfo := planCtx.builder.TableHints(); hintinfo != nil { @@ -1120,7 +1120,7 @@ out: // e.g. exists(select count(*) from t order by a) is equal to exists t. case *logicalop.LogicalProjection, *logicalop.LogicalSort: p = p.Children()[0] - case *LogicalAggregation: + case *logicalop.LogicalAggregation: if len(plan.GroupByItems) == 0 { p = logicalop.LogicalTableDual{RowCount: 1}.Init(planCtx.builder.ctx, planCtx.builder.getSelectOffset()) break out diff --git a/pkg/planner/core/logical_plan_builder.go b/pkg/planner/core/logical_plan_builder.go index d84189121e075..0b7c94f26b083 100644 --- a/pkg/planner/core/logical_plan_builder.go +++ b/pkg/planner/core/logical_plan_builder.go @@ -266,7 +266,7 @@ func (b *PlanBuilder) buildAggregation(ctx context.Context, p base.LogicalPlan, rollupExpand = expand } - plan4Agg := LogicalAggregation{AggFuncs: make([]*aggregation.AggFuncDesc, 0, len(aggFuncList))}.Init(b.ctx, b.getSelectOffset()) + plan4Agg := logicalop.LogicalAggregation{AggFuncs: make([]*aggregation.AggFuncDesc, 0, len(aggFuncList))}.Init(b.ctx, b.getSelectOffset()) if hintinfo := b.TableHints(); hintinfo != nil { plan4Agg.PreferAggType = hintinfo.PreferAggType plan4Agg.PreferAggToCop = hintinfo.PreferAggToCop @@ -1489,10 +1489,10 @@ func (b *PlanBuilder) buildProjection(ctx context.Context, p base.LogicalPlan, f return proj, proj.Exprs, oldLen, nil } -func (b *PlanBuilder) buildDistinct(child base.LogicalPlan, length int) (*LogicalAggregation, error) { +func (b *PlanBuilder) buildDistinct(child base.LogicalPlan, length int) (*logicalop.LogicalAggregation, error) { b.optFlag = b.optFlag | flagBuildKeyInfo b.optFlag = b.optFlag | flagPushDownAgg - plan4Agg := LogicalAggregation{ + plan4Agg := logicalop.LogicalAggregation{ AggFuncs: make([]*aggregation.AggFuncDesc, 0, child.Schema().Len()), GroupByItems: expression.Column2Exprs(child.Schema().Clone().Columns[:length]), }.Init(b.ctx, child.QueryBlockOffset()) diff --git a/pkg/planner/core/logical_plans.go b/pkg/planner/core/logical_plans.go index a1ddecb4e62ca..f529aaebef64f 100644 --- a/pkg/planner/core/logical_plans.go +++ b/pkg/planner/core/logical_plans.go @@ -21,7 +21,7 @@ import ( var ( _ base.LogicalPlan = &logicalop.LogicalJoin{} - _ base.LogicalPlan = &LogicalAggregation{} + _ base.LogicalPlan = &logicalop.LogicalAggregation{} _ base.LogicalPlan = &logicalop.LogicalProjection{} _ base.LogicalPlan = &logicalop.LogicalSelection{} _ base.LogicalPlan = &LogicalApply{} diff --git a/pkg/planner/core/logical_plans_test.go b/pkg/planner/core/logical_plans_test.go index 2820f4825823f..dbb9ff3a0db7c 100644 --- a/pkg/planner/core/logical_plans_test.go +++ b/pkg/planner/core/logical_plans_test.go @@ -163,7 +163,7 @@ func TestImplicitCastNotNullFlag(t *testing.T) { p, err = logicalOptimize(context.TODO(), flagPredicatePushDown|flagJoinReOrder|flagPrunColumns|flagEliminateProjection, p.(base.LogicalPlan)) require.NoError(t, err) // AggFuncs[0] is count; AggFuncs[1] is bit_and, args[0] is return type of the implicit cast - castNotNullFlag := (p.(*logicalop.LogicalProjection).Children()[0].(*logicalop.LogicalSelection).Children()[0].(*LogicalAggregation).AggFuncs[1].Args[0].GetType(s.ctx.GetExprCtx().GetEvalCtx()).GetFlag()) & mysql.NotNullFlag + castNotNullFlag := (p.(*logicalop.LogicalProjection).Children()[0].(*logicalop.LogicalSelection).Children()[0].(*logicalop.LogicalAggregation).AggFuncs[1].Args[0].GetType(s.ctx.GetExprCtx().GetEvalCtx()).GetFlag()) & mysql.NotNullFlag var nullableFlag uint = 0 require.Equal(t, nullableFlag, castNotNullFlag) } @@ -392,7 +392,7 @@ func TestExtraPKNotNullFlag(t *testing.T) { require.NoError(t, err, comment) p, err := BuildLogicalPlanForTest(ctx, s.sctx, stmt, s.is) require.NoError(t, err, comment) - ds := p.(*logicalop.LogicalProjection).Children()[0].(*LogicalAggregation).Children()[0].(*DataSource) + ds := p.(*logicalop.LogicalProjection).Children()[0].(*logicalop.LogicalAggregation).Children()[0].(*DataSource) require.Equal(t, "_tidb_rowid", ds.Columns[2].Name.L) require.Equal(t, mysql.PriKeyFlag|mysql.NotNullFlag, ds.Columns[2].GetFlag()) require.Equal(t, mysql.PriKeyFlag|mysql.NotNullFlag, ds.Schema().Columns[2].RetType.GetFlag()) @@ -761,7 +761,7 @@ func TestCS3389(t *testing.T) { require.True(t, isProj) require.True(t, len(proj.Exprs) > 0) child := proj.Children()[0] - agg, isAgg := child.(*LogicalAggregation) + agg, isAgg := child.(*logicalop.LogicalAggregation) require.True(t, isAgg) child = agg.Children()[0] _, isJoin := child.(*logicalop.LogicalJoin) diff --git a/pkg/planner/core/operator/logicalop/BUILD.bazel b/pkg/planner/core/operator/logicalop/BUILD.bazel index 3a1d6c7f4b030..fa57f0fe3c68d 100644 --- a/pkg/planner/core/operator/logicalop/BUILD.bazel +++ b/pkg/planner/core/operator/logicalop/BUILD.bazel @@ -4,6 +4,7 @@ go_library( name = "logicalop", srcs = [ "base_logical_plan.go", + "logical_aggregation.go", "logical_cte_table.go", "logical_join.go", "logical_limit.go", diff --git a/pkg/planner/core/logical_aggregation.go b/pkg/planner/core/operator/logicalop/logical_aggregation.go similarity index 96% rename from pkg/planner/core/logical_aggregation.go rename to pkg/planner/core/operator/logicalop/logical_aggregation.go index 8b65f7352121b..b27538c1b558d 100644 --- a/pkg/planner/core/logical_aggregation.go +++ b/pkg/planner/core/operator/logicalop/logical_aggregation.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package core +package logicalop import ( "bytes" @@ -24,7 +24,6 @@ import ( "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/planner/cardinality" "github.com/pingcap/tidb/pkg/planner/core/base" - "github.com/pingcap/tidb/pkg/planner/core/operator/logicalop" ruleutil "github.com/pingcap/tidb/pkg/planner/core/rule/util" fd "github.com/pingcap/tidb/pkg/planner/funcdep" "github.com/pingcap/tidb/pkg/planner/property" @@ -39,7 +38,7 @@ import ( // LogicalAggregation represents an aggregate plan. type LogicalAggregation struct { - logicalop.LogicalSchemaProducer + LogicalSchemaProducer AggFuncs []*aggregation.AggFuncDesc GroupByItems []expression.Expression @@ -58,7 +57,7 @@ type LogicalAggregation struct { // Init initializes LogicalAggregation. func (la LogicalAggregation) Init(ctx base.PlanContext, offset int) *LogicalAggregation { - la.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeAgg, &la, offset) + la.BaseLogicalPlan = NewBaseLogicalPlan(ctx, plancodec.TypeAgg, &la, offset) return &la } @@ -207,7 +206,7 @@ func (la *LogicalAggregation) BuildKeyInfo(selfSchema *expression.Schema, childS return } la.LogicalSchemaProducer.BuildKeyInfo(selfSchema, childSchema) - la.buildSelfKeyInfo(selfSchema) + la.BuildSelfKeyInfo(selfSchema) } // PushDownTopN inherits BaseLogicalPlan.LogicalPlan.<5rd> implementation. @@ -291,33 +290,7 @@ func (la *LogicalAggregation) PreparePossibleProperties(_ *expression.Schema, ch // ExhaustPhysicalPlans implements base.LogicalPlan.<14th> interface. func (la *LogicalAggregation) ExhaustPhysicalPlans(prop *property.PhysicalProperty) ([]base.PhysicalPlan, bool, error) { - if la.PreferAggToCop { - if !la.CanPushToCop(kv.TiKV) { - la.SCtx().GetSessionVars().StmtCtx.SetHintWarning( - "Optimizer Hint AGG_TO_COP is inapplicable") - la.PreferAggToCop = false - } - } - - preferHash, preferStream := la.ResetHintIfConflicted() - - hashAggs := utilfuncp.GetHashAggs(la, prop) - if hashAggs != nil && preferHash { - return hashAggs, true, nil - } - - streamAggs := utilfuncp.GetStreamAggs(la, prop) - if streamAggs != nil && preferStream { - return streamAggs, true, nil - } - - aggs := append(hashAggs, streamAggs...) - - if streamAggs == nil && preferStream && !prop.IsSortItemEmpty() { - la.SCtx().GetSessionVars().StmtCtx.SetHintWarning("Optimizer Hint STREAM_AGG is inapplicable") - } - - return aggs, !(preferStream || preferHash), nil + return utilfuncp.ExhaustPhysicalPlans4LogicalAggregation(la, prop) } // ExtractCorrelatedCols implements base.LogicalPlan.<15th> interface. @@ -705,7 +678,8 @@ func (la *LogicalAggregation) pushDownPredicatesForAggregation(cond expression.E return condsToPush, ret } -func (la *LogicalAggregation) buildSelfKeyInfo(selfSchema *expression.Schema) { +// BuildSelfKeyInfo builds the key information for the aggregation itself. +func (la *LogicalAggregation) BuildSelfKeyInfo(selfSchema *expression.Schema) { groupByCols := la.GetGroupByCols() if len(groupByCols) == len(la.GroupByItems) && len(la.GroupByItems) > 0 { indices := selfSchema.ColumnsIndices(groupByCols) @@ -722,8 +696,8 @@ func (la *LogicalAggregation) buildSelfKeyInfo(selfSchema *expression.Schema) { } } -// canPullUp checks if an aggregation can be pulled up. An aggregate function like count(*) cannot be pulled up. -func (la *LogicalAggregation) canPullUp() bool { +// CanPullUp checks if an aggregation can be pulled up. An aggregate function like count(*) cannot be pulled up. +func (la *LogicalAggregation) CanPullUp() bool { if len(la.GroupByItems) > 0 { return false } diff --git a/pkg/planner/core/physical_plans.go b/pkg/planner/core/physical_plans.go index d36f976bc6ee5..f988fd11c19f3 100644 --- a/pkg/planner/core/physical_plans.go +++ b/pkg/planner/core/physical_plans.go @@ -2135,7 +2135,7 @@ func (p *PhysicalHashAgg) MemoryUsage() (sum int64) { } // NewPhysicalHashAgg creates a new PhysicalHashAgg from a LogicalAggregation. -func NewPhysicalHashAgg(la *LogicalAggregation, newStats *property.StatsInfo, prop *property.PhysicalProperty) *PhysicalHashAgg { +func NewPhysicalHashAgg(la *logicalop.LogicalAggregation, newStats *property.StatsInfo, prop *property.PhysicalProperty) *PhysicalHashAgg { newGbyItems := make([]expression.Expression, len(la.GroupByItems)) copy(newGbyItems, la.GroupByItems) newAggFuncs := make([]*aggregation.AggFuncDesc, len(la.AggFuncs)) diff --git a/pkg/planner/core/plan.go b/pkg/planner/core/plan.go index e239bcba95300..054a38f6eec2a 100644 --- a/pkg/planner/core/plan.go +++ b/pkg/planner/core/plan.go @@ -252,7 +252,7 @@ func HasMaxOneRow(p base.LogicalPlan, childMaxOneRow []bool) bool { } switch x := p.(type) { case *logicalop.LogicalLock, *logicalop.LogicalLimit, *logicalop.LogicalSort, *logicalop.LogicalSelection, - *LogicalApply, *logicalop.LogicalProjection, *logicalop.LogicalWindow, *LogicalAggregation: + *LogicalApply, *logicalop.LogicalProjection, *logicalop.LogicalWindow, *logicalop.LogicalAggregation: return childMaxOneRow[0] case *logicalop.LogicalMaxOneRow: return true diff --git a/pkg/planner/core/rule_aggregation_elimination.go b/pkg/planner/core/rule_aggregation_elimination.go index 14f0fea535d82..6b5a7796e81b8 100644 --- a/pkg/planner/core/rule_aggregation_elimination.go +++ b/pkg/planner/core/rule_aggregation_elimination.go @@ -51,7 +51,7 @@ type aggregationEliminateChecker struct { // e.g. select min(b) from t group by a. If a is a unique key, then this sql is equal to `select b from t group by a`. // For count(expr), sum(expr), avg(expr), count(distinct expr, [expr...]) we may need to rewrite the expr. Details are shown below. // If we can eliminate agg successful, we return a projection. Else we return a nil pointer. -func (a *aggregationEliminateChecker) tryToEliminateAggregation(agg *LogicalAggregation, opt *optimizetrace.LogicalOptimizeOp) *logicalop.LogicalProjection { +func (a *aggregationEliminateChecker) tryToEliminateAggregation(agg *logicalop.LogicalAggregation, opt *optimizetrace.LogicalOptimizeOp) *logicalop.LogicalProjection { for _, af := range agg.AggFuncs { // TODO(issue #9968): Actually, we can rewrite GROUP_CONCAT when all the // arguments it accepts are promised to be NOT-NULL. @@ -92,7 +92,7 @@ func (a *aggregationEliminateChecker) tryToEliminateAggregation(agg *LogicalAggr // tryToEliminateDistinct will eliminate distinct in the aggregation function if the aggregation args // have unique key column. see detail example in https://github.com/pingcap/tidb/issues/23436 -func (*aggregationEliminateChecker) tryToEliminateDistinct(agg *LogicalAggregation, opt *optimizetrace.LogicalOptimizeOp) { +func (*aggregationEliminateChecker) tryToEliminateDistinct(agg *logicalop.LogicalAggregation, opt *optimizetrace.LogicalOptimizeOp) { for _, af := range agg.AggFuncs { if af.HasDistinct { cols := make([]*expression.Column, 0, len(af.Args)) @@ -132,7 +132,7 @@ func (*aggregationEliminateChecker) tryToEliminateDistinct(agg *LogicalAggregati } } -func appendAggregationEliminateTraceStep(agg *LogicalAggregation, proj *logicalop.LogicalProjection, uniqueKey expression.KeyInfo, opt *optimizetrace.LogicalOptimizeOp) { +func appendAggregationEliminateTraceStep(agg *logicalop.LogicalAggregation, proj *logicalop.LogicalProjection, uniqueKey expression.KeyInfo, opt *optimizetrace.LogicalOptimizeOp) { reason := func() string { return fmt.Sprintf("%s is a unique key", uniqueKey.String()) } @@ -143,7 +143,7 @@ func appendAggregationEliminateTraceStep(agg *LogicalAggregation, proj *logicalo opt.AppendStepToCurrent(agg.ID(), agg.TP(), reason, action) } -func appendDistinctEliminateTraceStep(agg *LogicalAggregation, uniqueKey expression.KeyInfo, af *aggregation.AggFuncDesc, +func appendDistinctEliminateTraceStep(agg *logicalop.LogicalAggregation, uniqueKey expression.KeyInfo, af *aggregation.AggFuncDesc, opt *optimizetrace.LogicalOptimizeOp) { reason := func() string { return fmt.Sprintf("%s is a unique key", uniqueKey.String()) @@ -156,7 +156,7 @@ func appendDistinctEliminateTraceStep(agg *LogicalAggregation, uniqueKey express // CheckCanConvertAggToProj check whether a special old aggregation (which has already been pushed down) to projection. // link: issue#44795 -func CheckCanConvertAggToProj(agg *LogicalAggregation) bool { +func CheckCanConvertAggToProj(agg *logicalop.LogicalAggregation) bool { var mayNullSchema *expression.Schema if join, ok := agg.Children()[0].(*logicalop.LogicalJoin); ok { if join.JoinType == logicalop.LeftOuterJoin { @@ -183,7 +183,7 @@ func CheckCanConvertAggToProj(agg *LogicalAggregation) bool { } // ConvertAggToProj convert aggregation to projection. -func ConvertAggToProj(agg *LogicalAggregation, schema *expression.Schema) (bool, *logicalop.LogicalProjection) { +func ConvertAggToProj(agg *logicalop.LogicalAggregation, schema *expression.Schema) (bool, *logicalop.LogicalProjection) { proj := logicalop.LogicalProjection{ Exprs: make([]expression.Expression, 0, len(agg.AggFuncs)), }.Init(agg.SCtx(), agg.QueryBlockOffset()) @@ -269,7 +269,7 @@ func (a *AggregationEliminator) Optimize(ctx context.Context, p base.LogicalPlan newChildren = append(newChildren, newChild) } p.SetChildren(newChildren...) - agg, ok := p.(*LogicalAggregation) + agg, ok := p.(*logicalop.LogicalAggregation) if !ok { return p, planChanged, nil } diff --git a/pkg/planner/core/rule_aggregation_push_down.go b/pkg/planner/core/rule_aggregation_push_down.go index 93cf8a1a98983..b4221671ef4bf 100644 --- a/pkg/planner/core/rule_aggregation_push_down.go +++ b/pkg/planner/core/rule_aggregation_push_down.go @@ -104,7 +104,7 @@ func (*AggregationPushDownSolver) getAggFuncChildIdx(aggFunc *aggregation.AggFun // collectAggFuncs collects all aggregate functions and splits them into two parts: "leftAggFuncs" and "rightAggFuncs" whose // arguments are all from left child or right child separately. If some aggregate functions have the arguments that have // columns both from left and right children, the whole aggregation is forbidden to push down. -func (a *AggregationPushDownSolver) collectAggFuncs(agg *LogicalAggregation, join *logicalop.LogicalJoin) (valid bool, leftAggFuncs, rightAggFuncs []*aggregation.AggFuncDesc) { +func (a *AggregationPushDownSolver) collectAggFuncs(agg *logicalop.LogicalAggregation, join *logicalop.LogicalJoin) (valid bool, leftAggFuncs, rightAggFuncs []*aggregation.AggFuncDesc) { valid = true leftChild := join.Children()[0] rightChild := join.Children()[1] @@ -147,7 +147,7 @@ func (a *AggregationPushDownSolver) collectAggFuncs(agg *LogicalAggregation, joi // query should be "SELECT SUM(B.agg) FROM A, (SELECT SUM(id) as agg, c1, c2, c3 FROM B GROUP BY id, c1, c2, c3) as B // WHERE A.c1 = B.c1 AND A.c2 != B.c2 GROUP BY B.c3". As you see, all the columns appearing in join-conditions should be // treated as group by columns in join subquery. -func (a *AggregationPushDownSolver) collectGbyCols(agg *LogicalAggregation, join *logicalop.LogicalJoin) (leftGbyCols, rightGbyCols []*expression.Column) { +func (a *AggregationPushDownSolver) collectGbyCols(agg *logicalop.LogicalAggregation, join *logicalop.LogicalJoin) (leftGbyCols, rightGbyCols []*expression.Column) { leftChild := join.Children()[0] ctx := agg.SCtx() for _, gbyExpr := range agg.GroupByItems { @@ -186,7 +186,7 @@ func (a *AggregationPushDownSolver) collectGbyCols(agg *LogicalAggregation, join return } -func (a *AggregationPushDownSolver) splitAggFuncsAndGbyCols(agg *LogicalAggregation, join *logicalop.LogicalJoin) (valid bool, +func (a *AggregationPushDownSolver) splitAggFuncsAndGbyCols(agg *logicalop.LogicalAggregation, join *logicalop.LogicalJoin) (valid bool, leftAggFuncs, rightAggFuncs []*aggregation.AggFuncDesc, leftGbyCols, rightGbyCols []*expression.Column) { valid, leftAggFuncs, rightAggFuncs = a.collectAggFuncs(agg, join) @@ -253,7 +253,7 @@ func (*AggregationPushDownSolver) decompose(ctx base.PlanContext, aggFunc *aggre // tryToPushDownAgg tries to push down an aggregate function into a join path. If all aggFuncs are first row, we won't // process it temporarily. If not, We will add additional group by columns and first row functions. We make a new aggregation operator. // If the pushed aggregation is grouped by unique key, it's no need to push it down. -func (a *AggregationPushDownSolver) tryToPushDownAgg(oldAgg *LogicalAggregation, aggFuncs []*aggregation.AggFuncDesc, gbyCols []*expression.Column, +func (a *AggregationPushDownSolver) tryToPushDownAgg(oldAgg *logicalop.LogicalAggregation, aggFuncs []*aggregation.AggFuncDesc, gbyCols []*expression.Column, join *logicalop.LogicalJoin, childIdx int, blockOffset int, opt *optimizetrace.LogicalOptimizeOp) (_ base.LogicalPlan, err error) { child := join.Children()[childIdx] if aggregation.IsAllFirstRow(aggFuncs) { @@ -294,7 +294,7 @@ func (a *AggregationPushDownSolver) tryToPushDownAgg(oldAgg *LogicalAggregation, return agg, nil } -func (*AggregationPushDownSolver) getDefaultValues(agg *LogicalAggregation) ([]types.Datum, bool) { +func (*AggregationPushDownSolver) getDefaultValues(agg *logicalop.LogicalAggregation) ([]types.Datum, bool) { defaultValues := make([]types.Datum, 0, agg.Schema().Len()) for _, aggFunc := range agg.AggFuncs { value, existsDefaultValue := aggFunc.EvalNullValueInOuterJoin(agg.SCtx().GetExprCtx(), agg.Children()[0].Schema()) @@ -331,8 +331,8 @@ func (*AggregationPushDownSolver) checkAllArgsColumn(fun *aggregation.AggFuncDes // 1. https://github.com/pingcap/tidb/issues/16355, push avg & distinct functions across join // 2. remove this method and use splitPartialAgg instead for clean code. func (a *AggregationPushDownSolver) makeNewAgg(ctx base.PlanContext, aggFuncs []*aggregation.AggFuncDesc, - gbyCols []*expression.Column, preferAggType uint, preferAggToCop bool, blockOffset int, nullGenerating bool) (*LogicalAggregation, error) { - agg := LogicalAggregation{ + gbyCols []*expression.Column, preferAggType uint, preferAggToCop bool, blockOffset int, nullGenerating bool) (*logicalop.LogicalAggregation, error) { + agg := logicalop.LogicalAggregation{ GroupByItems: expression.Column2Exprs(gbyCols), PreferAggType: preferAggType, PreferAggToCop: preferAggToCop, @@ -362,7 +362,7 @@ func (a *AggregationPushDownSolver) makeNewAgg(ctx base.PlanContext, aggFuncs [] return agg, nil } -func (*AggregationPushDownSolver) splitPartialAgg(agg *LogicalAggregation) (pushedAgg *LogicalAggregation) { +func (*AggregationPushDownSolver) splitPartialAgg(agg *logicalop.LogicalAggregation) (pushedAgg *logicalop.LogicalAggregation) { partial, final, _ := BuildFinalModeAggregation(agg.SCtx(), &AggInfo{ AggFuncs: agg.AggFuncs, GroupByItems: agg.GroupByItems, @@ -376,7 +376,7 @@ func (*AggregationPushDownSolver) splitPartialAgg(agg *LogicalAggregation) (push agg.AggFuncs = final.AggFuncs agg.GroupByItems = final.GroupByItems - pushedAgg = LogicalAggregation{ + pushedAgg = logicalop.LogicalAggregation{ AggFuncs: partial.AggFuncs, GroupByItems: partial.GroupByItems, PreferAggType: agg.PreferAggType, @@ -388,9 +388,9 @@ func (*AggregationPushDownSolver) splitPartialAgg(agg *LogicalAggregation) (push // pushAggCrossUnion will try to push the agg down to the union. If the new aggregation's group-by columns doesn't contain unique key. // We will return the new aggregation. Otherwise we will transform the aggregation to projection. -func (*AggregationPushDownSolver) pushAggCrossUnion(agg *LogicalAggregation, unionSchema *expression.Schema, unionChild base.LogicalPlan) (base.LogicalPlan, error) { +func (*AggregationPushDownSolver) pushAggCrossUnion(agg *logicalop.LogicalAggregation, unionSchema *expression.Schema, unionChild base.LogicalPlan) (base.LogicalPlan, error) { ctx := agg.SCtx() - newAgg := LogicalAggregation{ + newAgg := logicalop.LogicalAggregation{ AggFuncs: make([]*aggregation.AggFuncDesc, 0, len(agg.AggFuncs)), GroupByItems: make([]expression.Expression, 0, len(agg.GroupByItems)), PreferAggType: agg.PreferAggType, @@ -446,7 +446,7 @@ func (a *AggregationPushDownSolver) Optimize(_ context.Context, p base.LogicalPl return newLogicalPlan, planChanged, err } -func (a *AggregationPushDownSolver) tryAggPushDownForUnion(union *LogicalUnionAll, agg *LogicalAggregation, opt *optimizetrace.LogicalOptimizeOp) error { +func (a *AggregationPushDownSolver) tryAggPushDownForUnion(union *LogicalUnionAll, agg *logicalop.LogicalAggregation, opt *optimizetrace.LogicalOptimizeOp) error { for _, aggFunc := range agg.AggFuncs { if !a.isDecomposableWithUnion(aggFunc) { return nil @@ -482,7 +482,7 @@ func (a *AggregationPushDownSolver) tryAggPushDownForUnion(union *LogicalUnionAl // aggPushDown tries to push down aggregate functions to join paths. func (a *AggregationPushDownSolver) aggPushDown(p base.LogicalPlan, opt *optimizetrace.LogicalOptimizeOp) (_ base.LogicalPlan, err error) { - if agg, ok := p.(*LogicalAggregation); ok { + if agg, ok := p.(*logicalop.LogicalAggregation); ok { proj := a.tryToEliminateAggregation(agg, opt) if proj != nil { p = proj @@ -537,14 +537,14 @@ func (a *AggregationPushDownSolver) aggPushDown(p base.LogicalPlan, opt *optimiz // Notice that even if we eliminate new agg below if possible, the agg's schema is inherited by proj. // Therefore, we don't need to set the join's schema again, just build the keyInfo again. changed := false - if newAgg, ok1 := lChild.(*LogicalAggregation); ok1 { + if newAgg, ok1 := lChild.(*logicalop.LogicalAggregation); ok1 { proj := a.tryToEliminateAggregation(newAgg, opt) if proj != nil { lChild = proj changed = true } } - if newAgg, ok2 := rChild.(*LogicalAggregation); ok2 { + if newAgg, ok2 := rChild.(*logicalop.LogicalAggregation); ok2 { proj := a.tryToEliminateAggregation(newAgg, opt) if proj != nil { rChild = proj @@ -691,7 +691,7 @@ func (*AggregationPushDownSolver) Name() string { return "aggregation_push_down" } -func appendAggPushDownAcrossJoinTraceStep(oldAgg, newAgg *LogicalAggregation, aggFuncs []*aggregation.AggFuncDesc, join *logicalop.LogicalJoin, +func appendAggPushDownAcrossJoinTraceStep(oldAgg, newAgg *logicalop.LogicalAggregation, aggFuncs []*aggregation.AggFuncDesc, join *logicalop.LogicalJoin, childIdx int, opt *optimizetrace.LogicalOptimizeOp) { evalCtx := oldAgg.SCtx().GetExprCtx().GetEvalCtx() reason := func() string { @@ -718,7 +718,7 @@ func appendAggPushDownAcrossJoinTraceStep(oldAgg, newAgg *LogicalAggregation, ag opt.AppendStepToCurrent(join.ID(), join.TP(), reason, action) } -func appendAggPushDownAcrossProjTraceStep(agg *LogicalAggregation, proj *logicalop.LogicalProjection, opt *optimizetrace.LogicalOptimizeOp) { +func appendAggPushDownAcrossProjTraceStep(agg *logicalop.LogicalAggregation, proj *logicalop.LogicalProjection, opt *optimizetrace.LogicalOptimizeOp) { evalCtx := agg.SCtx().GetExprCtx().GetEvalCtx() action := func() string { buffer := bytes.NewBufferString(fmt.Sprintf("%v_%v is eliminated, and %v_%v's functions changed into[", proj.TP(), proj.ID(), agg.TP(), agg.ID())) @@ -737,7 +737,7 @@ func appendAggPushDownAcrossProjTraceStep(agg *LogicalAggregation, proj *logical opt.AppendStepToCurrent(agg.ID(), agg.TP(), reason, action) } -func appendAggPushDownAcrossUnionTraceStep(union *LogicalUnionAll, agg *LogicalAggregation, opt *optimizetrace.LogicalOptimizeOp) { +func appendAggPushDownAcrossUnionTraceStep(union *LogicalUnionAll, agg *logicalop.LogicalAggregation, opt *optimizetrace.LogicalOptimizeOp) { evalCtx := union.SCtx().GetExprCtx().GetEvalCtx() reason := func() string { buffer := bytes.NewBufferString(fmt.Sprintf("%v_%v functions[", agg.TP(), agg.ID())) diff --git a/pkg/planner/core/rule_aggregation_skew_rewrite.go b/pkg/planner/core/rule_aggregation_skew_rewrite.go index 1f11b68996fc9..f2123902ece00 100644 --- a/pkg/planner/core/rule_aggregation_skew_rewrite.go +++ b/pkg/planner/core/rule_aggregation_skew_rewrite.go @@ -50,7 +50,7 @@ type SkewDistinctAggRewriter struct { // - The aggregate has 1 and only 1 distinct aggregate function (limited to count, avg, sum) // // This rule is disabled by default. Use tidb_opt_skew_distinct_agg to enable the rule. -func (a *SkewDistinctAggRewriter) rewriteSkewDistinctAgg(agg *LogicalAggregation, opt *optimizetrace.LogicalOptimizeOp) base.LogicalPlan { +func (a *SkewDistinctAggRewriter) rewriteSkewDistinctAgg(agg *logicalop.LogicalAggregation, opt *optimizetrace.LogicalOptimizeOp) base.LogicalPlan { // only group aggregate is applicable if len(agg.GroupByItems) == 0 { return nil @@ -194,7 +194,7 @@ func (a *SkewDistinctAggRewriter) rewriteSkewDistinctAgg(agg *LogicalAggregation } // now create the bottom and top aggregate operators - bottomAgg := LogicalAggregation{ + bottomAgg := logicalop.LogicalAggregation{ AggFuncs: bottomAggFuncs, GroupByItems: bottomAggGroupbyItems, PreferAggType: agg.PreferAggType, @@ -202,7 +202,7 @@ func (a *SkewDistinctAggRewriter) rewriteSkewDistinctAgg(agg *LogicalAggregation bottomAgg.SetChildren(agg.Children()...) bottomAgg.SetSchema(bottomAggSchema) - topAgg := LogicalAggregation{ + topAgg := logicalop.LogicalAggregation{ AggFuncs: topAggFuncs, GroupByItems: agg.GroupByItems, PreferAggToCop: agg.PreferAggToCop, @@ -266,7 +266,7 @@ func (*SkewDistinctAggRewriter) isQualifiedAgg(aggFunc *aggregation.AggFuncDesc) } } -func appendSkewDistinctAggRewriteTraceStep(agg *LogicalAggregation, result base.LogicalPlan, opt *optimizetrace.LogicalOptimizeOp) { +func appendSkewDistinctAggRewriteTraceStep(agg *logicalop.LogicalAggregation, result base.LogicalPlan, opt *optimizetrace.LogicalOptimizeOp) { reason := func() string { return fmt.Sprintf("%v_%v has a distinct agg function", agg.TP(), agg.ID()) } @@ -289,7 +289,7 @@ func (a *SkewDistinctAggRewriter) Optimize(ctx context.Context, p base.LogicalPl newChildren = append(newChildren, newChild) } p.SetChildren(newChildren...) - agg, ok := p.(*LogicalAggregation) + agg, ok := p.(*logicalop.LogicalAggregation) if !ok { return p, planChanged, nil } diff --git a/pkg/planner/core/rule_decorrelate.go b/pkg/planner/core/rule_decorrelate.go index 15b977a3af870..9fac608f1bfa2 100644 --- a/pkg/planner/core/rule_decorrelate.go +++ b/pkg/planner/core/rule_decorrelate.go @@ -108,7 +108,7 @@ func extractOuterApplyCorrelatedColsHelper(p base.PhysicalPlan, outerSchemas []* // DecorrelateSolver tries to convert apply plan to join plan. type DecorrelateSolver struct{} -func (*DecorrelateSolver) aggDefaultValueMap(agg *LogicalAggregation) map[int]*expression.Constant { +func (*DecorrelateSolver) aggDefaultValueMap(agg *logicalop.LogicalAggregation) map[int]*expression.Constant { defaultValueMap := make(map[int]*expression.Constant, len(agg.AggFuncs)) for i, f := range agg.AggFuncs { switch f.Name { @@ -230,8 +230,8 @@ func (s *DecorrelateSolver) Optimize(ctx context.Context, p base.LogicalPlan, op appendRemoveLimitTraceStep(li, opt) return s.Optimize(ctx, p, opt) } - } else if agg, ok := innerPlan.(*LogicalAggregation); ok { - if apply.CanPullUpAgg() && agg.canPullUp() { + } else if agg, ok := innerPlan.(*logicalop.LogicalAggregation); ok { + if apply.CanPullUpAgg() && agg.CanPullUp() { innerPlan = agg.Children()[0] apply.JoinType = logicalop.LeftOuterJoin apply.SetChildren(outerPlan, innerPlan) @@ -463,7 +463,7 @@ func appendRemoveSortTraceStep(sort *logicalop.LogicalSort, opt *optimizetrace.L opt.AppendStepToCurrent(sort.ID(), sort.TP(), reason, action) } -func appendPullUpAggTraceStep(p *LogicalApply, np base.LogicalPlan, agg *LogicalAggregation, opt *optimizetrace.LogicalOptimizeOp) { +func appendPullUpAggTraceStep(p *LogicalApply, np base.LogicalPlan, agg *logicalop.LogicalAggregation, opt *optimizetrace.LogicalOptimizeOp) { action := func() string { return fmt.Sprintf("%v_%v pulled up as %v_%v's parent, and %v_%v's join type becomes %v", agg.TP(), agg.ID(), np.TP(), np.ID(), p.TP(), p.ID(), p.JoinType.String()) @@ -485,7 +485,7 @@ func appendAddProjTraceStep(p *LogicalApply, proj *logicalop.LogicalProjection, opt.AppendStepToCurrent(proj.ID(), proj.TP(), reason, action) } -func appendModifyAggTraceStep(outerPlan base.LogicalPlan, p *LogicalApply, agg *LogicalAggregation, sel *logicalop.LogicalSelection, +func appendModifyAggTraceStep(outerPlan base.LogicalPlan, p *LogicalApply, agg *logicalop.LogicalAggregation, sel *logicalop.LogicalSelection, appendedGroupByCols *expression.Schema, appendedAggFuncs []*aggregation.AggFuncDesc, eqCondWithCorCol []*expression.ScalarFunction, opt *optimizetrace.LogicalOptimizeOp) { evalCtx := outerPlan.SCtx().GetExprCtx().GetEvalCtx() diff --git a/pkg/planner/core/rule_eliminate_projection.go b/pkg/planner/core/rule_eliminate_projection.go index ac2e17a358350..f9fb992358e16 100644 --- a/pkg/planner/core/rule_eliminate_projection.go +++ b/pkg/planner/core/rule_eliminate_projection.go @@ -167,7 +167,7 @@ func (pe *ProjectionEliminator) eliminate(p base.LogicalPlan, replace map[string childFlag := canEliminate if _, isUnion := p.(*LogicalUnionAll); isUnion { childFlag = false - } else if _, isAgg := p.(*LogicalAggregation); isAgg || isProj { + } else if _, isAgg := p.(*logicalop.LogicalAggregation); isAgg || isProj { childFlag = true } else if _, isWindow := p.(*logicalop.LogicalWindow); isWindow { childFlag = true diff --git a/pkg/planner/core/rule_generate_column_substitute.go b/pkg/planner/core/rule_generate_column_substitute.go index 97274eb4b03c0..2f593849d0266 100644 --- a/pkg/planner/core/rule_generate_column_substitute.go +++ b/pkg/planner/core/rule_generate_column_substitute.go @@ -204,7 +204,7 @@ func (gc *GcSubstituter) substitute(ctx context.Context, lp base.LogicalPlan, ex tryToSubstituteExpr(&x.ByItems[i].Expr, lp, candidateExpr, tp, x.Schema(), column, opt) } } - case *LogicalAggregation: + case *logicalop.LogicalAggregation: for _, aggFunc := range x.AggFuncs { for i := 0; i < len(aggFunc.Args); i++ { tp = aggFunc.Args[i].GetType(ectx).EvalType() diff --git a/pkg/planner/core/rule_join_elimination.go b/pkg/planner/core/rule_join_elimination.go index 334d8efbbe621..2fa4e41cceb64 100644 --- a/pkg/planner/core/rule_join_elimination.go +++ b/pkg/planner/core/rule_join_elimination.go @@ -174,7 +174,7 @@ func GetDupAgnosticAggCols( p base.LogicalPlan, oldAggCols []*expression.Column, // Reuse the original buffer. ) (isAgg bool, newAggCols []*expression.Column) { - agg, ok := p.(*LogicalAggregation) + agg, ok := p.(*logicalop.LogicalAggregation) if !ok { return false, nil } @@ -219,7 +219,7 @@ func (o *OuterJoinEliminator) doOptimize(p base.LogicalPlan, aggCols []*expressi for _, expr := range x.Exprs { parentCols = append(parentCols, expression.ExtractColumns(expr)...) } - case *LogicalAggregation: + case *logicalop.LogicalAggregation: parentCols = parentCols[:0] for _, groupByItem := range x.GroupByItems { parentCols = append(parentCols, expression.ExtractColumns(groupByItem)...) diff --git a/pkg/planner/core/rule_max_min_eliminate.go b/pkg/planner/core/rule_max_min_eliminate.go index ca2d37ae3c9b9..c6cb256fcf0f0 100644 --- a/pkg/planner/core/rule_max_min_eliminate.go +++ b/pkg/planner/core/rule_max_min_eliminate.go @@ -46,7 +46,7 @@ func (a *MaxMinEliminator) Optimize(_ context.Context, p base.LogicalPlan, opt * } // composeAggsByInnerJoin composes the scalar aggregations by cartesianJoin. -func (*MaxMinEliminator) composeAggsByInnerJoin(originAgg *LogicalAggregation, aggs []*LogicalAggregation, opt *optimizetrace.LogicalOptimizeOp) (plan base.LogicalPlan) { +func (*MaxMinEliminator) composeAggsByInnerJoin(originAgg *logicalop.LogicalAggregation, aggs []*logicalop.LogicalAggregation, opt *optimizetrace.LogicalOptimizeOp) (plan base.LogicalPlan) { plan = aggs[0] sctx := plan.SCtx() joins := make([]*logicalop.LogicalJoin, 0) @@ -142,7 +142,7 @@ func (a *MaxMinEliminator) cloneSubPlans(plan base.LogicalPlan) base.LogicalPlan // `select max(a) from t` + `select min(a) from t` + `select max(b) from t`. // Then we check whether `a` and `b` have indices. If any of the used column has no index, we cannot eliminate // this aggregation. -func (a *MaxMinEliminator) splitAggFuncAndCheckIndices(agg *LogicalAggregation, opt *optimizetrace.LogicalOptimizeOp) (aggs []*LogicalAggregation, canEliminate bool) { +func (a *MaxMinEliminator) splitAggFuncAndCheckIndices(agg *logicalop.LogicalAggregation, opt *optimizetrace.LogicalOptimizeOp) (aggs []*logicalop.LogicalAggregation, canEliminate bool) { for _, f := range agg.AggFuncs { // We must make sure the args of max/min is a simple single column. col, ok := f.Args[0].(*expression.Column) @@ -153,10 +153,10 @@ func (a *MaxMinEliminator) splitAggFuncAndCheckIndices(agg *LogicalAggregation, return nil, false } } - aggs = make([]*LogicalAggregation, 0, len(agg.AggFuncs)) + aggs = make([]*logicalop.LogicalAggregation, 0, len(agg.AggFuncs)) // we can split the aggregation only if all of the aggFuncs pass the check. for i, f := range agg.AggFuncs { - newAgg := LogicalAggregation{AggFuncs: []*aggregation.AggFuncDesc{f}}.Init(agg.SCtx(), agg.QueryBlockOffset()) + newAgg := logicalop.LogicalAggregation{AggFuncs: []*aggregation.AggFuncDesc{f}}.Init(agg.SCtx(), agg.QueryBlockOffset()) newAgg.SetChildren(a.cloneSubPlans(agg.Children()[0])) newAgg.SetSchema(expression.NewSchema(agg.Schema().Columns[i])) // Since LogicalAggregation doesn’t use the parent base.LogicalPlan, passing an incorrect parameter here won’t affect subsequent optimizations. @@ -167,14 +167,14 @@ func (a *MaxMinEliminator) splitAggFuncAndCheckIndices(agg *LogicalAggregation, if p, err = newAgg.PruneColumns([]*expression.Column{newAgg.Schema().Columns[0]}, opt); err != nil { return nil, false } - newAgg = p.(*LogicalAggregation) + newAgg = p.(*logicalop.LogicalAggregation) aggs = append(aggs, newAgg) } return aggs, true } // eliminateSingleMaxMin tries to convert a single max/min to Limit+Sort operators. -func (*MaxMinEliminator) eliminateSingleMaxMin(agg *LogicalAggregation, opt *optimizetrace.LogicalOptimizeOp) *LogicalAggregation { +func (*MaxMinEliminator) eliminateSingleMaxMin(agg *logicalop.LogicalAggregation, opt *optimizetrace.LogicalOptimizeOp) *logicalop.LogicalAggregation { f := agg.AggFuncs[0] child := agg.Children()[0] ctx := agg.SCtx() @@ -225,7 +225,7 @@ func (a *MaxMinEliminator) eliminateMaxMin(p base.LogicalPlan, opt *optimizetrac newChildren = append(newChildren, a.eliminateMaxMin(child, opt)) } p.SetChildren(newChildren...) - if agg, ok := p.(*LogicalAggregation); ok { + if agg, ok := p.(*logicalop.LogicalAggregation); ok { if len(agg.GroupByItems) != 0 { return agg } @@ -266,7 +266,7 @@ func (*MaxMinEliminator) Name() string { return "max_min_eliminate" } -func appendEliminateSingleMaxMinTrace(agg *LogicalAggregation, sel *logicalop.LogicalSelection, sort *logicalop.LogicalSort, limit *logicalop.LogicalLimit, opt *optimizetrace.LogicalOptimizeOp) { +func appendEliminateSingleMaxMinTrace(agg *logicalop.LogicalAggregation, sel *logicalop.LogicalSelection, sort *logicalop.LogicalSort, limit *logicalop.LogicalLimit, opt *optimizetrace.LogicalOptimizeOp) { action := func() string { buffer := bytes.NewBufferString("") if sel != nil { @@ -291,7 +291,7 @@ func appendEliminateSingleMaxMinTrace(agg *LogicalAggregation, sel *logicalop.Lo opt.AppendStepToCurrent(agg.ID(), agg.TP(), reason, action) } -func appendEliminateMultiMinMaxTraceStep(originAgg *LogicalAggregation, aggs []*LogicalAggregation, joins []*logicalop.LogicalJoin, opt *optimizetrace.LogicalOptimizeOp) { +func appendEliminateMultiMinMaxTraceStep(originAgg *logicalop.LogicalAggregation, aggs []*logicalop.LogicalAggregation, joins []*logicalop.LogicalJoin, opt *optimizetrace.LogicalOptimizeOp) { action := func() string { buffer := bytes.NewBufferString(fmt.Sprintf("%v_%v splited into [", originAgg.TP(), originAgg.ID())) for i, agg := range aggs { diff --git a/pkg/planner/core/rule_push_down_sequence.go b/pkg/planner/core/rule_push_down_sequence.go index 69d7f97c9a7cb..ef433dbe78c85 100644 --- a/pkg/planner/core/rule_push_down_sequence.go +++ b/pkg/planner/core/rule_push_down_sequence.go @@ -62,7 +62,7 @@ func (pdss *PushDownSequenceSolver) recursiveOptimize(pushedSequence *logicalop. pushedSequence = logicalop.LogicalSequence{}.Init(lp.SCtx(), lp.QueryBlockOffset()) pushedSequence.SetChildren(append(allCTEs, mainQuery)...) return pdss.recursiveOptimize(pushedSequence, mainQuery) - case *DataSource, *LogicalAggregation, *LogicalCTE: + case *DataSource, *logicalop.LogicalAggregation, *LogicalCTE: pushedSequence.SetChild(pushedSequence.ChildLen()-1, pdss.recursiveOptimize(nil, lp)) return pushedSequence default: diff --git a/pkg/planner/core/rule_semi_join_rewrite.go b/pkg/planner/core/rule_semi_join_rewrite.go index 44d194bcda57c..8cafc180a16dc 100644 --- a/pkg/planner/core/rule_semi_join_rewrite.go +++ b/pkg/planner/core/rule_semi_join_rewrite.go @@ -100,7 +100,7 @@ func (smj *SemiJoinRewriter) recursivePlan(p base.LogicalPlan) (base.LogicalPlan innerChild = sel } - subAgg := LogicalAggregation{ + subAgg := logicalop.LogicalAggregation{ AggFuncs: make([]*aggregation.AggFuncDesc, 0, len(join.EqualConditions)), GroupByItems: make([]expression.Expression, 0, len(join.EqualConditions)), }.Init(p.SCtx(), p.Children()[1].QueryBlockOffset()) @@ -118,7 +118,7 @@ func (smj *SemiJoinRewriter) recursivePlan(p base.LogicalPlan) (base.LogicalPlan } subAgg.SetChildren(innerChild) subAgg.SetSchema(expression.NewSchema(aggOutputCols...)) - subAgg.buildSelfKeyInfo(subAgg.Schema()) + subAgg.BuildSelfKeyInfo(subAgg.Schema()) innerJoin := logicalop.LogicalJoin{ JoinType: logicalop.InnerJoin, diff --git a/pkg/planner/core/stringer.go b/pkg/planner/core/stringer.go index 2273b4aed9659..5a0191b3cecfd 100644 --- a/pkg/planner/core/stringer.go +++ b/pkg/planner/core/stringer.go @@ -64,7 +64,7 @@ func fdToString(in base.LogicalPlan, strs []string, idxs []int) ([]string, []int for _, child := range x.Children() { strs, idxs = fdToString(child, strs, idxs) } - case *LogicalAggregation: + case *logicalop.LogicalAggregation: strs = append(strs, "{"+x.FDs().String()+"}") for _, child := range x.Children() { strs, idxs = fdToString(child, strs, idxs) @@ -248,7 +248,7 @@ func toString(in base.Plan, strs []string, idxs []int) ([]string, []int) { str = "HashAgg" case *PhysicalStreamAgg: str = "StreamAgg" - case *LogicalAggregation: + case *logicalop.LogicalAggregation: str = "Aggr(" for i, aggFunc := range x.AggFuncs { str += aggFunc.StringWithCtx(ectx, perrors.RedactLogDisable) diff --git a/pkg/planner/pattern/pattern.go b/pkg/planner/pattern/pattern.go index c21503139cb22..e2469c585a97e 100644 --- a/pkg/planner/pattern/pattern.go +++ b/pkg/planner/pattern/pattern.go @@ -81,7 +81,7 @@ func GetOperand(p base.LogicalPlan) Operand { return OperandApply case *logicalop.LogicalJoin: return OperandJoin - case *plannercore.LogicalAggregation: + case *logicalop.LogicalAggregation: return OperandAggregation case *logicalop.LogicalProjection: return OperandProjection diff --git a/pkg/planner/pattern/pattern_test.go b/pkg/planner/pattern/pattern_test.go index 707a65b66a73f..0f8b0fbf2bbb0 100644 --- a/pkg/planner/pattern/pattern_test.go +++ b/pkg/planner/pattern/pattern_test.go @@ -24,7 +24,7 @@ import ( func TestGetOperand(t *testing.T) { require.Equal(t, OperandJoin, GetOperand(&logicalop.LogicalJoin{})) - require.Equal(t, OperandAggregation, GetOperand(&plannercore.LogicalAggregation{})) + require.Equal(t, OperandAggregation, GetOperand(&logicalop.LogicalAggregation{})) require.Equal(t, OperandProjection, GetOperand(&logicalop.LogicalProjection{})) require.Equal(t, OperandSelection, GetOperand(&logicalop.LogicalSelection{})) require.Equal(t, OperandApply, GetOperand(&plannercore.LogicalApply{})) diff --git a/pkg/planner/util/utilfuncp/func_pointer_misc.go b/pkg/planner/util/utilfuncp/func_pointer_misc.go index 6159d7cc5a5d0..abcf1117c9821 100644 --- a/pkg/planner/util/utilfuncp/func_pointer_misc.go +++ b/pkg/planner/util/utilfuncp/func_pointer_misc.go @@ -73,19 +73,9 @@ var FindBestTask func(p base.LogicalPlan, prop *property.PhysicalProperty, planC // todo: (7) arenatlx, remove this util func pointer when logical operators are all moved from core to logicalOp. var CanPushToCopImpl func(p base.LogicalPlan, storeTp kv.StoreType, considerDual bool) bool -// GetStreamAggs will be called by baseLogicalPlan in logicalOp pkg. The logic inside covers concrete physical -// operators. -// todo: (8) arenatlx, move this util func pointer to physicalOp when physical operators are all moved. -var GetStreamAggs func(lp base.LogicalPlan, prop *property.PhysicalProperty) []base.PhysicalPlan - -// GetHashAggs will be called by baseLogicalPlan in logicalOp pkg. The logic inside covers concrete physical -// operators. -// todo: (9) arenatlx, move this util func pointer to physicalOp when physical operators are all moved. -var GetHashAggs func(la base.LogicalPlan, prop *property.PhysicalProperty) []base.PhysicalPlan - // PruneByItems will be called by baseLogicalPlan in logicalOp pkg. The logic current exists for rule logic // inside core. -// todo: (10) arenatlx, when rule is moved out of core, we should direct ref the rule.Func instead of this +// todo: (8) arenatlx, when rule is moved out of core, we should direct ref the rule.Func instead of this // util func pointer. var PruneByItems func(p base.LogicalPlan, old []*util.ByItems, opt *optimizetrace.LogicalOptimizeOp) ( byItems []*util.ByItems, parentUsedCols []*expression.Column) @@ -155,6 +145,10 @@ var ExhaustPhysicalPlans4LogicalSelection func(lp base.LogicalPlan, prop *proper var ExhaustPhysicalPlans4LogicalJoin func(lp base.LogicalPlan, prop *property.PhysicalProperty) ( []base.PhysicalPlan, bool, error) +// ExhaustPhysicalPlans4LogicalAggregation will be called by LogicalAggregation in logicalOp pkg. +var ExhaustPhysicalPlans4LogicalAggregation func(lp base.LogicalPlan, prop *property.PhysicalProperty) ( + []base.PhysicalPlan, bool, error) + // *************************************** physical op related ******************************************* // GetEstimatedProbeCntFromProbeParents will be called by BasePhysicalPlan in physicalOp pkg.