Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: fix bug when using IndexMerge in transaction #30719

Merged
merged 17 commits into from
Dec 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,21 +139,24 @@ func (e *IndexMergeReaderExecutor) Open(ctx context.Context) (err error) {
}

func (e *IndexMergeReaderExecutor) buildKeyRangesForTable(tbl table.Table) (ranges [][]kv.KeyRange, err error) {
sc := e.ctx.GetSessionVars().StmtCtx
for i, plan := range e.partialPlans {
_, ok := plan[0].(*plannercore.PhysicalIndexScan)
if !ok {
if tbl.Meta().IsCommonHandle {
keyRanges, err := distsql.CommonHandleRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, []int64{getPhysicalTableID(tbl)}, e.ranges[i])
if err != nil {
return nil, err
}
ranges = append(ranges, keyRanges)
} else {
ranges = append(ranges, nil)
firstPartRanges, secondPartRanges := distsql.SplitRangesAcrossInt64Boundary(e.ranges[i], false, e.descs[i], tbl.Meta().IsCommonHandle)
firstKeyRanges, err := distsql.TableHandleRangesToKVRanges(sc, []int64{getPhysicalTableID(tbl)}, tbl.Meta().IsCommonHandle, firstPartRanges, nil)
if err != nil {
return nil, err
}
secondKeyRanges, err := distsql.TableHandleRangesToKVRanges(sc, []int64{getPhysicalTableID(tbl)}, tbl.Meta().IsCommonHandle, secondPartRanges, nil)
if err != nil {
return nil, err
}
keyRanges := append(firstKeyRanges, secondKeyRanges...)
ranges = append(ranges, keyRanges)
continue
}
keyRange, err := distsql.IndexRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, getPhysicalTableID(tbl), e.indexes[i].ID, e.ranges[i], e.feedbacks[i])
keyRange, err := distsql.IndexRangesToKVRanges(sc, getPhysicalTableID(tbl), e.indexes[i].ID, e.ranges[i], e.feedbacks[i])
if err != nil {
return nil, err
}
Expand Down
123 changes: 109 additions & 14 deletions executor/index_merge_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,22 +232,31 @@ func (s *testSuite1) TestIndexMergeInTransaction(c *C) {
" └─TableRowIDScan_7 3330.01 cop[tikv] table:t1 keep order:false, stats:pseudo"))

// Test with normal key.
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustExec("insert into t1 values(1, 1, 1, 1);")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1"))
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows("1 1 1 1"))
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1"))
tk.MustExec("update t1 set c3 = 100 where c3 = 1;")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustExec("delete from t1;")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows())

// Test with primary key, so the partialPlan is TableScan.
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows())
tk.MustExec("insert into t1 values(1, 1, 1, 1);")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1"))
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1"))
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows("1 1 1 1"))
tk.MustExec("update t1 set c3 = 100 where c3 = 1;")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows())
tk.MustExec("delete from t1;")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows())

tk.MustExec("commit;")
if i == 1 {
tk.MustExec("set tx_isolation = 'REPEATABLE-READ';")
Expand Down Expand Up @@ -306,22 +315,108 @@ func (s *testSuite1) TestIndexMergeInTransaction(c *C) {
tk.MustExec("insert into t1 values(11, 11, 11, 11, 11);")
tk.MustExec("insert into t1 values(21, 21, 21, 21, 21);")
tk.MustExec("insert into t1 values(31, 31, 31, 31, 31);")
res := tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 20 or c2 < 20) and c3 < 20;").Sort()

res := tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < -1 or c2 < 20) and c3 < 20;").Sort()
res.Check(testkit.Rows("1 1 1 1 1", "11 11 11 11 11"))
res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 20 or c2 < -1) and c3 < 20;").Sort()
res.Check(testkit.Rows("1 1 1 1 1", "11 11 11 11 11"))

res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < -1 or c2 < 20) and c3 < 20;").Sort()
res.Check(testkit.Rows("1 1 1 1 1", "11 11 11 11 11"))
res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 20 or c2 < 20) and c3 < 20;").Sort()
res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 20 or c2 < -1) and c3 < 20;").Sort()
res.Check(testkit.Rows("1 1 1 1 1", "11 11 11 11 11"))

