Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#41370
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
Dousir9 authored and ti-chi-bot committed Feb 16, 2023
1 parent e07fc7f commit 19f9ed7
Show file tree
Hide file tree
Showing 2 changed files with 225 additions and 11 deletions.
173 changes: 173 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8431,3 +8431,176 @@ func TestIssue40285(t *testing.T) {
tk.MustExec("CREATE TABLE t(col1 enum('p5', '9a33x') NOT NULL DEFAULT 'p5',col2 tinyblob DEFAULT NULL) ENGINE = InnoDB DEFAULT CHARSET = latin1 COLLATE = latin1_bin;")
tk.MustQuery("(select last_value(col1) over () as r0 from t) union all (select col2 as r0 from t);")
}
<<<<<<< HEAD
=======

// https://github.com/pingcap/tidb/issues/41273
func TestIssue41273(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec(`CREATE TABLE t (
a set('nwbk','r5','1ad3u','van','ir1z','y','9m','f1','z','e6yd','wfev') NOT NULL DEFAULT 'ir1z,f1,e6yd',
b enum('soo2','4s4j','qi9om','8ue','i71o','qon','3','3feh','6o1i','5yebx','d') NOT NULL DEFAULT '8ue',
c varchar(66) DEFAULT '13mdezixgcn',
PRIMARY KEY (a,b) /*T![clustered_index] CLUSTERED */,
UNIQUE KEY ib(b),
KEY ia(a)
)ENGINE=InnoDB DEFAULT CHARSET=ascii COLLATE=ascii_bin;`)
tk.MustExec("INSERT INTO t VALUES('ir1z,f1,e6yd','i71o','13mdezixgcn'),('ir1z,f1,e6yd','d','13mdezixgcn'),('nwbk','8ue','13mdezixgcn');")
expectedRes := []string{"ir1z,f1,e6yd d 13mdezixgcn", "ir1z,f1,e6yd i71o 13mdezixgcn", "nwbk 8ue 13mdezixgcn"}
tk.MustQuery("select * from t where a between 'e6yd' and 'z' or b <> '8ue';").Sort().Check(testkit.Rows(expectedRes...))
tk.MustQuery("select /*+ use_index_merge(t) */ * from t where a between 'e6yd' and 'z' or b <> '8ue';").Sort().Check(testkit.Rows(expectedRes...))
// For now tidb doesn't support push set type to TiKV, and column a is a set type, so we shouldn't generate a IndexMerge path.
require.False(t, tk.HasPlanForLastExecution("IndexMerge"))
}

func TestIsIPv4ToTiFlash(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(v4 varchar(100), v6 varchar(100))")
tk.MustExec("insert into t values('123.123.123.123', 'F746:C349:48E3:22F2:81E0:0EA8:E7B6:8286')")
tk.MustExec("insert into t values('0.0.0.0', '0000:0000:0000:0000:0000:0000:0000:0000')")
tk.MustExec("insert into t values('127.0.0.1', '2001:0:2851:b9f0:6d:2326:9036:f37a')")

tk.MustExec("set @@tidb_allow_mpp=1; set @@tidb_enforce_mpp=1")
tk.MustExec("set @@tidb_isolation_read_engines = 'tiflash'")

// Create virtual tiflash replica info.
is := dom.InfoSchema()
db, exists := is.SchemaByName(model.NewCIStr("test"))
require.True(t, exists)
for _, tblInfo := range db.Tables {
if tblInfo.Name.L == "t" {
tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{
Count: 1,
Available: true,
}
}
}

rows := [][]interface{}{
{"TableReader_9", "root", "MppVersion: 1, data:ExchangeSender_8"},
{"└─ExchangeSender_8", "mpp[tiflash]", "ExchangeType: PassThrough"},
{" └─Projection_4", "mpp[tiflash]", "is_ipv4(test.t.v4)->Column#4"},
{" └─TableFullScan_7", "mpp[tiflash]", "keep order:false, stats:pseudo"},
}
tk.MustQuery("explain select is_ipv4(v4) from t;").CheckAt([]int{0, 2, 4}, rows)
}

