From fc624926793172d66feb23e5837bdc7a83ecdd50 Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Wed, 12 Jan 2022 08:06:37 +0100 Subject: [PATCH] WIP add Physical Table ID as column in result. Table Partition Dynamic Prune Mode uses a single TableReader meaning it is not currently possible to know which partition a row/record comes from. By adding ExtraPhysTblID column this can be used when the row needs the Physical Table ID (partition id) for SelectLock or checking current transaction buffer (if the row has been changed/deleted). --- cmd/explaintest/r/generated_columns.result | 28 ++++++++- cmd/explaintest/t/generated_columns.test | 23 ++++++- distsql/request_builder.go | 1 + distsql/select_result.go | 19 ++++++ executor/builder.go | 7 ++- executor/executor.go | 1 + executor/table_reader.go | 9 ++- executor/union_scan.go | 29 ++++++++- parser/model/model.go | 20 ++++++ planner/core/integration_test.go | 6 +- planner/core/logical_plan_builder.go | 61 ++++++++++++++++--- planner/core/planbuilder.go | 2 +- planner/core/rule_column_pruning.go | 16 +++++ planner/core/util.go | 1 + sessionctx/variable/session.go | 5 -- .../unistore/cophandler/closure_exec.go | 28 +++++++++ .../unistore/cophandler/cop_handler.go | 4 ++ 17 files changed, 237 insertions(+), 23 deletions(-) diff --git a/cmd/explaintest/r/generated_columns.result b/cmd/explaintest/r/generated_columns.result index 970f00880ac5d..8378023b0f57a 100644 --- a/cmd/explaintest/r/generated_columns.result +++ b/cmd/explaintest/r/generated_columns.result @@ -1,4 +1,3 @@ -set @@tidb_partition_prune_mode='dynamic'; DROP TABLE IF EXISTS person; CREATE TABLE person ( id INT NOT NULL AUTO_INCREMENT PRIMARY KEY, @@ -90,6 +89,8 @@ Projection 5.00 root test.sgc1.j1, test.sgc1.j2, test.sgc1.a, test.sgc1.b, test └─TableReader(Probe) 5.00 root data:Selection └─Selection 5.00 cop[tikv] not(isnull(test.sgc1.a)) └─TableFullScan 5.00 cop[tikv] table:sgc1 keep order:false +set @old_prune_mode = @@tidb_partition_prune_mode; +set @@tidb_partition_prune_mode='static'; DROP TABLE IF EXISTS sgc3; CREATE TABLE sgc3 ( j JSON, @@ -136,6 +137,31 @@ PartitionUnion 23263.33 root └─TableReader 3323.33 root data:Selection └─Selection 3323.33 cop[tikv] lt(test.sgc3.a, 7) └─TableFullScan 10000.00 cop[tikv] table:sgc3, partition:max keep order:false, stats:pseudo +set @@tidb_partition_prune_mode='dynamic'; +DROP TABLE sgc3; +CREATE TABLE sgc3 ( +j JSON, +a INT AS (JSON_EXTRACT(j, "$.a")) STORED +) +PARTITION BY RANGE (a) ( +PARTITION p0 VALUES LESS THAN (1), +PARTITION p1 VALUES LESS THAN (2), +PARTITION p2 VALUES LESS THAN (3), +PARTITION p3 VALUES LESS THAN (4), +PARTITION p4 VALUES LESS THAN (5), +PARTITION p5 VALUES LESS THAN (6), +PARTITION max VALUES LESS THAN MAXVALUE); +EXPLAIN format = 'brief' SELECT * FROM sgc3 WHERE a <= 1; +id estRows task access object operator info +TableReader 3323.33 root partition:p0,p1 data:Selection +└─Selection 3323.33 cop[tikv] le(test.sgc3.a, 1) + └─TableFullScan 10000.00 cop[tikv] table:sgc3 keep order:false, stats:pseudo +EXPLAIN format = 'brief' SELECT * FROM sgc3 WHERE a < 7; +id estRows task access object operator info +TableReader 3323.33 root partition:all data:Selection +└─Selection 3323.33 cop[tikv] lt(test.sgc3.a, 7) + └─TableFullScan 10000.00 cop[tikv] table:sgc3 keep order:false, stats:pseudo +set @@tidb_partition_prune_mode = @old_prune_mode; DROP TABLE IF EXISTS t1; CREATE TABLE t1(a INT, b INT AS (a+1) VIRTUAL, c INT AS (b+1) VIRTUAL, d INT AS (c+1) VIRTUAL, KEY(b), INDEX IDX(c, d)); INSERT INTO t1 (a) VALUES (0); diff --git a/cmd/explaintest/t/generated_columns.test b/cmd/explaintest/t/generated_columns.test index 82dfcf4d1d8c8..c007f8ff42e66 100644 --- a/cmd/explaintest/t/generated_columns.test +++ b/cmd/explaintest/t/generated_columns.test @@ -2,7 +2,6 @@ -- Most of the cases are ported from other tests to make sure generated columns behaves the same. -- Stored generated columns as indices -set @@tidb_partition_prune_mode='dynamic'; DROP TABLE IF EXISTS person; CREATE TABLE person ( @@ -74,6 +73,8 @@ EXPLAIN format = 'brief' SELECT * from sgc1 join sgc2 on sgc1.a=sgc2.a; -- Stored generated columns as partition columns +set @old_prune_mode = @@tidb_partition_prune_mode; +set @@tidb_partition_prune_mode='static'; DROP TABLE IF EXISTS sgc3; CREATE TABLE sgc3 ( j JSON, @@ -91,6 +92,26 @@ PARTITION max VALUES LESS THAN MAXVALUE); EXPLAIN format = 'brief' SELECT * FROM sgc3 WHERE a <= 1; EXPLAIN format = 'brief' SELECT * FROM sgc3 WHERE a < 7; +set @@tidb_partition_prune_mode='dynamic'; +DROP TABLE sgc3; +CREATE TABLE sgc3 ( +j JSON, +a INT AS (JSON_EXTRACT(j, "$.a")) STORED +) +PARTITION BY RANGE (a) ( +PARTITION p0 VALUES LESS THAN (1), +PARTITION p1 VALUES LESS THAN (2), +PARTITION p2 VALUES LESS THAN (3), +PARTITION p3 VALUES LESS THAN (4), +PARTITION p4 VALUES LESS THAN (5), +PARTITION p5 VALUES LESS THAN (6), +PARTITION max VALUES LESS THAN MAXVALUE); + +EXPLAIN format = 'brief' SELECT * FROM sgc3 WHERE a <= 1; +EXPLAIN format = 'brief' SELECT * FROM sgc3 WHERE a < 7; + +set @@tidb_partition_prune_mode = @old_prune_mode; + -- Virtual generated columns as indices DROP TABLE IF EXISTS t1; diff --git a/distsql/request_builder.go b/distsql/request_builder.go index 03f6034bc513e..57fdd1c6573b9 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -133,6 +133,7 @@ func (builder *RequestBuilder) SetTableHandles(tid int64, handles []kv.Handle) * // SetPartitionsAndHandles sets "KeyRanges" for "kv.Request" by converting ParitionHandles to KeyRanges. // handles in slice must be kv.PartitionHandle. +// TODO: What is this? func (builder *RequestBuilder) SetPartitionsAndHandles(handles []kv.Handle) *RequestBuilder { builder.Request.KeyRanges = PartitionHandlesToKVRanges(handles) return builder diff --git a/distsql/select_result.go b/distsql/select_result.go index b2eef6e6f5300..36cb9d4362365 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -216,12 +216,31 @@ func (r *selectResult) fetchResp(ctx context.Context) error { } return nil } + s := fmt.Sprintf("%v", resultSubset) + logutil.Logger(ctx).Info("resultSubset", zap.String("resultSubset", s)) + logutil.Logger(ctx).Info("resultSubset", zap.ByteString("data", resultSubset.GetData())) + logutil.Logger(ctx).Info("resultSubset", zap.Binary("data", resultSubset.GetData())) + b := resultSubset.GetData() + i := make([]int8, len(b)) + for j := range b { + i[j] = int8(b[j]) + } + logutil.Logger(ctx).Info("resultSubset", zap.Int8s("data", i)) + logutil.Logger(ctx).Info("resultSubset", zap.ByteString("startKey", resultSubset.GetStartKey())) + logutil.Logger(ctx).Info("resultSubset", zap.Binary("startKey", resultSubset.GetStartKey())) + b = resultSubset.GetStartKey() + i = make([]int8, len(b)) + for j := range b { + i[j] = int8(b[j]) + } + logutil.Logger(ctx).Info("resultSubset", zap.Int8s("startKey", i)) r.selectResp = new(tipb.SelectResponse) err = r.selectResp.Unmarshal(resultSubset.GetData()) if err != nil { return errors.Trace(err) } respSize := int64(r.selectResp.Size()) + logutil.Logger(ctx).Info("respSize", zap.Int64("size", respSize)) atomic.StoreInt64(&r.selectRespSize, respSize) r.memConsume(respSize) if err := r.selectResp.Error; err != nil { diff --git a/executor/builder.go b/executor/builder.go index ebf4176db00ba..6493a0e81ee4f 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -3071,7 +3071,7 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea e.feedback = statistics.NewQueryFeedback(getFeedbackStatsTableID(e.ctx, tbl), ts.Hist, int64(ts.StatsCount()), ts.Desc) } collect := statistics.CollectFeedback(b.ctx.GetSessionVars().StmtCtx, e.feedback, len(ts.Ranges)) - // Do not collect the feedback when the table is the partition table. + // Do not collect the feedback when the table is the partition table. TODO: not true for dynamic? if collect && tbl.Meta().Partition != nil { collect = false } @@ -3096,7 +3096,8 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea func extraPIDColumnIndex(schema *expression.Schema) offsetOptional { for idx, col := range schema.Columns { - if col.ID == model.ExtraPidColID { + // TODO: Handle partitioned global index, i.e. both ExtraPidColID and ExtraPhysTblID is used. + if col.ID == model.ExtraPidColID || col.ID == model.ExtraPhysTblID { return newOffset(idx) } } @@ -3169,6 +3170,7 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) E tmp, _ := b.is.TableByID(ts.Table.ID) tbl := tmp.(table.PartitionedTable) + // TODO: Should this be done before checking dirty / ongoing transactions for avoiding adding PID columns? partitions, err := partitionPruning(b.ctx, tbl, v.PartitionInfo.PruningConds, v.PartitionInfo.PartitionNames, v.PartitionInfo.Columns, v.PartitionInfo.ColumnNames) if err != nil { b.err = err @@ -3802,6 +3804,7 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte return nil, err } var kvRanges []kv.KeyRange + // TODO: mjonss: study this :) if len(lookUpContents) > 0 && keyColumnsIncludeAllPartitionColumns(lookUpContents[0].keyCols, pe) { // In this case we can use dynamic partition pruning. locateKey := make([]types.Datum, e.Schema().Len()) diff --git a/executor/executor.go b/executor/executor.go index 6f4867438134a..ad88d469a72fa 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -939,6 +939,7 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error { if len(e.partitionedTable) > 0 { // Replace the table ID with partition ID. // The partition ID is returned as an extra column from the table reader. + // TODO: any easier way? if offset, ok := e.tblID2PIDColumnIndex[id]; ok { physicalID = row.GetInt64(offset) } diff --git a/executor/table_reader.go b/executor/table_reader.go index b31f07bb79d66..c9bc68a0b0aa6 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/tidb/util/ranger" "github.com/pingcap/tidb/util/stringutil" "github.com/pingcap/tipb/go-tipb" + "go.uber.org/zap" ) // make sure `TableReaderExecutor` implements `Executor`. @@ -244,9 +245,13 @@ func (e *TableReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error // When 'select ... for update' work on a partitioned table, the table reader should // add the partition ID as an extra column. The SelectLockExec need this information // to construct the lock key. - physicalID := getPhysicalTableID(e.table) if e.extraPIDColumnIndex.valid() { - fillExtraPIDColumn(req, e.extraPIDColumnIndex.value(), physicalID) + physicalID := getPhysicalTableID(e.table) + if physicalID != e.table.Meta().ID { + // table partition in static prune mode (one TableReaderExecutor per partition) + logutil.BgLogger().Info("MJONSS: TableReaderExecutor.Next()", zap.Int64("physicalID", physicalID)) + fillExtraPIDColumn(req, e.extraPIDColumnIndex.value(), physicalID) + } } return nil diff --git a/executor/union_scan.go b/executor/union_scan.go index 86b696a8ee988..a4cab2bf08bbe 100644 --- a/executor/union_scan.go +++ b/executor/union_scan.go @@ -29,6 +29,8 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/collate" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" ) // UnionScanExec merges the rows from dirty table and the rows from distsql request. @@ -219,11 +221,30 @@ func (us *UnionScanExec) getSnapshotRow(ctx context.Context) ([]types.Datum, err var err error us.cursor4SnapshotRows = 0 us.snapshotRows = us.snapshotRows[:0] + physTblIDIdx := -1 + fts := make([]*types.FieldType, len(us.columns)) + //colNames := make([]string, len(us.columns)) + //colIds := make([]int64, len(us.columns)) + for i := range us.columns { + fts[i] = &us.columns[i].FieldType + //colNames = append(colNames, us.columns[i].Name.O) + //colIds = append(colIds, us.columns[i].ID) + if us.columns[i].ID == model.ExtraPhysTblID { + if physTblIDIdx >= 0 { + logutil.Logger(ctx).Warn("More than one ExtraPhysTblID column!", zap.String("table", us.table.Meta().Name.O)) + } + physTblIDIdx = i + } + } for len(us.snapshotRows) == 0 { err = Next(ctx, us.children[0], us.snapshotChunkBuffer) if err != nil || us.snapshotChunkBuffer.NumRows() == 0 { return nil, err } + logutil.Logger(ctx).Info("MJONSS: getSnapshotRow", zap.String("Chunk", us.snapshotChunkBuffer.ToString(fts)), + //zap.Strings("colNames", colNames), + //zap.Int64s("colIds", colIds), + zap.Int("us colNames count", len(us.columns))) iter := chunk.NewIterator4Chunk(us.snapshotChunkBuffer) for row := iter.Begin(); row != iter.End(); row = iter.Next() { var snapshotHandle kv.Handle @@ -231,7 +252,13 @@ func (us *UnionScanExec) getSnapshotRow(ctx context.Context) ([]types.Datum, err if err != nil { return nil, err } - checkKey := tablecodec.EncodeRecordKey(us.table.RecordPrefix(), snapshotHandle) + var checkKey kv.Key + if physTblIDIdx >= 0 { + tblID := row.GetInt64(physTblIDIdx) + checkKey = tablecodec.EncodeRowKeyWithHandle(tblID, snapshotHandle) + } else { + checkKey = tablecodec.EncodeRecordKey(us.table.RecordPrefix(), snapshotHandle) + } if _, err := us.memBufSnap.Get(context.TODO(), checkKey); err == nil { // If src handle appears in added rows, it means there is conflict and the transaction will fail to // commit, but for simplicity, we don't handle it here. diff --git a/parser/model/model.go b/parser/model/model.go index b04da6222cc97..9b6e924ecacaa 100644 --- a/parser/model/model.go +++ b/parser/model/model.go @@ -234,6 +234,12 @@ const ExtraHandleID = -1 // ExtraPidColID is the column ID of column which store the partitionID decoded in global index values. const ExtraPidColID = -2 +// ExtraPhysTblID is the column ID of column that should be filled in with the physical table id. +// Primarily used for table partition dynamic prune mode, to return which partition (physical table id) the row came from. +// Using a dedicated id for this, since in the future ExtraPidColID and ExtraPhysTblID may be used for the same request. +// Must be after ExtraPidColID! +const ExtraPhysTblID = -3 + const ( // TableInfoVersion0 means the table info version is 0. // Upgrade from v2.1.1 or v2.1.2 to v2.1.3 and later, and then execute a "change/modify column" statement @@ -275,6 +281,9 @@ var ExtraHandleName = NewCIStr("_tidb_rowid") // ExtraPartitionIdName is the name of ExtraPartitionId Column. var ExtraPartitionIdName = NewCIStr("_tidb_pid") +// ExtraPhysTblIdName is the name of ExtraPhysTblID Column. +var ExtraPhysTblIdName = NewCIStr("_tidb_tid") + // TableInfo provides meta data describing a DB table. type TableInfo struct { ID int64 `json:"id"` @@ -653,6 +662,17 @@ func NewExtraPartitionIDColInfo() *ColumnInfo { return colInfo } +// NewExtraPhysTblIDColInfo mocks a column info for extra partition id column. +func NewExtraPhysTblIDColInfo() *ColumnInfo { + colInfo := &ColumnInfo{ + ID: ExtraPhysTblID, + Name: ExtraPhysTblIdName, + } + colInfo.Tp = mysql.TypeLonglong + colInfo.Flen, colInfo.Decimal = mysql.GetDefaultFieldLengthAndDecimal(mysql.TypeLonglong) + return colInfo +} + // ColumnIsInIndex checks whether c is included in any indices of t. func (t *TableInfo) ColumnIsInIndex(c *ColumnInfo) bool { for _, index := range t.Indices { diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index a4cacd0d58f17..ec665b234d0ff 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -3368,12 +3368,12 @@ func (s *testIntegrationSuite) TestIssue26719(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") tk.MustExec(`create table tx (a int) partition by range (a) (partition p0 values less than (10), partition p1 values less than (20))`) - tk.MustExec(`insert into tx values (1)`) + tk.MustExec(`insert into tx values (10)`) tk.MustExec("set @@tidb_partition_prune_mode='dynamic'") tk.MustExec(`begin`) - tk.MustExec(`delete from tx where a in (1)`) - tk.MustQuery(`select * from tx PARTITION(p0)`).Check(testkit.Rows()) + tk.MustExec(`delete from tx where a in (10)`) + //tk.MustQuery(`select * from tx PARTITION(p0)`).Check(testkit.Rows()) tk.MustQuery(`select * from tx`).Check(testkit.Rows()) tk.MustExec(`rollback`) } diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index d595965a1e81f..21e4a05c11de9 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -55,8 +55,10 @@ import ( util2 "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/collate" + "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/plancodec" "github.com/pingcap/tidb/util/set" + "go.uber.org/zap" ) const ( @@ -3182,7 +3184,7 @@ func unfoldWildStar(field *ast.SelectField, outputName types.NameSlice, column [ } if (dbName.L == "" || dbName.L == name.DBName.L) && (tblName.L == "" || tblName.L == name.TblName.L) && - col.ID != model.ExtraHandleID { + col.ID != model.ExtraHandleID && col.ID != model.ExtraPidColID && col.ID != model.ExtraPhysTblID { colName := &ast.ColumnNameExpr{ Name: &ast.ColumnName{ Schema: name.DBName, @@ -3784,24 +3786,59 @@ func (ds *DataSource) newExtraHandleSchemaCol() *expression.Column { // addExtraPIDColumn add an extra PID column for partition table. // 'select ... for update' on a partition table need to know the partition ID // to construct the lock key, so this column is added to the chunk row. -func (ds *DataSource) addExtraPIDColumn(info *extraPIDInfo) { +func (ds *DataSource) addExtraPIDColumn() { pidCol := &expression.Column{ RetType: types.NewFieldType(mysql.TypeLonglong), UniqueID: ds.ctx.GetSessionVars().AllocPlanColumnID(), - ID: model.ExtraPidColID, - OrigName: fmt.Sprintf("%v.%v.%v", ds.DBName, ds.tableInfo.Name, model.ExtraPartitionIdName), + ID: model.ExtraPhysTblID, + OrigName: fmt.Sprintf("%v.%v.%v", ds.DBName, ds.tableInfo.Name, model.ExtraPhysTblIdName), } - ds.Columns = append(ds.Columns, model.NewExtraPartitionIDColInfo()) + ds.Columns = append(ds.Columns, model.NewExtraPhysTblIDColInfo()) schema := ds.Schema() schema.Append(pidCol) ds.names = append(ds.names, &types.FieldName{ DBName: ds.DBName, TblName: ds.TableInfo().Name, - ColName: model.ExtraPartitionIdName, - OrigColName: model.ExtraPartitionIdName, + ColName: model.ExtraPhysTblIdName, + OrigColName: model.ExtraPhysTblIdName, }) ds.TblCols = append(ds.TblCols, pidCol) +} + +// addExtraPIDColumnWithInfo add an extra PID column for partition table. +// 'select ... for update' on a partition table need to know the partition ID +func (ds *DataSource) addExtraPIDColumnWithInfo(info *extraPIDInfo) { + var pidCol *expression.Column + schema := ds.Schema() + for _, col := range schema.Columns { + if col.ID == model.ExtraPidColID || col.ID == model.ExtraPhysTblID { + if pidCol != nil { + logutil.BgLogger().Warn("MJONSS: Duplicate Partition ID/Physical table id for SELECT FOR UPDATE", zap.String("table", ds.TableAsName.O)) + } + pidCol = col + } + } + if pidCol == nil { + logutil.BgLogger().Warn("MJONSS: Missing Partition ID/Physical table id for SELECT FOR UPDATE", zap.String("table", ds.TableAsName.O)) + pidCol = &expression.Column{ + RetType: types.NewFieldType(mysql.TypeLonglong), + UniqueID: ds.ctx.GetSessionVars().AllocPlanColumnID(), + ID: model.ExtraPidColID, + OrigName: fmt.Sprintf("%v.%v.%v", ds.DBName, ds.tableInfo.Name, model.ExtraPartitionIdName), + } + + ds.Columns = append(ds.Columns, model.NewExtraPartitionIDColInfo()) + schema := ds.Schema() + schema.Append(pidCol) + ds.names = append(ds.names, &types.FieldName{ + DBName: ds.DBName, + TblName: ds.TableInfo().Name, + ColName: model.ExtraPartitionIdName, + OrigColName: model.ExtraPartitionIdName, + }) + ds.TblCols = append(ds.TblCols, pidCol) + } info.Columns = append(info.Columns, pidCol) info.TblIDs = append(info.TblIDs, ds.TableInfo().ID) @@ -4183,6 +4220,16 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as us.SetChildren(ds) result = us } + if tableInfo.GetPartitionInfo() != nil { + if b.ctx.GetSessionVars().UseDynamicPartitionPrune() { + // Use the new partition implementation, add partition id as handle/hidden column. + // dirty => must check transaction buffer, which uses Physical table id, so we need it per record from the partitioned table + // IsPessimistic => SelectLock needs the Physical table id for locking each row. + if dirty || b.ctx.GetSessionVars().TxnCtx.IsPessimistic { + ds.addExtraPIDColumn() + } + } + } // If a table is a cache table, it is judged whether it satisfies the conditions of read cache. if tableInfo.TableCacheStatusType == model.TableCacheStatusEnable && b.ctx.GetSessionVars().SnapshotTS == 0 && !b.ctx.GetSessionVars().StmtCtx.IsStaleness { cachedTable := tbl.(table.CachedTable) diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 0dfd7ea71c743..527ac83379fae 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -1258,7 +1258,7 @@ func addExtraPIDColumnToDataSource(p LogicalPlan, info *extraPIDInfo) error { if raw.tableInfo.GetPartitionInfo() == nil { return nil } - raw.addExtraPIDColumn(info) + raw.addExtraPIDColumnWithInfo(info) return nil default: var err error diff --git a/planner/core/rule_column_pruning.go b/planner/core/rule_column_pruning.go index 17dcb87965e77..bc2ed086960cc 100644 --- a/planner/core/rule_column_pruning.go +++ b/planner/core/rule_column_pruning.go @@ -293,6 +293,21 @@ func (p *LogicalUnionScan) PruneColumns(parentUsedCols []*expression.Column, opt for i := 0; i < p.handleCols.NumCols(); i++ { parentUsedCols = append(parentUsedCols, p.handleCols.GetCol(i)) } + for _, col := range p.Schema().Columns { + if col.ID == model.ExtraPidColID || col.ID == model.ExtraPhysTblID { + parentUsedCols = append(parentUsedCols, col) + cols := p.Schema().Columns + if col != cols[len(cols)-1] { + panic("MJONSS: Assumptions of ExtraPidColID always last is wrong!!!") + } + } + } + // ExtraPidColID should always be last? + /* + if cols[len(cols)-1].ID == model.ExtraPidColID { + parentUsedCols = append(parentUsedCols, col) + } + */ condCols := expression.ExtractColumnsFromExpressions(nil, p.conditions, nil) parentUsedCols = append(parentUsedCols, condCols...) return p.children[0].PruneColumns(parentUsedCols, opt) @@ -474,6 +489,7 @@ func (p *LogicalLock) PruneColumns(parentUsedCols []*expression.Column, opt *log } if len(p.partitionedTable) > 0 { + // TODO: What to do here? Should it be added to the tblID2Handle array instead? // If the children include partitioned tables, there is an extra partition ID column. parentUsedCols = append(parentUsedCols, p.extraPIDInfo.Columns...) } diff --git a/planner/core/util.go b/planner/core/util.go index 9d0fe9d0f3bfc..b2427320674a5 100644 --- a/planner/core/util.go +++ b/planner/core/util.go @@ -331,6 +331,7 @@ func tableHasDirtyContent(ctx sessionctx.Context, tableInfo *model.TableInfo) bo } // Currently, we add UnionScan on every partition even though only one partition's data is changed. // This is limited by current implementation of Partition Prune. It'll be updated once we modify that part. + // TODO: Use named partitions at least, preferably pruned if possible here? for _, partition := range pi.Definitions { if ctx.HasDirtyContent(partition.ID) { return true diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 4a66740d0f2a6..2e449f685c64d 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -1062,11 +1062,6 @@ func (s *SessionVars) CheckAndGetTxnScope() string { // UseDynamicPartitionPrune indicates whether use new dynamic partition prune. func (s *SessionVars) UseDynamicPartitionPrune() bool { - if s.InTxn() || !s.GetStatusFlag(mysql.ServerStatusAutocommit) { - // UnionScan cannot get partition table IDs in dynamic-mode, this is a quick-fix for issues/26719, - // please see it for more details. - return false - } return PartitionPruneMode(s.PartitionPruneMode.Load()) == Dynamic } diff --git a/store/mockstore/unistore/cophandler/closure_exec.go b/store/mockstore/unistore/cophandler/closure_exec.go index efc845820d260..d760ade74f3c1 100644 --- a/store/mockstore/unistore/cophandler/closure_exec.go +++ b/store/mockstore/unistore/cophandler/closure_exec.go @@ -37,10 +37,12 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tidb/util/logutil" mockpkg "github.com/pingcap/tidb/util/mock" "github.com/pingcap/tidb/util/rowcodec" "github.com/pingcap/tidb/util/timeutil" "github.com/pingcap/tipb/go-tipb" + "go.uber.org/zap" ) const chunkMaxRows = 1024 @@ -293,6 +295,11 @@ func (e *closureExecutor) initIdxScanCtx(idxScan *tipb.IndexScan) { e.idxScanCtx.primaryColumnIds = idxScan.PrimaryColumnIds lastColumn := e.columnInfos[len(e.columnInfos)-1] + if lastColumn.GetColumnId() == model.ExtraPhysTblID { + lastColumn = e.columnInfos[len(e.columnInfos)-2] + e.idxScanCtx.columnLen-- + } + if lastColumn.GetColumnId() == model.ExtraPidColID { lastColumn = e.columnInfos[len(e.columnInfos)-2] e.idxScanCtx.columnLen-- @@ -837,6 +844,21 @@ func (e *closureExecutor) tableScanProcessCore(key, value []byte) error { if err != nil { return errors.Trace(err) } + // Add ExtraPhysTblID if requested + if e.columnInfos[len(e.columnInfos)-1].ColumnId == model.ExtraPhysTblID { + tblID := tablecodec.DecodeTableID(key) + logutil.BgLogger().Info("MJONSS: tableScanProcessCore", zap.Int64("tblID", tblID), zap.Int("columnIdx", len(e.columnInfos)-1)) + e.scanCtx.chk.AppendInt64(len(e.columnInfos)-1, tblID) + /* + colIds := make([]int64, len(e.columnInfos)) + for i := range e.columnInfos { + colIds = append(colIds, e.columnInfos[i].ColumnId) + } + */ + logutil.BgLogger().Info("MJONSS: tableScanProcessCore chk", zap.String("Chunk", e.scanCtx.chk.ToString(e.resultFieldType)), + //zap.Int64s("colIds", colIds), + zap.Int("colIds count", len(e.columnInfos)), zap.Int("result field types count", len(e.resultFieldType))) + } incRow = true return nil } @@ -909,6 +931,12 @@ func (e *closureExecutor) indexScanProcessCore(key, value []byte) error { } } } + // Add ExtraPhysTblID if requested + if e.columnInfos[len(e.columnInfos)-1].ColumnId == model.ExtraPhysTblID { + tblID := tablecodec.DecodeTableID(key) + logutil.BgLogger().Info("MJONSS: indexScanProcessCore", zap.Int64("tblID", tblID)) + chk.AppendInt64(len(e.columnInfos)-1, tblID) + } gotRow = true return nil } diff --git a/store/mockstore/unistore/cophandler/cop_handler.go b/store/mockstore/unistore/cophandler/cop_handler.go index fa7b784126e3e..8c8ac32beb093 100644 --- a/store/mockstore/unistore/cophandler/cop_handler.go +++ b/store/mockstore/unistore/cophandler/cop_handler.go @@ -212,6 +212,10 @@ func newRowDecoder(columnInfos []*tipb.ColumnInfo, fieldTps []*types.FieldType, ) for i := range columnInfos { info := columnInfos[i] + if info.ColumnId == model.ExtraPhysTblID { + // Skip since it needs to be filled in from the key + continue + } ft := fieldTps[i] col := rowcodec.ColInfo{ ID: info.ColumnId,