Skip to content

Commit

Permalink
use BaseExecutorV2 to refactor IndexReaderExecutor and IndexLookUpExe…
Browse files Browse the repository at this point in the history
…cutor

Signed-off-by: Yang Keao <[email protected]>
  • Loading branch information
YangKeao committed Jul 19, 2024
1 parent 9a2686e commit 6b3c4bb
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 113 deletions.
118 changes: 60 additions & 58 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3635,7 +3635,7 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) e
return ret
}

func buildIndexRangeForEachPartition(ctx sessionctx.Context, usedPartitions []table.PhysicalTable, contentPos []int64,
func buildIndexRangeForEachPartition(rctx *rangerctx.RangerContext, usedPartitions []table.PhysicalTable, contentPos []int64,
lookUpContent []*join.IndexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager) (map[int64][]*ranger.Range, error) {
contentBucket := make(map[int64][]*join.IndexJoinLookUpContent)
for _, p := range usedPartitions {
Expand All @@ -3648,7 +3648,7 @@ func buildIndexRangeForEachPartition(ctx sessionctx.Context, usedPartitions []ta
}
nextRange := make(map[int64][]*ranger.Range)
for _, p := range usedPartitions {
ranges, err := buildRangesForIndexJoin(ctx, contentBucket[p.GetPhysicalID()], indexRanges, keyOff2IdxOff, cwc)
ranges, err := buildRangesForIndexJoin(rctx, contentBucket[p.GetPhysicalID()], indexRanges, keyOff2IdxOff, cwc)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -3776,28 +3776,29 @@ func buildNoRangeIndexReader(b *executorBuilder, v *plannercore.PhysicalIndexRea
paging := b.ctx.GetSessionVars().EnablePaging

e := &IndexReaderExecutor{
BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID()),
indexUsageReporter: b.buildIndexUsageReporter(v),
dagPB: dagReq,
startTS: startTS,
txnScope: b.txnScope,
readReplicaScope: b.readReplicaScope,
isStaleness: b.isStaleness,
netDataSize: v.GetNetDataSize(),
physicalTableID: physicalTableID,
table: tbl,
index: is.Index,
keepOrder: is.KeepOrder,
desc: is.Desc,
columns: is.Columns,
byItems: is.ByItems,
paging: paging,
corColInFilter: b.corColInDistPlan(v.IndexPlans),
corColInAccess: b.corColInAccess(v.IndexPlans[0]),
idxCols: is.IdxCols,
colLens: is.IdxColLens,
plans: v.IndexPlans,
outputColumns: v.OutputColumns,
indexReaderExecutorContext: newIndexReaderExecutorContext(b.ctx),
BaseExecutorV2: exec.NewBaseExecutorV2(b.ctx.GetSessionVars(), v.Schema(), v.ID()),
indexUsageReporter: b.buildIndexUsageReporter(v),
dagPB: dagReq,
startTS: startTS,
txnScope: b.txnScope,
readReplicaScope: b.readReplicaScope,
isStaleness: b.isStaleness,
netDataSize: v.GetNetDataSize(),
physicalTableID: physicalTableID,
table: tbl,
index: is.Index,
keepOrder: is.KeepOrder,
desc: is.Desc,
columns: is.Columns,
byItems: is.ByItems,
paging: paging,
corColInFilter: b.corColInDistPlan(v.IndexPlans),
corColInAccess: b.corColInAccess(v.IndexPlans[0]),
idxCols: is.IdxCols,
colLens: is.IdxColLens,
plans: v.IndexPlans,
outputColumns: v.OutputColumns,
}

for _, col := range v.OutputColumns {
Expand Down Expand Up @@ -3954,29 +3955,30 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn
}

e := &IndexLookUpExecutor{
BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID()),
indexUsageReporter: b.buildIndexUsageReporter(v),
dagPB: indexReq,
startTS: startTS,
table: tbl,
index: is.Index,
keepOrder: is.KeepOrder,
byItems: is.ByItems,
desc: is.Desc,
tableRequest: tableReq,
columns: ts.Columns,
indexPaging: indexPaging,
dataReaderBuilder: readerBuilder,
corColInIdxSide: b.corColInDistPlan(v.IndexPlans),
corColInTblSide: b.corColInDistPlan(v.TablePlans),
corColInAccess: b.corColInAccess(v.IndexPlans[0]),
idxCols: is.IdxCols,
colLens: is.IdxColLens,
idxPlans: v.IndexPlans,
tblPlans: v.TablePlans,
PushedLimit: v.PushedLimit,
idxNetDataSize: v.GetAvgTableRowSize(),
avgRowSize: v.GetAvgTableRowSize(),
indexLookUpExecutorContext: newIndexLookUpExecutorContext(b.ctx),
BaseExecutorV2: exec.NewBaseExecutorV2(b.ctx.GetSessionVars(), v.Schema(), v.ID()),
indexUsageReporter: b.buildIndexUsageReporter(v),
dagPB: indexReq,
startTS: startTS,
table: tbl,
index: is.Index,
keepOrder: is.KeepOrder,
byItems: is.ByItems,
desc: is.Desc,
tableRequest: tableReq,
columns: ts.Columns,
indexPaging: indexPaging,
dataReaderBuilder: readerBuilder,
corColInIdxSide: b.corColInDistPlan(v.IndexPlans),
corColInTblSide: b.corColInDistPlan(v.TablePlans),
corColInAccess: b.corColInAccess(v.IndexPlans[0]),
idxCols: is.IdxCols,
colLens: is.IdxColLens,
idxPlans: v.IndexPlans,
tblPlans: v.TablePlans,
PushedLimit: v.PushedLimit,
idxNetDataSize: v.GetAvgTableRowSize(),
avgRowSize: v.GetAvgTableRowSize(),
}

if v.ExtraHandleCol != nil {
Expand Down Expand Up @@ -4591,7 +4593,7 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte
}
tbInfo := e.table.Meta()
if tbInfo.GetPartitionInfo() == nil || !builder.ctx.GetSessionVars().StmtCtx.UseDynamicPartitionPrune() {
kvRanges, err := buildKvRangesForIndexJoin(e.Ctx().GetDistSQLCtx(), e.Ctx().GetRangerCtx(), e.physicalTableID, e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memoryTracker, interruptSignal)
kvRanges, err := buildKvRangesForIndexJoin(e.dctx, e.rctx, e.physicalTableID, e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memoryTracker, interruptSignal)
if err != nil {
return nil, err
}
Expand All @@ -4605,7 +4607,7 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte
if err != nil {
return nil, err
}
if e.ranges, err = buildRangesForIndexJoin(e.Ctx(), lookUpContents, indexRanges, keyOff2IdxOff, cwc); err != nil {
if e.ranges, err = buildRangesForIndexJoin(e.rctx, lookUpContents, indexRanges, keyOff2IdxOff, cwc); err != nil {
return nil, err
}
if err := exec.Open(ctx, e); err != nil {
Expand All @@ -4621,7 +4623,7 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte
}
if len(usedPartition) != 0 {
if canPrune {
rangeMap, err := buildIndexRangeForEachPartition(e.Ctx(), usedPartition, contentPos, lookUpContents, indexRanges, keyOff2IdxOff, cwc)
rangeMap, err := buildIndexRangeForEachPartition(e.rctx, usedPartition, contentPos, lookUpContents, indexRanges, keyOff2IdxOff, cwc)
if err != nil {
return nil, err
}
Expand All @@ -4630,7 +4632,7 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte
e.partRangeMap = rangeMap
} else {
e.partitions = usedPartition
if e.ranges, err = buildRangesForIndexJoin(e.Ctx(), lookUpContents, indexRanges, keyOff2IdxOff, cwc); err != nil {
if e.ranges, err = buildRangesForIndexJoin(e.rctx, lookUpContents, indexRanges, keyOff2IdxOff, cwc); err != nil {
return nil, err
}
}
Expand All @@ -4653,7 +4655,7 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context

tbInfo := e.table.Meta()
if tbInfo.GetPartitionInfo() == nil || !builder.ctx.GetSessionVars().StmtCtx.UseDynamicPartitionPrune() {
e.kvRanges, err = buildKvRangesForIndexJoin(e.Ctx().GetDistSQLCtx(), e.Ctx().GetRangerCtx(), getPhysicalTableID(e.table), e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal)
e.kvRanges, err = buildKvRangesForIndexJoin(e.dctx, e.rctx, getPhysicalTableID(e.table), e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal)
if err != nil {
return nil, err
}
Expand All @@ -4667,7 +4669,7 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context
if err != nil {
return nil, err
}
e.ranges, err = buildRangesForIndexJoin(e.Ctx(), lookUpContents, indexRanges, keyOff2IdxOff, cwc)
e.ranges, err = buildRangesForIndexJoin(e.rctx, lookUpContents, indexRanges, keyOff2IdxOff, cwc)
if err != nil {
return nil, err
}
Expand All @@ -4684,7 +4686,7 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context
}
if len(usedPartition) != 0 {
if canPrune {
rangeMap, err := buildIndexRangeForEachPartition(e.Ctx(), usedPartition, contentPos, lookUpContents, indexRanges, keyOff2IdxOff, cwc)
rangeMap, err := buildIndexRangeForEachPartition(e.rctx, usedPartition, contentPos, lookUpContents, indexRanges, keyOff2IdxOff, cwc)
if err != nil {
return nil, err
}
Expand All @@ -4693,7 +4695,7 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context
e.partitionRangeMap = rangeMap
} else {
e.prunedPartitions = usedPartition
e.ranges, err = buildRangesForIndexJoin(e.Ctx(), lookUpContents, indexRanges, keyOff2IdxOff, cwc)
e.ranges, err = buildRangesForIndexJoin(e.rctx, lookUpContents, indexRanges, keyOff2IdxOff, cwc)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -4761,7 +4763,7 @@ func (builder *dataReaderBuilder) buildProjectionForIndexJoin(
}

// buildRangesForIndexJoin builds kv ranges for index join when the inner plan is index scan plan.
func buildRangesForIndexJoin(ctx sessionctx.Context, lookUpContents []*join.IndexJoinLookUpContent,
func buildRangesForIndexJoin(rctx *rangerctx.RangerContext, lookUpContents []*join.IndexJoinLookUpContent,
ranges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager) ([]*ranger.Range, error) {
retRanges := make([]*ranger.Range, 0, len(ranges)*len(lookUpContents))
lastPos := len(ranges[0].LowVal) - 1
Expand All @@ -4780,7 +4782,7 @@ func buildRangesForIndexJoin(ctx sessionctx.Context, lookUpContents []*join.Inde
}
continue
}
nextColRanges, err := cwc.BuildRangesByRow(ctx.GetRangerCtx(), content.Row)
nextColRanges, err := cwc.BuildRangesByRow(rctx, content.Row)
if err != nil {
return nil, err
}
Expand All @@ -4800,7 +4802,7 @@ func buildRangesForIndexJoin(ctx sessionctx.Context, lookUpContents []*join.Inde
return retRanges, nil
}

return ranger.UnionRanges(ctx.GetRangerCtx(), tmpDatumRanges, true)
return ranger.UnionRanges(rctx, tmpDatumRanges, true)
}

// buildKvRangesForIndexJoin builds kv ranges for index join when the inner plan is index scan plan.
Expand Down
Loading

0 comments on commit 6b3c4bb

Please sign in to comment.