From 3a8afe4bed9bbc9c1b762d4f636a410113295ff9 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Mon, 13 Feb 2023 11:00:02 +0800 Subject: [PATCH] revert mpp_gather related code Signed-off-by: guo-shaoge --- executor/builder.go | 21 ----------------- executor/mpp_gather.go | 16 +------------ executor/table_reader.go | 17 +++++--------- executor/tiflashtest/tiflash_test.go | 35 ---------------------------- expression/integration_test.go | 16 ++----------- planner/core/find_best_task.go | 12 ++++------ planner/core/task.go | 11 --------- 7 files changed, 13 insertions(+), 115 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index ba8390bfb5871..132ae66272c69 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -3443,29 +3443,8 @@ func (b *executorBuilder) buildMPPGather(v *plannercore.PhysicalTableReader) Exe startTS: startTs, mppQueryID: kv.MPPQueryID{QueryTs: getMPPQueryTS(b.ctx), LocalQueryID: getMPPQueryID(b.ctx), ServerID: domain.GetDomain(b.ctx).ServerID()}, memTracker: memory.NewTracker(v.ID(), -1), - - // To fill virtual column. - columns: []*model.ColumnInfo{}, - virtualColumnIndex: []int{}, - virtualColumnRetFieldTypes: []*types.FieldType{}, - } - var hasVirtualCol bool - for _, col := range v.Schema().Columns { - if col.VirtualExpr != nil { - hasVirtualCol = true - } - } - if hasVirtualCol { - ts, err := v.GetTableScan() - if err != nil { - b.err = err - return nil - } - gather.columns = ts.Columns - gather.virtualColumnIndex, gather.virtualColumnRetFieldTypes = buildVirtualColumnInfo(gather.Schema(), gather.columns) } gather.memTracker.AttachTo(b.ctx.GetSessionVars().StmtCtx.MemTracker) - return gather } diff --git a/executor/mpp_gather.go b/executor/mpp_gather.go index 700150b79bb89..6460e7e3de267 100644 --- a/executor/mpp_gather.go +++ b/executor/mpp_gather.go @@ -23,11 +23,8 @@ import ( "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/parser/model" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx" - "github.com/pingcap/tidb/table" - "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" @@ -69,10 +66,6 @@ type MPPGather struct { respIter distsql.SelectResult memTracker *memory.Tracker - columns []*model.ColumnInfo - - virtualColumnIndex []int - virtualColumnRetFieldTypes []*types.FieldType } func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment) error { @@ -166,14 +159,7 @@ func (e *MPPGather) Open(ctx context.Context) (err error) { // Next fills data into the chunk passed by its caller. func (e *MPPGather) Next(ctx context.Context, chk *chunk.Chunk) error { err := e.respIter.Next(ctx, chk) - if err != nil { - return err - } - err = table.FillVirtualColumnValue(e.virtualColumnRetFieldTypes, e.virtualColumnIndex, e.schema.Columns, e.columns, e.ctx, chk) - if err != nil { - return err - } - return nil + return errors.Trace(err) } // Close and release the used resources. diff --git a/executor/table_reader.go b/executor/table_reader.go index b7769880473e6..3e29dfe27b053 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -457,20 +457,15 @@ func buildVirtualColumnIndex(schema *expression.Schema, columns []*model.ColumnI return virtualColumnIndex } -func (e *TableReaderExecutor) buildVirtualColumnInfo() { - e.virtualColumnIndex, e.virtualColumnRetFieldTypes = buildVirtualColumnInfo(e.Schema(), e.columns) -} - // buildVirtualColumnInfo saves virtual column indices and sort them in definition order -func buildVirtualColumnInfo(schema *expression.Schema, columns []*model.ColumnInfo) (colIndexs []int, retTypes []*types.FieldType) { - colIndexs = buildVirtualColumnIndex(schema, columns) - if len(colIndexs) > 0 { - retTypes = make([]*types.FieldType, len(colIndexs)) - for i, idx := range colIndexs { - retTypes[i] = schema.Columns[idx].RetType +func (e *TableReaderExecutor) buildVirtualColumnInfo() { + e.virtualColumnIndex = buildVirtualColumnIndex(e.Schema(), e.columns) + if len(e.virtualColumnIndex) > 0 { + e.virtualColumnRetFieldTypes = make([]*types.FieldType, len(e.virtualColumnIndex)) + for i, idx := range e.virtualColumnIndex { + e.virtualColumnRetFieldTypes[i] = e.schema.Columns[idx].RetType } } - return colIndexs, retTypes } type tableResultHandler struct { diff --git a/executor/tiflashtest/tiflash_test.go b/executor/tiflashtest/tiflash_test.go index ee604c81d2aa4..e703d66ed33cc 100644 --- a/executor/tiflashtest/tiflash_test.go +++ b/executor/tiflashtest/tiflash_test.go @@ -1452,38 +1452,3 @@ func TestMPPMemoryTracker(t *testing.T) { require.NotNil(t, err) require.True(t, strings.Contains(err.Error(), "Out Of Memory Quota!")) } - -func TestDisaggregatedTiFlashGenColumn(t *testing.T) { - config.UpdateGlobal(func(conf *config.Config) { - conf.DisaggregatedTiFlash = true - }) - defer config.UpdateGlobal(func(conf *config.Config) { - conf.DisaggregatedTiFlash = false - }) - - store := testkit.CreateMockStore(t, withMockTiFlash(2)) - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec("drop table if exists tt1;") - tk.MustExec("create table tt1(c0 int, c1 varchar(100), c2 varchar(100) AS (lower(c1))) partition by hash(c0) partitions 2;") - tk.MustExec("insert into tt1(c0, c1) values(1, 'ABC'), (2, 'DEF');") - tk.MustExec("alter table tt1 set tiflash replica 1;") - tb := external.GetTableByName(t, tk, "test", "tt1") - err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true) - require.NoError(t, err) - tk.MustQuery("explain select sum(c0) from tt1;").Check(testkit.Rows( - "HashAgg_15 1.00 root funcs:sum(Column#6)->Column#5", - "└─PartitionUnion_16 2.00 root ", - " ├─HashAgg_34 1.00 root funcs:sum(Column#8)->Column#6", - " │ └─TableReader_36 1.00 root MppVersion: 1, data:ExchangeSender_35", - " │ └─ExchangeSender_35 1.00 mpp[tiflash] ExchangeType: PassThrough", - " │ └─HashAgg_21 1.00 mpp[tiflash] funcs:sum(Column#15)->Column#8", - " │ └─Projection_82 10000.00 mpp[tiflash] cast(test.tt1.c0, decimal(10,0) BINARY)->Column#15", - " │ └─TableFullScan_33 10000.00 mpp[tiflash] table:tt1, partition:p0 keep order:false, stats:pseudo", - " └─HashAgg_62 1.00 root funcs:sum(Column#11)->Column#6", - " └─TableReader_64 1.00 root MppVersion: 1, data:ExchangeSender_63", - " └─ExchangeSender_63 1.00 mpp[tiflash] ExchangeType: PassThrough", - " └─HashAgg_49 1.00 mpp[tiflash] funcs:sum(Column#16)->Column#11", - " └─Projection_83 10000.00 mpp[tiflash] cast(test.tt1.c0, decimal(10,0) BINARY)->Column#16", - " └─TableFullScan_61 10000.00 mpp[tiflash] table:tt1, partition:p1 keep order:false, stats:pseudo")) -} diff --git a/expression/integration_test.go b/expression/integration_test.go index f503a8ab16d06..5555de7e0aa62 100644 --- a/expression/integration_test.go +++ b/expression/integration_test.go @@ -3771,31 +3771,19 @@ func TestShardIndexOnTiFlash(t *testing.T) { } } } - tk.MustExec("set @@session.tidb_isolation_read_engines = 'tiflash'") tk.MustExec("set @@session.tidb_enforce_mpp = 1") rows := tk.MustQuery("explain select max(b) from t").Rows() - var hasTableFullScan bool for _, row := range rows { line := fmt.Sprintf("%v", row) - if strings.Contains(line, "TableFullScan") { - hasTableFullScan = true - require.Contains(t, line, "mpp[tiflash]") - } + require.NotContains(t, line, "tiflash") } - require.True(t, hasTableFullScan) - tk.MustExec("set @@session.tidb_enforce_mpp = 0") tk.MustExec("set @@session.tidb_allow_mpp = 0") rows = tk.MustQuery("explain select max(b) from t").Rows() - hasTableFullScan = false for _, row := range rows { line := fmt.Sprintf("%v", row) - if strings.Contains(line, "TableFullScan") { - hasTableFullScan = true - require.Contains(t, line, "tiflash") - } + require.NotContains(t, line, "tiflash") } - require.True(t, hasTableFullScan) } func TestExprPushdownBlacklist(t *testing.T) { diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index 9dce9f74b6f74..f9a4c6e094a81 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -2022,14 +2022,10 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid ds.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because table `" + ds.tableInfo.Name.O + "`is a partition table which is not supported when `@@tidb_partition_prune_mode=static`.") return invalidTask, nil } - if !isDisaggregatedTiFlash || !canMppConvertToRootForDisaggregatedTiFlash { - // Normally, cannot generate mppTask when got virtual column. - // But in disaggregated tiflash mode, we can convert mppTask of TableScan to rootTask directly. - for _, col := range ts.schema.Columns { - if col.VirtualExpr != nil { - ds.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because column `" + col.OrigName + "` is a virtual column which is not supported now.") - return invalidTask, nil - } + for _, col := range ts.schema.Columns { + if col.VirtualExpr != nil { + ds.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because column `" + col.OrigName + "` is a virtual column which is not supported now.") + return invalidTask, nil } } mppTask := &mppTask{ diff --git a/planner/core/task.go b/planner/core/task.go index d9f57b56d62b1..553bc64b9a6b9 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -2187,18 +2187,7 @@ func accumulateNetSeekCost4MPP(p PhysicalPlan) (cost float64) { return } -func tryExpandVirtualColumn(p PhysicalPlan) { - if ts, ok := p.(*PhysicalTableScan); ok { - ts.Columns = ExpandVirtualColumn(ts.Columns, ts.schema, ts.Table.Columns) - return - } - for _, child := range p.Children() { - tryExpandVirtualColumn(child) - } -} - func (t *mppTask) convertToRootTaskImpl(ctx sessionctx.Context) *rootTask { - tryExpandVirtualColumn(t.p) sender := PhysicalExchangeSender{ ExchangeType: tipb.ExchangeType_PassThrough, }.Init(ctx, t.p.statsInfo())