tk.MustExec("update t1 set c3 = 100 where c3 = 1;")
res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 20 or c2 < 20) and c3 < 20;")
res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < -1 or c2 < 20) and c3 < 20;")
res.Check(testkit.Rows("11 11 11 11 11"))
res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 20 or c2 < 20) and c3 < 20;")
res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 20 or c2 < -1) and c3 < 20;")
res.Check(testkit.Rows("11 11 11 11 11"))

res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < -1 or c2 < 20) and c3 < 20;")
res.Check(testkit.Rows("11 11 11 11 11"))
res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 20 or c2 < -1) and c3 < 20;")
res.Check(testkit.Rows("11 11 11 11 11"))

tk.MustExec("delete from t1;")
res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 20 or c2 < 20) and c3 < 20;")
res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < -1 or c2 < 20) and c3 < 20;")
res.Check(testkit.Rows())
res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 20 or c2 < 20) and c3 < 20;")
res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 20 or c2 < -1) and c3 < 20;")
res.Check(testkit.Rows())

res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < -1 or c2 < 20) and c3 < 20;")
res.Check(testkit.Rows())
res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 20 or c2 < -1) and c3 < 20;")
res.Check(testkit.Rows())
tk.MustExec("commit;")
}

func (s *testSuite1) TestIndexMergeReaderInTransIssue30685(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)

// This is a case generated by sqlgen to test if clustered index is ok.
// Detect the bugs in memIndexMergeReader.getMemRowsHandle().
tk.MustExec("drop table if exists t1;")
tk.MustExec(`create table t1 (col_30 decimal default 0 ,
col_31 char(99) collate utf8_bin default 'sVgzHblmYYtEjVg' not null ,
col_37 int unsigned default 377206828 ,
primary key idx_16 ( col_37 ) , key idx_19 ( col_31) ) collate utf8mb4_general_ci ;`)
tk.MustExec("begin;")
tk.MustExec("insert ignore into t1 values (388021, '', 416235653);")
tk.MustQuery("select /*+ use_index_merge( t1 ) */ 1 from t1 where ( t1.col_31 in ( 'OiOXzpCs' , 'oaVv' ) or t1.col_37 <= 4059907010 ) and t1.col_30 ;").Check(testkit.Rows("1"))
tk.MustExec("commit;")

tk.MustExec("drop table if exists tbl_3;")
tk.MustExec(`create table tbl_3 ( col_30 decimal , col_31 char(99) , col_32 smallint ,
col_33 tinyint unsigned not null , col_34 char(209) ,
col_35 char(110) , col_36 int unsigned , col_37 int unsigned ,
col_38 decimal(50,15) not null , col_39 char(104),
primary key ( col_37 ) , unique key ( col_33,col_30,col_36,col_39 ) ,
unique key ( col_32,col_35 ) , key ( col_31,col_38 ) ,
key ( col_31,col_33,col_32,col_35,col_36 ) ,
unique key ( col_38,col_34,col_33,col_31,col_30,col_36,col_35,col_37,col_39 ) ,
unique key ( col_39,col_32 ) , unique key ( col_30,col_35,col_31,col_38 ) ,
key ( col_38,col_32,col_33 ) )`)
tk.MustExec("begin;")
tk.MustExec("insert ignore into tbl_3 values ( 71,'Fipc',-6676,30,'','FgfK',2464927398,4084082400,5602.5868,'' );")
tk.MustQuery("select /*+ use_index_merge( tbl_3 ) */ 1 from tbl_3 where ( tbl_3.col_37 not in ( 1626615245 , 2433569159 ) or tbl_3.col_38 = 0.06 ) ;").Check(testkit.Rows("1"))
tk.MustExec("commit;")

// int + int compound type as clustered index pk.
tk.MustExec("drop table if exists t1;")
tk.MustExec("create table t1(c1 int, c2 int, c3 int, c4 int, primary key(c1, c2) /*T![clustered_index] CLUSTERED */, key(c3));")

