Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: make IndexReaders in the inner side of IndexJoins support direct reading #24260

Merged
merged 5 commits into from
Apr 27, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3474,7 +3474,6 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte
return e, err
}

nextPartition := nextPartitionForIndexReader{exec: e, innerPartitionInfo: &innerPartitionInfo{isFullPartition: true}}
tbl, _ := builder.executorBuilder.is.TableByID(tbInfo.ID)
usedPartition, canPrune, contentPos, err := prunePartitionForInnerExecutor(builder.executorBuilder.ctx, tbl, e.Schema(), &v.PartitionInfo, lookUpContents)
if err != nil {
Expand All @@ -3486,21 +3485,19 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte
if err != nil {
return nil, err
}
nextPartition.isFullPartition = false
nextPartition.nextRange = rangeMap
e.partitions = usedPartition
e.ranges = indexRanges
e.partRangeMap = rangeMap
} else {
e.ranges, err = buildRangesForIndexJoin(e.ctx, lookUpContents, indexRanges, keyOff2IdxOff, cwc)
if err != nil {
e.partitions = usedPartition
if e.ranges, err = buildRangesForIndexJoin(e.ctx, lookUpContents, indexRanges, keyOff2IdxOff, cwc); err != nil {
return nil, err
}
}
partitionExec := &PartitionTableExecutor{
baseExecutor: *e.base(),
partitions: usedPartition,
nextPartition: nextPartition,
if err := e.Open(ctx); err != nil {
return nil, err
}
err = partitionExec.Open(ctx)
return partitionExec, err
return e, nil
}
ret := &TableDualExec{baseExecutor: *e.base()}
err = ret.Open(ctx)
Expand Down
22 changes: 14 additions & 8 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ type IndexReaderExecutor struct {
physicalTableID int64
ranges []*ranger.Range
partitions []table.PhysicalTable
partRangeMap map[int64][]*ranger.Range // each partition may have different ranges

// kvRanges are only used for union scan.
kvRanges []kv.KeyRange
Expand Down Expand Up @@ -208,11 +209,11 @@ func (e *IndexReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error
return err
}

func (e *IndexReaderExecutor) buildKeyRanges(sc *stmtctx.StatementContext, physicalID int64) ([]kv.KeyRange, error) {
func (e *IndexReaderExecutor) buildKeyRanges(sc *stmtctx.StatementContext, ranges []*ranger.Range, physicalID int64) ([]kv.KeyRange, error) {
if e.index.ID == -1 {
return distsql.CommonHandleRangesToKVRanges(sc, []int64{physicalID}, e.ranges)
return distsql.CommonHandleRangesToKVRanges(sc, []int64{physicalID}, ranges)
}
return distsql.IndexRangesToKVRanges(sc, physicalID, e.index.ID, e.ranges, e.feedback)
return distsql.IndexRangesToKVRanges(sc, physicalID, e.index.ID, ranges, e.feedback)
}

func (e *IndexReaderExecutor) buildPartitionTableKeyRanges(sc *stmtctx.StatementContext, physicalIDs []int64) ([]kv.KeyRange, error) {
Expand All @@ -235,14 +236,19 @@ func (e *IndexReaderExecutor) Open(ctx context.Context) error {
sc := e.ctx.GetSessionVars().StmtCtx
var kvRanges []kv.KeyRange
if len(e.partitions) > 0 {
physicalIDs := make([]int64, 0, len(e.partitions))
for _, p := range e.partitions {
pid := p.GetPhysicalID()
physicalIDs = append(physicalIDs, pid)
partRange := e.ranges
if pRange, ok := e.partRangeMap[p.GetPhysicalID()]; ok {
partRange = pRange
}
kvRange, err := e.buildKeyRanges(sc, partRange, p.GetPhysicalID())
if err != nil {
return err
}
kvRanges = append(kvRanges, kvRange...)
}
kvRanges, err = e.buildPartitionTableKeyRanges(sc, physicalIDs)
} else {
kvRanges, err = e.buildKeyRanges(sc, e.physicalTableID)
kvRanges, err = e.buildKeyRanges(sc, e.ranges, e.physicalTableID)
}
if err != nil {
return err
Expand Down
35 changes: 35 additions & 0 deletions executor/index_lookup_join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ package executor_test
import (
"context"
"fmt"
"math/rand"
"strings"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/util/israce"
"github.com/pingcap/tidb/util/testkit"
)

Expand Down Expand Up @@ -331,3 +334,35 @@ func (s *testSuite5) TestIssue23722(c *C) {
"( select col_19 from t where t.col_18 <> 'David' and t.col_19 >= 'jDzNn' ) " +
"order by col_15 , col_16 , col_17 , col_18 , col_19;").Check(testkit.Rows("38799.400 20301 KETeFZhkoxnwMAhA Charlie zyhXEppZdqyqNV"))
}

func (s *testSuite5) TestPartitionTableIndexJoinAndIndexReader(c *C) {
if israce.RaceEnabled {
c.Skip("exhaustive types test, skip race test")
}
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("set @@tidb_partition_prune_mode='dynamic'")
tk.MustExec(`create table t (a int, b int, key(a)) partition by hash(a) partitions 4`)
tk.MustExec("create table tnormal (a int, b int, key(a), key(b))")
nRows := 64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I try to increase nRows to larger data, for example nRows=640.. and test again. It wil panic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

index_lookup_join_test.go:366:
    tk.MustQuery("select /*+ TIDB_INLJ(t1, t2) */ t1.a from t t1, t t2 where t1.a=t2.b and " + cond).Sort().Check(result)
/home/wshwsh12/project/tidb/util/testkit/testkit.go:346:
    tk.c.Check(errors.ErrorStack(err), check.Equals, "", comment)
... obtained string = "" +
...     "runtime error: index out of range [-1]\n" +
...     "github.com/pingcap/tidb/executor.(*innerWorker).run.func1\n" +
...     "\t/home/wshwsh12/project/tidb/executor/index_lookup_join.go:454\n" +
...     "runtime.gopanic\n" +
...     "\t/usr/lib/go/src/runtime/panic.go:965\n" +
...     "runtime.goPanicIndex\n" +
...     "\t/usr/lib/go/src/runtime/panic.go:88\n" +
...     "github.com/pingcap/parser.(*Scanner).stmtText\n" +
...     "\t/home/wshwsh12/go/pkg/mod/github.com/pingcap/[email protected]/lexer.go:96\n" +
...     "github.com/pingcap/parser.yyParse\n" +
...     "\t/home/wshwsh12/go/pkg/mod/github.com/pingcap/[email protected]/parser.go:17874\n" +
...     "github.com/pingcap/parser.(*Parser).Parse\n" +
...     "\t/home/wshwsh12/go/pkg/mod/github.com/pingcap/[email protected]/yy_parser.go:152\n" +
...     "github.com/pingcap/tidb/session.(*session).ParseSQL\n" +
...     "\t/home/wshwsh12/project/tidb/session/session.go:1091\n" +
...     "github.com/pingcap/tidb/expression.ParseSimpleExprsWithNames\n" +
...     "\t/home/wshwsh12/project/tidb/expression/simple_rewriter.go:88\n" +
...     "github.com/pingcap/tidb/planner/core.generateHashPartitionExpr\n" +
...     "\t/home/wshwsh12/project/tidb/planner/core/rule_partition_processor.go:110\n" +
...     "github.com/pingcap/tidb/planner/core.(*partitionProcessor).findUsedPartitions\n" +
...     "\t/home/wshwsh12/project/tidb/planner/core/rule_partition_processor.go:121\n" +
...     "github.com/pingcap/tidb/planner/core.(*partitionProcessor).pruneHashPartition\n" +
...     "\t/home/wshwsh12/project/tidb/planner/core/rule_partition_processor.go:207\n" +
...     "github.com/pingcap/tidb/planner/core.PartitionPruning\n" +
...     "\t/home/wshwsh12/project/tidb/planner/core/partition_prune.go:38\n" +
...     "github.com/pingcap/tidb/executor.partitionPruning\n" +
...     "\t/home/wshwsh12/project/tidb/executor/builder.go:3986\n" +
...     "github.com/pingcap/tidb/executor.prunePartitionForInnerExecutor\n" +
...     "\t/home/wshwsh12/project/tidb/executor/builder.go:2780\n" +
...     "github.com/pingcap/tidb/executor.(*dataReaderBuilder).buildIndexReaderForIndexJoin\n" +
...     "\t/home/wshwsh12/project/tidb/executor/builder.go:3478\n" +
...     "github.com/pingcap/tidb/executor.(*dataReaderBuilder).buildExecutorForIndexJoinInternal\n" +
...     "\t/home/wshwsh12/project/tidb/executor/builder.go:3211\n" +
...     "github.com/pingcap/tidb/executor.(*dataReaderBuilder).buildExecutorForIndexJoin\n" +
...     "\t/home/wshwsh12/project/tidb/executor/builder.go:3202\n" +
...     "github.com/pingcap/tidb/executor.(*innerWorker).fetchInnerResults\n" +
...     "\t/home/wshwsh12/project/tidb/executor/index_lookup_join.go:647\n" +
...     "github.com/pingcap/tidb/executor.(*innerWorker).handleTask\n" +
...     "\t/home/wshwsh12/project/tidb/executor/index_lookup_join.go:491\n" +
...     "github.com/pingcap/tidb/executor.(*innerWorker).run\n" +
...     "\t/home/wshwsh12/project/tidb/executor/index_lookup_join.go:469\n" +
...     "runtime.goexit\n" +
...     "\t/usr/lib/go/src/runtime/asm_amd64.s:1371"
... expected string = ""
... sql:select /*+ TIDB_INLJ(t1, t2) */ t1.a from t t1, t t2 where t1.a=t2.b and (t2.b between 304 and 559), args:[]

Copy link
Contributor Author

@qw4990 qw4990 Apr 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is a known problem, and actually it is irrelevant to this PR.
When it is in static mode (set @@tidb_partition_prune_mode='dynamic'), this problem still exists.
I have already created an issue for it #24259, and we decide to solve it later in our testing and bugfix phase.
PTAL and thanks for your review @wshwsh12

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems a concurrency bug.
Partition pruning use a parser shared in the session.

values := make([]string, 0, nRows)
for i := 0; i < nRows; i++ {
values = append(values, fmt.Sprintf("(%v, %v)", rand.Intn(nRows), rand.Intn(nRows)))
}
tk.MustExec(fmt.Sprintf("insert into t values %v", strings.Join(values, ", ")))
tk.MustExec(fmt.Sprintf("insert into tnormal values %v", strings.Join(values, ", ")))

randRange := func() (int, int) {
a, b := rand.Intn(nRows), rand.Intn(nRows)
if a > b {
return b, a
}
return a, b
}
for i := 0; i < nRows; i++ {
lb, rb := randRange()
cond := fmt.Sprintf("(t2.b between %v and %v)", lb, rb)
result := tk.MustQuery("select t1.a from tnormal t1, tnormal t2 where t1.a=t2.b and " + cond).Sort().Rows()
tk.MustQuery("select /*+ TIDB_INLJ(t1, t2) */ t1.a from t t1, t t2 where t1.a=t2.b and " + cond).Sort().Check(result)
}
}
19 changes: 0 additions & 19 deletions executor/partition_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,25 +86,6 @@ func (n nextPartitionForIndexLookUp) nextPartition(ctx context.Context, tbl tabl
return n.exec, nil
}

type nextPartitionForIndexReader struct {
*innerPartitionInfo
exec *IndexReaderExecutor
}

func (n nextPartitionForIndexReader) GetInnerPartitionInfo() *innerPartitionInfo {
return n.innerPartitionInfo
}

func (n nextPartitionForIndexReader) nextPartition(ctx context.Context, tbl table.PhysicalTable) (Executor, error) {
exec := n.exec
exec.table = tbl
exec.physicalTableID = tbl.GetPhysicalID()
if n.innerPartitionInfo != nil && !n.isFullPartition {
exec.ranges = n.nextRange[tbl.GetPhysicalID()]
}
return exec, nil
}

type nextPartitionForIndexMerge struct {
exec *IndexMergeReaderExecutor
}
Expand Down