Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: table partition dynamic prune mode with Physical Table ID from unistore #31634

Merged
merged 39 commits into from
Mar 3, 2022
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
fc62492
WIP add Physical Table ID as column in result.
mjonss Jan 12, 2022
4821fae
Added support for more cases, like Delete
mjonss Jan 12, 2022
21dd902
Cleanup and added test from pr/31025
mjonss Jan 12, 2022
a26a334
Cleaned up some tests for passing CI and added warning for debug
mjonss Jan 13, 2022
efd64b0
Merge branch 'master' into phystlbid-unistore
mjonss Jan 17, 2022
9d9eb52
wip ExtraPhysTblIDCol
mjonss Jan 22, 2022
d6e062f
wip ExtraPhysTblIDCol
mjonss Jan 22, 2022
d5d7996
WIP fix SelectLock and UnionScan for both dynamic and static prune
mjonss Jan 24, 2022
670e074
added test for #30382
mjonss Feb 2, 2022
8e0a883
Added another test for SelectLock (from pr/30732)
mjonss Feb 2, 2022
5611b0e
added test for issue#28073
mjonss Feb 2, 2022
aaf8369
reverted hack in ut tool
mjonss Feb 3, 2022
8c6bfdf
Merge remote-tracking branch 'pingcap/master' into phystlbid-unistore
mjonss Feb 3, 2022
85d8f16
Fixed some comments and minor changes
mjonss Feb 10, 2022
150ab0c
Merge remote-tracking branch 'pingcap/master' into phystlbid-unistore
mjonss Feb 10, 2022
c1acc5b
Added testcase for issue #31024
mjonss Feb 14, 2022
aae672a
Added test case for #27346
mjonss Feb 14, 2022
77542a8
Merge remote-tracking branch 'pingcap/master' into phystlbid-unistore
mjonss Feb 14, 2022
254304c
Manual merge fix for tests upgraded to testify
mjonss Feb 14, 2022
756e5fd
Minor cleanup in SelectLock Next
mjonss Feb 14, 2022
c08ea52
WIP before cleanup.
mjonss Feb 16, 2022
9f219d7
Cleaned up partitionTable for SelectLock and dependents
mjonss Feb 16, 2022
e37f6e7
WIP more tests for avoiding sending requests for phys tbl id to store
mjonss Feb 16, 2022
86b70e0
Merge remote-tracking branch 'pingcap/master' into phystlbid-unistore
mjonss Feb 16, 2022
9d9dfab
WIP, removed investigation panic
mjonss Feb 16, 2022
495fe45
Fixed test TestPartitionPruningInTransaction for dynamic pruning
mjonss Feb 16, 2022
2c926b1
Reverted change for updating table delta, which broke checks
mjonss Feb 16, 2022
16445ad
Removed some debug only panic checks
mjonss Feb 16, 2022
1897ed5
cleanups
mjonss Feb 16, 2022
dbdcb36
more cleanups
mjonss Feb 16, 2022
95fb6a4
Further cleanups
mjonss Feb 16, 2022
9df6392
last cleanups
mjonss Feb 16, 2022
3ab4106
Merge remote-tracking branch 'pingcap/master' into phystlbid-unistore
mjonss Feb 20, 2022
a2daabb
After merge with master, also added PhysTblID handling in unistore/mpp
mjonss Feb 20, 2022
e459cf7
Addressing review comments
mjonss Feb 21, 2022
5c8c460
Merge branch 'master' into phystlbid-unistore
qw4990 Mar 3, 2022
718067c
Merge branch 'master' into phystlbid-unistore
qw4990 Mar 3, 2022
2666252
Merge branch 'master' into phystlbid-unistore
ti-chi-bot Mar 3, 2022
0700f74
Merge branch 'master' into phystlbid-unistore
ti-chi-bot Mar 3, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion cmd/explaintest/r/generated_columns.result
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
set @@tidb_partition_prune_mode='dynamic';
DROP TABLE IF EXISTS person;
CREATE TABLE person (
id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
Expand Down Expand Up @@ -90,6 +89,8 @@ Projection 5.00 root test.sgc1.j1, test.sgc1.j2, test.sgc1.a, test.sgc1.b, test
└─TableReader(Probe) 5.00 root data:Selection
└─Selection 5.00 cop[tikv] not(isnull(test.sgc1.a))
└─TableFullScan 5.00 cop[tikv] table:sgc1 keep order:false
set @old_prune_mode = @@tidb_partition_prune_mode;
set @@tidb_partition_prune_mode='static';
DROP TABLE IF EXISTS sgc3;
CREATE TABLE sgc3 (
j JSON,
Expand Down Expand Up @@ -136,6 +137,31 @@ PartitionUnion 23263.33 root
└─TableReader 3323.33 root data:Selection
└─Selection 3323.33 cop[tikv] lt(test.sgc3.a, 7)
└─TableFullScan 10000.00 cop[tikv] table:sgc3, partition:max keep order:false, stats:pseudo
set @@tidb_partition_prune_mode='dynamic';
DROP TABLE sgc3;
CREATE TABLE sgc3 (
j JSON,
a INT AS (JSON_EXTRACT(j, "$.a")) STORED
)
PARTITION BY RANGE (a) (
PARTITION p0 VALUES LESS THAN (1),
PARTITION p1 VALUES LESS THAN (2),
PARTITION p2 VALUES LESS THAN (3),
PARTITION p3 VALUES LESS THAN (4),
PARTITION p4 VALUES LESS THAN (5),
PARTITION p5 VALUES LESS THAN (6),
PARTITION max VALUES LESS THAN MAXVALUE);
EXPLAIN format = 'brief' SELECT * FROM sgc3 WHERE a <= 1;
id estRows task access object operator info
TableReader 3323.33 root partition:p0,p1 data:Selection
└─Selection 3323.33 cop[tikv] le(test.sgc3.a, 1)
└─TableFullScan 10000.00 cop[tikv] table:sgc3 keep order:false, stats:pseudo
EXPLAIN format = 'brief' SELECT * FROM sgc3 WHERE a < 7;
id estRows task access object operator info
TableReader 3323.33 root partition:all data:Selection
└─Selection 3323.33 cop[tikv] lt(test.sgc3.a, 7)
└─TableFullScan 10000.00 cop[tikv] table:sgc3 keep order:false, stats:pseudo
set @@tidb_partition_prune_mode = @old_prune_mode;
DROP TABLE IF EXISTS t1;
CREATE TABLE t1(a INT, b INT AS (a+1) VIRTUAL, c INT AS (b+1) VIRTUAL, d INT AS (c+1) VIRTUAL, KEY(b), INDEX IDX(c, d));
INSERT INTO t1 (a) VALUES (0);
Expand Down
16 changes: 16 additions & 0 deletions cmd/explaintest/r/select.result
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ Projection_3 1.00 root sysdate()->Column#1, sleep(1)->Column#2, sysdate()->Colu
└─TableDual_4 1.00 root rows:1
drop table if exists th;
set @@session.tidb_enable_table_partition = '1';
set @@session.tidb_partition_prune_mode = 'static';
create table th (a int, b int) partition by hash(a) partitions 3;
insert into th values (0,0),(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8);
insert into th values (-1,-1),(-2,-2),(-3,-3),(-4,-4),(-5,-5),(-6,-6),(-7,-7),(-8,-8);
Expand All @@ -378,6 +379,21 @@ PartitionUnion_8 20000.00 root
│ └─TableFullScan_9 10000.00 cop[tikv] table:th, partition:p1 keep order:false, stats:pseudo
└─TableReader_12 10000.00 root data:TableFullScan_11
└─TableFullScan_11 10000.00 cop[tikv] table:th, partition:p2 keep order:false, stats:pseudo
set @@session.tidb_partition_prune_mode = 'dynamic';
desc select * from th where a=-2;
id estRows task access object operator info
TableReader_7 10.00 root partition:p2 data:Selection_6
└─Selection_6 10.00 cop[tikv] eq(test.th.a, -2)
└─TableFullScan_5 10000.00 cop[tikv] table:th keep order:false, stats:pseudo
desc select * from th;
id estRows task access object operator info
TableReader_5 10000.00 root partition:all data:TableFullScan_4
└─TableFullScan_4 10000.00 cop[tikv] table:th keep order:false, stats:pseudo
desc select * from th partition (p2,p1);
id estRows task access object operator info
TableReader_5 10000.00 root partition:p1,p2 data:TableFullScan_4
└─TableFullScan_4 10000.00 cop[tikv] table:th keep order:false, stats:pseudo
set @@session.tidb_partition_prune_mode = DEFAULT;
drop table if exists t;
create table t(a int, b int);
explain format = 'brief' select a != any (select a from t t2) from t t1;
Expand Down
23 changes: 22 additions & 1 deletion cmd/explaintest/t/generated_columns.test
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
-- Most of the cases are ported from other tests to make sure generated columns behaves the same.

-- Stored generated columns as indices
set @@tidb_partition_prune_mode='dynamic';

DROP TABLE IF EXISTS person;
CREATE TABLE person (
Expand Down Expand Up @@ -74,6 +73,8 @@ EXPLAIN format = 'brief' SELECT * from sgc1 join sgc2 on sgc1.a=sgc2.a;

-- Stored generated columns as partition columns

set @old_prune_mode = @@tidb_partition_prune_mode;
set @@tidb_partition_prune_mode='static';
DROP TABLE IF EXISTS sgc3;
CREATE TABLE sgc3 (
j JSON,
Expand All @@ -91,6 +92,26 @@ PARTITION max VALUES LESS THAN MAXVALUE);
EXPLAIN format = 'brief' SELECT * FROM sgc3 WHERE a <= 1;
EXPLAIN format = 'brief' SELECT * FROM sgc3 WHERE a < 7;

set @@tidb_partition_prune_mode='dynamic';
DROP TABLE sgc3;
CREATE TABLE sgc3 (
j JSON,
a INT AS (JSON_EXTRACT(j, "$.a")) STORED
)
PARTITION BY RANGE (a) (
PARTITION p0 VALUES LESS THAN (1),
PARTITION p1 VALUES LESS THAN (2),
PARTITION p2 VALUES LESS THAN (3),
PARTITION p3 VALUES LESS THAN (4),
PARTITION p4 VALUES LESS THAN (5),
PARTITION p5 VALUES LESS THAN (6),
PARTITION max VALUES LESS THAN MAXVALUE);

EXPLAIN format = 'brief' SELECT * FROM sgc3 WHERE a <= 1;
EXPLAIN format = 'brief' SELECT * FROM sgc3 WHERE a < 7;

set @@tidb_partition_prune_mode = @old_prune_mode;

-- Virtual generated columns as indices

DROP TABLE IF EXISTS t1;
Expand Down
6 changes: 6 additions & 0 deletions cmd/explaintest/t/select.test
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,18 @@ desc select sysdate(), sleep(1), sysdate();
# test select partition table
drop table if exists th;
set @@session.tidb_enable_table_partition = '1';
set @@session.tidb_partition_prune_mode = 'static';
create table th (a int, b int) partition by hash(a) partitions 3;
insert into th values (0,0),(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8);
insert into th values (-1,-1),(-2,-2),(-3,-3),(-4,-4),(-5,-5),(-6,-6),(-7,-7),(-8,-8);
desc select * from th where a=-2;
desc select * from th;
desc select * from th partition (p2,p1);
set @@session.tidb_partition_prune_mode = 'dynamic';
desc select * from th where a=-2;
desc select * from th;
desc select * from th partition (p2,p1);
set @@session.tidb_partition_prune_mode = DEFAULT;

# test != any(subq) and = all(subq)
drop table if exists t;
Expand Down
1 change: 1 addition & 0 deletions executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,7 @@ func TestFailedAnalyzeRequest(t *testing.T) {
tk.MustExec("set @@tidb_analyze_version = 1")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/buildStatsFromResult", `return(true)`))
_, err := tk.Exec("analyze table t")
require.NotNil(t, err)
require.Equal(t, "mock buildStatsFromResult error", err.Error())
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/buildStatsFromResult"))
}
Expand Down
33 changes: 4 additions & 29 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,10 +714,10 @@ func (b *executorBuilder) buildSelectLock(v *plannercore.PhysicalLock) Executor
return src
}
e := &SelectLockExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID(), src),
Lock: v.Lock,
tblID2Handle: v.TblID2Handle,
partitionedTable: v.PartitionedTable,
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID(), src),
Lock: v.Lock,
tblID2Handle: v.TblID2Handle,
tblID2PhysTblIDCol: v.TblID2PhysTblIDCol,
}