tk.MustExec("begin;")
tk.MustExec("insert into t1 values(1, 1, 1, 1);")
tk.MustQuery("explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < -1 or c3 < 10) and c4 < 10;").Check(testkit.Rows(
"UnionScan_6 1841.86 root lt(test.t1.c4, 10), or(lt(test.t1.c1, -1), lt(test.t1.c3, 10))",
"└─IndexMerge_11 1841.86 root ",
" ├─TableRangeScan_7(Build) 3323.33 cop[tikv] table:t1 range:[-inf,-1), keep order:false, stats:pseudo",
" ├─IndexRangeScan_8(Build) 3323.33 cop[tikv] table:t1, index:c3(c3) range:[-inf,10), keep order:false, stats:pseudo",
" └─Selection_10(Probe) 1841.86 cop[tikv] lt(test.t1.c4, 10)",
" └─TableRowIDScan_9 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo"))

tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < -1 or c3 < 10) and c4 < 10;").Check(testkit.Rows("1 1 1 1"))
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c3 < -1) and c4 < 10;").Check(testkit.Rows("1 1 1 1"))
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < -1 or c3 < -1) and c4 < 10;").Check(testkit.Rows())
tk.MustExec("commit;")

// Single int type as clustered index pk.
tk.MustExec("drop table if exists t1;")
tk.MustExec("create table t1(c1 varchar(100), c2 int, c3 int, c4 int, primary key(c1) /*T![clustered_index] CLUSTERED */, key(c3));")

tk.MustExec("begin;")
tk.MustExec("insert into t1 values('b', 1, 1, 1);")
tk.MustQuery("explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 'a' or c3 < 10) and c4 < 10;").Check(testkit.Rows(
"UnionScan_6 1841.86 root lt(test.t1.c4, 10), or(lt(test.t1.c1, \"a\"), lt(test.t1.c3, 10))",
"└─IndexMerge_11 1841.86 root ",
" ├─TableRangeScan_7(Build) 3323.33 cop[tikv] table:t1 range:[-inf,\"a\"), keep order:false, stats:pseudo",
" ├─IndexRangeScan_8(Build) 3323.33 cop[tikv] table:t1, index:c3(c3) range:[-inf,10), keep order:false, stats:pseudo",
" └─Selection_10(Probe) 1841.86 cop[tikv] lt(test.t1.c4, 10)",
" └─TableRowIDScan_9 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo"))

tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 'a' or c3 < 10) and c4 < 10;").Check(testkit.Rows("b 1 1 1"))
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 <= 'b' or c3 < -1) and c4 < 10;").Check(testkit.Rows("b 1 1 1"))
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 'a' or c3 < -1) and c4 < 10;").Check(testkit.Rows())
tk.MustExec("commit;")
}

Expand Down
23 changes: 13 additions & 10 deletions executor/mem_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,6 @@ type memTableReader struct {
buffer allocBuf
pkColIDs []int64
cacheTable kv.MemBuffer
// Used when extracting handles from row in memTableReader.getMemRowsHandle.
handleCols plannercore.HandleCols
}

type allocBuf struct {
Expand Down Expand Up @@ -329,17 +327,23 @@ func (m *memTableReader) getRowData(handle kv.Handle, value []byte) ([][]byte, e

// getMemRowsHandle is called when memIndexMergeReader.partialPlans[i] is TableScan.
func (m *memTableReader) getMemRowsHandle() ([]kv.Handle, error) {
rows, err := m.getMemRows()
handles := make([]kv.Handle, 0, 16)
err := iterTxnMemBuffer(m.ctx, m.cacheTable, m.kvRanges, func(key, value []byte) error {
handle, err := tablecodec.DecodeRowKey(key)
if err != nil {
return err
}
handles = append(handles, handle)
return nil
})
if err != nil {
return nil, err
}
handles := make([]kv.Handle, 0, len(rows))
for _, row := range rows {
handle, err := m.handleCols.BuildHandleByDatums(row)
if err != nil {
return nil, err

if m.desc {
for i, j := 0, len(handles)-1; i < j; i, j = i+1, j-1 {
handles[i], handles[j] = handles[j], handles[i]
}
handles = append(handles, handle)
}
return handles, nil
}
Expand Down Expand Up @@ -577,7 +581,6 @@ func buildMemIndexMergeReader(us *UnionScanExec, indexMergeReader *IndexMergeRea
handleBytes: make([]byte, 0, 16),
rd: rd,
},
handleCols: indexMergeReader.handleCols,
})
} else {
outputOffset := []int{len(indexMergeReader.indexes[i].Columns)}
Expand Down