func TestIsIPv6ToTiFlash(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(v4 varchar(100), v6 varchar(100))")
tk.MustExec("insert into t values('123.123.123.123', 'F746:C349:48E3:22F2:81E0:0EA8:E7B6:8286')")
tk.MustExec("insert into t values('0.0.0.0', '0000:0000:0000:0000:0000:0000:0000:0000')")
tk.MustExec("insert into t values('127.0.0.1', '2001:0:2851:b9f0:6d:2326:9036:f37a')")

tk.MustExec("set @@tidb_allow_mpp=1; set @@tidb_enforce_mpp=1")
tk.MustExec("set @@tidb_isolation_read_engines = 'tiflash'")

// Create virtual tiflash replica info.
is := dom.InfoSchema()
db, exists := is.SchemaByName(model.NewCIStr("test"))
require.True(t, exists)
for _, tblInfo := range db.Tables {
if tblInfo.Name.L == "t" {
tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{
Count: 1,
Available: true,
}
}
}

rows := [][]interface{}{
{"TableReader_9", "root", "MppVersion: 1, data:ExchangeSender_8"},
{"└─ExchangeSender_8", "mpp[tiflash]", "ExchangeType: PassThrough"},
{" └─Projection_4", "mpp[tiflash]", "is_ipv6(test.t.v6)->Column#4"},
{" └─TableFullScan_7", "mpp[tiflash]", "keep order:false, stats:pseudo"},
}
tk.MustQuery("explain select is_ipv6(v6) from t;").CheckAt([]int{0, 2, 4}, rows)
}

