Skip to content

Commit

Permalink
WIP add Physical Table ID as column in result.
Browse files Browse the repository at this point in the history
Table Partition Dynamic Prune Mode uses a single TableReader
meaning it is not currently possible to know which partition
a row/record comes from.

By adding ExtraPhysTblID column this can be used when the row
needs the Physical Table ID (partition id) for SelectLock or
checking current transaction buffer (if the row has been changed/deleted).
  • Loading branch information
mjonss committed Jan 12, 2022
1 parent 28c5074 commit fc62492
Show file tree
Hide file tree
Showing 17 changed files with 237 additions and 23 deletions.
28 changes: 27 additions & 1 deletion cmd/explaintest/r/generated_columns.result
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
set @@tidb_partition_prune_mode='dynamic';
DROP TABLE IF EXISTS person;
CREATE TABLE person (
id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
Expand Down Expand Up @@ -90,6 +89,8 @@ Projection 5.00 root test.sgc1.j1, test.sgc1.j2, test.sgc1.a, test.sgc1.b, test
└─TableReader(Probe) 5.00 root data:Selection
└─Selection 5.00 cop[tikv] not(isnull(test.sgc1.a))
└─TableFullScan 5.00 cop[tikv] table:sgc1 keep order:false
set @old_prune_mode = @@tidb_partition_prune_mode;
set @@tidb_partition_prune_mode='static';
DROP TABLE IF EXISTS sgc3;
CREATE TABLE sgc3 (
j JSON,
Expand Down Expand Up @@ -136,6 +137,31 @@ PartitionUnion 23263.33 root
└─TableReader 3323.33 root data:Selection
└─Selection 3323.33 cop[tikv] lt(test.sgc3.a, 7)
└─TableFullScan 10000.00 cop[tikv] table:sgc3, partition:max keep order:false, stats:pseudo
set @@tidb_partition_prune_mode='dynamic';
DROP TABLE sgc3;
CREATE TABLE sgc3 (
j JSON,
a INT AS (JSON_EXTRACT(j, "$.a")) STORED
)
PARTITION BY RANGE (a) (
PARTITION p0 VALUES LESS THAN (1),
PARTITION p1 VALUES LESS THAN (2),
PARTITION p2 VALUES LESS THAN (3),
PARTITION p3 VALUES LESS THAN (4),
PARTITION p4 VALUES LESS THAN (5),
PARTITION p5 VALUES LESS THAN (6),
PARTITION max VALUES LESS THAN MAXVALUE);
EXPLAIN format = 'brief' SELECT * FROM sgc3 WHERE a <= 1;
id estRows task access object operator info
TableReader 3323.33 root partition:p0,p1 data:Selection
└─Selection 3323.33 cop[tikv] le(test.sgc3.a, 1)
└─TableFullScan 10000.00 cop[tikv] table:sgc3 keep order:false, stats:pseudo
EXPLAIN format = 'brief' SELECT * FROM sgc3 WHERE a < 7;
id estRows task access object operator info
TableReader 3323.33 root partition:all data:Selection
└─Selection 3323.33 cop[tikv] lt(test.sgc3.a, 7)
└─TableFullScan 10000.00 cop[tikv] table:sgc3 keep order:false, stats:pseudo
set @@tidb_partition_prune_mode = @old_prune_mode;
DROP TABLE IF EXISTS t1;
CREATE TABLE t1(a INT, b INT AS (a+1) VIRTUAL, c INT AS (b+1) VIRTUAL, d INT AS (c+1) VIRTUAL, KEY(b), INDEX IDX(c, d));
INSERT INTO t1 (a) VALUES (0);
Expand Down
23 changes: 22 additions & 1 deletion cmd/explaintest/t/generated_columns.test
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
-- Most of the cases are ported from other tests to make sure generated columns behaves the same.

-- Stored generated columns as indices
set @@tidb_partition_prune_mode='dynamic';

DROP TABLE IF EXISTS person;
CREATE TABLE person (
Expand Down Expand Up @@ -74,6 +73,8 @@ EXPLAIN format = 'brief' SELECT * from sgc1 join sgc2 on sgc1.a=sgc2.a;

-- Stored generated columns as partition columns

set @old_prune_mode = @@tidb_partition_prune_mode;
set @@tidb_partition_prune_mode='static';
DROP TABLE IF EXISTS sgc3;
CREATE TABLE sgc3 (
j JSON,
Expand All @@ -91,6 +92,26 @@ PARTITION max VALUES LESS THAN MAXVALUE);
EXPLAIN format = 'brief' SELECT * FROM sgc3 WHERE a <= 1;
EXPLAIN format = 'brief' SELECT * FROM sgc3 WHERE a < 7;

set @@tidb_partition_prune_mode='dynamic';
DROP TABLE sgc3;
CREATE TABLE sgc3 (
j JSON,
a INT AS (JSON_EXTRACT(j, "$.a")) STORED
)
PARTITION BY RANGE (a) (
PARTITION p0 VALUES LESS THAN (1),
PARTITION p1 VALUES LESS THAN (2),
PARTITION p2 VALUES LESS THAN (3),
PARTITION p3 VALUES LESS THAN (4),
PARTITION p4 VALUES LESS THAN (5),
PARTITION p5 VALUES LESS THAN (6),
PARTITION max VALUES LESS THAN MAXVALUE);

EXPLAIN format = 'brief' SELECT * FROM sgc3 WHERE a <= 1;
EXPLAIN format = 'brief' SELECT * FROM sgc3 WHERE a < 7;

set @@tidb_partition_prune_mode = @old_prune_mode;

-- Virtual generated columns as indices

DROP TABLE IF EXISTS t1;
Expand Down
1 change: 1 addition & 0 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func (builder *RequestBuilder) SetTableHandles(tid int64, handles []kv.Handle) *

// SetPartitionsAndHandles sets "KeyRanges" for "kv.Request" by converting ParitionHandles to KeyRanges.
// handles in slice must be kv.PartitionHandle.
// TODO: What is this?
func (builder *RequestBuilder) SetPartitionsAndHandles(handles []kv.Handle) *RequestBuilder {
builder.Request.KeyRanges = PartitionHandlesToKVRanges(handles)
return builder
Expand Down
19 changes: 19 additions & 0 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,31 @@ func (r *selectResult) fetchResp(ctx context.Context) error {
}
return nil
}
s := fmt.Sprintf("%v", resultSubset)
logutil.Logger(ctx).Info("resultSubset", zap.String("resultSubset", s))
logutil.Logger(ctx).Info("resultSubset", zap.ByteString("data", resultSubset.GetData()))
logutil.Logger(ctx).Info("resultSubset", zap.Binary("data", resultSubset.GetData()))
b := resultSubset.GetData()
i := make([]int8, len(b))
for j := range b {
i[j] = int8(b[j])
}
logutil.Logger(ctx).Info("resultSubset", zap.Int8s("data", i))
logutil.Logger(ctx).Info("resultSubset", zap.ByteString("startKey", resultSubset.GetStartKey()))
logutil.Logger(ctx).Info("resultSubset", zap.Binary("startKey", resultSubset.GetStartKey()))
b = resultSubset.GetStartKey()
i = make([]int8, len(b))
for j := range b {
i[j] = int8(b[j])
}
logutil.Logger(ctx).Info("resultSubset", zap.Int8s("startKey", i))
r.selectResp = new(tipb.SelectResponse)
err = r.selectResp.Unmarshal(resultSubset.GetData())
if err != nil {
return errors.Trace(err)
}
respSize := int64(r.selectResp.Size())
logutil.Logger(ctx).Info("respSize", zap.Int64("size", respSize))
atomic.StoreInt64(&r.selectRespSize, respSize)
r.memConsume(respSize)
if err := r.selectResp.Error; err != nil {
Expand Down
7 changes: 5 additions & 2 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3071,7 +3071,7 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
e.feedback = statistics.NewQueryFeedback(getFeedbackStatsTableID(e.ctx, tbl), ts.Hist, int64(ts.StatsCount()), ts.Desc)
}
collect := statistics.CollectFeedback(b.ctx.GetSessionVars().StmtCtx, e.feedback, len(ts.Ranges))
// Do not collect the feedback when the table is the partition table.
// Do not collect the feedback when the table is the partition table. TODO: not true for dynamic?
if collect && tbl.Meta().Partition != nil {
collect = false
}
Expand All @@ -3096,7 +3096,8 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea

func extraPIDColumnIndex(schema *expression.Schema) offsetOptional {
for idx, col := range schema.Columns {
if col.ID == model.ExtraPidColID {
// TODO: Handle partitioned global index, i.e. both ExtraPidColID and ExtraPhysTblID is used.
if col.ID == model.ExtraPidColID || col.ID == model.ExtraPhysTblID {
return newOffset(idx)
}
}
Expand Down Expand Up @@ -3169,6 +3170,7 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) E

tmp, _ := b.is.TableByID(ts.Table.ID)
tbl := tmp.(table.PartitionedTable)
// TODO: Should this be done before checking dirty / ongoing transactions for avoiding adding PID columns?
partitions, err := partitionPruning(b.ctx, tbl, v.PartitionInfo.PruningConds, v.PartitionInfo.PartitionNames, v.PartitionInfo.Columns, v.PartitionInfo.ColumnNames)
if err != nil {
b.err = err
Expand Down Expand Up @@ -3802,6 +3804,7 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
return nil, err
}
var kvRanges []kv.KeyRange
// TODO: mjonss: study this :)
if len(lookUpContents) > 0 && keyColumnsIncludeAllPartitionColumns(lookUpContents[0].keyCols, pe) {
// In this case we can use dynamic partition pruning.
locateKey := make([]types.Datum, e.Schema().Len())
Expand Down
1 change: 1 addition & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -939,6 +939,7 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
if len(e.partitionedTable) > 0 {
// Replace the table ID with partition ID.
// The partition ID is returned as an extra column from the table reader.
// TODO: any easier way?
if offset, ok := e.tblID2PIDColumnIndex[id]; ok {
physicalID = row.GetInt64(offset)
}
Expand Down
9 changes: 7 additions & 2 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tidb/util/stringutil"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
)

// make sure `TableReaderExecutor` implements `Executor`.
Expand Down Expand Up @@ -244,9 +245,13 @@ func (e *TableReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error
// When 'select ... for update' work on a partitioned table, the table reader should
// add the partition ID as an extra column. The SelectLockExec need this information
// to construct the lock key.
physicalID := getPhysicalTableID(e.table)
if e.extraPIDColumnIndex.valid() {
fillExtraPIDColumn(req, e.extraPIDColumnIndex.value(), physicalID)
physicalID := getPhysicalTableID(e.table)
if physicalID != e.table.Meta().ID {
// table partition in static prune mode (one TableReaderExecutor per partition)
logutil.BgLogger().Info("MJONSS: TableReaderExecutor.Next()", zap.Int64("physicalID", physicalID))
fillExtraPIDColumn(req, e.extraPIDColumnIndex.value(), physicalID)
}
}

return nil
Expand Down
29 changes: 28 additions & 1 deletion executor/union_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/collate"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)

// UnionScanExec merges the rows from dirty table and the rows from distsql request.
Expand Down Expand Up @@ -219,19 +221,44 @@ func (us *UnionScanExec) getSnapshotRow(ctx context.Context) ([]types.Datum, err
var err error
us.cursor4SnapshotRows = 0
us.snapshotRows = us.snapshotRows[:0]
physTblIDIdx := -1
fts := make([]*types.FieldType, len(us.columns))
//colNames := make([]string, len(us.columns))
//colIds := make([]int64, len(us.columns))
for i := range us.columns {
fts[i] = &us.columns[i].FieldType
//colNames = append(colNames, us.columns[i].Name.O)
//colIds = append(colIds, us.columns[i].ID)
if us.columns[i].ID == model.ExtraPhysTblID {
if physTblIDIdx >= 0 {
logutil.Logger(ctx).Warn("More than one ExtraPhysTblID column!", zap.String("table", us.table.Meta().Name.O))
}
physTblIDIdx = i
}
}
for len(us.snapshotRows) == 0 {
err = Next(ctx, us.children[0], us.snapshotChunkBuffer)
if err != nil || us.snapshotChunkBuffer.NumRows() == 0 {
return nil, err
}
logutil.Logger(ctx).Info("MJONSS: getSnapshotRow", zap.String("Chunk", us.snapshotChunkBuffer.ToString(fts)),
//zap.Strings("colNames", colNames),
//zap.Int64s("colIds", colIds),
zap.Int("us colNames count", len(us.columns)))
iter := chunk.NewIterator4Chunk(us.snapshotChunkBuffer)
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
var snapshotHandle kv.Handle
snapshotHandle, err = us.belowHandleCols.BuildHandle(row)
if err != nil {
return nil, err
}
checkKey := tablecodec.EncodeRecordKey(us.table.RecordPrefix(), snapshotHandle)
var checkKey kv.Key
if physTblIDIdx >= 0 {
tblID := row.GetInt64(physTblIDIdx)
checkKey = tablecodec.EncodeRowKeyWithHandle(tblID, snapshotHandle)
} else {
checkKey = tablecodec.EncodeRecordKey(us.table.RecordPrefix(), snapshotHandle)
}
if _, err := us.memBufSnap.Get(context.TODO(), checkKey); err == nil {
// If src handle appears in added rows, it means there is conflict and the transaction will fail to
// commit, but for simplicity, we don't handle it here.
Expand Down
20 changes: 20 additions & 0 deletions parser/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,12 @@ const ExtraHandleID = -1
// ExtraPidColID is the column ID of column which store the partitionID decoded in global index values.
const ExtraPidColID = -2

// ExtraPhysTblID is the column ID of column that should be filled in with the physical table id.
// Primarily used for table partition dynamic prune mode, to return which partition (physical table id) the row came from.
// Using a dedicated id for this, since in the future ExtraPidColID and ExtraPhysTblID may be used for the same request.
// Must be after ExtraPidColID!
const ExtraPhysTblID = -3

const (
// TableInfoVersion0 means the table info version is 0.
// Upgrade from v2.1.1 or v2.1.2 to v2.1.3 and later, and then execute a "change/modify column" statement
Expand Down Expand Up @@ -275,6 +281,9 @@ var ExtraHandleName = NewCIStr("_tidb_rowid")
// ExtraPartitionIdName is the name of ExtraPartitionId Column.
var ExtraPartitionIdName = NewCIStr("_tidb_pid")

// ExtraPhysTblIdName is the name of ExtraPhysTblID Column.
var ExtraPhysTblIdName = NewCIStr("_tidb_tid")

// TableInfo provides meta data describing a DB table.
type TableInfo struct {
ID int64 `json:"id"`
Expand Down Expand Up @@ -653,6 +662,17 @@ func NewExtraPartitionIDColInfo() *ColumnInfo {
return colInfo
}

// NewExtraPhysTblIDColInfo mocks a column info for extra partition id column.
func NewExtraPhysTblIDColInfo() *ColumnInfo {
colInfo := &ColumnInfo{
ID: ExtraPhysTblID,
Name: ExtraPhysTblIdName,
}
colInfo.Tp = mysql.TypeLonglong
colInfo.Flen, colInfo.Decimal = mysql.GetDefaultFieldLengthAndDecimal(mysql.TypeLonglong)
return colInfo
}

// ColumnIsInIndex checks whether c is included in any indices of t.
func (t *TableInfo) ColumnIsInIndex(c *ColumnInfo) bool {
for _, index := range t.Indices {
Expand Down
6 changes: 3 additions & 3 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3368,12 +3368,12 @@ func (s *testIntegrationSuite) TestIssue26719(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec(`create table tx (a int) partition by range (a) (partition p0 values less than (10), partition p1 values less than (20))`)
tk.MustExec(`insert into tx values (1)`)
tk.MustExec(`insert into tx values (10)`)
tk.MustExec("set @@tidb_partition_prune_mode='dynamic'")

tk.MustExec(`begin`)
tk.MustExec(`delete from tx where a in (1)`)
tk.MustQuery(`select * from tx PARTITION(p0)`).Check(testkit.Rows())
tk.MustExec(`delete from tx where a in (10)`)
//tk.MustQuery(`select * from tx PARTITION(p0)`).Check(testkit.Rows())
tk.MustQuery(`select * from tx`).Check(testkit.Rows())
tk.MustExec(`rollback`)
}
Expand Down
Loading

0 comments on commit fc62492

Please sign in to comment.