Skip to content

Commit

Permalink
cherry pick pingcap#29068 to release-5.3
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <[email protected]>
  • Loading branch information
XuHuaiyu authored and ti-srebot committed Nov 24, 2021
1 parent 79e237d commit 86db918
Show file tree
Hide file tree
Showing 9 changed files with 214 additions and 37 deletions.
40 changes: 36 additions & 4 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"math"
"sort"
"sync/atomic"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -551,13 +552,25 @@ func PartitionHandlesToKVRanges(handles []kv.Handle) []kv.KeyRange {

// IndexRangesToKVRanges converts index ranges to "KeyRange".
func IndexRangesToKVRanges(sc *stmtctx.StatementContext, tid, idxID int64, ranges []*ranger.Range, fb *statistics.QueryFeedback) ([]kv.KeyRange, error) {
return IndexRangesToKVRangesForTables(sc, []int64{tid}, idxID, ranges, fb)
return IndexRangesToKVRangesWithInterruptSignal(sc, tid, idxID, ranges, fb, nil, nil)
}

// IndexRangesToKVRangesWithInterruptSignal converts index ranges to "KeyRange".
// The process can be interrupted by set `interruptSignal` to true.
func IndexRangesToKVRangesWithInterruptSignal(sc *stmtctx.StatementContext, tid, idxID int64, ranges []*ranger.Range, fb *statistics.QueryFeedback, memTracker *memory.Tracker, interruptSignal *atomic.Value) ([]kv.KeyRange, error) {
return indexRangesToKVRangesForTablesWithInterruptSignal(sc, []int64{tid}, idxID, ranges, fb, memTracker, interruptSignal)
}

// IndexRangesToKVRangesForTables converts indexes ranges to "KeyRange".
func IndexRangesToKVRangesForTables(sc *stmtctx.StatementContext, tids []int64, idxID int64, ranges []*ranger.Range, fb *statistics.QueryFeedback) ([]kv.KeyRange, error) {
return indexRangesToKVRangesForTablesWithInterruptSignal(sc, tids, idxID, ranges, fb, nil, nil)
}

// IndexRangesToKVRangesForTablesWithInterruptSignal converts indexes ranges to "KeyRange".
// The process can be interrupted by set `interruptSignal` to true.
func indexRangesToKVRangesForTablesWithInterruptSignal(sc *stmtctx.StatementContext, tids []int64, idxID int64, ranges []*ranger.Range, fb *statistics.QueryFeedback, memTracker *memory.Tracker, interruptSignal *atomic.Value) ([]kv.KeyRange, error) {
if fb == nil || fb.Hist == nil {
return indexRangesToKVWithoutSplit(sc, tids, idxID, ranges)
return indexRangesToKVWithoutSplit(sc, tids, idxID, ranges, memTracker, interruptSignal)
}
feedbackRanges := make([]*ranger.Range, 0, len(ranges))
for _, ran := range ranges {
Expand Down Expand Up @@ -642,18 +655,37 @@ func VerifyTxnScope(txnScope string, physicalTableID int64, is infoschema.InfoSc
return true
}

func indexRangesToKVWithoutSplit(sc *stmtctx.StatementContext, tids []int64, idxID int64, ranges []*ranger.Range) ([]kv.KeyRange, error) {
func indexRangesToKVWithoutSplit(sc *stmtctx.StatementContext, tids []int64, idxID int64, ranges []*ranger.Range, memTracker *memory.Tracker, interruptSignal *atomic.Value) ([]kv.KeyRange, error) {
krs := make([]kv.KeyRange, 0, len(ranges))
for _, ran := range ranges {
const CheckSignalStep = 8
var estimatedMemUsage int64
// encodeIndexKey and EncodeIndexSeekKey is time-consuming, thus we need to
// check the interrupt signal periodically.
for i, ran := range ranges {
low, high, err := encodeIndexKey(sc, ran)
if err != nil {
return nil, err
}
if i == 0 {
estimatedMemUsage += int64(cap(low) + cap(high))
}
for _, tid := range tids {
startKey := tablecodec.EncodeIndexSeekKey(tid, idxID, low)
endKey := tablecodec.EncodeIndexSeekKey(tid, idxID, high)
if i == 0 {
estimatedMemUsage += int64(cap(startKey)) + int64(cap(endKey))
}
krs = append(krs, kv.KeyRange{StartKey: startKey, EndKey: endKey})
}
if i%CheckSignalStep == 0 {
if i == 0 && memTracker != nil {
estimatedMemUsage *= int64(len(ranges))
memTracker.Consume(estimatedMemUsage)
}
if interruptSignal != nil && interruptSignal.Load().(bool) {
return nil, nil
}
}
}
return krs, nil
}
Expand Down
60 changes: 34 additions & 26 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"unsafe"

Expand Down Expand Up @@ -57,6 +58,7 @@ import (
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tidb/util/timeutil"
Expand Down Expand Up @@ -2765,6 +2767,7 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plannercore.PhysicalIndexJoin)
indexRanges: v.Ranges,
keyOff2IdxOff: v.KeyOff2IdxOff,
lastColHelper: v.CompareFilters,
finished: &atomic.Value{},
}
childrenUsedSchema := markChildrenUsedCols(v.Schema(), v.Children()[0].Schema(), v.Children()[1].Schema())
e.joiner = newJoiner(b.ctx, v.JoinType, v.InnerChildIdx == 0, defaultValues, v.OtherConditions, leftTypes, rightTypes, childrenUsedSchema)
Expand Down Expand Up @@ -3594,33 +3597,33 @@ type mockPhysicalIndexReader struct {
}

func (builder *dataReaderBuilder) buildExecutorForIndexJoin(ctx context.Context, lookUpContents []*indexJoinLookUpContent,
IndexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, canReorderHandles bool) (Executor, error) {
return builder.buildExecutorForIndexJoinInternal(ctx, builder.Plan, lookUpContents, IndexRanges, keyOff2IdxOff, cwc, canReorderHandles)
IndexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, canReorderHandles bool, memTracker *memory.Tracker, interruptSignal *atomic.Value) (Executor, error) {
return builder.buildExecutorForIndexJoinInternal(ctx, builder.Plan, lookUpContents, IndexRanges, keyOff2IdxOff, cwc, canReorderHandles, memTracker, interruptSignal)
}

func (builder *dataReaderBuilder) buildExecutorForIndexJoinInternal(ctx context.Context, plan plannercore.Plan, lookUpContents []*indexJoinLookUpContent,
IndexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, canReorderHandles bool) (Executor, error) {
IndexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, canReorderHandles bool, memTracker *memory.Tracker, interruptSignal *atomic.Value) (Executor, error) {
switch v := plan.(type) {
case *plannercore.PhysicalTableReader:
return builder.buildTableReaderForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc, canReorderHandles)
return builder.buildTableReaderForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc, canReorderHandles, memTracker, interruptSignal)
case *plannercore.PhysicalIndexReader:
return builder.buildIndexReaderForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc)
return builder.buildIndexReaderForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal)
case *plannercore.PhysicalIndexLookUpReader:
return builder.buildIndexLookUpReaderForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc)
return builder.buildIndexLookUpReaderForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal)
case *plannercore.PhysicalUnionScan:
return builder.buildUnionScanForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc, canReorderHandles)
return builder.buildUnionScanForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc, canReorderHandles, memTracker, interruptSignal)
// The inner child of IndexJoin might be Projection when a combination of the following conditions is true:
// 1. The inner child fetch data using indexLookupReader
// 2. PK is not handle
// 3. The inner child needs to keep order
// In this case, an extra column tidb_rowid will be appended in the output result of IndexLookupReader(see copTask.doubleReadNeedProj).
// Then we need a Projection upon IndexLookupReader to prune the redundant column.
case *plannercore.PhysicalProjection:
return builder.buildProjectionForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc)
return builder.buildProjectionForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal)
// Need to support physical selection because after PR 16389, TiDB will push down all the expr supported by TiKV or TiFlash
// in predicate push down stage, so if there is an expr which only supported by TiFlash, a physical selection will be added after index read
case *plannercore.PhysicalSelection:
childExec, err := builder.buildExecutorForIndexJoinInternal(ctx, v.Children()[0], lookUpContents, IndexRanges, keyOff2IdxOff, cwc, canReorderHandles)
childExec, err := builder.buildExecutorForIndexJoinInternal(ctx, v.Children()[0], lookUpContents, IndexRanges, keyOff2IdxOff, cwc, canReorderHandles, memTracker, interruptSignal)
if err != nil {
return nil, err
}
Expand All @@ -3638,9 +3641,9 @@ func (builder *dataReaderBuilder) buildExecutorForIndexJoinInternal(ctx context.

func (builder *dataReaderBuilder) buildUnionScanForIndexJoin(ctx context.Context, v *plannercore.PhysicalUnionScan,
values []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int,
cwc *plannercore.ColWithCmpFuncManager, canReorderHandles bool) (Executor, error) {
cwc *plannercore.ColWithCmpFuncManager, canReorderHandles bool, memTracker *memory.Tracker, interruptSignal *atomic.Value) (Executor, error) {
childBuilder := &dataReaderBuilder{Plan: v.Children()[0], executorBuilder: builder.executorBuilder}
reader, err := childBuilder.buildExecutorForIndexJoin(ctx, values, indexRanges, keyOff2IdxOff, cwc, canReorderHandles)
reader, err := childBuilder.buildExecutorForIndexJoin(ctx, values, indexRanges, keyOff2IdxOff, cwc, canReorderHandles, memTracker, interruptSignal)
if err != nil {
return nil, err
}
Expand All @@ -3654,15 +3657,15 @@ func (builder *dataReaderBuilder) buildUnionScanForIndexJoin(ctx context.Context

func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Context, v *plannercore.PhysicalTableReader,
lookUpContents []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int,
cwc *plannercore.ColWithCmpFuncManager, canReorderHandles bool) (Executor, error) {
cwc *plannercore.ColWithCmpFuncManager, canReorderHandles bool, memTracker *memory.Tracker, interruptSignal *atomic.Value) (Executor, error) {
e, err := buildNoRangeTableReader(builder.executorBuilder, v)
if err != nil {
return nil, err
}
tbInfo := e.table.Meta()
if v.IsCommonHandle {
if tbInfo.GetPartitionInfo() == nil || !builder.ctx.GetSessionVars().UseDynamicPartitionPrune() {
kvRanges, err := buildKvRangesForIndexJoin(e.ctx, getPhysicalTableID(e.table), -1, lookUpContents, indexRanges, keyOff2IdxOff, cwc)
kvRanges, err := buildKvRangesForIndexJoin(e.ctx, getPhysicalTableID(e.table), -1, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -3691,7 +3694,7 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
return nil, err
}
pid := p.GetPhysicalID()
tmp, err := buildKvRangesForIndexJoin(e.ctx, pid, -1, []*indexJoinLookUpContent{content}, indexRanges, keyOff2IdxOff, cwc)
tmp, err := buildKvRangesForIndexJoin(e.ctx, pid, -1, []*indexJoinLookUpContent{content}, indexRanges, keyOff2IdxOff, cwc, nil, interruptSignal)
if err != nil {
return nil, err
}
Expand All @@ -3706,7 +3709,7 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
kvRanges = make([]kv.KeyRange, 0, len(partitions)*len(lookUpContents))
for _, p := range partitions {
pid := p.GetPhysicalID()
tmp, err := buildKvRangesForIndexJoin(e.ctx, pid, -1, lookUpContents, indexRanges, keyOff2IdxOff, cwc)
tmp, err := buildKvRangesForIndexJoin(e.ctx, pid, -1, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -3872,14 +3875,14 @@ func (builder *dataReaderBuilder) buildTableReaderFromKvRanges(ctx context.Conte
}

func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Context, v *plannercore.PhysicalIndexReader,
lookUpContents []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager) (Executor, error) {
lookUpContents []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, memoryTracker *memory.Tracker, interruptSignal *atomic.Value) (Executor, error) {
e, err := buildNoRangeIndexReader(builder.executorBuilder, v)
if err != nil {
return nil, err
}
tbInfo := e.table.Meta()
if tbInfo.GetPartitionInfo() == nil || !builder.ctx.GetSessionVars().UseDynamicPartitionPrune() {
kvRanges, err := buildKvRangesForIndexJoin(e.ctx, e.physicalTableID, e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc)
kvRanges, err := buildKvRangesForIndexJoin(e.ctx, e.physicalTableID, e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memoryTracker, interruptSignal)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -3918,15 +3921,15 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte
}

func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context.Context, v *plannercore.PhysicalIndexLookUpReader,
lookUpContents []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager) (Executor, error) {
lookUpContents []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, memTracker *memory.Tracker, interruptSignal *atomic.Value) (Executor, error) {
e, err := buildNoRangeIndexLookUpReader(builder.executorBuilder, v)
if err != nil {
return nil, err
}

tbInfo := e.table.Meta()
if tbInfo.GetPartitionInfo() == nil || !builder.ctx.GetSessionVars().UseDynamicPartitionPrune() {
e.kvRanges, err = buildKvRangesForIndexJoin(e.ctx, getPhysicalTableID(e.table), e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc)
e.kvRanges, err = buildKvRangesForIndexJoin(e.ctx, getPhysicalTableID(e.table), e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -3966,18 +3969,18 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context
}

func (builder *dataReaderBuilder) buildProjectionForIndexJoin(ctx context.Context, v *plannercore.PhysicalProjection,
lookUpContents []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager) (Executor, error) {
lookUpContents []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, memTracker *memory.Tracker, interruptSignal *atomic.Value) (Executor, error) {
var (
childExec Executor
err error
)
switch op := v.Children()[0].(type) {
case *plannercore.PhysicalIndexLookUpReader:
if childExec, err = builder.buildIndexLookUpReaderForIndexJoin(ctx, op, lookUpContents, indexRanges, keyOff2IdxOff, cwc); err != nil {
if childExec, err = builder.buildIndexLookUpReaderForIndexJoin(ctx, op, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal); err != nil {
return nil, err
}
case *plannercore.PhysicalTableReader:
if childExec, err = builder.buildTableReaderForIndexJoin(ctx, op, lookUpContents, indexRanges, keyOff2IdxOff, cwc, true); err != nil {
if childExec, err = builder.buildTableReaderForIndexJoin(ctx, op, lookUpContents, indexRanges, keyOff2IdxOff, cwc, true, memTracker, interruptSignal); err != nil {
return nil, err
}
default:
Expand Down Expand Up @@ -4046,7 +4049,7 @@ func buildRangesForIndexJoin(ctx sessionctx.Context, lookUpContents []*indexJoin

// buildKvRangesForIndexJoin builds kv ranges for index join when the inner plan is index scan plan.
func buildKvRangesForIndexJoin(ctx sessionctx.Context, tableID, indexID int64, lookUpContents []*indexJoinLookUpContent,
ranges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager) (_ []kv.KeyRange, err error) {
ranges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, memTracker *memory.Tracker, interruptSignal *atomic.Value) (_ []kv.KeyRange, err error) {
kvRanges := make([]kv.KeyRange, 0, len(ranges)*len(lookUpContents))
lastPos := len(ranges[0].LowVal) - 1
sc := ctx.GetSessionVars().StmtCtx
Expand All @@ -4065,7 +4068,7 @@ func buildKvRangesForIndexJoin(ctx sessionctx.Context, tableID, indexID int64, l
if indexID == -1 {
tmpKvRanges, err = distsql.CommonHandleRangesToKVRanges(sc, []int64{tableID}, ranges)
} else {
tmpKvRanges, err = distsql.IndexRangesToKVRanges(sc, tableID, indexID, ranges, nil)
tmpKvRanges, err = distsql.IndexRangesToKVRangesWithInterruptSignal(sc, tableID, indexID, ranges, nil, memTracker, interruptSignal)
}
if err != nil {
return nil, err
Expand All @@ -4087,7 +4090,12 @@ func buildKvRangesForIndexJoin(ctx sessionctx.Context, tableID, indexID int64, l
}
}
}

if len(kvRanges) != 0 && memTracker != nil {
memTracker.Consume(int64(2 * cap(kvRanges[0].StartKey) * len(kvRanges)))
}
if len(tmpDatumRanges) != 0 && memTracker != nil {
memTracker.Consume(2 * int64(len(tmpDatumRanges)) * types.EstimatedMemUsage(tmpDatumRanges[0].LowVal, len(tmpDatumRanges)))
}
if cwc == nil {
sort.Slice(kvRanges, func(i, j int) bool {
return bytes.Compare(kvRanges[i].StartKey, kvRanges[j].StartKey) < 0
Expand All @@ -4103,7 +4111,7 @@ func buildKvRangesForIndexJoin(ctx sessionctx.Context, tableID, indexID int64, l
if indexID == -1 {
return distsql.CommonHandleRangesToKVRanges(ctx.GetSessionVars().StmtCtx, []int64{tableID}, tmpDatumRanges)
}
return distsql.IndexRangesToKVRanges(ctx.GetSessionVars().StmtCtx, tableID, indexID, tmpDatumRanges, nil)
return distsql.IndexRangesToKVRangesWithInterruptSignal(ctx.GetSessionVars().StmtCtx, tableID, indexID, tmpDatumRanges, nil, memTracker, interruptSignal)
}

func (b *executorBuilder) buildWindow(v *plannercore.PhysicalWindow) Executor {
Expand Down
2 changes: 1 addition & 1 deletion executor/executor_pkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func SubTestBuildKvRangesForIndexJoinWithoutCwc(t *testing.T) {

keyOff2IdxOff := []int{1, 3}
ctx := mock.NewContext()
kvRanges, err := buildKvRangesForIndexJoin(ctx, 0, 0, joinKeyRows, indexRanges, keyOff2IdxOff, nil)
kvRanges, err := buildKvRangesForIndexJoin(ctx, 0, 0, joinKeyRows, indexRanges, keyOff2IdxOff, nil, nil, nil)
require.NoError(t, err)
// Check the kvRanges is in order.
for i, kvRange := range kvRanges {
Expand Down
Loading

0 comments on commit 86db918

Please sign in to comment.