// https://github.com/pingcap/tidb/issues/41355
// The "virtual generated column" push down is not supported now.
// This test covers: TopN, Projection, Selection.
func TestVirtualExprPushDown(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
tk.MustExec("drop table if exists t;")
tk.MustExec("CREATE TABLE t (c1 int DEFAULT 0, c2 int GENERATED ALWAYS AS (abs(c1)) VIRTUAL);")
tk.MustExec("insert into t(c1) values(1), (-1), (2), (-2), (99), (-99);")
tk.MustExec("set @@tidb_isolation_read_engines = 'tikv'")

// TopN to tikv.
rows := [][]interface{}{
{"TopN_7", "root", "test.t.c2, offset:0, count:2"},
{"└─TableReader_13", "root", "data:TableFullScan_12"},
{" └─TableFullScan_12", "cop[tikv]", "keep order:false, stats:pseudo"},
}
tk.MustQuery("explain select * from t order by c2 limit 2;").CheckAt([]int{0, 2, 4}, rows)

// Projection to tikv.
rows = [][]interface{}{
{"Projection_3", "root", "plus(test.t.c1, test.t.c2)->Column#4"},
{"└─TableReader_5", "root", "data:TableFullScan_4"},
{" └─TableFullScan_4", "cop[tikv]", "keep order:false, stats:pseudo"},
}
tk.MustExec("set session tidb_opt_projection_push_down='ON';")
tk.MustQuery("explain select c1 + c2 from t;").CheckAt([]int{0, 2, 4}, rows)
tk.MustExec("set session tidb_opt_projection_push_down='OFF';")

// Selection to tikv.
rows = [][]interface{}{
{"Selection_7", "root", "gt(test.t.c2, 1)"},
{"└─TableReader_6", "root", "data:TableFullScan_5"},
{" └─TableFullScan_5", "cop[tikv]", "keep order:false, stats:pseudo"},
}
tk.MustQuery("explain select * from t where c2 > 1;").CheckAt([]int{0, 2, 4}, rows)

tk.MustExec("set @@tidb_allow_mpp=1; set @@tidb_enforce_mpp=1")
tk.MustExec("set @@tidb_isolation_read_engines = 'tiflash'")
is := dom.InfoSchema()
db, exists := is.SchemaByName(model.NewCIStr("test"))
require.True(t, exists)
for _, tblInfo := range db.Tables {
if tblInfo.Name.L == "t" {
tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{
Count: 1,
Available: true,
}
}
}

// TopN to tiflash.
rows = [][]interface{}{
{"TopN_7", "root", "test.t.c2, offset:0, count:2"},
{"└─TableReader_15", "root", "data:TableFullScan_14"},
{" └─TableFullScan_14", "cop[tiflash]", "keep order:false, stats:pseudo"},
}
tk.MustQuery("explain select * from t order by c2 limit 2;").CheckAt([]int{0, 2, 4}, rows)

// Projection to tiflash.
rows = [][]interface{}{
{"Projection_3", "root", "plus(test.t.c1, test.t.c2)->Column#4"},
{"└─TableReader_6", "root", "data:TableFullScan_5"},
{" └─TableFullScan_5", "cop[tiflash]", "keep order:false, stats:pseudo"},
}
tk.MustExec("set session tidb_opt_projection_push_down='ON';")
tk.MustQuery("explain select c1 + c2 from t;").CheckAt([]int{0, 2, 4}, rows)
tk.MustExec("set session tidb_opt_projection_push_down='OFF';")

// Selection to tiflash.
rows = [][]interface{}{
{"Selection_8", "root", "gt(test.t.c2, 1)"},
{"└─TableReader_7", "root", "data:TableFullScan_6"},
{" └─TableFullScan_6", "cop[tiflash]", "keep order:false, stats:pseudo"},
}
tk.MustQuery("explain select * from t where c2 > 1;").CheckAt([]int{0, 2, 4}, rows)
}
>>>>>>> d2d91b5d969 (planner: add more checks when pushing TopN down (#41370))
63 changes: 52 additions & 11 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -895,15 +895,6 @@ func (p *PhysicalLimit) sinkIntoIndexLookUp(t task) bool {
return true
}

// canPushDown checks if this topN can be pushed down. If each of the expression can be converted to pb, it can be pushed.
func (p *PhysicalTopN) canPushDown(storeTp kv.StoreType) bool {
exprs := make([]expression.Expression, 0, len(p.ByItems))
for _, item := range p.ByItems {
exprs = append(exprs, item.Expr)
}
return expression.CanExprsPushDown(p.ctx.GetSessionVars().StmtCtx, exprs, p.ctx.GetClient(), storeTp)
}

func (p *PhysicalSort) attach2Task(tasks ...task) task {
t := tasks[0].copy()
t = attachPlan2Task(p, t)
Expand Down Expand Up @@ -955,14 +946,64 @@ func (p *PhysicalTopN) canPushToIndexPlan(indexPlan PhysicalPlan, byItemCols []*
return true
}

// canExpressionConvertedToPB checks whether each of the the expression in TopN can be converted to pb.
func (p *PhysicalTopN) canExpressionConvertedToPB(storeTp kv.StoreType) bool {
exprs := make([]expression.Expression, 0, len(p.ByItems))
for _, item := range p.ByItems {
exprs = append(exprs, item.Expr)
}
return expression.CanExprsPushDown(p.ctx.GetSessionVars().StmtCtx, exprs, p.ctx.GetClient(), storeTp)
}

// containVirtualColumn checks whether TopN.ByItems contains virtual generated columns.
func (p *PhysicalTopN) containVirtualColumn(tCols []*expression.Column) bool {
for _, by := range p.ByItems {
cols := expression.ExtractColumns(by.Expr)
for _, col := range cols {
for _, tCol := range tCols {
// A column with ID > 0 indicates that the column can be resolved by data source.
if tCol.ID > 0 && tCol.ID == col.ID && tCol.VirtualExpr != nil {
return true
}
}
}
}
return false
}

// canPushDownToTiKV checks whether this topN can be pushed down to TiKV.
func (p *PhysicalTopN) canPushDownToTiKV(copTask *copTask) bool {
if !p.canExpressionConvertedToPB(kv.TiKV) {
return false
}
if len(copTask.rootTaskConds) != 0 {
return false
}
if p.containVirtualColumn(copTask.plan().Schema().Columns) {
return false
}
return true
}

// canPushDownToTiFlash checks whether this topN can be pushed down to TiFlash.
func (p *PhysicalTopN) canPushDownToTiFlash(mppTask *mppTask) bool {
if !p.canExpressionConvertedToPB(kv.TiFlash) {
return false
}
if p.containVirtualColumn(mppTask.plan().Schema().Columns) {
return false
}
return true
}

func (p *PhysicalTopN) attach2Task(tasks ...task) task {
t := tasks[0].copy()
cols := make([]*expression.Column, 0, len(p.ByItems))
for _, item := range p.ByItems {
cols = append(cols, expression.ExtractColumns(item.Expr)...)
}
needPushDown := len(cols) > 0
if copTask, ok := t.(*copTask); ok && needPushDown && p.canPushDown(copTask.getStoreType()) && len(copTask.rootTaskConds) == 0 {
if copTask, ok := t.(*copTask); ok && needPushDown && p.canPushDownToTiKV(copTask) {
newTask, changed := p.pushTopNDownToDynamicPartition(copTask)
if changed {
return newTask
Expand All @@ -978,7 +1019,7 @@ func (p *PhysicalTopN) attach2Task(tasks ...task) task {
pushedDownTopN = p.getPushedDownTopN(copTask.tablePlan)
copTask.tablePlan = pushedDownTopN
}
} else if mppTask, ok := t.(*mppTask); ok && needPushDown && p.canPushDown(kv.TiFlash) {
} else if mppTask, ok := t.(*mppTask); ok && needPushDown && p.canPushDownToTiFlash(mppTask) {
pushedDownTopN := p.getPushedDownTopN(mppTask.p)
mppTask.p = pushedDownTopN
}
Expand Down

0 comments on commit 19f9ed7

Please sign in to comment.