From d2d91b5d9694eb0154d78b0bb36754b7977e7584 Mon Sep 17 00:00:00 2001 From: Jk Xu <54522439+Dousir9@users.noreply.github.com> Date: Thu, 16 Feb 2023 01:38:01 +0800 Subject: [PATCH] planner: add more checks when pushing TopN down (#41370) close pingcap/tidb#41355 --- planner/core/integration_test.go | 79 ++++++++++++++++++++++++++++++++ planner/core/task.go | 63 ++++++++++++++++++++----- 2 files changed, 131 insertions(+), 11 deletions(-) diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index 30220864b20d1..d8e00e9fdabb3 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -5130,3 +5130,82 @@ func TestIsIPv6ToTiFlash(t *testing.T) { } tk.MustQuery("explain select is_ipv6(v6) from t;").CheckAt([]int{0, 2, 4}, rows) } + +// https://github.com/pingcap/tidb/issues/41355 +// The "virtual generated column" push down is not supported now. +// This test covers: TopN, Projection, Selection. +func TestVirtualExprPushDown(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + tk.MustExec("drop table if exists t;") + tk.MustExec("CREATE TABLE t (c1 int DEFAULT 0, c2 int GENERATED ALWAYS AS (abs(c1)) VIRTUAL);") + tk.MustExec("insert into t(c1) values(1), (-1), (2), (-2), (99), (-99);") + tk.MustExec("set @@tidb_isolation_read_engines = 'tikv'") + + // TopN to tikv. + rows := [][]interface{}{ + {"TopN_7", "root", "test.t.c2, offset:0, count:2"}, + {"└─TableReader_13", "root", "data:TableFullScan_12"}, + {" └─TableFullScan_12", "cop[tikv]", "keep order:false, stats:pseudo"}, + } + tk.MustQuery("explain select * from t order by c2 limit 2;").CheckAt([]int{0, 2, 4}, rows) + + // Projection to tikv. + rows = [][]interface{}{ + {"Projection_3", "root", "plus(test.t.c1, test.t.c2)->Column#4"}, + {"└─TableReader_5", "root", "data:TableFullScan_4"}, + {" └─TableFullScan_4", "cop[tikv]", "keep order:false, stats:pseudo"}, + } + tk.MustExec("set session tidb_opt_projection_push_down='ON';") + tk.MustQuery("explain select c1 + c2 from t;").CheckAt([]int{0, 2, 4}, rows) + tk.MustExec("set session tidb_opt_projection_push_down='OFF';") + + // Selection to tikv. + rows = [][]interface{}{ + {"Selection_7", "root", "gt(test.t.c2, 1)"}, + {"└─TableReader_6", "root", "data:TableFullScan_5"}, + {" └─TableFullScan_5", "cop[tikv]", "keep order:false, stats:pseudo"}, + } + tk.MustQuery("explain select * from t where c2 > 1;").CheckAt([]int{0, 2, 4}, rows) + + tk.MustExec("set @@tidb_allow_mpp=1; set @@tidb_enforce_mpp=1") + tk.MustExec("set @@tidb_isolation_read_engines = 'tiflash'") + is := dom.InfoSchema() + db, exists := is.SchemaByName(model.NewCIStr("test")) + require.True(t, exists) + for _, tblInfo := range db.Tables { + if tblInfo.Name.L == "t" { + tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{ + Count: 1, + Available: true, + } + } + } + + // TopN to tiflash. + rows = [][]interface{}{ + {"TopN_7", "root", "test.t.c2, offset:0, count:2"}, + {"└─TableReader_15", "root", "data:TableFullScan_14"}, + {" └─TableFullScan_14", "cop[tiflash]", "keep order:false, stats:pseudo"}, + } + tk.MustQuery("explain select * from t order by c2 limit 2;").CheckAt([]int{0, 2, 4}, rows) + + // Projection to tiflash. + rows = [][]interface{}{ + {"Projection_3", "root", "plus(test.t.c1, test.t.c2)->Column#4"}, + {"└─TableReader_6", "root", "data:TableFullScan_5"}, + {" └─TableFullScan_5", "cop[tiflash]", "keep order:false, stats:pseudo"}, + } + tk.MustExec("set session tidb_opt_projection_push_down='ON';") + tk.MustQuery("explain select c1 + c2 from t;").CheckAt([]int{0, 2, 4}, rows) + tk.MustExec("set session tidb_opt_projection_push_down='OFF';") + + // Selection to tiflash. + rows = [][]interface{}{ + {"Selection_8", "root", "gt(test.t.c2, 1)"}, + {"└─TableReader_7", "root", "data:TableFullScan_6"}, + {" └─TableFullScan_6", "cop[tiflash]", "keep order:false, stats:pseudo"}, + } + tk.MustQuery("explain select * from t where c2 > 1;").CheckAt([]int{0, 2, 4}, rows) +} diff --git a/planner/core/task.go b/planner/core/task.go index 553bc64b9a6b9..e134ea13ce346 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -895,15 +895,6 @@ func (p *PhysicalLimit) sinkIntoIndexLookUp(t task) bool { return true } -// canPushDown checks if this topN can be pushed down. If each of the expression can be converted to pb, it can be pushed. -func (p *PhysicalTopN) canPushDown(storeTp kv.StoreType) bool { - exprs := make([]expression.Expression, 0, len(p.ByItems)) - for _, item := range p.ByItems { - exprs = append(exprs, item.Expr) - } - return expression.CanExprsPushDown(p.ctx.GetSessionVars().StmtCtx, exprs, p.ctx.GetClient(), storeTp) -} - func (p *PhysicalSort) attach2Task(tasks ...task) task { t := tasks[0].copy() t = attachPlan2Task(p, t) @@ -955,6 +946,56 @@ func (p *PhysicalTopN) canPushToIndexPlan(indexPlan PhysicalPlan, byItemCols []* return true } +// canExpressionConvertedToPB checks whether each of the the expression in TopN can be converted to pb. +func (p *PhysicalTopN) canExpressionConvertedToPB(storeTp kv.StoreType) bool { + exprs := make([]expression.Expression, 0, len(p.ByItems)) + for _, item := range p.ByItems { + exprs = append(exprs, item.Expr) + } + return expression.CanExprsPushDown(p.ctx.GetSessionVars().StmtCtx, exprs, p.ctx.GetClient(), storeTp) +} + +// containVirtualColumn checks whether TopN.ByItems contains virtual generated columns. +func (p *PhysicalTopN) containVirtualColumn(tCols []*expression.Column) bool { + for _, by := range p.ByItems { + cols := expression.ExtractColumns(by.Expr) + for _, col := range cols { + for _, tCol := range tCols { + // A column with ID > 0 indicates that the column can be resolved by data source. + if tCol.ID > 0 && tCol.ID == col.ID && tCol.VirtualExpr != nil { + return true + } + } + } + } + return false +} + +// canPushDownToTiKV checks whether this topN can be pushed down to TiKV. +func (p *PhysicalTopN) canPushDownToTiKV(copTask *copTask) bool { + if !p.canExpressionConvertedToPB(kv.TiKV) { + return false + } + if len(copTask.rootTaskConds) != 0 { + return false + } + if p.containVirtualColumn(copTask.plan().Schema().Columns) { + return false + } + return true +} + +// canPushDownToTiFlash checks whether this topN can be pushed down to TiFlash. +func (p *PhysicalTopN) canPushDownToTiFlash(mppTask *mppTask) bool { + if !p.canExpressionConvertedToPB(kv.TiFlash) { + return false + } + if p.containVirtualColumn(mppTask.plan().Schema().Columns) { + return false + } + return true +} + func (p *PhysicalTopN) attach2Task(tasks ...task) task { t := tasks[0].copy() cols := make([]*expression.Column, 0, len(p.ByItems)) @@ -962,7 +1003,7 @@ func (p *PhysicalTopN) attach2Task(tasks ...task) task { cols = append(cols, expression.ExtractColumns(item.Expr)...) } needPushDown := len(cols) > 0 - if copTask, ok := t.(*copTask); ok && needPushDown && p.canPushDown(copTask.getStoreType()) && len(copTask.rootTaskConds) == 0 { + if copTask, ok := t.(*copTask); ok && needPushDown && p.canPushDownToTiKV(copTask) { newTask, changed := p.pushTopNDownToDynamicPartition(copTask) if changed { return newTask @@ -978,7 +1019,7 @@ func (p *PhysicalTopN) attach2Task(tasks ...task) task { pushedDownTopN = p.getPushedDownTopN(copTask.tablePlan) copTask.tablePlan = pushedDownTopN } - } else if mppTask, ok := t.(*mppTask); ok && needPushDown && p.canPushDown(kv.TiFlash) { + } else if mppTask, ok := t.(*mppTask); ok && needPushDown && p.canPushDownToTiFlash(mppTask) { pushedDownTopN := p.getPushedDownTopN(mppTask.p) mppTask.p = pushedDownTopN }