Skip to content

Commit

Permalink
planner: fix wrong request data type when pushing down avg aggfuncs (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
wshwsh12 authored and sre-bot committed Sep 6, 2019
1 parent bde7f44 commit b0a6481
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 2 deletions.
70 changes: 70 additions & 0 deletions planner/core/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1864,3 +1864,73 @@ func (s *testPlanSuite) TestQueryBlockHint(c *C) {
c.Assert(core.ToString(p), Equals, tt.plan, comment)
}
}

func (s *testPlanSuite) TestDAGPlanBuilderSplitAvg(c *C) {
defer testleak.AfterTest(c)()
store, dom, err := newStoreWithBootstrap()
c.Assert(err, IsNil)
defer func() {
dom.Close()
store.Close()
}()
se, err := session.CreateSession4Test(store)
c.Assert(err, IsNil)
_, err = se.Execute(context.Background(), "use test")
c.Assert(err, IsNil)
tests := []struct {
sql string
plan string
}{
{
sql: "select avg(a),avg(b),avg(c) from t",
plan: "TableReader(Table(t)->StreamAgg)->StreamAgg",
},
{
sql: "select /*+ HASH_AGG() */ avg(a),avg(b),avg(c) from t",
plan: "TableReader(Table(t)->HashAgg)->HashAgg",
},
}

for _, tt := range tests {
comment := Commentf("for %s", tt.sql)
stmt, err := s.ParseOneStmt(tt.sql, "", "")
c.Assert(err, IsNil, comment)

core.Preprocess(se, stmt, s.is)
p, err := planner.Optimize(context.TODO(), se, stmt, s.is)
c.Assert(err, IsNil, comment)

c.Assert(core.ToString(p), Equals, tt.plan, comment)
root, ok := p.(core.PhysicalPlan)
if !ok {
continue
}
testDAGPlanBuilderSplitAvg(c, root)
}
}

func testDAGPlanBuilderSplitAvg(c *C, root core.PhysicalPlan) {
if p, ok := root.(*core.PhysicalTableReader); ok {
if p.TablePlans != nil {
baseAgg := p.TablePlans[len(p.TablePlans)-1]
if agg, ok := baseAgg.(*core.PhysicalHashAgg); ok {
for i, aggfunc := range agg.AggFuncs {
c.Assert(agg.Schema().Columns[i].RetType, Equals, aggfunc.RetTp)
}
}
if agg, ok := baseAgg.(*core.PhysicalStreamAgg); ok {
for i, aggfunc := range agg.AggFuncs {
c.Assert(agg.Schema().Columns[i].RetType, Equals, aggfunc.RetTp)
}
}
}
}

childs := root.Children()
if childs == nil {
return
}
for _, son := range childs {
testDAGPlanBuilderSplitAvg(c, son)
}
}
8 changes: 6 additions & 2 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,15 +434,19 @@ func splitCopAvg2CountAndSum(p PhysicalPlan) {
if baseAgg == nil {
return
}

schemaCursor := len(baseAgg.Schema().Columns) - len(baseAgg.GroupByItems)
for i := len(baseAgg.AggFuncs) - 1; i >= 0; i-- {
f := baseAgg.AggFuncs[i]
schemaCursor--
if f.Name == ast.AggFuncAvg {
schemaCursor--
sumAgg := *f
sumAgg.Name = ast.AggFuncSum
sumAgg.RetTp = baseAgg.Schema().Columns[i+1].RetType
sumAgg.RetTp = baseAgg.Schema().Columns[schemaCursor+1].RetType
cntAgg := *f
cntAgg.Name = ast.AggFuncCount
cntAgg.RetTp = baseAgg.Schema().Columns[i].RetType
cntAgg.RetTp = baseAgg.Schema().Columns[schemaCursor].RetType
cntAgg.RetTp.Flag = f.RetTp.Flag
baseAgg.AggFuncs = append(baseAgg.AggFuncs[:i], append([]*aggregation.AggFuncDesc{&cntAgg, &sumAgg}, baseAgg.AggFuncs[i+1:]...)...)
}
Expand Down

0 comments on commit b0a6481

Please sign in to comment.