diff --git a/cmd/explaintest/r/generated_columns.result b/cmd/explaintest/r/generated_columns.result index fa5d4f79b5906..a937079129e11 100644 --- a/cmd/explaintest/r/generated_columns.result +++ b/cmd/explaintest/r/generated_columns.result @@ -1,4 +1,3 @@ -set @@tidb_partition_prune_mode='dynamic'; DROP TABLE IF EXISTS person; CREATE TABLE person ( id INT NOT NULL AUTO_INCREMENT PRIMARY KEY, @@ -90,6 +89,8 @@ Projection 5.00 root test.sgc1.j1, test.sgc1.j2, test.sgc1.a, test.sgc1.b, test └─TableReader(Probe) 5.00 root data:Selection └─Selection 5.00 cop[tikv] not(isnull(test.sgc1.a)) └─TableFullScan 5.00 cop[tikv] table:sgc1 keep order:false +set @old_prune_mode = @@tidb_partition_prune_mode; +set @@tidb_partition_prune_mode='static'; DROP TABLE IF EXISTS sgc3; CREATE TABLE sgc3 ( j JSON, @@ -136,6 +137,31 @@ PartitionUnion 23263.33 root └─TableReader 3323.33 root data:Selection └─Selection 3323.33 cop[tikv] lt(test.sgc3.a, 7) └─TableFullScan 10000.00 cop[tikv] table:sgc3, partition:max keep order:false, stats:pseudo +set @@tidb_partition_prune_mode='dynamic'; +DROP TABLE sgc3; +CREATE TABLE sgc3 ( +j JSON, +a INT AS (JSON_EXTRACT(j, "$.a")) STORED +) +PARTITION BY RANGE (a) ( +PARTITION p0 VALUES LESS THAN (1), +PARTITION p1 VALUES LESS THAN (2), +PARTITION p2 VALUES LESS THAN (3), +PARTITION p3 VALUES LESS THAN (4), +PARTITION p4 VALUES LESS THAN (5), +PARTITION p5 VALUES LESS THAN (6), +PARTITION max VALUES LESS THAN MAXVALUE); +EXPLAIN format = 'brief' SELECT * FROM sgc3 WHERE a <= 1; +id estRows task access object operator info +TableReader 3323.33 root partition:p0,p1 data:Selection +└─Selection 3323.33 cop[tikv] le(test.sgc3.a, 1) + └─TableFullScan 10000.00 cop[tikv] table:sgc3 keep order:false, stats:pseudo +EXPLAIN format = 'brief' SELECT * FROM sgc3 WHERE a < 7; +id estRows task access object operator info +TableReader 3323.33 root partition:all data:Selection +└─Selection 3323.33 cop[tikv] lt(test.sgc3.a, 7) + └─TableFullScan 10000.00 cop[tikv] table:sgc3 keep order:false, stats:pseudo +set @@tidb_partition_prune_mode = @old_prune_mode; DROP TABLE IF EXISTS t1; CREATE TABLE t1(a INT, b INT AS (a+1) VIRTUAL, c INT AS (b+1) VIRTUAL, d INT AS (c+1) VIRTUAL, KEY(b), INDEX IDX(c, d)); INSERT INTO t1 (a) VALUES (0); diff --git a/cmd/explaintest/r/select.result b/cmd/explaintest/r/select.result index 63bdb29e7d39b..9fe5dbc5c9a0f 100644 --- a/cmd/explaintest/r/select.result +++ b/cmd/explaintest/r/select.result @@ -354,6 +354,7 @@ Projection_3 1.00 root sysdate()->Column#1, sleep(1)->Column#2, sysdate()->Colu └─TableDual_4 1.00 root rows:1 drop table if exists th; set @@session.tidb_enable_table_partition = '1'; +set @@session.tidb_partition_prune_mode = 'static'; create table th (a int, b int) partition by hash(a) partitions 3; insert into th values (0,0),(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8); insert into th values (-1,-1),(-2,-2),(-3,-3),(-4,-4),(-5,-5),(-6,-6),(-7,-7),(-8,-8); @@ -378,6 +379,21 @@ PartitionUnion_8 20000.00 root │ └─TableFullScan_9 10000.00 cop[tikv] table:th, partition:p1 keep order:false, stats:pseudo └─TableReader_12 10000.00 root data:TableFullScan_11 └─TableFullScan_11 10000.00 cop[tikv] table:th, partition:p2 keep order:false, stats:pseudo +set @@session.tidb_partition_prune_mode = 'dynamic'; +desc select * from th where a=-2; +id estRows task access object operator info +TableReader_7 10.00 root partition:p2 data:Selection_6 +└─Selection_6 10.00 cop[tikv] eq(test.th.a, -2) + └─TableFullScan_5 10000.00 cop[tikv] table:th keep order:false, stats:pseudo +desc select * from th; +id estRows task access object operator info +TableReader_5 10000.00 root partition:all data:TableFullScan_4 +└─TableFullScan_4 10000.00 cop[tikv] table:th keep order:false, stats:pseudo +desc select * from th partition (p2,p1); +id estRows task access object operator info +TableReader_5 10000.00 root partition:p1,p2 data:TableFullScan_4 +└─TableFullScan_4 10000.00 cop[tikv] table:th keep order:false, stats:pseudo +set @@session.tidb_partition_prune_mode = DEFAULT; drop table if exists t; create table t(a int, b int); explain format = 'brief' select a != any (select a from t t2) from t t1; diff --git a/cmd/explaintest/t/generated_columns.test b/cmd/explaintest/t/generated_columns.test index 82dfcf4d1d8c8..c007f8ff42e66 100644 --- a/cmd/explaintest/t/generated_columns.test +++ b/cmd/explaintest/t/generated_columns.test @@ -2,7 +2,6 @@ -- Most of the cases are ported from other tests to make sure generated columns behaves the same. -- Stored generated columns as indices -set @@tidb_partition_prune_mode='dynamic'; DROP TABLE IF EXISTS person; CREATE TABLE person ( @@ -74,6 +73,8 @@ EXPLAIN format = 'brief' SELECT * from sgc1 join sgc2 on sgc1.a=sgc2.a; -- Stored generated columns as partition columns +set @old_prune_mode = @@tidb_partition_prune_mode; +set @@tidb_partition_prune_mode='static'; DROP TABLE IF EXISTS sgc3; CREATE TABLE sgc3 ( j JSON, @@ -91,6 +92,26 @@ PARTITION max VALUES LESS THAN MAXVALUE); EXPLAIN format = 'brief' SELECT * FROM sgc3 WHERE a <= 1; EXPLAIN format = 'brief' SELECT * FROM sgc3 WHERE a < 7; +set @@tidb_partition_prune_mode='dynamic'; +DROP TABLE sgc3; +CREATE TABLE sgc3 ( +j JSON, +a INT AS (JSON_EXTRACT(j, "$.a")) STORED +) +PARTITION BY RANGE (a) ( +PARTITION p0 VALUES LESS THAN (1), +PARTITION p1 VALUES LESS THAN (2), +PARTITION p2 VALUES LESS THAN (3), +PARTITION p3 VALUES LESS THAN (4), +PARTITION p4 VALUES LESS THAN (5), +PARTITION p5 VALUES LESS THAN (6), +PARTITION max VALUES LESS THAN MAXVALUE); + +EXPLAIN format = 'brief' SELECT * FROM sgc3 WHERE a <= 1; +EXPLAIN format = 'brief' SELECT * FROM sgc3 WHERE a < 7; + +set @@tidb_partition_prune_mode = @old_prune_mode; + -- Virtual generated columns as indices DROP TABLE IF EXISTS t1; diff --git a/cmd/explaintest/t/select.test b/cmd/explaintest/t/select.test index 62d1a49ce629b..5efbbd77a0364 100644 --- a/cmd/explaintest/t/select.test +++ b/cmd/explaintest/t/select.test @@ -181,12 +181,18 @@ desc select sysdate(), sleep(1), sysdate(); # test select partition table drop table if exists th; set @@session.tidb_enable_table_partition = '1'; +set @@session.tidb_partition_prune_mode = 'static'; create table th (a int, b int) partition by hash(a) partitions 3; insert into th values (0,0),(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8); insert into th values (-1,-1),(-2,-2),(-3,-3),(-4,-4),(-5,-5),(-6,-6),(-7,-7),(-8,-8); desc select * from th where a=-2; desc select * from th; desc select * from th partition (p2,p1); +set @@session.tidb_partition_prune_mode = 'dynamic'; +desc select * from th where a=-2; +desc select * from th; +desc select * from th partition (p2,p1); +set @@session.tidb_partition_prune_mode = DEFAULT; # test != any(subq) and = all(subq) drop table if exists t; diff --git a/executor/analyze_test.go b/executor/analyze_test.go index 1dd9b426c0ec1..ea52cfdded744 100644 --- a/executor/analyze_test.go +++ b/executor/analyze_test.go @@ -660,6 +660,7 @@ func TestFailedAnalyzeRequest(t *testing.T) { tk.MustExec("set @@tidb_analyze_version = 1") require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/buildStatsFromResult", `return(true)`)) _, err := tk.Exec("analyze table t") + require.NotNil(t, err) require.Equal(t, "mock buildStatsFromResult error", err.Error()) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/buildStatsFromResult")) } diff --git a/executor/builder.go b/executor/builder.go index 7ff7b3445bfff..30ecf028e9dae 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -717,10 +717,10 @@ func (b *executorBuilder) buildSelectLock(v *plannercore.PhysicalLock) Executor return src } e := &SelectLockExec{ - baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID(), src), - Lock: v.Lock, - tblID2Handle: v.TblID2Handle, - partitionedTable: v.PartitionedTable, + baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID(), src), + Lock: v.Lock, + tblID2Handle: v.TblID2Handle, + tblID2PhysTblIDCol: v.TblID2PhysTblIDCol, } // filter out temporary tables because they do not store any record in tikv and should not write any lock @@ -736,16 +736,6 @@ func (b *executorBuilder) buildSelectLock(v *plannercore.PhysicalLock) Executor } } - if len(e.partitionedTable) > 0 { - schema := v.Schema() - e.tblID2PIDColumnIndex = make(map[int64]int) - for i := 0; i < len(v.ExtraPIDInfo.Columns); i++ { - col := v.ExtraPIDInfo.Columns[i] - tblID := v.ExtraPIDInfo.TblIDs[i] - offset := schema.ColumnIndex(col) - e.tblID2PIDColumnIndex[tblID] = offset - } - } return e } @@ -3170,9 +3160,6 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea storeType: v.StoreType, batchCop: v.BatchCop, } - if tbl.Meta().Partition != nil { - e.extraPIDColumnIndex = extraPIDColumnIndex(v.Schema()) - } e.buildVirtualColumnInfo() if containsLimit(dagReq.Executors) { e.feedback = statistics.NewQueryFeedback(0, nil, 0, ts.Desc) @@ -3203,15 +3190,6 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea return e, nil } -func extraPIDColumnIndex(schema *expression.Schema) offsetOptional { - for idx, col := range schema.Columns { - if col.ID == model.ExtraPidColID { - return newOffset(idx) - } - } - return 0 -} - func (b *executorBuilder) buildMPPGather(v *plannercore.PhysicalTableReader) Executor { startTs, err := b.getSnapshotTS() if err != nil { @@ -3618,9 +3596,6 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn tblPlans: v.TablePlans, PushedLimit: v.PushedLimit, } - if ok, _ := ts.IsPartition(); ok { - e.extraPIDColumnIndex = extraPIDColumnIndex(v.Schema()) - } if containsLimit(indexReq.Executors) { e.feedback = statistics.NewQueryFeedback(0, nil, 0, is.Desc) diff --git a/executor/delete.go b/executor/delete.go index 2d4425653c090..c859b41a83c75 100644 --- a/executor/delete.go +++ b/executor/delete.go @@ -122,7 +122,7 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error { datumRow := make([]types.Datum, 0, len(fields)) for i, field := range fields { - if columns[i].ID == model.ExtraPidColID { + if columns[i].ID == model.ExtraPidColID || columns[i].ID == model.ExtraPhysTblID { continue } diff --git a/executor/distsql.go b/executor/distsql.go index e0ac2cfb04063..4558066f6719c 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -386,9 +386,6 @@ type IndexLookUpExecutor struct { stats *IndexLookUpRunTimeStats - // extraPIDColumnIndex is used for partition reader to add an extra partition ID column, default -1 - extraPIDColumnIndex offsetOptional - // cancelFunc is called when close the executor cancelFunc context.CancelFunc @@ -676,18 +673,17 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, task *lookup table = task.partitionTable } tableReaderExec := &TableReaderExecutor{ - baseExecutor: newBaseExecutor(e.ctx, e.schema, e.getTableRootPlanID()), - table: table, - dagPB: e.tableRequest, - startTS: e.startTS, - readReplicaScope: e.readReplicaScope, - isStaleness: e.isStaleness, - columns: e.columns, - streaming: e.tableStreaming, - feedback: statistics.NewQueryFeedback(0, nil, 0, false), - corColInFilter: e.corColInTblSide, - plans: e.tblPlans, - extraPIDColumnIndex: e.extraPIDColumnIndex, + baseExecutor: newBaseExecutor(e.ctx, e.schema, e.getTableRootPlanID()), + table: table, + dagPB: e.tableRequest, + startTS: e.startTS, + readReplicaScope: e.readReplicaScope, + isStaleness: e.isStaleness, + columns: e.columns, + streaming: e.tableStreaming, + feedback: statistics.NewQueryFeedback(0, nil, 0, false), + corColInFilter: e.corColInTblSide, + plans: e.tblPlans, } tableReaderExec.buildVirtualColumnInfo() tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, tableReaderExec, task.handles, true) diff --git a/executor/executor.go b/executor/executor.go index fa4be26da43ff..24d6328d5e6ad 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -930,21 +930,47 @@ type SelectLockExec struct { Lock *ast.SelectLockInfo keys []kv.Key + // The children may be a join of multiple tables, so we need a map. tblID2Handle map[int64][]plannercore.HandleCols - // All the partition tables in the children of this executor. - partitionedTable []table.PartitionedTable + // When SelectLock work on a partition table, we need the partition ID + // (Physical Table ID) instead of the 'logical' table ID to calculate + // the lock KV. In that case, the Physical Table ID is extracted + // from the row key in the store and as an extra column in the chunk row. - // When SelectLock work on the partition table, we need the partition ID - // instead of table ID to calculate the lock KV. In that case, partition ID is store as an - // extra column in the chunk row. - // tblID2PIDColumnIndex stores the column index in the chunk row. The children may be join - // of multiple tables, so the map struct is used. - tblID2PIDColumnIndex map[int64]int + // tblID2PhyTblIDCol is used for partitioned tables. + // The child executor need to return an extra column containing + // the Physical Table ID (i.e. from which partition the row came from) + // Used during building + tblID2PhysTblIDCol map[int64]*expression.Column + + // Used during execution + // Map from logic tableID to column index where the physical table id is stored + // For dynamic prune mode, model.ExtraPhysTblID columns are requested from + // storage and used for physical table id + // For static prune mode, model.ExtraPhysTblID is still sent to storage/Protobuf + // but could be filled in by the partitions TableReaderExecutor + // due to issues with chunk handling between the TableReaderExecutor and the + // SelectReader result. + tblID2PhysTblIDColIdx map[int64]int } // Open implements the Executor Open interface. func (e *SelectLockExec) Open(ctx context.Context) error { + if len(e.tblID2PhysTblIDCol) > 0 { + e.tblID2PhysTblIDColIdx = make(map[int64]int) + cols := e.Schema().Columns + for i := len(cols) - 1; i >= 0; i-- { + if cols[i].ID == model.ExtraPhysTblID { + for tblID, col := range e.tblID2PhysTblIDCol { + if cols[i].UniqueID == col.UniqueID { + e.tblID2PhysTblIDColIdx[tblID] = i + break + } + } + } + } + } return e.baseExecutor.Open(ctx) } @@ -963,23 +989,17 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error { if req.NumRows() > 0 { iter := chunk.NewIterator4Chunk(req) for row := iter.Begin(); row != iter.End(); row = iter.Next() { - - for id, cols := range e.tblID2Handle { - physicalID := id - if len(e.partitionedTable) > 0 { - // Replace the table ID with partition ID. - // The partition ID is returned as an extra column from the table reader. - if offset, ok := e.tblID2PIDColumnIndex[id]; ok { - physicalID = row.GetInt64(offset) - } - } - + for tblID, cols := range e.tblID2Handle { for _, col := range cols { handle, err := col.BuildHandle(row) if err != nil { return err } - e.keys = append(e.keys, tablecodec.EncodeRowKeyWithHandle(physicalID, handle)) + physTblID := tblID + if physTblColIdx, ok := e.tblID2PhysTblIDColIdx[tblID]; ok { + physTblID = row.GetInt64(physTblColIdx) + } + e.keys = append(e.keys, tablecodec.EncodeRowKeyWithHandle(physTblID, handle)) } } } @@ -992,16 +1012,8 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error { lockWaitTime = int64(e.Lock.WaitSec) * 1000 } - if len(e.tblID2Handle) > 0 { - for id := range e.tblID2Handle { - e.updateDeltaForTableID(id) - } - } - if len(e.partitionedTable) > 0 { - for _, p := range e.partitionedTable { - pid := p.Meta().ID - e.updateDeltaForTableID(pid) - } + for id := range e.tblID2Handle { + e.updateDeltaForTableID(id) } return doLockKeys(ctx, e.ctx, newLockCtx(e.ctx.GetSessionVars(), lockWaitTime, len(e.keys)), e.keys...) diff --git a/executor/executor_test.go b/executor/executor_test.go index 2bbc5653d14d4..eabbd42234d0c 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -9805,6 +9805,37 @@ func (s *testSerialSuite) TestFix31537(c *C) { tk.MustQuery(`trace plan SELECT T_ID, T_S_SYMB, T_QTY, ST_NAME, TH_DTS FROM ( SELECT T_ID AS ID FROM TRADE WHERE T_CA_ID = 43000014236 ORDER BY T_DTS DESC LIMIT 10 ) T, TRADE, TRADE_HISTORY, STATUS_TYPE WHERE TRADE.T_ID = ID AND TRADE_HISTORY.TH_T_ID = TRADE.T_ID AND STATUS_TYPE.ST_ID = TRADE_HISTORY.TH_ST_ID ORDER BY TH_DTS DESC LIMIT 30;`) } +func (s *testSuiteP1) TestIssue30382(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("set @@session.tidb_enable_list_partition = ON;") + tk.MustExec("drop table if exists t1, t2;") + tk.MustExec("create table t1 (c_int int, c_str varchar(40), c_decimal decimal(12, 6), primary key (c_int) , key(c_str(2)) , key(c_decimal) ) partition by list (c_int) ( partition p0 values IN (1, 5, 9, 13, 17, 21, 25, 29, 33, 37), partition p1 values IN (2, 6, 10, 14, 18, 22, 26, 30, 34, 38), partition p2 values IN (3, 7, 11, 15, 19, 23, 27, 31, 35, 39), partition p3 values IN (4, 8, 12, 16, 20, 24, 28, 32, 36, 40)) ;") + tk.MustExec("create table t2 (c_int int, c_str varchar(40), c_decimal decimal(12, 6), primary key (c_int) , key(c_str) , key(c_decimal) ) partition by hash (c_int) partitions 4;") + tk.MustExec("insert into t1 values (6, 'musing mayer', 1.280), (7, 'wizardly heisenberg', 6.589), (8, 'optimistic swirles', 9.633), (9, 'hungry haslett', 2.659), (10, 'stupefied wiles', 2.336);") + tk.MustExec("insert into t2 select * from t1 ;") + tk.MustExec("begin;") + tk.MustQuery("select * from t1 where c_str <> any (select c_str from t2 where c_decimal < 5) for update;").Sort().Check(testkit.Rows( + "10 stupefied wiles 2.336000", + "6 musing mayer 1.280000", + "7 wizardly heisenberg 6.589000", + "8 optimistic swirles 9.633000", + "9 hungry haslett 2.659000")) + tk.MustQuery("explain format = 'brief' select * from t1 where c_str <> any (select c_str from t2 where c_decimal < 5) for update;").Check(testkit.Rows( + "SelectLock 6400.00 root for update 0", + "└─HashJoin 6400.00 root CARTESIAN inner join, other cond:or(gt(Column#8, 1), or(ne(test.t1.c_str, Column#7), if(ne(Column#9, 0), NULL, 0)))", + " ├─Selection(Build) 0.80 root ne(Column#10, 0)", + " │ └─StreamAgg 1.00 root funcs:max(Column#17)->Column#7, funcs:count(distinct Column#18)->Column#8, funcs:sum(Column#19)->Column#9, funcs:count(1)->Column#10", + " │ └─Projection 3323.33 root test.t2.c_str, test.t2.c_str, cast(isnull(test.t2.c_str), decimal(20,0) BINARY)->Column#19", + " │ └─TableReader 3323.33 root partition:all data:Selection", + " │ └─Selection 3323.33 cop[tikv] lt(test.t2.c_decimal, 5)", + " │ └─TableFullScan 10000.00 cop[tikv] table:t2 keep order:false, stats:pseudo", + " └─TableReader(Probe) 8000.00 root partition:all data:Selection", + " └─Selection 8000.00 cop[tikv] if(isnull(test.t1.c_str), NULL, 1)", + " └─TableFullScan 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo")) + tk.MustExec("commit") +} + func (s *testSerialSuite) TestEncodingSet(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") diff --git a/executor/index_merge_reader_test.go b/executor/index_merge_reader_test.go index ad57e3d050b40..fb484898ccdfa 100644 --- a/executor/index_merge_reader_test.go +++ b/executor/index_merge_reader_test.go @@ -21,6 +21,7 @@ import ( "strconv" "strings" "testing" + "time" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/util" @@ -500,3 +501,82 @@ func TestIndexMergeSplitTable(t *testing.T) { tk.MustExec("SPLIT TABLE tab2 BY (5);") tk.MustQuery("SELECT /*+ use_index_merge(tab2) */ pk FROM tab2 WHERE (col4 > 565.89 OR col0 > 68 ) and col0 > 10 order by 1;").Check(testkit.Rows("0", "1", "2", "3", "4", "5", "6", "7")) } + +func TestPessimisticLockOnPartitionForIndexMerge(t *testing.T) { + // Same purpose with TestPessimisticLockOnPartition, but test IndexMergeReader. + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + tk.MustExec("drop table if exists t1, t2") + tk.MustExec(`create table t1 (c_datetime datetime, c1 int, c2 int, primary key (c_datetime), key(c1), key(c2)) + partition by range (to_days(c_datetime)) ( + partition p0 values less than (to_days('2020-02-01')), + partition p1 values less than (to_days('2020-04-01')), + partition p2 values less than (to_days('2020-06-01')), + partition p3 values less than maxvalue)`) + tk.MustExec("create table t2 (c_datetime datetime, unique key(c_datetime))") + tk.MustExec("insert into t1 values ('2020-06-26 03:24:00', 1, 1), ('2020-02-21 07:15:33', 2, 2), ('2020-04-27 13:50:58', 3, 3)") + tk.MustExec("insert into t2 values ('2020-01-10 09:36:00'), ('2020-02-04 06:00:00'), ('2020-06-12 03:45:18')") + tk.MustExec("analyze table t1") + tk.MustExec("analyze table t2") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + tk1.MustExec("set @@tidb_partition_prune_mode = 'static'") + + tk.MustExec("set @@tidb_partition_prune_mode = 'static'") + tk.MustExec("begin pessimistic") + tk.MustQuery(`explain format='brief' select /*+ use_index_merge(t1) */ c1 from t1 join t2 + on t1.c_datetime >= t2.c_datetime + where t1.c1 < 10 or t1.c2 < 10 for update`).Check(testkit.Rows( + "Projection 16635.64 root test.t1.c1", + "└─SelectLock 16635.64 root for update 0", + " └─Projection 16635.64 root test.t1.c1, test.t1._tidb_rowid, test.t1._tidb_tid, test.t2._tidb_rowid", + " └─HashJoin 16635.64 root CARTESIAN inner join, other cond:ge(test.t1.c_datetime, test.t2.c_datetime)", + " ├─IndexReader(Build) 3.00 root index:IndexFullScan", + " │ └─IndexFullScan 3.00 cop[tikv] table:t2, index:c_datetime(c_datetime) keep order:false", + " └─PartitionUnion(Probe) 5545.21 root ", + " ├─IndexMerge 5542.21 root ", + " │ ├─IndexRangeScan(Build) 3323.33 cop[tikv] table:t1, partition:p0, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo", + " │ ├─IndexRangeScan(Build) 3323.33 cop[tikv] table:t1, partition:p0, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo", + " │ └─TableRowIDScan(Probe) 5542.21 cop[tikv] table:t1, partition:p0 keep order:false, stats:pseudo", + " ├─IndexMerge 1.00 root ", + " │ ├─IndexRangeScan(Build) 1.00 cop[tikv] table:t1, partition:p1, index:c1(c1) range:[-inf,10), keep order:false", + " │ ├─IndexRangeScan(Build) 1.00 cop[tikv] table:t1, partition:p1, index:c2(c2) range:[-inf,10), keep order:false", + " │ └─TableRowIDScan(Probe) 1.00 cop[tikv] table:t1, partition:p1 keep order:false", + " ├─IndexMerge 1.00 root ", + " │ ├─IndexRangeScan(Build) 1.00 cop[tikv] table:t1, partition:p2, index:c1(c1) range:[-inf,10), keep order:false", + " │ ├─IndexRangeScan(Build) 1.00 cop[tikv] table:t1, partition:p2, index:c2(c2) range:[-inf,10), keep order:false", + " │ └─TableRowIDScan(Probe) 1.00 cop[tikv] table:t1, partition:p2 keep order:false", + " └─IndexMerge 1.00 root ", + " ├─IndexRangeScan(Build) 1.00 cop[tikv] table:t1, partition:p3, index:c1(c1) range:[-inf,10), keep order:false", + " ├─IndexRangeScan(Build) 1.00 cop[tikv] table:t1, partition:p3, index:c2(c2) range:[-inf,10), keep order:false", + " └─TableRowIDScan(Probe) 1.00 cop[tikv] table:t1, partition:p3 keep order:false", + )) + tk.MustQuery(`select /*+ use_index_merge(t1) */ c1 from t1 join t2 + on t1.c_datetime >= t2.c_datetime + where t1.c1 < 10 or t1.c2 < 10 for update`).Sort().Check(testkit.Rows("1", "1", "1", "2", "2", "3", "3")) + tk1.MustExec("begin pessimistic") + + ch := make(chan int32, 5) + go func() { + tk1.MustExec("update t1 set c_datetime = '2020-06-26 03:24:00' where c1 = 1") + ch <- 0 + tk1.MustExec("rollback") + ch <- 0 + }() + + // Leave 50ms for tk1 to run, tk1 should be blocked at the update operation. + time.Sleep(50 * time.Millisecond) + ch <- 1 + + tk.MustExec("commit") + // tk1 should be blocked until tk commit, check the order. + require.Equal(t, <-ch, int32(1)) + require.Equal(t, <-ch, int32(0)) + <-ch // wait for goroutine to quit. + + // TODO: add support for index merge reader in dynamic tidb_partition_prune_mode +} diff --git a/executor/partition_table_test.go b/executor/partition_table_test.go index 7d4a50a369f8a..257199359370f 100644 --- a/executor/partition_table_test.go +++ b/executor/partition_table_test.go @@ -1828,13 +1828,20 @@ func TestPartitionPruningInTransaction(t *testing.T) { tk.MustExec("create database test_pruning_transaction") defer tk.MustExec(`drop database test_pruning_transaction`) tk.MustExec("use test_pruning_transaction") - tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic'") tk.MustExec(`create table t(a int, b int) partition by range(a) (partition p0 values less than(3), partition p1 values less than (5), partition p2 values less than(11))`) + tk.MustExec("set @@tidb_partition_prune_mode = 'static'") tk.MustExec(`begin`) tk.MustPartitionByList(`select * from t`, []string{"p0", "p1", "p2"}) tk.MustPartitionByList(`select * from t where a > 3`, []string{"p1", "p2"}) // partition pruning can work in transactions tk.MustPartitionByList(`select * from t where a > 7`, []string{"p2"}) tk.MustExec(`rollback`) + tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic'") + tk.MustExec(`begin`) + tk.MustPartition(`select * from t`, "all") + tk.MustPartition(`select * from t where a > 3`, "p1,p2") // partition pruning can work in transactions + tk.MustPartition(`select * from t where a > 7`, "p2") + tk.MustExec(`rollback`) + tk.MustExec("set @@tidb_partition_prune_mode = default") } func TestIssue25253(t *testing.T) { @@ -2983,8 +2990,8 @@ partition p2 values less than (11))`) } partitionModes := []string{ - "'dynamic-only'", - "'static-only'", + "'dynamic'", + "'static'", } testCases := []func(){ optimisticTableReader, @@ -3262,3 +3269,163 @@ func TestIssue26251(t *testing.T) { <-ch tk2.MustExec("rollback") } + +func TestLeftJoinForUpdate(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("create database TestLeftJoinForUpdate") + defer tk1.MustExec("drop database TestLeftJoinForUpdate") + tk1.MustExec("use TestLeftJoinForUpdate") + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("use TestLeftJoinForUpdate") + tk3 := testkit.NewTestKit(t, store) + tk3.MustExec("use TestLeftJoinForUpdate") + + tk1.MustExec("drop table if exists nt, pt") + tk1.MustExec("create table nt (id int, col varchar(32), primary key (id))") + tk1.MustExec("create table pt (id int, col varchar(32), primary key (id)) partition by hash(id) partitions 4") + + resetData := func() { + tk1.MustExec("truncate table nt") + tk1.MustExec("truncate table pt") + tk1.MustExec("insert into nt values (1, 'hello')") + tk1.MustExec("insert into pt values (2, 'test')") + } + + // ========================== First round of test ================== + // partition table left join normal table. + // ================================================================= + resetData() + ch := make(chan int, 10) + tk1.MustExec("begin pessimistic") + // No union scan + tk1.MustQuery("select * from pt left join nt on pt.id = nt.id for update").Check(testkit.Rows("2 test ")) + go func() { + // Check the key is locked. + tk2.MustExec("update pt set col = 'xxx' where id = 2") + ch <- 2 + }() + + // Union scan + tk1.MustExec("insert into pt values (1, 'world')") + tk1.MustQuery("select * from pt left join nt on pt.id = nt.id for update").Sort().Check(testkit.Rows("1 world 1 hello", "2 test ")) + go func() { + // Check the key is locked. + tk3.MustExec("update nt set col = 'yyy' where id = 1") + ch <- 3 + }() + + // Give chance for the goroutines to run first. + time.Sleep(80 * time.Millisecond) + ch <- 1 + tk1.MustExec("rollback") + + checkOrder := func() { + require.Equal(t, <-ch, 1) + v1 := <-ch + v2 := <-ch + require.True(t, (v1 == 2 && v2 == 3) || (v1 == 3 && v2 == 2)) + } + checkOrder() + + // ========================== Another round of test ================== + // normal table left join partition table. + // =================================================================== + resetData() + tk1.MustExec("begin pessimistic") + // No union scan + tk1.MustQuery("select * from nt left join pt on pt.id = nt.id for update").Check(testkit.Rows("1 hello ")) + + // Union scan + tk1.MustExec("insert into pt values (1, 'world')") + tk1.MustQuery("select * from nt left join pt on pt.id = nt.id for update").Check(testkit.Rows("1 hello 1 world")) + go func() { + tk2.MustExec("replace into pt values (1, 'aaa')") + ch <- 2 + }() + go func() { + tk3.MustExec("update nt set col = 'bbb' where id = 1") + ch <- 3 + }() + time.Sleep(80 * time.Millisecond) + ch <- 1 + tk1.MustExec("rollback") + checkOrder() +} + +func TestIssue31024(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("create database TestIssue31024") + defer tk1.MustExec("drop database TestIssue31024") + tk1.MustExec("use TestIssue31024") + tk1.MustExec("create table t1 (c_datetime datetime, c1 int, c2 int, primary key (c_datetime), key(c1), key(c2))" + + " partition by range (to_days(c_datetime)) " + + "( partition p0 values less than (to_days('2020-02-01'))," + + " partition p1 values less than (to_days('2020-04-01'))," + + " partition p2 values less than (to_days('2020-06-01'))," + + " partition p3 values less than maxvalue)") + tk1.MustExec("create table t2 (c_datetime datetime, unique key(c_datetime))") + tk1.MustExec("insert into t1 values ('2020-06-26 03:24:00', 1, 1), ('2020-02-21 07:15:33', 2, 2), ('2020-04-27 13:50:58', 3, 3)") + tk1.MustExec("insert into t2 values ('2020-01-10 09:36:00'), ('2020-02-04 06:00:00'), ('2020-06-12 03:45:18')") + tk1.MustExec("SET GLOBAL tidb_txn_mode = 'pessimistic'") + + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("use TestIssue31024") + + ch := make(chan int, 10) + tk1.MustExec("set @@tidb_partition_prune_mode='dynamic'") + tk1.MustExec("begin pessimistic") + tk1.MustQuery("select /*+ use_index_merge(t1) */ * from t1 join t2 on t1.c_datetime >= t2.c_datetime where t1.c1 < 10 or t1.c2 < 10 for update") + + go func() { + // Check the key is locked. + tk2.MustExec("set @@tidb_partition_prune_mode='dynamic'") + tk2.MustExec("begin pessimistic") + tk2.MustExec("update t1 set c_datetime = '2020-06-26 03:24:00' where c1 = 1") + ch <- 2 + }() + + // Give chance for the goroutines to run first. + time.Sleep(80 * time.Millisecond) + ch <- 1 + tk1.MustExec("rollback") + + require.Equal(t, <-ch, 1) + require.Equal(t, <-ch, 2) + + tk2.MustExec("rollback") +} + +func TestIssue27346(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("create database TestIssue27346") + defer tk1.MustExec("drop database TestIssue27346") + tk1.MustExec("use TestIssue27346") + + tk1.MustExec("set @@tidb_enable_index_merge=1,@@tidb_partition_prune_mode='dynamic'") + + tk1.MustExec("DROP TABLE IF EXISTS `tbl_18`") + tk1.MustExec("CREATE TABLE `tbl_18` (`col_119` binary(16) NOT NULL DEFAULT 'skPoKiwYUi',`col_120` int(10) unsigned NOT NULL,`col_121` timestamp NOT NULL,`col_122` double NOT NULL DEFAULT '3937.1887880628115',`col_123` bigint(20) NOT NULL DEFAULT '3550098074891542725',PRIMARY KEY (`col_123`,`col_121`,`col_122`,`col_120`) CLUSTERED,UNIQUE KEY `idx_103` (`col_123`,`col_119`,`col_120`),UNIQUE KEY `idx_104` (`col_122`,`col_120`),UNIQUE KEY `idx_105` (`col_119`,`col_120`),KEY `idx_106` (`col_121`,`col_120`,`col_122`,`col_119`),KEY `idx_107` (`col_121`)) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_general_ci PARTITION BY HASH( `col_120` ) PARTITIONS 3") + tk1.MustExec("INSERT INTO tbl_18 (`col_119`, `col_120`, `col_121`, `col_122`, `col_123`) VALUES (X'736b506f4b6977595569000000000000', 672436701, '1974-02-24 00:00:00', 3937.1887880628115e0, -7373106839136381229), (X'736b506f4b6977595569000000000000', 2637316689, '1993-10-29 00:00:00', 3937.1887880628115e0, -4522626077860026631), (X'736b506f4b6977595569000000000000', 831809724, '1995-11-20 00:00:00', 3937.1887880628115e0, -4426441253940231780), (X'736b506f4b6977595569000000000000', 1588592628, '2001-03-28 00:00:00', 3937.1887880628115e0, 1329207475772244999), (X'736b506f4b6977595569000000000000', 3908038471, '2031-06-06 00:00:00', 3937.1887880628115e0, -6562815696723135786), (X'736b506f4b6977595569000000000000', 1674237178, '2001-10-24 00:00:00', 3937.1887880628115e0, -6459065549188938772), (X'736b506f4b6977595569000000000000', 3507075493, '2010-03-25 00:00:00', 3937.1887880628115e0, -4329597025765326929), (X'736b506f4b6977595569000000000000', 1276461709, '2019-07-20 00:00:00', 3937.1887880628115e0, 3550098074891542725)") + + tk1.MustQuery("select col_120,col_122,col_123 from tbl_18 where tbl_18.col_122 = 4763.320888074281 and not( tbl_18.col_121 in ( '2032-11-01' , '1975-05-21' , '1994-05-16' , '1984-01-15' ) ) or not( tbl_18.col_121 >= '2008-10-24' ) order by tbl_18.col_119,tbl_18.col_120,tbl_18.col_121,tbl_18.col_122,tbl_18.col_123 limit 919 for update").Sort().Check(testkit.Rows( + "1588592628 3937.1887880628115 1329207475772244999", + "1674237178 3937.1887880628115 -6459065549188938772", + "2637316689 3937.1887880628115 -4522626077860026631", + "672436701 3937.1887880628115 -7373106839136381229", + "831809724 3937.1887880628115 -4426441253940231780")) + tk1.MustQuery("select /*+ use_index_merge( tbl_18 ) */ col_120,col_122,col_123 from tbl_18 where tbl_18.col_122 = 4763.320888074281 and not( tbl_18.col_121 in ( '2032-11-01' , '1975-05-21' , '1994-05-16' , '1984-01-15' ) ) or not( tbl_18.col_121 >= '2008-10-24' ) order by tbl_18.col_119,tbl_18.col_120,tbl_18.col_121,tbl_18.col_122,tbl_18.col_123 limit 919 for update").Sort().Check(testkit.Rows( + "1588592628 3937.1887880628115 1329207475772244999", + "1674237178 3937.1887880628115 -6459065549188938772", + "2637316689 3937.1887880628115 -4522626077860026631", + "672436701 3937.1887880628115 -7373106839136381229", + "831809724 3937.1887880628115 -4426441253940231780")) +} diff --git a/executor/table_reader.go b/executor/table_reader.go index 4b7678ebd961f..3057a3dfd1115 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -23,7 +23,6 @@ import ( "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/parser/mysql" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/statistics" @@ -109,29 +108,11 @@ type TableReaderExecutor struct { // batchCop indicates whether use super batch coprocessor request, only works for TiFlash engine. batchCop bool - // extraPIDColumnIndex is used for partition reader to add an extra partition ID column. - extraPIDColumnIndex offsetOptional - // If dummy flag is set, this is not a real TableReader, it just provides the KV ranges for UnionScan. // Used by the temporary table, cached table. dummy bool } -// offsetOptional may be a positive integer, or invalid. -type offsetOptional int - -func newOffset(i int) offsetOptional { - return offsetOptional(i + 1) -} - -func (i offsetOptional) valid() bool { - return i != 0 -} - -func (i offsetOptional) value() int { - return int(i - 1) -} - // Table implements the dataSourceExecutor interface. func (e *TableReaderExecutor) Table() table.Table { return e.table @@ -250,26 +231,9 @@ func (e *TableReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error return err } - // When 'select ... for update' work on a partitioned table, the table reader should - // add the partition ID as an extra column. The SelectLockExec need this information - // to construct the lock key. - physicalID := getPhysicalTableID(e.table) - if e.extraPIDColumnIndex.valid() { - fillExtraPIDColumn(req, e.extraPIDColumnIndex.value(), physicalID) - } - return nil } -func fillExtraPIDColumn(req *chunk.Chunk, extraPIDColumnIndex int, physicalID int64) { - numRows := req.NumRows() - pidColumn := chunk.NewColumn(types.NewFieldType(mysql.TypeLonglong), numRows) - for i := 0; i < numRows; i++ { - pidColumn.AppendInt64(physicalID) - } - req.SetCol(extraPIDColumnIndex, pidColumn) -} - // Close implements the Executor Close interface. func (e *TableReaderExecutor) Close() error { if e.dummy { diff --git a/executor/union_scan.go b/executor/union_scan.go index 36af94bd07db8..d13505f3a84cf 100644 --- a/executor/union_scan.go +++ b/executor/union_scan.go @@ -62,6 +62,11 @@ type UnionScanExec struct { // cacheTable not nil means it's reading from cached table. cacheTable kv.MemBuffer collators []collate.Collator + + // If partitioned table and the physical table id is encoded in the chuck at this column index + // used with dynamic prune mode + // < 0 if not used. + physTblIDIdx int } // Open implements the Executor Open interface. @@ -93,6 +98,13 @@ func (us *UnionScanExec) open(ctx context.Context) error { return err } + us.physTblIDIdx = -1 + for i := len(us.columns) - 1; i >= 0; i-- { + if us.columns[i].ID == model.ExtraPhysTblID { + us.physTblIDIdx = i + break + } + } mb := txn.GetMemBuffer() mb.RLock() defer mb.RUnlock() @@ -237,7 +249,13 @@ func (us *UnionScanExec) getSnapshotRow(ctx context.Context) ([]types.Datum, err if err != nil { return nil, err } - checkKey := tablecodec.EncodeRecordKey(us.table.RecordPrefix(), snapshotHandle) + var checkKey kv.Key + if us.physTblIDIdx >= 0 { + tblID := row.GetInt64(us.physTblIDIdx) + checkKey = tablecodec.EncodeRowKeyWithHandle(tblID, snapshotHandle) + } else { + checkKey = tablecodec.EncodeRecordKey(us.table.RecordPrefix(), snapshotHandle) + } if _, err := us.memBufSnap.Get(context.TODO(), checkKey); err == nil { // If src handle appears in added rows, it means there is conflict and the transaction will fail to // commit, but for simplicity, we don't handle it here. diff --git a/executor/union_scan_test.go b/executor/union_scan_test.go index bdf521427dcda..41250fe0980bc 100644 --- a/executor/union_scan_test.go +++ b/executor/union_scan_test.go @@ -19,6 +19,7 @@ import ( "testing" "github.com/pingcap/tidb/executor" + "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/util/benchdaily" "github.com/stretchr/testify/require" @@ -419,6 +420,43 @@ func TestForApplyAndUnionScan(t *testing.T) { tk.MustExec("rollback") } +func TestIssue28073(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t1, t2") + tk.MustExec("create table t1 (c_int int, c_str varchar(40), primary key (c_int, c_str) , key(c_int)) partition by hash (c_int) partitions 4") + tk.MustExec("create table t2 like t1") + tk.MustExec("insert into t1 values (1, 'flamboyant mcclintock')") + tk.MustExec("insert into t2 select * from t1") + + tk.MustExec("begin") + tk.MustExec("insert into t2 (c_int, c_str) values (2, 'romantic grothendieck')") + tk.MustQuery("select * from t2 left join t1 on t1.c_int = t2.c_int for update").Sort().Check( + testkit.Rows( + "1 flamboyant mcclintock 1 flamboyant mcclintock", + "2 romantic grothendieck ", + )) + tk.MustExec("commit") + + // Check no key is written to table ID 0 + txn, err := store.Begin() + require.NoError(t, err) + start := tablecodec.EncodeTablePrefix(0) + end := tablecodec.EncodeTablePrefix(1) + iter, err := txn.Iter(start, end) + require.NoError(t, err) + + exist := false + for iter.Valid() { + require.Nil(t, iter.Next()) + exist = true + break + } + require.False(t, exist) +} + func BenchmarkUnionScanRead(b *testing.B) { store, clean := testkit.CreateMockStore(b) defer clean() diff --git a/parser/model/model.go b/parser/model/model.go index d5020d521b447..8ce7c6738082d 100644 --- a/parser/model/model.go +++ b/parser/model/model.go @@ -234,6 +234,12 @@ const ExtraHandleID = -1 // ExtraPidColID is the column ID of column which store the partitionID decoded in global index values. const ExtraPidColID = -2 +// ExtraPhysTblID is the column ID of column that should be filled in with the physical table id. +// Primarily used for table partition dynamic prune mode, to return which partition (physical table id) the row came from. +// Using a dedicated id for this, since in the future ExtraPidColID and ExtraPhysTblID may be used for the same request. +// Must be after ExtraPidColID! +const ExtraPhysTblID = -3 + const ( // TableInfoVersion0 means the table info version is 0. // Upgrade from v2.1.1 or v2.1.2 to v2.1.3 and later, and then execute a "change/modify column" statement @@ -275,6 +281,9 @@ var ExtraHandleName = NewCIStr("_tidb_rowid") // ExtraPartitionIdName is the name of ExtraPartitionId Column. var ExtraPartitionIdName = NewCIStr("_tidb_pid") +// ExtraPhysTblIdName is the name of ExtraPhysTblID Column. +var ExtraPhysTblIdName = NewCIStr("_tidb_tid") + // TableInfo provides meta data describing a DB table. type TableInfo struct { ID int64 `json:"id"` @@ -652,6 +661,17 @@ func NewExtraPartitionIDColInfo() *ColumnInfo { return colInfo } +// NewExtraPhysTblIDColInfo mocks a column info for extra partition id column. +func NewExtraPhysTblIDColInfo() *ColumnInfo { + colInfo := &ColumnInfo{ + ID: ExtraPhysTblID, + Name: ExtraPhysTblIdName, + } + colInfo.Tp = mysql.TypeLonglong + colInfo.Flen, colInfo.Decimal = mysql.GetDefaultFieldLengthAndDecimal(mysql.TypeLonglong) + return colInfo +} + // ColumnIsInIndex checks whether c is included in any indices of t. func (t *TableInfo) ColumnIsInIndex(c *ColumnInfo) bool { for _, index := range t.Indices { diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index 3fe30d6b52c50..80c506051a735 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -2757,10 +2757,9 @@ func (p *LogicalLock) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]P } childProp := prop.CloneEssentialFields() lock := PhysicalLock{ - Lock: p.Lock, - TblID2Handle: p.tblID2Handle, - PartitionedTable: p.partitionedTable, - ExtraPIDInfo: p.extraPIDInfo, + Lock: p.Lock, + TblID2Handle: p.tblID2Handle, + TblID2PhysTblIDCol: p.tblID2PhysTblIDCol, }.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), childProp) return []PhysicalPlan{lock}, true, nil } diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index a73759eb09961..21dcc26f95e15 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -3182,7 +3182,7 @@ func unfoldWildStar(field *ast.SelectField, outputName types.NameSlice, column [ } if (dbName.L == "" || dbName.L == name.DBName.L) && (tblName.L == "" || tblName.L == name.TblName.L) && - col.ID != model.ExtraHandleID { + col.ID != model.ExtraHandleID && col.ID != model.ExtraPidColID && col.ID != model.ExtraPhysTblID { colName := &ast.ColumnNameExpr{ Name: &ast.ColumnName{ Schema: name.DBName, @@ -3781,30 +3781,36 @@ func (ds *DataSource) newExtraHandleSchemaCol() *expression.Column { } } -// addExtraPIDColumn add an extra PID column for partition table. +// AddExtraPhysTblIDColumn for partition table. // 'select ... for update' on a partition table need to know the partition ID // to construct the lock key, so this column is added to the chunk row. -func (ds *DataSource) addExtraPIDColumn(info *extraPIDInfo) { +// Also needed for checking against the sessions transaction buffer +func (ds *DataSource) AddExtraPhysTblIDColumn() *expression.Column { + // Avoid adding multiple times (should never happen!) + cols := ds.TblCols + for i := len(cols) - 1; i >= 0; i-- { + if cols[i].ID == model.ExtraPhysTblID { + return cols[i] + } + } pidCol := &expression.Column{ RetType: types.NewFieldType(mysql.TypeLonglong), UniqueID: ds.ctx.GetSessionVars().AllocPlanColumnID(), - ID: model.ExtraPidColID, - OrigName: fmt.Sprintf("%v.%v.%v", ds.DBName, ds.tableInfo.Name, model.ExtraPartitionIdName), + ID: model.ExtraPhysTblID, + OrigName: fmt.Sprintf("%v.%v.%v", ds.DBName, ds.tableInfo.Name, model.ExtraPhysTblIdName), } - ds.Columns = append(ds.Columns, model.NewExtraPartitionIDColInfo()) + ds.Columns = append(ds.Columns, model.NewExtraPhysTblIDColInfo()) schema := ds.Schema() schema.Append(pidCol) ds.names = append(ds.names, &types.FieldName{ DBName: ds.DBName, TblName: ds.TableInfo().Name, - ColName: model.ExtraPartitionIdName, - OrigColName: model.ExtraPartitionIdName, + ColName: model.ExtraPhysTblIdName, + OrigColName: model.ExtraPhysTblIdName, }) ds.TblCols = append(ds.TblCols, pidCol) - - info.Columns = append(info.Columns, pidCol) - info.TblIDs = append(info.TblIDs, ds.TableInfo().ID) + return pidCol } var ( @@ -4210,9 +4216,17 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as if dirty || tableInfo.TempTableType == model.TempTableLocal { us := LogicalUnionScan{handleCols: handleCols}.Init(b.ctx, b.getSelectOffset()) us.SetChildren(ds) + if tableInfo.Partition != nil && b.optFlag&flagPartitionProcessor == 0 { + // Adding ExtraPhysTblIDCol for UnionScan (transaction buffer handling) + // Not using old static prune mode + // Single TableReader for all partitions, needs the PhysTblID from storage + _ = ds.AddExtraPhysTblIDColumn() + } result = us } + // Adding ExtraPhysTblIDCol for SelectLock (SELECT FOR UPDATE) is done when building SelectLock + if sessionVars.StmtCtx.TblInfo2UnionScan == nil { sessionVars.StmtCtx.TblInfo2UnionScan = make(map[*model.TableInfo]bool) } diff --git a/planner/core/logical_plans.go b/planner/core/logical_plans.go index e7c5dde77c110..af37fb030fbda 100644 --- a/planner/core/logical_plans.go +++ b/planner/core/logical_plans.go @@ -1055,26 +1055,17 @@ type LogicalLimit struct { limitHints limitHintInfo } -// extraPIDInfo is used by SelectLock on partitioned table, the TableReader need -// to return the partition id column. -// Because SelectLock has to used that partition id to encode the lock key. -// the child of SelectLock may be Join, so that table can be multiple extra PID columns. -// fields are for each of the table, and TblIDs are the corresponding table IDs. -type extraPIDInfo struct { - Columns []*expression.Column - TblIDs []int64 -} - // LogicalLock represents a select lock plan. type LogicalLock struct { baseLogicalPlan - Lock *ast.SelectLockInfo - tblID2Handle map[int64][]HandleCols - partitionedTable []table.PartitionedTable - // extraPIDInfo is used when it works on partition table, the child executor - // need to return an extra partition ID column in the chunk row. - extraPIDInfo + Lock *ast.SelectLockInfo + tblID2Handle map[int64][]HandleCols + + // tblID2phyTblIDCol is used for partitioned tables, + // the child executor need to return an extra column containing + // the Physical Table ID (i.e. from which partition the row came from) + tblID2PhysTblIDCol map[int64]*expression.Column } // WindowFrame represents a window function frame. diff --git a/planner/core/physical_plans.go b/planner/core/physical_plans.go index 876e71415b75e..ecb6812e322f2 100644 --- a/planner/core/physical_plans.go +++ b/planner/core/physical_plans.go @@ -959,9 +959,8 @@ type PhysicalLock struct { Lock *ast.SelectLockInfo - TblID2Handle map[int64][]HandleCols - PartitionedTable []table.PartitionedTable - ExtraPIDInfo extraPIDInfo + TblID2Handle map[int64][]HandleCols + TblID2PhysTblIDCol map[int64]*expression.Column } // PhysicalLimit is the physical operator of Limit. diff --git a/planner/core/plan_to_pb.go b/planner/core/plan_to_pb.go index eb0db3cd97273..cbd40d0c52b22 100644 --- a/planner/core/plan_to_pb.go +++ b/planner/core/plan_to_pb.go @@ -324,6 +324,8 @@ func (p *PhysicalIndexScan) ToPB(ctx sessionctx.Context, _ kv.StoreType) (*tipb. for _, col := range p.schema.Columns { if col.ID == model.ExtraHandleID { columns = append(columns, model.NewExtraHandleColInfo()) + } else if col.ID == model.ExtraPhysTblID { + columns = append(columns, model.NewExtraPhysTblIDColInfo()) } else if col.ID == model.ExtraPidColID { columns = append(columns, model.NewExtraPartitionIDColInfo()) } else { diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index cf8cb52959f78..2bfc387c397e4 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -1226,50 +1226,40 @@ func removeTiflashDuringStaleRead(paths []*util.AccessPath) []*util.AccessPath { } func (b *PlanBuilder) buildSelectLock(src LogicalPlan, lock *ast.SelectLockInfo) (*LogicalLock, error) { - selectLock := LogicalLock{ - Lock: lock, - tblID2Handle: b.handleHelper.tailMap(), - partitionedTable: b.partitionedTable, - }.Init(b.ctx) - selectLock.SetChildren(src) - + var tblID2PhysTblIDCol map[int64]*expression.Column if len(b.partitionedTable) > 0 { + tblID2PhysTblIDCol = make(map[int64]*expression.Column) // If a chunk row is read from a partitioned table, which partition the row // comes from is unknown. With the existence of Join, the situation could be // even worse: SelectLock have to know the `pid` to construct the lock key. - // To solve the problem, an extra `pid` column is add to the schema, and the + // To solve the problem, an extra `pid` column is added to the schema, and the // DataSource need to return the `pid` information in the chunk row. - err := addExtraPIDColumnToDataSource(src, &selectLock.extraPIDInfo) - if err != nil { - return nil, err - } - // TODO: Dynamic partition mode does not support adding extra pid column to the data source. - // (Because one table reader can read from multiple partitions, which partition a chunk row comes from is unknown) - // So we have to use the old "rewrite to union" way here, set `flagPartitionProcessor` flag for that. - b.optFlag = b.optFlag | flagPartitionProcessor + // For dynamic prune mode, it is filled in from the tableID in the key by storage. + // For static prune mode it is also filled in from the tableID in the key by storage. + // since it would otherwise be lost in the PartitionUnion executor. + setExtraPhysTblIDColsOnDataSource(src, tblID2PhysTblIDCol) } + selectLock := LogicalLock{ + Lock: lock, + tblID2Handle: b.handleHelper.tailMap(), + tblID2PhysTblIDCol: tblID2PhysTblIDCol, + }.Init(b.ctx) + selectLock.SetChildren(src) return selectLock, nil } -func addExtraPIDColumnToDataSource(p LogicalPlan, info *extraPIDInfo) error { - switch raw := p.(type) { +func setExtraPhysTblIDColsOnDataSource(p LogicalPlan, tblID2PhysTblIDCol map[int64]*expression.Column) { + switch ds := p.(type) { case *DataSource: - // Fix issue 26250, do not add extra pid column to normal table. - if raw.tableInfo.GetPartitionInfo() == nil { - return nil + if ds.tableInfo.GetPartitionInfo() == nil { + return } - raw.addExtraPIDColumn(info) - return nil + tblID2PhysTblIDCol[ds.tableInfo.ID] = ds.AddExtraPhysTblIDColumn() default: - var err error for _, child := range p.Children() { - err = addExtraPIDColumnToDataSource(child, info) - if err != nil { - return err - } + setExtraPhysTblIDColsOnDataSource(child, tblID2PhysTblIDCol) } } - return nil } func (b *PlanBuilder) buildPrepare(x *ast.PrepareStmt) Plan { diff --git a/planner/core/rule_column_pruning.go b/planner/core/rule_column_pruning.go index d79d7aa7760a3..98ca1e20190da 100644 --- a/planner/core/rule_column_pruning.go +++ b/planner/core/rule_column_pruning.go @@ -293,6 +293,11 @@ func (p *LogicalUnionScan) PruneColumns(parentUsedCols []*expression.Column, opt for i := 0; i < p.handleCols.NumCols(); i++ { parentUsedCols = append(parentUsedCols, p.handleCols.GetCol(i)) } + for _, col := range p.Schema().Columns { + if col.ID == model.ExtraPidColID || col.ID == model.ExtraPhysTblID { + parentUsedCols = append(parentUsedCols, col) + } + } condCols := expression.ExtractColumnsFromExpressions(nil, p.conditions, nil) parentUsedCols = append(parentUsedCols, condCols...) return p.children[0].PruneColumns(parentUsedCols, opt) @@ -479,17 +484,16 @@ func (p *LogicalLock) PruneColumns(parentUsedCols []*expression.Column, opt *log return p.baseLogicalPlan.PruneColumns(parentUsedCols, opt) } - if len(p.partitionedTable) > 0 { - // If the children include partitioned tables, there is an extra partition ID column. - parentUsedCols = append(parentUsedCols, p.extraPIDInfo.Columns...) - } - - for _, cols := range p.tblID2Handle { + for tblID, cols := range p.tblID2Handle { for _, col := range cols { for i := 0; i < col.NumCols(); i++ { parentUsedCols = append(parentUsedCols, col.GetCol(i)) } } + if physTblIDCol, ok := p.tblID2PhysTblIDCol[tblID]; ok { + // If the children include partitioned tables, there is an extra partition ID column. + parentUsedCols = append(parentUsedCols, physTblIDCol) + } } return p.children[0].PruneColumns(parentUsedCols, opt) } diff --git a/planner/core/rule_partition_processor.go b/planner/core/rule_partition_processor.go index 0597298fc2cc7..9a926db7bdf0f 100644 --- a/planner/core/rule_partition_processor.go +++ b/planner/core/rule_partition_processor.go @@ -43,6 +43,7 @@ import ( const FullRange = -1 // partitionProcessor rewrites the ast for table partition. +// Used by static partition prune mode. // // create table t (id int) partition by range (id) // (partition p1 values less than (10), @@ -310,6 +311,15 @@ func (s *partitionProcessor) reconstructTableColNames(ds *DataSource) ([]*types. }) continue } + if colExpr.ID == model.ExtraPhysTblID { + names = append(names, &types.FieldName{ + DBName: ds.DBName, + TblName: ds.tableInfo.Name, + ColName: model.ExtraPhysTblIdName, + OrigColName: model.ExtraPhysTblIdName, + }) + continue + } if colInfo, found := colsInfoMap[colExpr.ID]; found { names = append(names, &types.FieldName{ DBName: ds.DBName, @@ -640,7 +650,7 @@ func (s *partitionProcessor) prune(ds *DataSource, opt *logicalOptimizeOp) (Logi return s.processListPartition(ds, pi, opt) } - // We haven't implement partition by list and so on. + // We haven't implement partition by key and so on. return s.makeUnionAllChildren(ds, pi, fullRange(len(pi.Definitions)), opt) } diff --git a/session/pessimistic_test.go b/session/pessimistic_test.go index 14636177b7153..da706e59066cc 100644 --- a/session/pessimistic_test.go +++ b/session/pessimistic_test.go @@ -2465,6 +2465,7 @@ func (s *testPessimisticSuite) TestIssue21498(c *C) { tk.MustQuery("select * from t s, t t1 where s.v = 23 and s.id = t1.id").Check(testkit.Rows("2 23 200 2 23 200")) tk.MustQuery("select * from t s, t t1 where s.v = 24 and s.id = t1.id").Check(testkit.Rows()) tk.MustQuery("select * from t s, t t1 where s.v = 23 and s.id = t1.id for update").Check(testkit.Rows()) + // TODO: Do the same with Partitioned Table!!! Since this query leads to two columns in SelectLocExec.tblID2Handle!!! tk.MustQuery("select * from t s, t t1 where s.v = 24 and s.id = t1.id for update").Check(testkit.Rows("2 24 200 2 24 200")) tk.MustExec("delete from t where v = 24") tk.CheckExecResult(1, 0) diff --git a/session/session_test.go b/session/session_test.go index f2ced1e086851..1c4274ebf4705 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -2312,6 +2312,36 @@ func (s *testSchemaSerialSuite) TestSchemaCheckerSQL(c *C) { tk.MustQuery(`select * from t for update`) _, err = tk.Exec(`commit;`) c.Assert(err, NotNil) + + // Repeated tests for partitioned table + tk.MustExec(`create table pt (id int, c int) partition by hash (id) partitions 3`) + tk.MustExec(`insert into pt values(1, 1);`) + // The schema version is out of date in the first transaction, and the SQL can't be retried. + tk.MustExec(`begin;`) + tk1.MustExec(`alter table pt modify column c bigint;`) + tk.MustExec(`insert into pt values(3, 3);`) + _, err = tk.Exec(`commit;`) + c.Assert(terror.ErrorEqual(err, domain.ErrInfoSchemaChanged), IsTrue, Commentf("err %v", err)) + + // But the transaction related table IDs aren't in the updated table IDs. + tk.MustExec(`begin;`) + tk1.MustExec(`alter table pt add index idx2(c);`) + tk.MustExec(`insert into t1 values(4, 4);`) + tk.MustExec(`commit;`) + + // Test for "select for update". + tk.MustExec(`begin;`) + tk1.MustExec(`alter table pt add index idx3(c);`) + tk.MustQuery(`select * from pt for update`) + _, err = tk.Exec(`commit;`) + c.Assert(err, NotNil) + + // Test for "select for update". + tk.MustExec(`begin;`) + tk1.MustExec(`alter table pt add index idx4(c);`) + tk.MustQuery(`select * from pt partition (p1) for update`) + _, err = tk.Exec(`commit;`) + c.Assert(err, NotNil) } func (s *testSchemaSerialSuite) TestSchemaCheckerTempTable(c *C) { diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index f1111bbdbe433..322652f6da4f3 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -1066,11 +1066,6 @@ func (s *SessionVars) CheckAndGetTxnScope() string { // UseDynamicPartitionPrune indicates whether use new dynamic partition prune. func (s *SessionVars) UseDynamicPartitionPrune() bool { - if s.InTxn() || !s.GetStatusFlag(mysql.ServerStatusAutocommit) { - // UnionScan cannot get partition table IDs in dynamic-mode, this is a quick-fix for issues/26719, - // please see it for more details. - return false - } return PartitionPruneMode(s.PartitionPruneMode.Load()) == Dynamic } diff --git a/store/mockstore/unistore/cophandler/closure_exec.go b/store/mockstore/unistore/cophandler/closure_exec.go index d1dee3b888538..03170cf4dfac7 100644 --- a/store/mockstore/unistore/cophandler/closure_exec.go +++ b/store/mockstore/unistore/cophandler/closure_exec.go @@ -294,9 +294,18 @@ func (e *closureExecutor) initIdxScanCtx(idxScan *tipb.IndexScan) { e.idxScanCtx.primaryColumnIds = idxScan.PrimaryColumnIds lastColumn := e.columnInfos[len(e.columnInfos)-1] + + // Here it is required that ExtraPhysTblID is last + if lastColumn.GetColumnId() == model.ExtraPhysTblID { + e.idxScanCtx.columnLen-- + lastColumn = e.columnInfos[e.idxScanCtx.columnLen-1] + } + + // Here it is required that ExtraPidColID + // is after all other columns except ExtraPhysTblID if lastColumn.GetColumnId() == model.ExtraPidColID { - lastColumn = e.columnInfos[len(e.columnInfos)-2] e.idxScanCtx.columnLen-- + lastColumn = e.columnInfos[e.idxScanCtx.columnLen-1] } if len(e.idxScanCtx.primaryColumnIds) == 0 { @@ -838,6 +847,12 @@ func (e *closureExecutor) tableScanProcessCore(key, value []byte) error { if err != nil { return errors.Trace(err) } + // Add ExtraPhysTblID if requested + // Assumes it is always last! + if e.columnInfos[len(e.columnInfos)-1].ColumnId == model.ExtraPhysTblID { + tblID := tablecodec.DecodeTableID(key) + e.scanCtx.chk.AppendInt64(len(e.columnInfos)-1, tblID) + } incRow = true return nil } @@ -910,6 +925,12 @@ func (e *closureExecutor) indexScanProcessCore(key, value []byte) error { } } } + // Add ExtraPhysTblID if requested + // Assumes it is always last! + if e.columnInfos[len(e.columnInfos)-1].ColumnId == model.ExtraPhysTblID { + tblID := tablecodec.DecodeTableID(key) + chk.AppendInt64(len(e.columnInfos)-1, tblID) + } gotRow = true return nil } diff --git a/store/mockstore/unistore/cophandler/cop_handler.go b/store/mockstore/unistore/cophandler/cop_handler.go index b39baccda1125..f1091a62b6e71 100644 --- a/store/mockstore/unistore/cophandler/cop_handler.go +++ b/store/mockstore/unistore/cophandler/cop_handler.go @@ -314,6 +314,10 @@ func newRowDecoder(columnInfos []*tipb.ColumnInfo, fieldTps []*types.FieldType, ) for i := range columnInfos { info := columnInfos[i] + if info.ColumnId == model.ExtraPhysTblID { + // Skip since it needs to be filled in from the key + continue + } ft := fieldTps[i] col := rowcodec.ColInfo{ ID: info.ColumnId, diff --git a/store/mockstore/unistore/cophandler/mpp.go b/store/mockstore/unistore/cophandler/mpp.go index 59a66d31f6f88..cc8954e025ce9 100644 --- a/store/mockstore/unistore/cophandler/mpp.go +++ b/store/mockstore/unistore/cophandler/mpp.go @@ -78,7 +78,11 @@ func (b *mppExecBuilder) buildMPPTableScan(pb *tipb.TableScan) (*tableScanExec, ts.lockStore = b.dagCtx.lockStore ts.resolvedLocks = b.dagCtx.resolvedLocks } - for _, col := range pb.Columns { + for i, col := range pb.Columns { + if col.ColumnId == model.ExtraPhysTblID { + ts.physTblIDColIdx = new(int) + *ts.physTblIDColIdx = i + } ft := fieldTypeFromPBColumn(col) ts.fieldTypes = append(ts.fieldTypes, ft) } @@ -98,9 +102,16 @@ func (b *mppExecBuilder) buildIdxScan(pb *tipb.IndexScan) (*indexScanExec, error primaryColIds := pb.GetPrimaryColumnIds() lastCol := pb.Columns[numCols-1] + var physTblIDColIdx *int + if lastCol.GetColumnId() == model.ExtraPhysTblID { + numIdxCols-- + physTblIDColIdx = new(int) + *physTblIDColIdx = numIdxCols + lastCol = pb.Columns[numIdxCols-1] + } if lastCol.GetColumnId() == model.ExtraPidColID { - lastCol = pb.Columns[numCols-2] numIdxCols-- + lastCol = pb.Columns[numIdxCols-1] } hdlStatus := tablecodec.HandleDefault @@ -132,19 +143,20 @@ func (b *mppExecBuilder) buildIdxScan(pb *tipb.IndexScan) (*indexScanExec, error prevVals = make([][]byte, numIdxCols) } idxScan := &indexScanExec{ - baseMPPExec: baseMPPExec{sc: b.sc, fieldTypes: fieldTypes}, - startTS: b.dagCtx.startTS, - kvRanges: ranges, - dbReader: b.dbReader, - lockStore: b.dagCtx.lockStore, - resolvedLocks: b.dagCtx.resolvedLocks, - counts: b.counts, - ndvs: b.ndvs, - prevVals: prevVals, - colInfos: colInfos, - numIdxCols: numIdxCols, - hdlStatus: hdlStatus, - desc: pb.Desc, + baseMPPExec: baseMPPExec{sc: b.sc, fieldTypes: fieldTypes}, + startTS: b.dagCtx.startTS, + kvRanges: ranges, + dbReader: b.dbReader, + lockStore: b.dagCtx.lockStore, + resolvedLocks: b.dagCtx.resolvedLocks, + counts: b.counts, + ndvs: b.ndvs, + prevVals: prevVals, + colInfos: colInfos, + numIdxCols: numIdxCols, + hdlStatus: hdlStatus, + desc: pb.Desc, + physTblIDColIdx: physTblIDColIdx, } return idxScan, nil } diff --git a/store/mockstore/unistore/cophandler/mpp_exec.go b/store/mockstore/unistore/cophandler/mpp_exec.go index 9752371d50c02..85a5983517e11 100644 --- a/store/mockstore/unistore/cophandler/mpp_exec.go +++ b/store/mockstore/unistore/cophandler/mpp_exec.go @@ -123,6 +123,9 @@ type tableScanExec struct { decoder *rowcodec.ChunkDecoder desc bool + + // if ExtraPhysTblIDCol is requested, fill in the physical table id in this column position + physTblIDColIdx *int } func (e *tableScanExec) SkipValue() bool { return false } @@ -137,6 +140,10 @@ func (e *tableScanExec) Process(key, value []byte) error { if err != nil { return errors.Trace(err) } + if e.physTblIDColIdx != nil { + tblID := tablecodec.DecodeTableID(key) + e.chk.AppendInt64(*e.physTblIDColIdx, tblID) + } e.rowCnt++ if e.chk.IsFull() { @@ -241,6 +248,9 @@ type indexScanExec struct { colInfos []rowcodec.ColInfo numIdxCols int hdlStatus tablecodec.HandleStatus + + // if ExtraPhysTblIDCol is requested, fill in the physical table id in this column position + physTblIDColIdx *int } func (e *indexScanExec) SkipValue() bool { return false } @@ -275,6 +285,10 @@ func (e *indexScanExec) Process(key, value []byte) error { } } } + if e.physTblIDColIdx != nil { + tblID := tablecodec.DecodeTableID(key) + e.chk.AppendInt64(*e.physTblIDColIdx, tblID) + } if e.chk.IsFull() { e.chunks = append(e.chunks, e.chk) e.chk = chunk.NewChunkWithCapacity(e.fieldTypes, DefaultBatchSize)