// filter out temporary tables because they do not store any record in tikv and should not write any lock
Expand All @@ -733,16 +733,6 @@ func (b *executorBuilder) buildSelectLock(v *plannercore.PhysicalLock) Executor
}
}

if len(e.partitionedTable) > 0 {
schema := v.Schema()
e.tblID2PIDColumnIndex = make(map[int64]int)
for i := 0; i < len(v.ExtraPIDInfo.Columns); i++ {
col := v.ExtraPIDInfo.Columns[i]
tblID := v.ExtraPIDInfo.TblIDs[i]
offset := schema.ColumnIndex(col)
e.tblID2PIDColumnIndex[tblID] = offset
}
}
return e
}

Expand Down Expand Up @@ -3164,9 +3154,6 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
storeType: v.StoreType,
batchCop: v.BatchCop,
}
if tbl.Meta().Partition != nil {
e.extraPIDColumnIndex = extraPIDColumnIndex(v.Schema())
}
e.buildVirtualColumnInfo()
if containsLimit(dagReq.Executors) {
e.feedback = statistics.NewQueryFeedback(0, nil, 0, ts.Desc)
Expand Down Expand Up @@ -3197,15 +3184,6 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
return e, nil
}

func extraPIDColumnIndex(schema *expression.Schema) offsetOptional {
for idx, col := range schema.Columns {
if col.ID == model.ExtraPidColID {
return newOffset(idx)
}
}
return 0
}

