Skip to content

Commit

Permalink
planner, executor: support index merge's order prop push down at the …
Browse files Browse the repository at this point in the history
…normal way (#43881)

ref #41028, close #43577, close #45387
  • Loading branch information
winoros authored Aug 1, 2023
1 parent a53fd2d commit 0e068ed
Show file tree
Hide file tree
Showing 20 changed files with 596 additions and 443 deletions.
63 changes: 32 additions & 31 deletions cmd/explaintest/r/index_merge.result
Original file line number Diff line number Diff line change
Expand Up @@ -390,15 +390,15 @@ Delete_11 N/A root N/A
└─Sort_15 4056.68 root test.t1.c1
└─SelectLock_17 4056.68 root for update 0
└─HashJoin_33 4056.68 root inner join, equal:[eq(test.t1.c1, test.t1.c1)]
├─HashAgg_36(Build) 3245.34 root group by:test.t1.c1, funcs:firstrow(test.t1.c1)->test.t1.c1
│ └─IndexMerge_45 2248.30 root type: union
│ ├─IndexRangeScan_41(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo
│ ├─IndexRangeScan_42(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo
│ └─Selection_44(Probe) 2248.30 cop[tikv] not(isnull(test.t1.c1)), or(lt(test.t1.c1, 10), and(lt(test.t1.c2, 10), lt(test.t1.c3, 10)))
│ └─TableRowIDScan_43 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo
└─TableReader_48(Probe) 9990.00 root data:Selection_47
└─Selection_47 9990.00 cop[tikv] not(isnull(test.t1.c1))
└─TableFullScan_46 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
├─HashAgg_35(Build) 3245.34 root group by:test.t1.c1, funcs:firstrow(test.t1.c1)->test.t1.c1
│ └─IndexMerge_41 2248.30 root type: union
│ ├─IndexRangeScan_37(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo
│ ├─IndexRangeScan_38(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo
│ └─Selection_40(Probe) 2248.30 cop[tikv] not(isnull(test.t1.c1)), or(lt(test.t1.c1, 10), and(lt(test.t1.c2, 10), lt(test.t1.c3, 10)))
│ └─TableRowIDScan_39 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo
└─TableReader_49(Probe) 9990.00 root data:Selection_48
└─Selection_48 9990.00 cop[tikv] not(isnull(test.t1.c1))
└─TableFullScan_47 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
delete from t1 where c1 in (select /*+ use_index_merge(t1) */ c1 from t1 where c1 < 10 or c2 < 10 and c3 < 10) order by 1;
select * from t1;
c1 c2 c3
Expand All @@ -408,15 +408,15 @@ id estRows task access object operator info
Update_10 N/A root N/A
└─SelectLock_14 4056.68 root for update 0
└─HashJoin_30 4056.68 root inner join, equal:[eq(test.t1.c1, test.t1.c1)]
├─HashAgg_33(Build) 3245.34 root group by:test.t1.c1, funcs:firstrow(test.t1.c1)->test.t1.c1
│ └─IndexMerge_42 2248.30 root type: union
│ ├─IndexRangeScan_38(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo
│ ├─IndexRangeScan_39(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo
│ └─Selection_41(Probe) 2248.30 cop[tikv] not(isnull(test.t1.c1)), or(lt(test.t1.c1, 10), and(lt(test.t1.c2, 10), lt(test.t1.c3, 10)))
│ └─TableRowIDScan_40 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo
└─TableReader_45(Probe) 9990.00 root data:Selection_44
└─Selection_44 9990.00 cop[tikv] not(isnull(test.t1.c1))
└─TableFullScan_43 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
├─HashAgg_32(Build) 3245.34 root group by:test.t1.c1, funcs:firstrow(test.t1.c1)->test.t1.c1
│ └─IndexMerge_38 2248.30 root type: union
│ ├─IndexRangeScan_34(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo
│ ├─IndexRangeScan_35(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo
│ └─Selection_37(Probe) 2248.30 cop[tikv] not(isnull(test.t1.c1)), or(lt(test.t1.c1, 10), and(lt(test.t1.c2, 10), lt(test.t1.c3, 10)))
│ └─TableRowIDScan_36 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo
└─TableReader_46(Probe) 9990.00 root data:Selection_45
└─Selection_45 9990.00 cop[tikv] not(isnull(test.t1.c1))
└─TableFullScan_44 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
update t1 set c1 = 100, c2 = 100, c3 = 100 where c1 in (select /*+ use_index_merge(t1) */ c1 from t1 where c1 < 10 or c2 < 10 and c3 < 10);
select * from t1;
c1 c2 c3
Expand Down Expand Up @@ -469,26 +469,27 @@ create table t1(c1 int, c2 int, c3 int, key(c1), key(c2));
insert into t1 values(1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4), (5, 5, 5);
explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10 order by 1 limit 1 offset 2;
id estRows task access object operator info
TopN_10 1.00 root test.t1.c1, offset:2, count:1
└─IndexMerge_23 1841.86 root type: union
├─IndexRangeScan_19(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo
├─IndexRangeScan_20(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo
└─Selection_22(Probe) 1841.86 cop[tikv] lt(test.t1.c3, 10)
└─TableRowIDScan_21 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo
TopN_9 1.00 root test.t1.c1, offset:2, count:1
└─IndexMerge_18 3.00 root type: union
├─IndexRangeScan_13(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo
├─IndexRangeScan_14(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo
└─TopN_17(Probe) 3.00 cop[tikv] test.t1.c1, offset:0, count:3
└─Selection_16 1841.86 cop[tikv] lt(test.t1.c3, 10)
└─TableRowIDScan_15 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo
select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10 order by 1 limit 1 offset 2;
c1 c2 c3
3 3 3
///// GROUP BY
explain select /*+ use_index_merge(t1) */ sum(c1) from t1 where (c1 < 10 or c2 < 10) and c3 < 10 group by c1 order by 1;
id estRows task access object operator info
Sort_6 1473.49 root Column#5
└─HashAgg_11 1473.49 root group by:Column#13, funcs:sum(Column#12)->Column#5
└─Projection_22 1841.86 root cast(test.t1.c1, decimal(10,0) BINARY)->Column#12, test.t1.c1->Column#13
└─IndexMerge_20 1841.86 root type: union
├─IndexRangeScan_16(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo
├─IndexRangeScan_17(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo
└─Selection_19(Probe) 1841.86 cop[tikv] lt(test.t1.c3, 10)
└─TableRowIDScan_18 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo
└─HashAgg_10 1473.49 root group by:Column#13, funcs:sum(Column#12)->Column#5
└─Projection_23 1841.86 root cast(test.t1.c1, decimal(10,0) BINARY)->Column#12, test.t1.c1->Column#13
└─IndexMerge_16 1841.86 root type: union
├─IndexRangeScan_12(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo
├─IndexRangeScan_13(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo
└─Selection_15(Probe) 1841.86 cop[tikv] lt(test.t1.c3, 10)
└─TableRowIDScan_14 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo
select /*+ use_index_merge(t1) */ sum(c1) from t1 where (c1 < 10 or c2 < 10) and c3 < 10 group by c1 order by 1;
sum(c1)
1
Expand Down
20 changes: 18 additions & 2 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1445,7 +1445,23 @@ func (b *executorBuilder) buildUnionScanFromReader(reader exec.Executor, v *plan
us.virtualColumnIndex = buildVirtualColumnIndex(us.Schema(), us.columns)
us.handleCachedTable(b, x, sessionVars, startTS)
case *IndexMergeReaderExecutor:
// IndexMergeReader doesn't care order for now. So we will not set desc, useIndex and keepOrder.
if len(x.byItems) != 0 {
us.keepOrder = x.keepOrder
us.desc = x.byItems[0].Desc
for _, item := range x.byItems {
c, ok := item.Expr.(*expression.Column)
if !ok {
b.err = errors.Errorf("Not support non-column in orderBy pushed down")
return nil
}
for i, col := range x.columns {
if col.ID == c.ID {
us.usedIndex = append(us.usedIndex, i)
break
}
}
}
}
us.conditions, us.conditionsWithVirCol = plannercore.SplitSelCondsWithVirtualColumn(v.Conditions)
us.columns = x.columns
us.table = x.table
Expand Down Expand Up @@ -4201,7 +4217,7 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd
dataReaderBuilder: readerBuilder,
feedbacks: feedbacks,
paging: paging,
handleCols: ts.HandleCols,
handleCols: v.HandleCols,
isCorColInPartialFilters: isCorColInPartialFilters,
isCorColInTableFilter: isCorColInTableFilter,
isCorColInPartialAccess: isCorColInPartialAccess,
Expand Down
74 changes: 58 additions & 16 deletions executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ type indexMergeTableTask struct {
// parTblIdx are only used in indexMergeProcessWorker.fetchLoopIntersection.
parTblIdx int

// partialPlanID are only used for indexMergeProcessWorker.fetchLoopUnionWithOrderByAndPushedLimit.
// partialPlanID are only used for indexMergeProcessWorker.fetchLoopUnionWithOrderBy.
partialPlanID int
}

Expand Down Expand Up @@ -297,9 +297,12 @@ func (e *IndexMergeReaderExecutor) startIndexMergeProcessWorker(ctx context.Cont
util.WithRecovery(
func() {
if e.isIntersection {
if e.pushedLimit != nil || e.keepOrder {
panic("Not support intersection with pushedLimit or keepOrder = true")
}
idxMergeProcessWorker.fetchLoopIntersection(ctx, fetch, workCh, e.resultCh, e.finished)
} else if e.pushedLimit != nil && len(e.byItems) != 0 {
idxMergeProcessWorker.fetchLoopUnionWithOrderByAndPushedLimit(ctx, fetch, workCh, e.resultCh, e.finished)
} else if len(e.byItems) != 0 {
idxMergeProcessWorker.fetchLoopUnionWithOrderBy(ctx, fetch, workCh, e.resultCh, e.finished)
} else {
idxMergeProcessWorker.fetchLoopUnion(ctx, fetch, workCh, e.resultCh, e.finished)
}
Expand Down Expand Up @@ -341,6 +344,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
util.WithRecovery(
func() {
failpoint.Inject("testIndexMergePanicPartialIndexWorker", nil)
is := e.partialPlans[workID][0].(*plannercore.PhysicalIndexScan)
worker := &partialIndexWorker{
stats: e.stats,
idxID: e.getPartitalPlanID(workID),
Expand All @@ -353,10 +357,9 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
memTracker: e.memTracker,
partitionTableMode: e.partitionTableMode,
prunedPartitions: e.prunedPartitions,
byItems: e.byItems,
byItems: is.ByItems,
pushedLimit: e.pushedLimit,
}

if e.isCorColInPartialFilters[workID] {
// We got correlated column, so need to refresh Selection operator.
var err error
Expand Down Expand Up @@ -471,7 +474,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context,
ranges: e.ranges[workID],
netDataSize: e.partialNetDataSizes[workID],
keepOrder: ts.KeepOrder,
byItems: e.byItems,
byItems: ts.ByItems,
}

worker := &partialTableWorker{
Expand All @@ -484,7 +487,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context,
memTracker: e.memTracker,
partitionTableMode: e.partitionTableMode,
prunedPartitions: e.prunedPartitions,
byItems: e.byItems,
byItems: ts.ByItems,
pushedLimit: e.pushedLimit,
}

Expand Down Expand Up @@ -924,7 +927,9 @@ type rowIdx struct {
}

type handleHeap struct {
// requiredCnt == 0 means need all handles
requiredCnt uint64
tracker *memory.Tracker
taskMap map[int][]*indexMergeTableTask

idx []rowIdx
Expand Down Expand Up @@ -961,23 +966,34 @@ func (h handleHeap) Swap(i, j int) {
func (h *handleHeap) Push(x interface{}) {
idx := x.(rowIdx)
h.idx = append(h.idx, idx)
if h.tracker != nil {
h.tracker.Consume(int64(unsafe.Sizeof(h.idx)))
}
}

func (h *handleHeap) Pop() interface{} {
idxRet := h.idx[len(h.idx)-1]
h.idx = h.idx[:len(h.idx)-1]
if h.tracker != nil {
h.tracker.Consume(-int64(unsafe.Sizeof(h.idx)))
}
return idxRet
}

func (w *indexMergeProcessWorker) NewHandleHeap(taskMap map[int][]*indexMergeTableTask) *handleHeap {
func (w *indexMergeProcessWorker) NewHandleHeap(taskMap map[int][]*indexMergeTableTask, memTracker *memory.Tracker) *handleHeap {
compareFuncs := make([]chunk.CompareFunc, 0, len(w.indexMerge.byItems))
for _, item := range w.indexMerge.byItems {
keyType := item.Expr.GetType()
compareFuncs = append(compareFuncs, chunk.GetCompareFunc(keyType))
}
requiredCnt := w.indexMerge.pushedLimit.Count + w.indexMerge.pushedLimit.Offset

requiredCnt := uint64(0)
if w.indexMerge.pushedLimit != nil {
requiredCnt = mathutil.Max(requiredCnt, w.indexMerge.pushedLimit.Count+w.indexMerge.pushedLimit.Offset)
}
return &handleHeap{
requiredCnt: requiredCnt,
tracker: memTracker,
taskMap: taskMap,
idx: make([]rowIdx, 0, requiredCnt),
compareFunc: compareFuncs,
Expand All @@ -1004,7 +1020,7 @@ func (w *indexMergeProcessWorker) pruneTableWorkerTaskIdxRows(task *indexMergeTa
}
}

func (w *indexMergeProcessWorker) fetchLoopUnionWithOrderByAndPushedLimit(ctx context.Context, fetchCh <-chan *indexMergeTableTask,
func (w *indexMergeProcessWorker) fetchLoopUnionWithOrderBy(ctx context.Context, fetchCh <-chan *indexMergeTableTask,
workCh chan<- *indexMergeTableTask, resultCh chan<- *indexMergeTableTask, finished <-chan struct{}) {
memTracker := memory.NewTracker(w.indexMerge.ID(), -1)
memTracker.AttachTo(w.indexMerge.memTracker)
Expand All @@ -1021,8 +1037,7 @@ func (w *indexMergeProcessWorker) fetchLoopUnionWithOrderByAndPushedLimit(ctx co
distinctHandles := kv.NewHandleMap()
taskMap := make(map[int][]*indexMergeTableTask)
uselessMap := make(map[int]struct{})
taskHeap := w.NewHandleHeap(taskMap)
memTracker.Consume(int64(taskHeap.requiredCnt) * int64(unsafe.Sizeof(rowIdx{0, 0, 0})))
taskHeap := w.NewHandleHeap(taskMap, memTracker)

for task := range fetchCh {
select {
Expand All @@ -1038,15 +1053,15 @@ func (w *indexMergeProcessWorker) fetchLoopUnionWithOrderByAndPushedLimit(ctx co
continue
}
if _, ok := taskMap[task.partialPlanID]; !ok {
taskMap[task.partialPlanID] = make([]*indexMergeTableTask, 0)
taskMap[task.partialPlanID] = make([]*indexMergeTableTask, 0, 1)
}
w.pruneTableWorkerTaskIdxRows(task)
taskMap[task.partialPlanID] = append(taskMap[task.partialPlanID], task)
for i, h := range task.handles {
if _, ok := distinctHandles.Get(h); !ok {
distinctHandles.Set(h, true)
heap.Push(taskHeap, rowIdx{task.partialPlanID, len(taskMap[task.partialPlanID]) - 1, i})
if taskHeap.Len() > int(taskHeap.requiredCnt) {
if int(taskHeap.requiredCnt) != 0 && taskHeap.Len() > int(taskHeap.requiredCnt) {
top := heap.Pop(taskHeap).(rowIdx)
if top.partialID == task.partialPlanID && top.taskID == len(taskMap[task.partialPlanID])-1 && top.rowID == i {
uselessMap[task.partialPlanID] = struct{}{}
Expand All @@ -1067,7 +1082,10 @@ func (w *indexMergeProcessWorker) fetchLoopUnionWithOrderByAndPushedLimit(ctx co
}
}

needCount := mathutil.Max(0, taskHeap.Len()-int(w.indexMerge.pushedLimit.Offset))
needCount := taskHeap.Len()
if w.indexMerge.pushedLimit != nil {
needCount = mathutil.Max(0, taskHeap.Len()-int(w.indexMerge.pushedLimit.Offset))
}
if needCount == 0 {
return
}
Expand Down Expand Up @@ -1125,10 +1143,17 @@ func (w *indexMergeProcessWorker) fetchLoopUnion(ctx context.Context, fetchCh <-
defer close(workCh)
failpoint.Inject("testIndexMergePanicProcessWorkerUnion", nil)

var pushedLimit *plannercore.PushedDownLimit
if w.indexMerge.pushedLimit != nil {
pushedLimit = w.indexMerge.pushedLimit.Clone()
}
distinctHandles := make(map[int64]*kv.HandleMap)
for {
var ok bool
var task *indexMergeTableTask
if pushedLimit != nil && pushedLimit.Count == 0 {
return
}
select {
case <-ctx.Done():
return
Expand Down Expand Up @@ -1175,6 +1200,23 @@ func (w *indexMergeProcessWorker) fetchLoopUnion(ctx context.Context, fetchCh <-
if len(fhs) == 0 {
continue
}
if pushedLimit != nil {
fhsLen := uint64(len(fhs))
// The number of handles is less than the offset, discard all handles.
if fhsLen <= pushedLimit.Offset {
pushedLimit.Offset -= fhsLen
continue
}
fhs = fhs[pushedLimit.Offset:]
pushedLimit.Offset = 0

fhsLen = uint64(len(fhs))
// The number of handles is greater than the limit, only keep limit count.
if fhsLen > pushedLimit.Count {
fhs = fhs[:pushedLimit.Count]
}
pushedLimit.Count -= mathutil.Min(pushedLimit.Count, fhsLen)
}
task = &indexMergeTableTask{
lookupTableTask: lookupTableTask{
handles: fhs,
Expand Down Expand Up @@ -1748,7 +1790,7 @@ func (w *indexMergeTableScanWorker) executeTask(ctx context.Context, task *index
if err != nil {
return err
}
if physicalTableIDIdx != -1 {
if w.indexMergeExec.partitionTableMode && physicalTableIDIdx != -1 {
handle = kv.NewPartitionHandle(row.GetInt64(physicalTableIDIdx), handle)
}
rowIdx, _ := task.indexOrder.Get(handle)
Expand Down
25 changes: 24 additions & 1 deletion executor/mem_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,9 @@ type memIndexMergeReader struct {
partitionMode bool // if it is accessing a partition table
partitionTables []table.PhysicalTable // partition tables to access
partitionKVRanges [][][]kv.KeyRange // kv ranges for these partition tables

keepOrder bool
compareExec
}

func buildMemIndexMergeReader(ctx context.Context, us *UnionScanExec, indexMergeReader *IndexMergeReaderExecutor) *memIndexMergeReader {
Expand Down Expand Up @@ -831,6 +834,9 @@ func buildMemIndexMergeReader(ctx context.Context, us *UnionScanExec, indexMerge
partitionMode: indexMergeReader.partitionTableMode,
partitionTables: indexMergeReader.prunedPartitions,
partitionKVRanges: indexMergeReader.partitionKeyRanges,

keepOrder: us.keepOrder,
compareExec: us.compareExec,
}
}

Expand Down Expand Up @@ -921,7 +927,24 @@ func (m *memIndexMergeReader) getMemRows(ctx context.Context) ([][]types.Datum,
},
}

return memTblReader.getMemRows(ctx)
rows, err := memTblReader.getMemRows(ctx)
if err != nil {
return nil, err
}

// Didn't set keepOrder = true for memTblReader,
// In indexMerge, non-partitioned tables are also need reordered.
if m.keepOrder {
slices.SortFunc(rows, func(a, b []types.Datum) bool {
ret, err1 := m.compare(m.ctx.GetSessionVars().StmtCtx, a, b)
if err1 != nil {
err = err1
}
return ret
})
}

return rows, err
}

// Union all handles of all partial paths.
Expand Down
2 changes: 1 addition & 1 deletion executor/test/indexmergereadtest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 30,
shard_count = 33,
deps = [
"//config",
"//meta/autoid",
Expand Down
Loading

0 comments on commit 0e068ed

Please sign in to comment.