Skip to content

Commit

Permalink
planner, executor: indexMerge support partition table with limit embe…
Browse files Browse the repository at this point in the history
…dded (#43173) (#43261)

ref #41028, close #43158, close #43160
  • Loading branch information
ti-chi-bot authored Apr 21, 2023
1 parent d3dce91 commit c65e6ac
Show file tree
Hide file tree
Showing 9 changed files with 334 additions and 150 deletions.
13 changes: 10 additions & 3 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,10 @@ func (h *chunkRowHeap) Pop() interface{} {

// NewSortedSelectResults is only for partition table
// When pids != nil, the pid will be set in the last column of each chunk.Rows.
func NewSortedSelectResults(selectResult []SelectResult, pids []int64, byitems []*util.ByItems, memTracker *memory.Tracker) SelectResult {
// When schema == nil, sort by first few columns.
func NewSortedSelectResults(selectResult []SelectResult, pids []int64, schema *expression.Schema, byitems []*util.ByItems, memTracker *memory.Tracker) SelectResult {
s := &sortedSelectResults{
schema: schema,
selectResult: selectResult,
byItems: byitems,
memTracker: memTracker,
Expand All @@ -124,6 +126,7 @@ func NewSortedSelectResults(selectResult []SelectResult, pids []int64, byitems [
}

type sortedSelectResults struct {
schema *expression.Schema
selectResult []SelectResult
compareFuncs []chunk.CompareFunc
byItems []*util.ByItems
Expand Down Expand Up @@ -160,9 +163,13 @@ func (ssr *sortedSelectResults) initCompareFuncs() {

func (ssr *sortedSelectResults) buildKeyColumns() {
ssr.keyColumns = make([]int, 0, len(ssr.byItems))
for _, by := range ssr.byItems {
for i, by := range ssr.byItems {
col := by.Expr.(*expression.Column)
ssr.keyColumns = append(ssr.keyColumns, col.Index)
if ssr.schema == nil {
ssr.keyColumns = append(ssr.keyColumns, i)
} else {
ssr.keyColumns = append(ssr.keyColumns, ssr.schema.ColumnIndex(col))
}
}
}

Expand Down
10 changes: 4 additions & 6 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
}
results = append(results, result)
}
e.result = distsql.NewSortedSelectResults(results, nil, e.byItems, e.memTracker)
e.result = distsql.NewSortedSelectResults(results, nil, e.Schema(), e.byItems, e.memTracker)
}
return nil
}
Expand Down Expand Up @@ -695,16 +695,14 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan<
break
}
results = append(results, result)
worker.batchSize = initBatchSize
if worker.batchSize > worker.maxBatchSize {
worker.batchSize = worker.maxBatchSize
}
if e.partitionTableMode {
pids = append(pids, e.prunedPartitions[partTblIdx].GetPhysicalID())
}
}
worker.batchSize = mathutil.Min(initBatchSize, worker.maxBatchSize)
if len(results) > 1 && len(e.byItems) != 0 {
ssr := distsql.NewSortedSelectResults(results, pids, e.byItems, e.memTracker)
// e.Schema() not the output schema for indexReader, and we put byItems related column at first in `buildIndexReq`, so use nil here.
ssr := distsql.NewSortedSelectResults(results, pids, nil, e.byItems, e.memTracker)
results = []distsql.SelectResult{ssr}
}
ctx1, cancel := context.WithCancel(ctx)
Expand Down
Loading

0 comments on commit c65e6ac

Please sign in to comment.