Skip to content

Commit

Permalink
planner: enforce projection push down (pingcap#25450)
Browse files Browse the repository at this point in the history
  • Loading branch information
fzhedu authored and LittleFall committed Jul 20, 2021
1 parent 05ec09f commit 68cdb73
Show file tree
Hide file tree
Showing 6 changed files with 289 additions and 108 deletions.
142 changes: 142 additions & 0 deletions executor/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,148 @@ func (s *tiflashTestSuite) TestInjectExtraProj(c *C) {
tk.MustQuery("select avg(a), a from t group by a").Check(testkit.Rows("9223372036854775807.0000 9223372036854775807"))
}

func (s *tiflashTestSuite) TestTiFlashPartitionTableShuffledHashJoin(c *C) {
if israce.RaceEnabled {
c.Skip("exhaustive types test, skip race test")
}

tk := testkit.NewTestKit(c, s.store)
tk.MustExec(`create database tiflash_partition_SHJ`)
tk.MustExec("use tiflash_partition_SHJ")
tk.MustExec(`create table thash (a int, b int) partition by hash(a) partitions 4`)
tk.MustExec(`create table trange (a int, b int) partition by range(a) (
partition p0 values less than (100), partition p1 values less than (200),
partition p2 values less than (300), partition p3 values less than (400))`)
listPartitions := make([]string, 4)
for i := 0; i < 400; i++ {
idx := i % 4
if listPartitions[idx] != "" {
listPartitions[idx] += ", "
}
listPartitions[idx] = listPartitions[idx] + fmt.Sprintf("%v", i)
}
tk.MustExec(`create table tlist (a int, b int) partition by list(a) (
partition p0 values in (` + listPartitions[0] + `), partition p1 values in (` + listPartitions[1] + `),
partition p2 values in (` + listPartitions[2] + `), partition p3 values in (` + listPartitions[3] + `))`)
tk.MustExec(`create table tnormal (a int, b int)`)

for _, tbl := range []string{`thash`, `trange`, `tlist`, `tnormal`} {
tk.MustExec("alter table " + tbl + " set tiflash replica 1")
tb := testGetTableByName(c, tk.Se, "tiflash_partition_SHJ", tbl)
err := domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true)
c.Assert(err, IsNil)
}

vals := make([]string, 0, 100)
for i := 0; i < 100; i++ {
vals = append(vals, fmt.Sprintf("(%v, %v)", rand.Intn(400), rand.Intn(400)))
}
for _, tbl := range []string{`thash`, `trange`, `tlist`, `tnormal`} {
tk.MustExec(fmt.Sprintf("insert into %v values %v", tbl, strings.Join(vals, ", ")))
tk.MustExec(fmt.Sprintf("analyze table %v", tbl))
}

tk.MustExec("SET tidb_enforce_mpp=1")
tk.MustExec("SET tidb_opt_broadcast_join=0")
tk.MustExec("SET tidb_broadcast_join_threshold_count=0")
tk.MustExec("SET tidb_broadcast_join_threshold_size=0")
tk.MustExec("set @@session.tidb_isolation_read_engines='tiflash'")
// mock executor does not support use outer table as build side for outer join, so need to
// force the inner table as build side
tk.MustExec("set tidb_opt_mpp_outer_join_fixed_build_side=1")

lr := func() (int, int) {
l, r := rand.Intn(400), rand.Intn(400)
if l > r {
l, r = r, l
}
return l, r
}
for i := 0; i < 2; i++ {
l1, r1 := lr()
l2, r2 := lr()
cond := fmt.Sprintf("t1.b>=%v and t1.b<=%v and t2.b>=%v and t2.b<=%v", l1, r1, l2, r2)
var res [][]interface{}
for _, mode := range []string{"static", "dynamic"} {
tk.MustExec(fmt.Sprintf("set @@tidb_partition_prune_mode = '%v'", mode))
for _, tbl := range []string{`thash`, `trange`, `tlist`, `tnormal`} {
q := fmt.Sprintf("select count(*) from %v t1 join %v t2 on t1.a=t2.a where %v", tbl, tbl, cond)
if res == nil {
res = tk.MustQuery(q).Sort().Rows()
} else {
tk.MustQuery(q).Check(res)
}
}
}
}
}