func (b *executorBuilder) buildMPPGather(v *plannercore.PhysicalTableReader) Executor {
startTs, err := b.getSnapshotTS()
if err != nil {
Expand Down Expand Up @@ -3612,9 +3590,6 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn
tblPlans: v.TablePlans,
PushedLimit: v.PushedLimit,
}
if ok, _ := ts.IsPartition(); ok {
e.extraPIDColumnIndex = extraPIDColumnIndex(v.Schema())
}

if containsLimit(indexReq.Executors) {
e.feedback = statistics.NewQueryFeedback(0, nil, 0, is.Desc)
Expand Down
2 changes: 1 addition & 1 deletion executor/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error {

datumRow := make([]types.Datum, 0, len(fields))
for i, field := range fields {
if columns[i].ID == model.ExtraPidColID {
if columns[i].ID == model.ExtraPidColID || columns[i].ID == model.ExtraPhysTblID {
continue
}

Expand Down
26 changes: 11 additions & 15 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,9 +386,6 @@ type IndexLookUpExecutor struct {

stats *IndexLookUpRunTimeStats

// extraPIDColumnIndex is used for partition reader to add an extra partition ID column, default -1
extraPIDColumnIndex offsetOptional

// cancelFunc is called when close the executor
cancelFunc context.CancelFunc

Expand Down Expand Up @@ -673,18 +670,17 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, task *lookup
table = task.partitionTable
}
tableReaderExec := &TableReaderExecutor{
baseExecutor: newBaseExecutor(e.ctx, e.schema, e.getTableRootPlanID()),
table: table,
dagPB: e.tableRequest,
startTS: e.startTS,
readReplicaScope: e.readReplicaScope,
isStaleness: e.isStaleness,
columns: e.columns,
streaming: e.tableStreaming,
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
corColInFilter: e.corColInTblSide,
plans: e.tblPlans,
extraPIDColumnIndex: e.extraPIDColumnIndex,
baseExecutor: newBaseExecutor(e.ctx, e.schema, e.getTableRootPlanID()),
table: table,
dagPB: e.tableRequest,
startTS: e.startTS,
readReplicaScope: e.readReplicaScope,
isStaleness: e.isStaleness,
columns: e.columns,
streaming: e.tableStreaming,
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
corColInFilter: e.corColInTblSide,
plans: e.tblPlans,
}
tableReaderExec.buildVirtualColumnInfo()
tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, tableReaderExec, task.handles, true)
Expand Down
72 changes: 42 additions & 30 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -918,21 +918,47 @@ type SelectLockExec struct {
Lock *ast.SelectLockInfo
keys []kv.Key

// The children may be a join of multiple tables, so we need a map.
tblID2Handle map[int64][]plannercore.HandleCols

// All the partition tables in the children of this executor.
partitionedTable []table.PartitionedTable
// When SelectLock work on a partition table, we need the partition ID
// (Physical Table ID) instead of the 'logical' table ID to calculate
// the lock KV. In that case, the Physical Table ID is extracted
// from the row key in the store and as an extra column in the chunk row.

// When SelectLock work on the partition table, we need the partition ID
// instead of table ID to calculate the lock KV. In that case, partition ID is store as an
// extra column in the chunk row.
// tblID2PIDColumnIndex stores the column index in the chunk row. The children may be join
// of multiple tables, so the map struct is used.
tblID2PIDColumnIndex map[int64]int
// tblID2PhyTblIDCol is used for partitioned tables.
// The child executor need to return an extra column containing
// the Physical Table ID (i.e. from which partition the row came from)
// Used during building
tblID2PhysTblIDCol map[int64]*expression.Column

// Used during execution
// Map from logic tableID to column index where the physical table id is stored
// For dynamic prune mode, model.ExtraPhysTblID columns are requested from
// storage and used for physical table id
// For static prune mode, model.ExtraPhysTblID is still sent to storage/Protobuf
// but could be filled in by the partitions TableReaderExecutor
// due to issues with chunk handling between the TableReaderExecutor and the
// SelectReader result.
qw4990 marked this conversation as resolved.
Show resolved Hide resolved
tblID2PhysTblIDColIdx map[int64]int
}

// Open implements the Executor Open interface.
func (e *SelectLockExec) Open(ctx context.Context) error {
if len(e.tblID2PhysTblIDCol) > 0 {
e.tblID2PhysTblIDColIdx = make(map[int64]int)
cols := e.Schema().Columns
for i := len(cols) - 1; i >= 0; i-- {
if cols[i].ID == model.ExtraPhysTblID {
for tblID, col := range e.tblID2PhysTblIDCol {
if cols[i].UniqueID == col.UniqueID {
e.tblID2PhysTblIDColIdx[tblID] = i
break
}
}
}
qw4990 marked this conversation as resolved.
Show resolved Hide resolved
}
}
return e.baseExecutor.Open(ctx)
}

Expand All @@ -951,23 +977,17 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
if req.NumRows() > 0 {
iter := chunk.NewIterator4Chunk(req)
for row := iter.Begin(); row != iter.End(); row = iter.Next() {

for id, cols := range e.tblID2Handle {
physicalID := id
if len(e.partitionedTable) > 0 {
// Replace the table ID with partition ID.
// The partition ID is returned as an extra column from the table reader.
if offset, ok := e.tblID2PIDColumnIndex[id]; ok {
physicalID = row.GetInt64(offset)
}
}

for tblID, cols := range e.tblID2Handle {
for _, col := range cols {
handle, err := col.BuildHandle(row)
if err != nil {
return err
}
e.keys = append(e.keys, tablecodec.EncodeRowKeyWithHandle(physicalID, handle))
physTblID := tblID
if physTblColIdx, ok := e.tblID2PhysTblIDColIdx[tblID]; ok {
physTblID = row.GetInt64(physTblColIdx)
}
e.keys = append(e.keys, tablecodec.EncodeRowKeyWithHandle(physTblID, handle))
}
}
}
Expand All @@ -980,16 +1000,8 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
lockWaitTime = int64(e.Lock.WaitSec) * 1000
}

if len(e.tblID2Handle) > 0 {
for id := range e.tblID2Handle {
e.updateDeltaForTableID(id)
}
}
if len(e.partitionedTable) > 0 {
for _, p := range e.partitionedTable {
pid := p.Meta().ID
e.updateDeltaForTableID(pid)
}
for id := range e.tblID2Handle {
e.updateDeltaForTableID(id)
}

return doLockKeys(ctx, e.ctx, newLockCtx(e.ctx.GetSessionVars(), lockWaitTime, len(e.keys)), e.keys...)
Expand Down
Loading