Skip to content

Commit

Permalink
planner: fix stream agg pushed down to mpp plan unexpectedly (#32752)
Browse files Browse the repository at this point in the history
close #32632
  • Loading branch information
time-and-fate authored Mar 3, 2022
1 parent 7e8ca4c commit 41c1cc9
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 0 deletions.
57 changes: 57 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6091,3 +6091,60 @@ func TestIssue31240(t *testing.T) {
}
tk.MustExec("drop table if exists t31240")
}

func TestIssue32632(t *testing.T) {
store, dom, clean := testkit.CreateMockStoreAndDomain(t)
defer clean()
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("CREATE TABLE `partsupp` (" +
" `PS_PARTKEY` bigint(20) NOT NULL," +
"`PS_SUPPKEY` bigint(20) NOT NULL," +
"`PS_AVAILQTY` bigint(20) NOT NULL," +
"`PS_SUPPLYCOST` decimal(15,2) NOT NULL," +
"`PS_COMMENT` varchar(199) NOT NULL," +
"PRIMARY KEY (`PS_PARTKEY`,`PS_SUPPKEY`) /*T![clustered_index] NONCLUSTERED */)")
tk.MustExec("CREATE TABLE `supplier` (" +
"`S_SUPPKEY` bigint(20) NOT NULL," +
"`S_NAME` char(25) NOT NULL," +
"`S_ADDRESS` varchar(40) NOT NULL," +
"`S_NATIONKEY` bigint(20) NOT NULL," +
"`S_PHONE` char(15) NOT NULL," +
"`S_ACCTBAL` decimal(15,2) NOT NULL," +
"`S_COMMENT` varchar(101) NOT NULL," +
"PRIMARY KEY (`S_SUPPKEY`) /*T![clustered_index] CLUSTERED */)")
tk.MustExec("analyze table partsupp;")
tk.MustExec("analyze table supplier;")
tk.MustExec("set @@tidb_enforce_mpp = 1")

tbl1, err := dom.InfoSchema().TableByName(model.CIStr{O: "test", L: "test"}, model.CIStr{O: "partsupp", L: "partsupp"})
require.NoError(t, err)
tbl2, err := dom.InfoSchema().TableByName(model.CIStr{O: "test", L: "test"}, model.CIStr{O: "supplier", L: "supplier"})
require.NoError(t, err)
// Set the hacked TiFlash replica for explain tests.
tbl1.Meta().TiFlashReplica = &model.TiFlashReplicaInfo{Count: 1, Available: true}
tbl2.Meta().TiFlashReplica = &model.TiFlashReplicaInfo{Count: 1, Available: true}

h := dom.StatsHandle()
statsTbl1 := h.GetTableStats(tbl1.Meta())
statsTbl1.Count = 800000
statsTbl2 := h.GetTableStats(tbl2.Meta())
statsTbl2.Count = 10000
var input []string
var output []struct {
SQL string
Plan []string
}
integrationSuiteData := core.GetIntegrationSuiteData()
integrationSuiteData.GetTestCases(t, &input, &output)
for i, tt := range input {
testdata.OnRecord(func() {
output[i].SQL = tt
output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery(tt).Rows())
})
tk.MustQuery(tt).Check(testkit.Rows(output[i].Plan...))
}
tk.MustExec("drop table if exists partsupp")
tk.MustExec("drop table if exists supplier")
}
3 changes: 3 additions & 0 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2055,6 +2055,9 @@ func (p *PhysicalStreamAgg) attach2Task(tasks ...task) task {
attachPlan2Task(finalAgg, t)
finalAgg.SetCost(cop.cost())
}
} else if mpp, ok := t.(*mppTask); ok {
t = mpp.convertToRootTask(p.ctx)
attachPlan2Task(p, t)
} else {
attachPlan2Task(p, t)
}
Expand Down
6 changes: 6 additions & 0 deletions planner/core/testdata/integration_suite_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -867,5 +867,11 @@
"desc format = 'brief' select * from ((select count(*) from (select id,name from t order by id)a group by name,id order by id) union all (select id+1 from t order by 1))c",
"desc format = 'brief' select * from (select * from t order by id)a order by name"
]
},
{
"name": "TestIssue32632",
"cases": [
"explain format = 'brief' select sum(ps_supplycost) from partsupp, supplier where ps_suppkey = s_suppkey;"
]
}
]
20 changes: 20 additions & 0 deletions planner/core/testdata/integration_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -6561,5 +6561,25 @@
]
}
]
},
{
"Name": "TestIssue32632",
"Cases": [
{
"SQL": "explain format = 'brief' select sum(ps_supplycost) from partsupp, supplier where ps_suppkey = s_suppkey;",
"Plan": [
"HashAgg 1.00 root funcs:sum(Column#15)->Column#14",
"└─TableReader 1.00 root data:ExchangeSender",
" └─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough",
" └─HashAgg 1.00 batchCop[tiflash] funcs:sum(test.partsupp.ps_supplycost)->Column#15",
" └─Projection 12500.00 batchCop[tiflash] test.partsupp.ps_supplycost",
" └─HashJoin 12500.00 batchCop[tiflash] inner join, equal:[eq(test.supplier.s_suppkey, test.partsupp.ps_suppkey)]",
" ├─ExchangeReceiver(Build) 10000.00 batchCop[tiflash] ",
" │ └─ExchangeSender 10000.00 batchCop[tiflash] ExchangeType: Broadcast",
" │ └─TableFullScan 10000.00 batchCop[tiflash] table:supplier keep order:false",
" └─TableFullScan(Probe) 800000.00 batchCop[tiflash] table:partsupp keep order:false"
]
}
]
}
]

0 comments on commit 41c1cc9

Please sign in to comment.