func (s *tiflashTestSuite) TestTiFlashPartitionTableReader(c *C) {
if israce.RaceEnabled {
c.Skip("exhaustive types test, skip race test")
}

tk := testkit.NewTestKit(c, s.store)
tk.MustExec(`create database tiflash_partition_tablereader`)
tk.MustExec("use tiflash_partition_tablereader")
tk.MustExec(`create table thash (a int, b int) partition by hash(a) partitions 4`)
tk.MustExec(`create table trange (a int, b int) partition by range(a) (
partition p0 values less than (100), partition p1 values less than (200),
partition p2 values less than (300), partition p3 values less than (400))`)
listPartitions := make([]string, 4)
for i := 0; i < 400; i++ {
idx := i % 4
if listPartitions[idx] != "" {
listPartitions[idx] += ", "
}
listPartitions[idx] = listPartitions[idx] + fmt.Sprintf("%v", i)
}
tk.MustExec(`create table tlist (a int, b int) partition by list(a) (
partition p0 values in (` + listPartitions[0] + `), partition p1 values in (` + listPartitions[1] + `),
partition p2 values in (` + listPartitions[2] + `), partition p3 values in (` + listPartitions[3] + `))`)
tk.MustExec(`create table tnormal (a int, b int)`)

for _, tbl := range []string{`thash`, `trange`, `tlist`, `tnormal`} {
tk.MustExec("alter table " + tbl + " set tiflash replica 1")
tb := testGetTableByName(c, tk.Se, "tiflash_partition_tablereader", tbl)
err := domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true)
c.Assert(err, IsNil)
}
// mock executor does not support use outer table as build side for outer join, so need to
// force the inner table as build side
tk.MustExec("set tidb_opt_mpp_outer_join_fixed_build_side=1")

vals := make([]string, 0, 500)
for i := 0; i < 500; i++ {
vals = append(vals, fmt.Sprintf("(%v, %v)", rand.Intn(400), rand.Intn(400)))
}
for _, tbl := range []string{`thash`, `trange`, `tlist`, `tnormal`} {
tk.MustExec(fmt.Sprintf("insert into %v values %v", tbl, strings.Join(vals, ", ")))
}

tk.MustExec("SET tidb_enforce_mpp=1")
tk.MustExec("set @@session.tidb_isolation_read_engines='tiflash'")
for i := 0; i < 10; i++ {
l, r := rand.Intn(400), rand.Intn(400)
if l > r {
l, r = r, l
}
cond := fmt.Sprintf("a>=%v and a<=%v", l, r)
var res [][]interface{}
for _, mode := range []string{"static", "dynamic"} {
tk.MustExec(fmt.Sprintf("set @@tidb_partition_prune_mode = '%v'", mode))
for _, tbl := range []string{"thash", "trange", "tlist", "tnormal"} {
q := fmt.Sprintf("select * from %v where %v", tbl, cond)
if res == nil {
res = tk.MustQuery(q).Sort().Rows()
} else {
tk.MustQuery(q).Sort().Check(res)
}
}
}
}
}

func (s *tiflashTestSuite) TestPartitionTable(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
26 changes: 19 additions & 7 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -2008,13 +2008,25 @@ func (p *LogicalProjection) exhaustPhysicalPlans(prop *property.PhysicalProperty
if !ok {
return nil, true
}
proj := PhysicalProjection{
Exprs: p.Exprs,
CalculateNoDelay: p.CalculateNoDelay,
AvoidColumnEvaluator: p.AvoidColumnEvaluator,
}.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), p.blockOffset, newProp)
proj.SetSchema(p.schema)
return []PhysicalPlan{proj}, true
newProps := []*property.PhysicalProperty{newProp}
// generate a mpp task candidate if enforced mpp
if newProp.TaskTp != property.MppTaskType && p.SCtx().GetSessionVars().IsMPPEnforced() && p.canPushToCop(kv.TiFlash) &&
expression.CanExprsPushDown(p.SCtx().GetSessionVars().StmtCtx, p.Exprs, p.SCtx().GetClient(), kv.TiFlash) {
mppProp := newProp.CloneEssentialFields()
mppProp.TaskTp = property.MppTaskType
newProps = append(newProps, mppProp)
}
ret := make([]PhysicalPlan, 0, len(newProps))
for _, newProp := range newProps {
proj := PhysicalProjection{
Exprs: p.Exprs,
CalculateNoDelay: p.CalculateNoDelay,
AvoidColumnEvaluator: p.AvoidColumnEvaluator,
}.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), p.blockOffset, newProp)
proj.SetSchema(p.schema)
ret = append(ret, proj)
}
return ret, true, nil
}

func (lt *LogicalTopN) getPhysTopN(prop *property.PhysicalProperty) []PhysicalPlan {
Expand Down
2 changes: 1 addition & 1 deletion planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3147,7 +3147,7 @@ func (s *testIntegrationSerialSuite) TestPushDownProjectionForMPP(c *C) {
}
}

tk.MustExec("set @@tidb_allow_mpp=1; set @@tidb_opt_broadcast_join=0;")
tk.MustExec("set @@tidb_allow_mpp=1; set @@tidb_opt_broadcast_join=0; set @@tidb_enforce_mpp=1;")

var input []string
var output []struct {
Expand Down
Loading

0 comments on commit 68cdb73

Please sign in to comment.