Skip to content

Commit

Permalink
planner: add more checks when pushing TopN down (#41370) (#41467)
Browse files Browse the repository at this point in the history
close #41355
  • Loading branch information
ti-chi-bot authored Mar 29, 2023
1 parent aac228a commit 38b33e5
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 11 deletions.
81 changes: 81 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6714,6 +6714,87 @@ func TestAutoIncrementCheckWithCheckConstraint(t *testing.T) {
)`)
}

// https://github.com/pingcap/tidb/issues/41355
// The "virtual generated column" push down is not supported now.
// This test covers: TopN, Projection, Selection.
func TestVirtualExprPushDown(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
tk.MustExec("drop table if exists t;")
tk.MustExec("CREATE TABLE t (c1 int DEFAULT 0, c2 int GENERATED ALWAYS AS (abs(c1)) VIRTUAL);")
tk.MustExec("insert into t(c1) values(1), (-1), (2), (-2), (99), (-99);")
tk.MustExec("set @@tidb_isolation_read_engines = 'tikv'")

// TopN to tikv.
rows := [][]interface{}{
{"TopN_7", "root", "test.t.c2, offset:0, count:2"},
{"└─TableReader_13", "root", "data:TableFullScan_12"},
{" └─TableFullScan_12", "cop[tikv]", "keep order:false, stats:pseudo"},
}
tk.MustQuery("explain select * from t order by c2 limit 2;").CheckAt([]int{0, 2, 4}, rows)

// Projection to tikv.
rows = [][]interface{}{
{"Projection_3", "root", "plus(test.t.c1, test.t.c2)->Column#4"},
{"└─TableReader_5", "root", "data:TableFullScan_4"},
{" └─TableFullScan_4", "cop[tikv]", "keep order:false, stats:pseudo"},
}
tk.MustExec("set session tidb_opt_projection_push_down='ON';")
tk.MustQuery("explain select c1 + c2 from t;").CheckAt([]int{0, 2, 4}, rows)
tk.MustExec("set session tidb_opt_projection_push_down='OFF';")

// Selection to tikv.
rows = [][]interface{}{
{"Selection_7", "root", "gt(test.t.c2, 1)"},
{"└─TableReader_6", "root", "data:TableFullScan_5"},
{" └─TableFullScan_5", "cop[tikv]", "keep order:false, stats:pseudo"},
}
tk.MustQuery("explain select * from t where c2 > 1;").CheckAt([]int{0, 2, 4}, rows)

tk.MustExec("set @@tidb_allow_mpp=1; set @@tidb_enforce_mpp=1")
tk.MustExec("set @@tidb_isolation_read_engines = 'tiflash'")
dom := domain.GetDomain(tk.Session())
is := dom.InfoSchema()
db, exists := is.SchemaByName(model.NewCIStr("test"))
require.True(t, exists)
for _, tblInfo := range db.Tables {
if tblInfo.Name.L == "t" {
tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{
Count: 1,
Available: true,
}
}
}

// TopN to tiflash.
rows = [][]interface{}{
{"TopN_7", "root", "test.t.c2, offset:0, count:2"},
{"└─TableReader_15", "root", "data:TableFullScan_14"},
{" └─TableFullScan_14", "cop[tiflash]", "keep order:false, stats:pseudo"},
}
tk.MustQuery("explain select * from t order by c2 limit 2;").CheckAt([]int{0, 2, 4}, rows)

// Projection to tiflash.
rows = [][]interface{}{
{"Projection_3", "root", "plus(test.t.c1, test.t.c2)->Column#4"},
{"└─TableReader_6", "root", "data:TableFullScan_5"},
{" └─TableFullScan_5", "cop[tiflash]", "keep order:false, stats:pseudo"},
}
tk.MustExec("set session tidb_opt_projection_push_down='ON';")
tk.MustQuery("explain select c1 + c2 from t;").CheckAt([]int{0, 2, 4}, rows)
tk.MustExec("set session tidb_opt_projection_push_down='OFF';")

// Selection to tiflash.
rows = [][]interface{}{
{"Selection_8", "root", "gt(test.t.c2, 1)"},
{"└─TableReader_7", "root", "data:TableFullScan_6"},
{" └─TableFullScan_6", "cop[tiflash]", "keep order:false, stats:pseudo"},
}
tk.MustQuery("explain select * from t where c2 > 1;").CheckAt([]int{0, 2, 4}, rows)
}

// https://github.com/pingcap/tidb/issues/41273
func TestIssue41273(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
Expand Down
63 changes: 52 additions & 11 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -941,15 +941,6 @@ func (p *PhysicalLimit) sinkIntoIndexLookUp(t task) bool {
return true
}

// canPushDown checks if this topN can be pushed down. If each of the expression can be converted to pb, it can be pushed.
func (p *PhysicalTopN) canPushDown(storeTp kv.StoreType) bool {
exprs := make([]expression.Expression, 0, len(p.ByItems))
for _, item := range p.ByItems {
exprs = append(exprs, item.Expr)
}
return expression.CanExprsPushDown(p.ctx.GetSessionVars().StmtCtx, exprs, p.ctx.GetClient(), storeTp)
}

func (p *PhysicalSort) attach2Task(tasks ...task) task {
t := tasks[0].copy()
t = attachPlan2Task(p, t)
Expand Down Expand Up @@ -1003,6 +994,56 @@ func (p *PhysicalTopN) canPushToIndexPlan(indexPlan PhysicalPlan, byItemCols []*
return true
}

// canExpressionConvertedToPB checks whether each of the the expression in TopN can be converted to pb.
func (p *PhysicalTopN) canExpressionConvertedToPB(storeTp kv.StoreType) bool {
exprs := make([]expression.Expression, 0, len(p.ByItems))
for _, item := range p.ByItems {
exprs = append(exprs, item.Expr)
}
return expression.CanExprsPushDown(p.ctx.GetSessionVars().StmtCtx, exprs, p.ctx.GetClient(), storeTp)
}

// containVirtualColumn checks whether TopN.ByItems contains virtual generated columns.
func (p *PhysicalTopN) containVirtualColumn(tCols []*expression.Column) bool {
for _, by := range p.ByItems {
cols := expression.ExtractColumns(by.Expr)
for _, col := range cols {
for _, tCol := range tCols {
// A column with ID > 0 indicates that the column can be resolved by data source.
if tCol.ID > 0 && tCol.ID == col.ID && tCol.VirtualExpr != nil {
return true
}
}
}
}
return false
}

// canPushDownToTiKV checks whether this topN can be pushed down to TiKV.
func (p *PhysicalTopN) canPushDownToTiKV(copTask *copTask) bool {
if !p.canExpressionConvertedToPB(kv.TiKV) {
return false
}
if len(copTask.rootTaskConds) != 0 {
return false
}
if p.containVirtualColumn(copTask.plan().Schema().Columns) {
return false
}
return true
}

// canPushDownToTiFlash checks whether this topN can be pushed down to TiFlash.
func (p *PhysicalTopN) canPushDownToTiFlash(mppTask *mppTask) bool {
if !p.canExpressionConvertedToPB(kv.TiFlash) {
return false
}
if p.containVirtualColumn(mppTask.plan().Schema().Columns) {
return false
}
return true
}

func (p *PhysicalTopN) attach2Task(tasks ...task) task {
t := tasks[0].copy()
inputCount := t.count()
Expand All @@ -1011,7 +1052,7 @@ func (p *PhysicalTopN) attach2Task(tasks ...task) task {
cols = append(cols, expression.ExtractColumns(item.Expr)...)
}
needPushDown := len(cols) > 0
if copTask, ok := t.(*copTask); ok && needPushDown && p.canPushDown(copTask.getStoreType()) && len(copTask.rootTaskConds) == 0 {
if copTask, ok := t.(*copTask); ok && needPushDown && p.canPushDownToTiKV(copTask) && len(copTask.rootTaskConds) == 0 {
// If all columns in topN are from index plan, we push it to index plan, otherwise we finish the index plan and
// push it to table plan.
var pushedDownTopN *PhysicalTopN
Expand All @@ -1024,7 +1065,7 @@ func (p *PhysicalTopN) attach2Task(tasks ...task) task {
copTask.tablePlan = pushedDownTopN
}
copTask.addCost(pushedDownTopN.GetCost(inputCount, false))
} else if mppTask, ok := t.(*mppTask); ok && needPushDown && p.canPushDown(kv.TiFlash) {
} else if mppTask, ok := t.(*mppTask); ok && needPushDown && p.canPushDownToTiFlash(mppTask) {
pushedDownTopN := p.getPushedDownTopN(mppTask.p)
mppTask.addCost(pushedDownTopN.GetCost(mppTask.p.StatsCount(), false))
pushedDownTopN.SetCost(mppTask.cst)
Expand Down

0 comments on commit 38b33e5

Please sign in to comment.