From bd5aec2ae9bc720ed9978603bce7585c7b86daf0 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Thu, 9 Feb 2023 13:02:35 +0800 Subject: [PATCH 01/10] planner: fix tiflash cannot find generated column Signed-off-by: guo-shaoge --- parser/mysql/type.go | 1 + planner/core/find_best_task.go | 12 +++--------- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/parser/mysql/type.go b/parser/mysql/type.go index f79be8ab30d96..580fb6b4184f9 100644 --- a/parser/mysql/type.go +++ b/parser/mysql/type.go @@ -74,6 +74,7 @@ const ( PreventNullInsertFlag uint = 1 << 20 /* Prevent this Field from inserting NULL values */ EnumSetAsIntFlag uint = 1 << 21 /* Internal: Used for inferring enum eval type. */ DropColumnIndexFlag uint = 1 << 22 /* Internal: Used for indicate the column is being dropped with index */ + GeneratedColumnFlag uint = 1 << 23 ) // TypeInt24 bounds. diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index 1ecc9f995243c..adb539b53a592 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -2001,15 +2001,9 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid return invalidTask, nil } if ts.StoreType == kv.TiFlash { - for _, col := range ts.schema.Columns { - // In theory, TiFlash does not support virtual expr, but in non-mpp mode, if the cop request only contain table scan, then - // TiDB will fill the virtual column after decoding the cop response(executor.FillVirtualColumnValue), that is to say, the virtual - // columns in Cop request is just a placeholder, so TiFlash can support virtual column in cop request mode. However, virtual column - // with TiDBShard is special, it can be added using create index statement, TiFlash's ddl does not handle create index statement, so - // there is a chance that the TiDBShard's virtual column is not seen by TiFlash, in this case, TiFlash will throw column not found error - if ds.containExprPrefixUk && expression.GcColumnExprIsTidbShard(col.VirtualExpr) { - ds.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because column `" + col.OrigName + "` is a virtual column which is not supported now.") - return invalidTask, nil + for _, col := range ts.Columns { + if col.IsGenerated() && !col.GeneratedStored { + col.AddFlag(mysql.GeneratedColumnFlag) } } } From 7cf54227a779641433c7a4c17a02406f6c574511 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Sun, 12 Feb 2023 14:38:42 +0800 Subject: [PATCH 02/10] fix lint Signed-off-by: guo-shaoge --- parser/mysql/type.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parser/mysql/type.go b/parser/mysql/type.go index 580fb6b4184f9..8a2531d870d3e 100644 --- a/parser/mysql/type.go +++ b/parser/mysql/type.go @@ -74,7 +74,7 @@ const ( PreventNullInsertFlag uint = 1 << 20 /* Prevent this Field from inserting NULL values */ EnumSetAsIntFlag uint = 1 << 21 /* Internal: Used for inferring enum eval type. */ DropColumnIndexFlag uint = 1 << 22 /* Internal: Used for indicate the column is being dropped with index */ - GeneratedColumnFlag uint = 1 << 23 + GeneratedColumnFlag uint = 1 << 23 /* Internal: TiFlash will check this flag and add a placeholder for this column */ ) // TypeInt24 bounds. From 363730d18c32ec13b734d9734b96aef09c59ef16 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Sun, 12 Feb 2023 21:02:58 +0800 Subject: [PATCH 03/10] fill virtual column in mpp_gather Signed-off-by: guo-shaoge --- executor/builder.go | 8 ++++++++ executor/mpp_gather.go | 16 +++++++++++++++- executor/table_reader.go | 17 +++++++++++------ planner/core/find_best_task.go | 2 +- 4 files changed, 35 insertions(+), 8 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 3ebb0e4f3a1e9..74d2c2490476b 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -3401,6 +3401,11 @@ func (b *executorBuilder) buildMPPGather(v *plannercore.PhysicalTableReader) Exe b.err = err return nil } + ts, err := v.GetTableScan() + if err != nil { + b.err = err + return nil + } gather := &MPPGather{ baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()), @@ -3409,8 +3414,11 @@ func (b *executorBuilder) buildMPPGather(v *plannercore.PhysicalTableReader) Exe startTS: startTs, mppQueryID: kv.MPPQueryID{QueryTs: getMPPQueryTS(b.ctx), LocalQueryID: getMPPQueryID(b.ctx), ServerID: domain.GetDomain(b.ctx).ServerID()}, memTracker: memory.NewTracker(v.ID(), -1), + columns: ts.Columns, } + gather.virtualColumnIndex, gather.virtualColumnRetFieldTypes = buildVirtualColumnInfo(gather.Schema(), gather.columns) gather.memTracker.AttachTo(b.ctx.GetSessionVars().StmtCtx.MemTracker) + return gather } diff --git a/executor/mpp_gather.go b/executor/mpp_gather.go index 6460e7e3de267..700150b79bb89 100644 --- a/executor/mpp_gather.go +++ b/executor/mpp_gather.go @@ -23,8 +23,11 @@ import ( "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser/model" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" @@ -66,6 +69,10 @@ type MPPGather struct { respIter distsql.SelectResult memTracker *memory.Tracker + columns []*model.ColumnInfo + + virtualColumnIndex []int + virtualColumnRetFieldTypes []*types.FieldType } func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment) error { @@ -159,7 +166,14 @@ func (e *MPPGather) Open(ctx context.Context) (err error) { // Next fills data into the chunk passed by its caller. func (e *MPPGather) Next(ctx context.Context, chk *chunk.Chunk) error { err := e.respIter.Next(ctx, chk) - return errors.Trace(err) + if err != nil { + return err + } + err = table.FillVirtualColumnValue(e.virtualColumnRetFieldTypes, e.virtualColumnIndex, e.schema.Columns, e.columns, e.ctx, chk) + if err != nil { + return err + } + return nil } // Close and release the used resources. diff --git a/executor/table_reader.go b/executor/table_reader.go index 3e29dfe27b053..b7769880473e6 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -457,15 +457,20 @@ func buildVirtualColumnIndex(schema *expression.Schema, columns []*model.ColumnI return virtualColumnIndex } -// buildVirtualColumnInfo saves virtual column indices and sort them in definition order func (e *TableReaderExecutor) buildVirtualColumnInfo() { - e.virtualColumnIndex = buildVirtualColumnIndex(e.Schema(), e.columns) - if len(e.virtualColumnIndex) > 0 { - e.virtualColumnRetFieldTypes = make([]*types.FieldType, len(e.virtualColumnIndex)) - for i, idx := range e.virtualColumnIndex { - e.virtualColumnRetFieldTypes[i] = e.schema.Columns[idx].RetType + e.virtualColumnIndex, e.virtualColumnRetFieldTypes = buildVirtualColumnInfo(e.Schema(), e.columns) +} + +// buildVirtualColumnInfo saves virtual column indices and sort them in definition order +func buildVirtualColumnInfo(schema *expression.Schema, columns []*model.ColumnInfo) (colIndexs []int, retTypes []*types.FieldType) { + colIndexs = buildVirtualColumnIndex(schema, columns) + if len(colIndexs) > 0 { + retTypes = make([]*types.FieldType, len(colIndexs)) + for i, idx := range colIndexs { + retTypes[i] = schema.Columns[idx].RetType } } + return colIndexs, retTypes } type tableResultHandler struct { diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index adb539b53a592..4e38324e78d9e 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -2023,7 +2023,7 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid return invalidTask, nil } for _, col := range ts.schema.Columns { - if col.VirtualExpr != nil { + if col.VirtualExpr != nil && !isDisaggregatedTiFlash { ds.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because column `" + col.OrigName + "` is a virtual column which is not supported now.") return invalidTask, nil } From 1c3a915b78611594e33749a36bd6fef639a82f37 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Sun, 12 Feb 2023 22:15:01 +0800 Subject: [PATCH 04/10] update ts.Columns when got virtual column Signed-off-by: guo-shaoge --- planner/core/find_best_task.go | 12 ++++++++---- planner/core/task.go | 11 +++++++++++ 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index 4e38324e78d9e..bb509a88a3a47 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -2022,10 +2022,14 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid ds.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because table `" + ds.tableInfo.Name.O + "`is a partition table which is not supported when `@@tidb_partition_prune_mode=static`.") return invalidTask, nil } - for _, col := range ts.schema.Columns { - if col.VirtualExpr != nil && !isDisaggregatedTiFlash { - ds.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because column `" + col.OrigName + "` is a virtual column which is not supported now.") - return invalidTask, nil + if !isDisaggregatedTiFlash || !canMppConvertToRootForDisaggregatedTiFlash { + // Normally, cannot generate mppTask when got virtual column. + // But in disaggregated tiflash mode, we can convert mppTask of TableScan to rootTask directly. + for _, col := range ts.schema.Columns { + if col.VirtualExpr != nil { + ds.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because column `" + col.OrigName + "` is a virtual column which is not supported now.") + return invalidTask, nil + } } } mppTask := &mppTask{ diff --git a/planner/core/task.go b/planner/core/task.go index c0fc7a9bfaad7..89273c07bcbce 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -2151,7 +2151,18 @@ func accumulateNetSeekCost4MPP(p PhysicalPlan) (cost float64) { return } +func tryExpandVirtualColumn(p PhysicalPlan) { + if ts, ok := p.(*PhysicalTableScan); ok { + ts.Columns = ExpandVirtualColumn(ts.Columns, ts.schema, ts.Table.Columns) + return + } + for _, child := range p.Children() { + tryExpandVirtualColumn(child) + } +} + func (t *mppTask) convertToRootTaskImpl(ctx sessionctx.Context) *rootTask { + tryExpandVirtualColumn(t.p) sender := PhysicalExchangeSender{ ExchangeType: tipb.ExchangeType_PassThrough, }.Init(ctx, t.p.statsInfo()) From d6c5552f521d16d256ff8d20f31bf7074c57960a Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Sun, 12 Feb 2023 22:33:37 +0800 Subject: [PATCH 05/10] add case Signed-off-by: guo-shaoge --- executor/tiflashtest/tiflash_test.go | 35 ++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/executor/tiflashtest/tiflash_test.go b/executor/tiflashtest/tiflash_test.go index 0fd41eb4872cc..0bd8d0d0f93eb 100644 --- a/executor/tiflashtest/tiflash_test.go +++ b/executor/tiflashtest/tiflash_test.go @@ -1425,3 +1425,38 @@ func TestMPPMemoryTracker(t *testing.T) { require.NotNil(t, err) require.True(t, strings.Contains(err.Error(), "Out Of Memory Quota!")) } + +func TestDisaggregatedTiFlashGenColumn(t *testing.T) { + config.UpdateGlobal(func(conf *config.Config) { + conf.DisaggregatedTiFlash = true + }) + defer config.UpdateGlobal(func(conf *config.Config) { + conf.DisaggregatedTiFlash = false + }) + + store := testkit.CreateMockStore(t, withMockTiFlash(2)) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists tt1;") + tk.MustExec("create table tt1(c0 int, c1 varchar(100), c2 varchar(100) AS (lower(c1))) partition by hash(c0) partitions 2;") + tk.MustExec("insert into tt1(c0, c1) values(1, 'ABC'), (2, 'DEF');") + tk.MustExec("alter table tt1 set tiflash replica 1;") + tb := external.GetTableByName(t, tk, "test", "tt1") + err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true) + require.NoError(t, err) + tk.MustQuery("explain select sum(c0) from tt1;").Check(testkit.Rows( + "HashAgg_15 1.00 root funcs:sum(Column#6)->Column#5", + "└─PartitionUnion_16 2.00 root ", + " ├─HashAgg_34 1.00 root funcs:sum(Column#8)->Column#6", + " │ └─TableReader_36 1.00 root MppVersion: 1, data:ExchangeSender_35", + " │ └─ExchangeSender_35 1.00 mpp[tiflash] ExchangeType: PassThrough", + " │ └─HashAgg_21 1.00 mpp[tiflash] funcs:sum(Column#15)->Column#8", + " │ └─Projection_82 10000.00 mpp[tiflash] cast(test.tt1.c0, decimal(10,0) BINARY)->Column#15", + " │ └─TableFullScan_33 10000.00 mpp[tiflash] table:tt1, partition:p0 keep order:false, stats:pseudo", + " └─HashAgg_62 1.00 root funcs:sum(Column#11)->Column#6", + " └─TableReader_64 1.00 root MppVersion: 1, data:ExchangeSender_63", + " └─ExchangeSender_63 1.00 mpp[tiflash] ExchangeType: PassThrough", + " └─HashAgg_49 1.00 mpp[tiflash] funcs:sum(Column#16)->Column#11", + " └─Projection_83 10000.00 mpp[tiflash] cast(test.tt1.c0, decimal(10,0) BINARY)->Column#16", + " └─TableFullScan_61 10000.00 mpp[tiflash] table:tt1, partition:p1 keep order:false, stats:pseudo")) +} From 77921e7f77911bf27b18ab7928aab5abfedabd61 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Sun, 12 Feb 2023 23:03:47 +0800 Subject: [PATCH 06/10] fix case Signed-off-by: guo-shaoge --- executor/builder.go | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 74d2c2490476b..eed40236e6c15 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -3401,11 +3401,6 @@ func (b *executorBuilder) buildMPPGather(v *plannercore.PhysicalTableReader) Exe b.err = err return nil } - ts, err := v.GetTableScan() - if err != nil { - b.err = err - return nil - } gather := &MPPGather{ baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()), @@ -3414,9 +3409,22 @@ func (b *executorBuilder) buildMPPGather(v *plannercore.PhysicalTableReader) Exe startTS: startTs, mppQueryID: kv.MPPQueryID{QueryTs: getMPPQueryTS(b.ctx), LocalQueryID: getMPPQueryID(b.ctx), ServerID: domain.GetDomain(b.ctx).ServerID()}, memTracker: memory.NewTracker(v.ID(), -1), - columns: ts.Columns, } - gather.virtualColumnIndex, gather.virtualColumnRetFieldTypes = buildVirtualColumnInfo(gather.Schema(), gather.columns) + var hasVirtualCol bool + for _, col := range v.Schema().Columns { + if col.VirtualExpr != nil { + hasVirtualCol = true + } + } + if hasVirtualCol { + ts, err := v.GetTableScan() + if err != nil { + b.err = err + return nil + } + gather.columns = ts.Columns + gather.virtualColumnIndex, gather.virtualColumnRetFieldTypes = buildVirtualColumnInfo(gather.Schema(), gather.columns) + } gather.memTracker.AttachTo(b.ctx.GetSessionVars().StmtCtx.MemTracker) return gather From eab659ef860910ee16d756631ee07fc767b794f9 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Sun, 12 Feb 2023 23:08:10 +0800 Subject: [PATCH 07/10] fix fmt Signed-off-by: guo-shaoge --- executor/builder.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/executor/builder.go b/executor/builder.go index eed40236e6c15..2599fcf246b26 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -3409,6 +3409,11 @@ func (b *executorBuilder) buildMPPGather(v *plannercore.PhysicalTableReader) Exe startTS: startTs, mppQueryID: kv.MPPQueryID{QueryTs: getMPPQueryTS(b.ctx), LocalQueryID: getMPPQueryID(b.ctx), ServerID: domain.GetDomain(b.ctx).ServerID()}, memTracker: memory.NewTracker(v.ID(), -1), + + // To fill virtual column. + columns: []*model.ColumnInfo{}, + virtualColumnIndex: []int{}, + virtualColumnRetFieldTypes: []*types.FieldType{}, } var hasVirtualCol bool for _, col := range v.Schema().Columns { From fb2bc871063e41e95e029c08761c7a8d342a30b2 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Sun, 12 Feb 2023 23:50:25 +0800 Subject: [PATCH 08/10] fix case Signed-off-by: guo-shaoge --- expression/integration_test.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/expression/integration_test.go b/expression/integration_test.go index 5555de7e0aa62..f503a8ab16d06 100644 --- a/expression/integration_test.go +++ b/expression/integration_test.go @@ -3771,19 +3771,31 @@ func TestShardIndexOnTiFlash(t *testing.T) { } } } + tk.MustExec("set @@session.tidb_isolation_read_engines = 'tiflash'") tk.MustExec("set @@session.tidb_enforce_mpp = 1") rows := tk.MustQuery("explain select max(b) from t").Rows() + var hasTableFullScan bool for _, row := range rows { line := fmt.Sprintf("%v", row) - require.NotContains(t, line, "tiflash") + if strings.Contains(line, "TableFullScan") { + hasTableFullScan = true + require.Contains(t, line, "mpp[tiflash]") + } } + require.True(t, hasTableFullScan) + tk.MustExec("set @@session.tidb_enforce_mpp = 0") tk.MustExec("set @@session.tidb_allow_mpp = 0") rows = tk.MustQuery("explain select max(b) from t").Rows() + hasTableFullScan = false for _, row := range rows { line := fmt.Sprintf("%v", row) - require.NotContains(t, line, "tiflash") + if strings.Contains(line, "TableFullScan") { + hasTableFullScan = true + require.Contains(t, line, "tiflash") + } } + require.True(t, hasTableFullScan) } func TestExprPushdownBlacklist(t *testing.T) { From 3a8afe4bed9bbc9c1b762d4f636a410113295ff9 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Mon, 13 Feb 2023 11:00:02 +0800 Subject: [PATCH 09/10] revert mpp_gather related code Signed-off-by: guo-shaoge --- executor/builder.go | 21 ----------------- executor/mpp_gather.go | 16 +------------ executor/table_reader.go | 17 +++++--------- executor/tiflashtest/tiflash_test.go | 35 ---------------------------- expression/integration_test.go | 16 ++----------- planner/core/find_best_task.go | 12 ++++------ planner/core/task.go | 11 --------- 7 files changed, 13 insertions(+), 115 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index ba8390bfb5871..132ae66272c69 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -3443,29 +3443,8 @@ func (b *executorBuilder) buildMPPGather(v *plannercore.PhysicalTableReader) Exe startTS: startTs, mppQueryID: kv.MPPQueryID{QueryTs: getMPPQueryTS(b.ctx), LocalQueryID: getMPPQueryID(b.ctx), ServerID: domain.GetDomain(b.ctx).ServerID()}, memTracker: memory.NewTracker(v.ID(), -1), - - // To fill virtual column. - columns: []*model.ColumnInfo{}, - virtualColumnIndex: []int{}, - virtualColumnRetFieldTypes: []*types.FieldType{}, - } - var hasVirtualCol bool - for _, col := range v.Schema().Columns { - if col.VirtualExpr != nil { - hasVirtualCol = true - } - } - if hasVirtualCol { - ts, err := v.GetTableScan() - if err != nil { - b.err = err - return nil - } - gather.columns = ts.Columns - gather.virtualColumnIndex, gather.virtualColumnRetFieldTypes = buildVirtualColumnInfo(gather.Schema(), gather.columns) } gather.memTracker.AttachTo(b.ctx.GetSessionVars().StmtCtx.MemTracker) - return gather } diff --git a/executor/mpp_gather.go b/executor/mpp_gather.go index 700150b79bb89..6460e7e3de267 100644 --- a/executor/mpp_gather.go +++ b/executor/mpp_gather.go @@ -23,11 +23,8 @@ import ( "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/parser/model" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx" - "github.com/pingcap/tidb/table" - "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" @@ -69,10 +66,6 @@ type MPPGather struct { respIter distsql.SelectResult memTracker *memory.Tracker - columns []*model.ColumnInfo - - virtualColumnIndex []int - virtualColumnRetFieldTypes []*types.FieldType } func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment) error { @@ -166,14 +159,7 @@ func (e *MPPGather) Open(ctx context.Context) (err error) { // Next fills data into the chunk passed by its caller. func (e *MPPGather) Next(ctx context.Context, chk *chunk.Chunk) error { err := e.respIter.Next(ctx, chk) - if err != nil { - return err - } - err = table.FillVirtualColumnValue(e.virtualColumnRetFieldTypes, e.virtualColumnIndex, e.schema.Columns, e.columns, e.ctx, chk) - if err != nil { - return err - } - return nil + return errors.Trace(err) } // Close and release the used resources. diff --git a/executor/table_reader.go b/executor/table_reader.go index b7769880473e6..3e29dfe27b053 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -457,20 +457,15 @@ func buildVirtualColumnIndex(schema *expression.Schema, columns []*model.ColumnI return virtualColumnIndex } -func (e *TableReaderExecutor) buildVirtualColumnInfo() { - e.virtualColumnIndex, e.virtualColumnRetFieldTypes = buildVirtualColumnInfo(e.Schema(), e.columns) -} - // buildVirtualColumnInfo saves virtual column indices and sort them in definition order -func buildVirtualColumnInfo(schema *expression.Schema, columns []*model.ColumnInfo) (colIndexs []int, retTypes []*types.FieldType) { - colIndexs = buildVirtualColumnIndex(schema, columns) - if len(colIndexs) > 0 { - retTypes = make([]*types.FieldType, len(colIndexs)) - for i, idx := range colIndexs { - retTypes[i] = schema.Columns[idx].RetType +func (e *TableReaderExecutor) buildVirtualColumnInfo() { + e.virtualColumnIndex = buildVirtualColumnIndex(e.Schema(), e.columns) + if len(e.virtualColumnIndex) > 0 { + e.virtualColumnRetFieldTypes = make([]*types.FieldType, len(e.virtualColumnIndex)) + for i, idx := range e.virtualColumnIndex { + e.virtualColumnRetFieldTypes[i] = e.schema.Columns[idx].RetType } } - return colIndexs, retTypes } type tableResultHandler struct { diff --git a/executor/tiflashtest/tiflash_test.go b/executor/tiflashtest/tiflash_test.go index ee604c81d2aa4..e703d66ed33cc 100644 --- a/executor/tiflashtest/tiflash_test.go +++ b/executor/tiflashtest/tiflash_test.go @@ -1452,38 +1452,3 @@ func TestMPPMemoryTracker(t *testing.T) { require.NotNil(t, err) require.True(t, strings.Contains(err.Error(), "Out Of Memory Quota!")) } - -func TestDisaggregatedTiFlashGenColumn(t *testing.T) { - config.UpdateGlobal(func(conf *config.Config) { - conf.DisaggregatedTiFlash = true - }) - defer config.UpdateGlobal(func(conf *config.Config) { - conf.DisaggregatedTiFlash = false - }) - - store := testkit.CreateMockStore(t, withMockTiFlash(2)) - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec("drop table if exists tt1;") - tk.MustExec("create table tt1(c0 int, c1 varchar(100), c2 varchar(100) AS (lower(c1))) partition by hash(c0) partitions 2;") - tk.MustExec("insert into tt1(c0, c1) values(1, 'ABC'), (2, 'DEF');") - tk.MustExec("alter table tt1 set tiflash replica 1;") - tb := external.GetTableByName(t, tk, "test", "tt1") - err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true) - require.NoError(t, err) - tk.MustQuery("explain select sum(c0) from tt1;").Check(testkit.Rows( - "HashAgg_15 1.00 root funcs:sum(Column#6)->Column#5", - "└─PartitionUnion_16 2.00 root ", - " ├─HashAgg_34 1.00 root funcs:sum(Column#8)->Column#6", - " │ └─TableReader_36 1.00 root MppVersion: 1, data:ExchangeSender_35", - " │ └─ExchangeSender_35 1.00 mpp[tiflash] ExchangeType: PassThrough", - " │ └─HashAgg_21 1.00 mpp[tiflash] funcs:sum(Column#15)->Column#8", - " │ └─Projection_82 10000.00 mpp[tiflash] cast(test.tt1.c0, decimal(10,0) BINARY)->Column#15", - " │ └─TableFullScan_33 10000.00 mpp[tiflash] table:tt1, partition:p0 keep order:false, stats:pseudo", - " └─HashAgg_62 1.00 root funcs:sum(Column#11)->Column#6", - " └─TableReader_64 1.00 root MppVersion: 1, data:ExchangeSender_63", - " └─ExchangeSender_63 1.00 mpp[tiflash] ExchangeType: PassThrough", - " └─HashAgg_49 1.00 mpp[tiflash] funcs:sum(Column#16)->Column#11", - " └─Projection_83 10000.00 mpp[tiflash] cast(test.tt1.c0, decimal(10,0) BINARY)->Column#16", - " └─TableFullScan_61 10000.00 mpp[tiflash] table:tt1, partition:p1 keep order:false, stats:pseudo")) -} diff --git a/expression/integration_test.go b/expression/integration_test.go index f503a8ab16d06..5555de7e0aa62 100644 --- a/expression/integration_test.go +++ b/expression/integration_test.go @@ -3771,31 +3771,19 @@ func TestShardIndexOnTiFlash(t *testing.T) { } } } - tk.MustExec("set @@session.tidb_isolation_read_engines = 'tiflash'") tk.MustExec("set @@session.tidb_enforce_mpp = 1") rows := tk.MustQuery("explain select max(b) from t").Rows() - var hasTableFullScan bool for _, row := range rows { line := fmt.Sprintf("%v", row) - if strings.Contains(line, "TableFullScan") { - hasTableFullScan = true - require.Contains(t, line, "mpp[tiflash]") - } + require.NotContains(t, line, "tiflash") } - require.True(t, hasTableFullScan) - tk.MustExec("set @@session.tidb_enforce_mpp = 0") tk.MustExec("set @@session.tidb_allow_mpp = 0") rows = tk.MustQuery("explain select max(b) from t").Rows() - hasTableFullScan = false for _, row := range rows { line := fmt.Sprintf("%v", row) - if strings.Contains(line, "TableFullScan") { - hasTableFullScan = true - require.Contains(t, line, "tiflash") - } + require.NotContains(t, line, "tiflash") } - require.True(t, hasTableFullScan) } func TestExprPushdownBlacklist(t *testing.T) { diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index 9dce9f74b6f74..f9a4c6e094a81 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -2022,14 +2022,10 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid ds.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because table `" + ds.tableInfo.Name.O + "`is a partition table which is not supported when `@@tidb_partition_prune_mode=static`.") return invalidTask, nil } - if !isDisaggregatedTiFlash || !canMppConvertToRootForDisaggregatedTiFlash { - // Normally, cannot generate mppTask when got virtual column. - // But in disaggregated tiflash mode, we can convert mppTask of TableScan to rootTask directly. - for _, col := range ts.schema.Columns { - if col.VirtualExpr != nil { - ds.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because column `" + col.OrigName + "` is a virtual column which is not supported now.") - return invalidTask, nil - } + for _, col := range ts.schema.Columns { + if col.VirtualExpr != nil { + ds.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because column `" + col.OrigName + "` is a virtual column which is not supported now.") + return invalidTask, nil } } mppTask := &mppTask{ diff --git a/planner/core/task.go b/planner/core/task.go index d9f57b56d62b1..553bc64b9a6b9 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -2187,18 +2187,7 @@ func accumulateNetSeekCost4MPP(p PhysicalPlan) (cost float64) { return } -func tryExpandVirtualColumn(p PhysicalPlan) { - if ts, ok := p.(*PhysicalTableScan); ok { - ts.Columns = ExpandVirtualColumn(ts.Columns, ts.schema, ts.Table.Columns) - return - } - for _, child := range p.Children() { - tryExpandVirtualColumn(child) - } -} - func (t *mppTask) convertToRootTaskImpl(ctx sessionctx.Context) *rootTask { - tryExpandVirtualColumn(t.p) sender := PhysicalExchangeSender{ ExchangeType: tipb.ExchangeType_PassThrough, }.Init(ctx, t.p.statsInfo()) From b76c9c242038ab246060002a1b20bc84c5091185 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Mon, 13 Feb 2023 15:36:42 +0800 Subject: [PATCH 10/10] fix case Signed-off-by: guo-shaoge --- expression/integration_test.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/expression/integration_test.go b/expression/integration_test.go index 5555de7e0aa62..bb8a0d622bf66 100644 --- a/expression/integration_test.go +++ b/expression/integration_test.go @@ -3771,18 +3771,23 @@ func TestShardIndexOnTiFlash(t *testing.T) { } } } + tk.MustExec("set @@session.tidb_isolation_read_engines = 'tiflash'") tk.MustExec("set @@session.tidb_enforce_mpp = 1") rows := tk.MustQuery("explain select max(b) from t").Rows() for _, row := range rows { line := fmt.Sprintf("%v", row) - require.NotContains(t, line, "tiflash") + if strings.Contains(line, "TableFullScan") { + require.Contains(t, line, "tiflash") + } } tk.MustExec("set @@session.tidb_enforce_mpp = 0") tk.MustExec("set @@session.tidb_allow_mpp = 0") rows = tk.MustQuery("explain select max(b) from t").Rows() for _, row := range rows { line := fmt.Sprintf("%v", row) - require.NotContains(t, line, "tiflash") + if strings.Contains(line, "TableFullScan") { + require.NotContains(t, line, "mpp[tiflash]") + } } }