Skip to content

Commit

Permalink
planner: split avg to count and sum for TableReader cop task (pingcap…
Browse files Browse the repository at this point in the history
  • Loading branch information
lzmhhh123 committed Jan 17, 2020
1 parent 2b917dd commit 49b9bb0
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 0 deletions.
8 changes: 8 additions & 0 deletions cmd/explaintest/r/tpch.result
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,11 @@ Sort_6 2.94 root tpch.lineitem.l_returnflag:asc, tpch.lineitem.l_linestatus:asc
└─Projection_8 2.94 root tpch.lineitem.l_returnflag, tpch.lineitem.l_linestatus, 3_col_0, 3_col_1, 3_col_2, 3_col_3, 3_col_4, 3_col_5, 3_col_6, 3_col_7
└─HashAgg_14 2.94 root group by:col_13, col_14, funcs:sum(col_0), sum(col_1), sum(col_2), sum(col_3), avg(col_4, col_5), avg(col_6, col_7), avg(col_8, col_9), count(col_10), firstrow(col_11), firstrow(col_12)
└─TableReader_15 2.94 root data:HashAgg_9
<<<<<<< HEAD
└─HashAgg_9 2.94 cop group by:tpch.lineitem.l_linestatus, tpch.lineitem.l_returnflag, funcs:sum(tpch.lineitem.l_quantity), sum(tpch.lineitem.l_extendedprice), sum(mul(tpch.lineitem.l_extendedprice, minus(1, tpch.lineitem.l_discount))), sum(mul(mul(tpch.lineitem.l_extendedprice, minus(1, tpch.lineitem.l_discount)), plus(1, tpch.lineitem.l_tax))), avg(tpch.lineitem.l_quantity), avg(tpch.lineitem.l_extendedprice), avg(tpch.lineitem.l_discount), count(1), firstrow(tpch.lineitem.l_returnflag), firstrow(tpch.lineitem.l_linestatus)
=======
└─HashAgg_9 2.94 cop group by:tpch.lineitem.l_linestatus, tpch.lineitem.l_returnflag, funcs:sum(tpch.lineitem.l_quantity), sum(tpch.lineitem.l_extendedprice), sum(mul(tpch.lineitem.l_extendedprice, minus(1, tpch.lineitem.l_discount))), sum(mul(mul(tpch.lineitem.l_extendedprice, minus(1, tpch.lineitem.l_discount)), plus(1, tpch.lineitem.l_tax))), count(tpch.lineitem.l_quantity), sum(tpch.lineitem.l_quantity), count(tpch.lineitem.l_extendedprice), sum(tpch.lineitem.l_extendedprice), count(tpch.lineitem.l_discount), sum(tpch.lineitem.l_discount), count(1)
>>>>>>> b239f2f04... planner: split avg to count and sum for TableReader cop task (#11926)
└─Selection_13 293795345.00 cop le(tpch.lineitem.l_shipdate, 1998-08-15)
└─TableScan_12 300005811.00 cop table:lineitem, range:[-inf,+inf], keep order:false
/*
Expand Down Expand Up @@ -982,7 +986,11 @@ Projection_16 1.00 root div(11_col_0, 7.0)
│ └─TableScan_31 300005811.00 cop table:lineitem, range:[-inf,+inf], keep order:false
└─HashAgg_40 9943040.00 root group by:col_3, funcs:avg(col_0, col_1), firstrow(col_2)
└─TableReader_41 9943040.00 root data:HashAgg_36
<<<<<<< HEAD
└─HashAgg_36 9943040.00 cop group by:tpch.lineitem.l_partkey, funcs:avg(tpch.lineitem.l_quantity), firstrow(tpch.lineitem.l_partkey)
=======
└─HashAgg_36 9943040.00 cop group by:tpch.lineitem.l_partkey, funcs:count(tpch.lineitem.l_quantity), sum(tpch.lineitem.l_quantity)
>>>>>>> b239f2f04... planner: split avg to count and sum for TableReader cop task (#11926)
└─TableScan_39 300005811.00 cop table:lineitem, range:[-inf,+inf], keep order:false
/*
Q18 Large Volume Customer Query
Expand Down
29 changes: 29 additions & 0 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,34 @@ func (p *PhysicalMergeJoin) attach2Task(tasks ...task) task {
}
}

// splitCopAvg2CountAndSum splits the cop avg function to count and sum.
// Now it's only used for TableReader.
func splitCopAvg2CountAndSum(p PhysicalPlan) {
var baseAgg *basePhysicalAgg
if agg, ok := p.(*PhysicalStreamAgg); ok {
baseAgg = &agg.basePhysicalAgg
}
if agg, ok := p.(*PhysicalHashAgg); ok {
baseAgg = &agg.basePhysicalAgg
}
if baseAgg == nil {
return
}
for i := len(baseAgg.AggFuncs) - 1; i >= 0; i-- {
f := baseAgg.AggFuncs[i]
if f.Name == ast.AggFuncAvg {
sumAgg := *f
sumAgg.Name = ast.AggFuncSum
sumAgg.RetTp = baseAgg.Schema().Columns[i+1].RetType
cntAgg := *f
cntAgg.Name = ast.AggFuncCount
cntAgg.RetTp = baseAgg.Schema().Columns[i].RetType
cntAgg.RetTp.Flag = f.RetTp.Flag
baseAgg.AggFuncs = append(baseAgg.AggFuncs[:i], append([]*aggregation.AggFuncDesc{&cntAgg, &sumAgg}, baseAgg.AggFuncs[i+1:]...)...)
}
}
}

// finishCopTask means we close the coprocessor task and create a root task.
func finishCopTask(ctx sessionctx.Context, task task) task {
t, ok := task.(*copTask)
Expand Down Expand Up @@ -232,6 +260,7 @@ func finishCopTask(ctx sessionctx.Context, task task) task {
p.stats = t.indexPlan.statsInfo()
newTask.p = p
} else {
splitCopAvg2CountAndSum(t.tablePlan)
p := PhysicalTableReader{tablePlan: t.tablePlan}.Init(ctx)
p.stats = t.tablePlan.statsInfo()
newTask.p = p
Expand Down

0 comments on commit 49b9bb0

Please sign in to comment.