Skip to content

Commit

Permalink
use BaseExecutorV2 to refactor IndexReaderExecutor and IndexLookUpExe…
Browse files Browse the repository at this point in the history
…cutor

Signed-off-by: Yang Keao <[email protected]>
  • Loading branch information
YangKeao committed Jul 19, 2024
1 parent 98fa22f commit fc3fa3f
Show file tree
Hide file tree
Showing 7 changed files with 200 additions and 118 deletions.
118 changes: 60 additions & 58 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3634,7 +3634,7 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) e
return ret
}

func buildIndexRangeForEachPartition(ctx sessionctx.Context, usedPartitions []table.PhysicalTable, contentPos []int64,
func buildIndexRangeForEachPartition(rctx *rangerctx.RangerContext, usedPartitions []table.PhysicalTable, contentPos []int64,
lookUpContent []*join.IndexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager) (map[int64][]*ranger.Range, error) {
contentBucket := make(map[int64][]*join.IndexJoinLookUpContent)
for _, p := range usedPartitions {
Expand All @@ -3647,7 +3647,7 @@ func buildIndexRangeForEachPartition(ctx sessionctx.Context, usedPartitions []ta
}
nextRange := make(map[int64][]*ranger.Range)
for _, p := range usedPartitions {
ranges, err := buildRangesForIndexJoin(ctx, contentBucket[p.GetPhysicalID()], indexRanges, keyOff2IdxOff, cwc)
ranges, err := buildRangesForIndexJoin(rctx, contentBucket[p.GetPhysicalID()], indexRanges, keyOff2IdxOff, cwc)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -3775,28 +3775,29 @@ func buildNoRangeIndexReader(b *executorBuilder, v *plannercore.PhysicalIndexRea
paging := b.ctx.GetSessionVars().EnablePaging

e := &IndexReaderExecutor{
BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID()),
indexUsageReporter: b.buildIndexUsageReporter(v),
dagPB: dagReq,
startTS: startTS,
txnScope: b.txnScope,
readReplicaScope: b.readReplicaScope,
isStaleness: b.isStaleness,
netDataSize: v.GetNetDataSize(),
physicalTableID: physicalTableID,
table: tbl,
index: is.Index,
keepOrder: is.KeepOrder,
desc: is.Desc,
columns: is.Columns,
byItems: is.ByItems,
paging: paging,
corColInFilter: b.corColInDistPlan(v.IndexPlans),
corColInAccess: b.corColInAccess(v.IndexPlans[0]),
idxCols: is.IdxCols,
colLens: is.IdxColLens,
plans: v.IndexPlans,
outputColumns: v.OutputColumns,
indexReaderExecutorContext: newIndexReaderExecutorContext(b.ctx),
BaseExecutorV2: exec.NewBaseExecutorV2(b.ctx.GetSessionVars(), v.Schema(), v.ID()),
indexUsageReporter: b.buildIndexUsageReporter(v),
dagPB: dagReq,
startTS: startTS,
txnScope: b.txnScope,
readReplicaScope: b.readReplicaScope,
isStaleness: b.isStaleness,
netDataSize: v.GetNetDataSize(),
physicalTableID: physicalTableID,
table: tbl,
index: is.Index,
keepOrder: is.KeepOrder,
desc: is.Desc,
columns: is.Columns,
byItems: is.ByItems,
paging: paging,
corColInFilter: b.corColInDistPlan(v.IndexPlans),
corColInAccess: b.corColInAccess(v.IndexPlans[0]),
idxCols: is.IdxCols,
colLens: is.IdxColLens,
plans: v.IndexPlans,
outputColumns: v.OutputColumns,
}

for _, col := range v.OutputColumns {
Expand Down Expand Up @@ -3953,29 +3954,30 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn
}

e := &IndexLookUpExecutor{
BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID()),
indexUsageReporter: b.buildIndexUsageReporter(v),
dagPB: indexReq,
startTS: startTS,
table: tbl,
index: is.Index,
keepOrder: is.KeepOrder,
byItems: is.ByItems,
desc: is.Desc,
tableRequest: tableReq,
columns: ts.Columns,
indexPaging: indexPaging,
dataReaderBuilder: readerBuilder,
corColInIdxSide: b.corColInDistPlan(v.IndexPlans),
corColInTblSide: b.corColInDistPlan(v.TablePlans),
corColInAccess: b.corColInAccess(v.IndexPlans[0]),
idxCols: is.IdxCols,
colLens: is.IdxColLens,
idxPlans: v.IndexPlans,
tblPlans: v.TablePlans,
PushedLimit: v.PushedLimit,
idxNetDataSize: v.GetAvgTableRowSize(),
avgRowSize: v.GetAvgTableRowSize(),
indexLookUpExecutorContext: newIndexLookUpExecutorContext(b.ctx),
BaseExecutorV2: exec.NewBaseExecutorV2(b.ctx.GetSessionVars(), v.Schema(), v.ID()),
indexUsageReporter: b.buildIndexUsageReporter(v),
dagPB: indexReq,
startTS: startTS,
table: tbl,
index: is.Index,
keepOrder: is.KeepOrder,
byItems: is.ByItems,
desc: is.Desc,
tableRequest: tableReq,
columns: ts.Columns,
indexPaging: indexPaging,
dataReaderBuilder: readerBuilder,
corColInIdxSide: b.corColInDistPlan(v.IndexPlans),
corColInTblSide: b.corColInDistPlan(v.TablePlans),
corColInAccess: b.corColInAccess(v.IndexPlans[0]),
idxCols: is.IdxCols,
colLens: is.IdxColLens,
idxPlans: v.IndexPlans,
tblPlans: v.TablePlans,
PushedLimit: v.PushedLimit,
idxNetDataSize: v.GetAvgTableRowSize(),
avgRowSize: v.GetAvgTableRowSize(),
}

if v.ExtraHandleCol != nil {
Expand Down Expand Up @@ -4589,7 +4591,7 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte
}
tbInfo := e.table.Meta()
if tbInfo.GetPartitionInfo() == nil || !builder.ctx.GetSessionVars().StmtCtx.UseDynamicPartitionPrune() {
kvRanges, err := buildKvRangesForIndexJoin(e.Ctx().GetDistSQLCtx(), e.Ctx().GetRangerCtx(), e.physicalTableID, e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memoryTracker, interruptSignal)
kvRanges, err := buildKvRangesForIndexJoin(e.dctx, e.rctx, e.physicalTableID, e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memoryTracker, interruptSignal)
if err != nil {
return nil, err
}
Expand All @@ -4603,7 +4605,7 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte
if err != nil {
return nil, err
}
if e.ranges, err = buildRangesForIndexJoin(e.Ctx(), lookUpContents, indexRanges, keyOff2IdxOff, cwc); err != nil {
if e.ranges, err = buildRangesForIndexJoin(e.rctx, lookUpContents, indexRanges, keyOff2IdxOff, cwc); err != nil {
return nil, err
}
if err := exec.Open(ctx, e); err != nil {
Expand All @@ -4619,7 +4621,7 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte
}
if len(usedPartition) != 0 {
if canPrune {
rangeMap, err := buildIndexRangeForEachPartition(e.Ctx(), usedPartition, contentPos, lookUpContents, indexRanges, keyOff2IdxOff, cwc)
rangeMap, err := buildIndexRangeForEachPartition(e.rctx, usedPartition, contentPos, lookUpContents, indexRanges, keyOff2IdxOff, cwc)
if err != nil {
return nil, err
}
Expand All @@ -4628,7 +4630,7 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte
e.partRangeMap = rangeMap
} else {
e.partitions = usedPartition
if e.ranges, err = buildRangesForIndexJoin(e.Ctx(), lookUpContents, indexRanges, keyOff2IdxOff, cwc); err != nil {
if e.ranges, err = buildRangesForIndexJoin(e.rctx, lookUpContents, indexRanges, keyOff2IdxOff, cwc); err != nil {
return nil, err
}
}
Expand All @@ -4651,7 +4653,7 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context

tbInfo := e.table.Meta()
if tbInfo.GetPartitionInfo() == nil || !builder.ctx.GetSessionVars().StmtCtx.UseDynamicPartitionPrune() {
e.kvRanges, err = buildKvRangesForIndexJoin(e.Ctx().GetDistSQLCtx(), e.Ctx().GetRangerCtx(), getPhysicalTableID(e.table), e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal)
e.kvRanges, err = buildKvRangesForIndexJoin(e.dctx, e.rctx, getPhysicalTableID(e.table), e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal)
if err != nil {
return nil, err
}
Expand All @@ -4665,7 +4667,7 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context
if err != nil {
return nil, err
}
e.ranges, err = buildRangesForIndexJoin(e.Ctx(), lookUpContents, indexRanges, keyOff2IdxOff, cwc)
e.ranges, err = buildRangesForIndexJoin(e.rctx, lookUpContents, indexRanges, keyOff2IdxOff, cwc)
if err != nil {
return nil, err
}
Expand All @@ -4682,7 +4684,7 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context
}
if len(usedPartition) != 0 {
if canPrune {
rangeMap, err := buildIndexRangeForEachPartition(e.Ctx(), usedPartition, contentPos, lookUpContents, indexRanges, keyOff2IdxOff, cwc)
rangeMap, err := buildIndexRangeForEachPartition(e.rctx, usedPartition, contentPos, lookUpContents, indexRanges, keyOff2IdxOff, cwc)
if err != nil {
return nil, err
}
Expand All @@ -4691,7 +4693,7 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context
e.partitionRangeMap = rangeMap
} else {
e.prunedPartitions = usedPartition
e.ranges, err = buildRangesForIndexJoin(e.Ctx(), lookUpContents, indexRanges, keyOff2IdxOff, cwc)
e.ranges, err = buildRangesForIndexJoin(e.rctx, lookUpContents, indexRanges, keyOff2IdxOff, cwc)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -4759,7 +4761,7 @@ func (builder *dataReaderBuilder) buildProjectionForIndexJoin(
}

// buildRangesForIndexJoin builds kv ranges for index join when the inner plan is index scan plan.
func buildRangesForIndexJoin(ctx sessionctx.Context, lookUpContents []*join.IndexJoinLookUpContent,
func buildRangesForIndexJoin(rctx *rangerctx.RangerContext, lookUpContents []*join.IndexJoinLookUpContent,
ranges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager) ([]*ranger.Range, error) {
retRanges := make([]*ranger.Range, 0, len(ranges)*len(lookUpContents))
lastPos := len(ranges[0].LowVal) - 1
Expand All @@ -4778,7 +4780,7 @@ func buildRangesForIndexJoin(ctx sessionctx.Context, lookUpContents []*join.Inde
}
continue
}
nextColRanges, err := cwc.BuildRangesByRow(ctx.GetRangerCtx(), content.Row)
nextColRanges, err := cwc.BuildRangesByRow(rctx, content.Row)
if err != nil {
return nil, err
}
Expand All @@ -4798,7 +4800,7 @@ func buildRangesForIndexJoin(ctx sessionctx.Context, lookUpContents []*join.Inde
return retRanges, nil
}

return ranger.UnionRanges(ctx.GetRangerCtx(), tmpDatumRanges, true)
return ranger.UnionRanges(rctx, tmpDatumRanges, true)
}

// buildKvRangesForIndexJoin builds kv ranges for index join when the inner plan is index scan plan.
Expand Down
Loading

0 comments on commit fc3fa3f

Please sign in to comment.