Skip to content

Commit

Permalink
executor: make IndexLookUps in the inner side of IndexJoins support d…
Browse files Browse the repository at this point in the history
…irect reading (#24261)
  • Loading branch information
qw4990 authored Apr 27, 2021
1 parent e580641 commit 5c83f0b
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 31 deletions.
17 changes: 8 additions & 9 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3512,7 +3512,6 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context
err = e.open(ctx)
return e, err
}
nextPartition := nextPartitionForIndexLookUp{exec: e, innerPartitionInfo: &innerPartitionInfo{isFullPartition: true}}
tbl, _ := builder.executorBuilder.is.TableByID(tbInfo.ID)
usedPartition, canPrune, contentPos, err := prunePartitionForInnerExecutor(builder.executorBuilder.ctx, tbl, e.Schema(), &v.PartitionInfo, lookUpContents)
if err != nil {
Expand All @@ -3524,21 +3523,21 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context
if err != nil {
return nil, err
}
nextPartition.isFullPartition = false
nextPartition.nextRange = rangeMap
e.prunedPartitions = usedPartition
e.ranges = indexRanges
e.partitionRangeMap = rangeMap
} else {
e.prunedPartitions = usedPartition
e.ranges, err = buildRangesForIndexJoin(e.ctx, lookUpContents, indexRanges, keyOff2IdxOff, cwc)
if err != nil {
return nil, err
}
}
partitionExec := &PartitionTableExecutor{
baseExecutor: *e.base(),
partitions: usedPartition,
nextPartition: nextPartition,
e.partitionTableMode = true
if err := e.Open(ctx); err != nil {
return nil, err
}
err = partitionExec.Open(ctx)
return partitionExec, err
return e, err
}
ret := &TableDualExec{baseExecutor: *e.base()}
err = ret.Open(ctx)
Expand Down
11 changes: 8 additions & 3 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,8 @@ type IndexLookUpExecutor struct {
// fields about accessing partition tables
partitionTableMode bool // if this executor is accessing a partition table
prunedPartitions []table.PhysicalTable // partition tables need to access
partitionKVRanges [][]kv.KeyRange // kvRanges of each partition table
partitionRangeMap map[int64][]*ranger.Range
partitionKVRanges [][]kv.KeyRange // kvRanges of each partition table

// All fields above are immutable.

Expand Down Expand Up @@ -401,11 +402,15 @@ func (e *IndexLookUpExecutor) buildTableKeyRanges() (err error) {
// For example, a table partitioned by range(a), and p0=(1, 10), p1=(11, 20), for the condition "(a>1 and a<10) or (a>11 and a<20)",
// the first range is only suitable to p0 and the second is to p1, but now we'll also build kvRange for range0+p1 and range1+p0.
physicalID := p.GetPhysicalID()
ranges := e.ranges
if e.partitionRangeMap != nil && e.partitionRangeMap[physicalID] != nil {
ranges = e.partitionRangeMap[physicalID]
}
var kvRange []kv.KeyRange
if e.index.ID == -1 {
kvRange, err = distsql.CommonHandleRangesToKVRanges(sc, []int64{physicalID}, e.ranges)
kvRange, err = distsql.CommonHandleRangesToKVRanges(sc, []int64{physicalID}, ranges)
} else {
kvRange, err = distsql.IndexRangesToKVRanges(sc, physicalID, e.index.ID, e.ranges, e.feedback)
kvRange, err = distsql.IndexRangesToKVRanges(sc, physicalID, e.index.ID, ranges, e.feedback)
}
if err != nil {
return err
Expand Down
32 changes: 32 additions & 0 deletions executor/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,3 +350,35 @@ func (s *testSuite3) TestIndexLookUpGetResultChunk(c *C) {
tk.MustQuery("select * from tbl use index(idx_a) where a > 99 order by a asc limit 1").Check(testkit.Rows("100 100 100"))
tk.MustQuery("select * from tbl use index(idx_a) where a > 10 order by a asc limit 4,1").Check(testkit.Rows("15 15 15"))
}

func (s *testSuite3) TestPartitionTableIndexJoinIndexLookUp(c *C) {
if israce.RaceEnabled {
c.Skip("exhaustive types test, skip race test")
}
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("set @@tidb_partition_prune_mode='dynamic'")
tk.MustExec(`create table t (a int, b int, key(a)) partition by hash(a) partitions 4`)
tk.MustExec("create table tnormal (a int, b int, key(a), key(b))")
nRows := 64
values := make([]string, 0, nRows)
for i := 0; i < nRows; i++ {
values = append(values, fmt.Sprintf("(%v, %v)", rand.Intn(nRows), rand.Intn(nRows)))
}
tk.MustExec(fmt.Sprintf("insert into t values %v", strings.Join(values, ", ")))
tk.MustExec(fmt.Sprintf("insert into tnormal values %v", strings.Join(values, ", ")))

randRange := func() (int, int) {
a, b := rand.Intn(nRows), rand.Intn(nRows)
if a > b {
return b, a
}
return a, b
}
for i := 0; i < nRows; i++ {
lb, rb := randRange()
cond := fmt.Sprintf("(t2.b between %v and %v)", lb, rb)
result := tk.MustQuery("select t1.* from tnormal t1, tnormal t2 use index(a) where t1.a=t2.b and " + cond).Sort().Rows()
tk.MustQuery("select /*+ TIDB_INLJ(t1, t2) */ t1.* from t t1, t t2 use index(a) where t1.a=t2.b and " + cond).Sort().Check(result)
}
}
19 changes: 0 additions & 19 deletions executor/partition_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tipb/go-tipb"
)

Expand All @@ -44,7 +43,6 @@ type nextPartition interface {
// nolint:structcheck
type innerPartitionInfo struct {
isFullPartition bool
nextRange map[int64][]*ranger.Range
}

type nextPartitionForTableReader struct {
Expand All @@ -69,23 +67,6 @@ func (n nextPartitionForTableReader) nextPartition(ctx context.Context, tbl tabl
return n.exec, nil
}

type nextPartitionForIndexLookUp struct {
*innerPartitionInfo
exec *IndexLookUpExecutor
}

func (n nextPartitionForIndexLookUp) GetInnerPartitionInfo() *innerPartitionInfo {
return n.innerPartitionInfo
}

func (n nextPartitionForIndexLookUp) nextPartition(ctx context.Context, tbl table.PhysicalTable) (Executor, error) {
n.exec.table = tbl
if n.innerPartitionInfo != nil && !n.isFullPartition {
n.exec.ranges = n.nextRange[tbl.GetPhysicalID()]
}
return n.exec, nil
}

type nextPartitionForUnionScan struct {
b *executorBuilder
us *plannercore.PhysicalUnionScan
Expand Down

0 comments on commit 5c83f0b

Please sign in to comment.