diff --git a/cmd/explaintest/r/tpch.result b/cmd/explaintest/r/tpch.result index 34dc2a1920320..c27776be71af5 100644 --- a/cmd/explaintest/r/tpch.result +++ b/cmd/explaintest/r/tpch.result @@ -123,7 +123,11 @@ Sort_6 2.94 root tpch.lineitem.l_returnflag:asc, tpch.lineitem.l_linestatus:asc └─Projection_8 2.94 root tpch.lineitem.l_returnflag, tpch.lineitem.l_linestatus, 3_col_0, 3_col_1, 3_col_2, 3_col_3, 3_col_4, 3_col_5, 3_col_6, 3_col_7 └─HashAgg_14 2.94 root group by:col_13, col_14, funcs:sum(col_0), sum(col_1), sum(col_2), sum(col_3), avg(col_4, col_5), avg(col_6, col_7), avg(col_8, col_9), count(col_10), firstrow(col_11), firstrow(col_12) └─TableReader_15 2.94 root data:HashAgg_9 +<<<<<<< HEAD └─HashAgg_9 2.94 cop group by:tpch.lineitem.l_linestatus, tpch.lineitem.l_returnflag, funcs:sum(tpch.lineitem.l_quantity), sum(tpch.lineitem.l_extendedprice), sum(mul(tpch.lineitem.l_extendedprice, minus(1, tpch.lineitem.l_discount))), sum(mul(mul(tpch.lineitem.l_extendedprice, minus(1, tpch.lineitem.l_discount)), plus(1, tpch.lineitem.l_tax))), avg(tpch.lineitem.l_quantity), avg(tpch.lineitem.l_extendedprice), avg(tpch.lineitem.l_discount), count(1), firstrow(tpch.lineitem.l_returnflag), firstrow(tpch.lineitem.l_linestatus) +======= + └─HashAgg_9 2.94 cop group by:tpch.lineitem.l_linestatus, tpch.lineitem.l_returnflag, funcs:sum(tpch.lineitem.l_quantity), sum(tpch.lineitem.l_extendedprice), sum(mul(tpch.lineitem.l_extendedprice, minus(1, tpch.lineitem.l_discount))), sum(mul(mul(tpch.lineitem.l_extendedprice, minus(1, tpch.lineitem.l_discount)), plus(1, tpch.lineitem.l_tax))), count(tpch.lineitem.l_quantity), sum(tpch.lineitem.l_quantity), count(tpch.lineitem.l_extendedprice), sum(tpch.lineitem.l_extendedprice), count(tpch.lineitem.l_discount), sum(tpch.lineitem.l_discount), count(1) +>>>>>>> b239f2f04... planner: split avg to count and sum for TableReader cop task (#11926) └─Selection_13 293795345.00 cop le(tpch.lineitem.l_shipdate, 1998-08-15) └─TableScan_12 300005811.00 cop table:lineitem, range:[-inf,+inf], keep order:false /* @@ -982,7 +986,11 @@ Projection_16 1.00 root div(11_col_0, 7.0) │ └─TableScan_31 300005811.00 cop table:lineitem, range:[-inf,+inf], keep order:false └─HashAgg_40 9943040.00 root group by:col_3, funcs:avg(col_0, col_1), firstrow(col_2) └─TableReader_41 9943040.00 root data:HashAgg_36 +<<<<<<< HEAD └─HashAgg_36 9943040.00 cop group by:tpch.lineitem.l_partkey, funcs:avg(tpch.lineitem.l_quantity), firstrow(tpch.lineitem.l_partkey) +======= + └─HashAgg_36 9943040.00 cop group by:tpch.lineitem.l_partkey, funcs:count(tpch.lineitem.l_quantity), sum(tpch.lineitem.l_quantity) +>>>>>>> b239f2f04... planner: split avg to count and sum for TableReader cop task (#11926) └─TableScan_39 300005811.00 cop table:lineitem, range:[-inf,+inf], keep order:false /* Q18 Large Volume Customer Query diff --git a/planner/core/task.go b/planner/core/task.go index 9a2258a7c3e9d..c9989459630ca 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -200,6 +200,34 @@ func (p *PhysicalMergeJoin) attach2Task(tasks ...task) task { } } +// splitCopAvg2CountAndSum splits the cop avg function to count and sum. +// Now it's only used for TableReader. +func splitCopAvg2CountAndSum(p PhysicalPlan) { + var baseAgg *basePhysicalAgg + if agg, ok := p.(*PhysicalStreamAgg); ok { + baseAgg = &agg.basePhysicalAgg + } + if agg, ok := p.(*PhysicalHashAgg); ok { + baseAgg = &agg.basePhysicalAgg + } + if baseAgg == nil { + return + } + for i := len(baseAgg.AggFuncs) - 1; i >= 0; i-- { + f := baseAgg.AggFuncs[i] + if f.Name == ast.AggFuncAvg { + sumAgg := *f + sumAgg.Name = ast.AggFuncSum + sumAgg.RetTp = baseAgg.Schema().Columns[i+1].RetType + cntAgg := *f + cntAgg.Name = ast.AggFuncCount + cntAgg.RetTp = baseAgg.Schema().Columns[i].RetType + cntAgg.RetTp.Flag = f.RetTp.Flag + baseAgg.AggFuncs = append(baseAgg.AggFuncs[:i], append([]*aggregation.AggFuncDesc{&cntAgg, &sumAgg}, baseAgg.AggFuncs[i+1:]...)...) + } + } +} + // finishCopTask means we close the coprocessor task and create a root task. func finishCopTask(ctx sessionctx.Context, task task) task { t, ok := task.(*copTask) @@ -232,6 +260,7 @@ func finishCopTask(ctx sessionctx.Context, task task) task { p.stats = t.indexPlan.statsInfo() newTask.p = p } else { + splitCopAvg2CountAndSum(t.tablePlan) p := PhysicalTableReader{tablePlan: t.tablePlan}.Init(ctx) p.stats = t.tablePlan.statsInfo() newTask.p = p