From 6b3c4bbf1a529ac99bee60163979ce59c0063d30 Mon Sep 17 00:00:00 2001 From: Yang Keao Date: Mon, 15 Jul 2024 14:58:19 +0800 Subject: [PATCH] use BaseExecutorV2 to refactor IndexReaderExecutor and IndexLookUpExecutor Signed-off-by: Yang Keao --- pkg/executor/builder.go | 118 +++++++------- pkg/executor/distsql.go | 150 ++++++++++++------ pkg/executor/index_merge_reader.go | 2 +- pkg/executor/internal/exec/executor.go | 24 +++ .../table_readers_required_rows_test.go | 13 +- pkg/util/logutil/consistency/BUILD.bazel | 1 - 6 files changed, 195 insertions(+), 113 deletions(-) diff --git a/pkg/executor/builder.go b/pkg/executor/builder.go index e14e1500a0d2b..e17e16a03da45 100644 --- a/pkg/executor/builder.go +++ b/pkg/executor/builder.go @@ -3635,7 +3635,7 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) e return ret } -func buildIndexRangeForEachPartition(ctx sessionctx.Context, usedPartitions []table.PhysicalTable, contentPos []int64, +func buildIndexRangeForEachPartition(rctx *rangerctx.RangerContext, usedPartitions []table.PhysicalTable, contentPos []int64, lookUpContent []*join.IndexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager) (map[int64][]*ranger.Range, error) { contentBucket := make(map[int64][]*join.IndexJoinLookUpContent) for _, p := range usedPartitions { @@ -3648,7 +3648,7 @@ func buildIndexRangeForEachPartition(ctx sessionctx.Context, usedPartitions []ta } nextRange := make(map[int64][]*ranger.Range) for _, p := range usedPartitions { - ranges, err := buildRangesForIndexJoin(ctx, contentBucket[p.GetPhysicalID()], indexRanges, keyOff2IdxOff, cwc) + ranges, err := buildRangesForIndexJoin(rctx, contentBucket[p.GetPhysicalID()], indexRanges, keyOff2IdxOff, cwc) if err != nil { return nil, err } @@ -3776,28 +3776,29 @@ func buildNoRangeIndexReader(b *executorBuilder, v *plannercore.PhysicalIndexRea paging := b.ctx.GetSessionVars().EnablePaging e := &IndexReaderExecutor{ - BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID()), - indexUsageReporter: b.buildIndexUsageReporter(v), - dagPB: dagReq, - startTS: startTS, - txnScope: b.txnScope, - readReplicaScope: b.readReplicaScope, - isStaleness: b.isStaleness, - netDataSize: v.GetNetDataSize(), - physicalTableID: physicalTableID, - table: tbl, - index: is.Index, - keepOrder: is.KeepOrder, - desc: is.Desc, - columns: is.Columns, - byItems: is.ByItems, - paging: paging, - corColInFilter: b.corColInDistPlan(v.IndexPlans), - corColInAccess: b.corColInAccess(v.IndexPlans[0]), - idxCols: is.IdxCols, - colLens: is.IdxColLens, - plans: v.IndexPlans, - outputColumns: v.OutputColumns, + indexReaderExecutorContext: newIndexReaderExecutorContext(b.ctx), + BaseExecutorV2: exec.NewBaseExecutorV2(b.ctx.GetSessionVars(), v.Schema(), v.ID()), + indexUsageReporter: b.buildIndexUsageReporter(v), + dagPB: dagReq, + startTS: startTS, + txnScope: b.txnScope, + readReplicaScope: b.readReplicaScope, + isStaleness: b.isStaleness, + netDataSize: v.GetNetDataSize(), + physicalTableID: physicalTableID, + table: tbl, + index: is.Index, + keepOrder: is.KeepOrder, + desc: is.Desc, + columns: is.Columns, + byItems: is.ByItems, + paging: paging, + corColInFilter: b.corColInDistPlan(v.IndexPlans), + corColInAccess: b.corColInAccess(v.IndexPlans[0]), + idxCols: is.IdxCols, + colLens: is.IdxColLens, + plans: v.IndexPlans, + outputColumns: v.OutputColumns, } for _, col := range v.OutputColumns { @@ -3954,29 +3955,30 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn } e := &IndexLookUpExecutor{ - BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID()), - indexUsageReporter: b.buildIndexUsageReporter(v), - dagPB: indexReq, - startTS: startTS, - table: tbl, - index: is.Index, - keepOrder: is.KeepOrder, - byItems: is.ByItems, - desc: is.Desc, - tableRequest: tableReq, - columns: ts.Columns, - indexPaging: indexPaging, - dataReaderBuilder: readerBuilder, - corColInIdxSide: b.corColInDistPlan(v.IndexPlans), - corColInTblSide: b.corColInDistPlan(v.TablePlans), - corColInAccess: b.corColInAccess(v.IndexPlans[0]), - idxCols: is.IdxCols, - colLens: is.IdxColLens, - idxPlans: v.IndexPlans, - tblPlans: v.TablePlans, - PushedLimit: v.PushedLimit, - idxNetDataSize: v.GetAvgTableRowSize(), - avgRowSize: v.GetAvgTableRowSize(), + indexLookUpExecutorContext: newIndexLookUpExecutorContext(b.ctx), + BaseExecutorV2: exec.NewBaseExecutorV2(b.ctx.GetSessionVars(), v.Schema(), v.ID()), + indexUsageReporter: b.buildIndexUsageReporter(v), + dagPB: indexReq, + startTS: startTS, + table: tbl, + index: is.Index, + keepOrder: is.KeepOrder, + byItems: is.ByItems, + desc: is.Desc, + tableRequest: tableReq, + columns: ts.Columns, + indexPaging: indexPaging, + dataReaderBuilder: readerBuilder, + corColInIdxSide: b.corColInDistPlan(v.IndexPlans), + corColInTblSide: b.corColInDistPlan(v.TablePlans), + corColInAccess: b.corColInAccess(v.IndexPlans[0]), + idxCols: is.IdxCols, + colLens: is.IdxColLens, + idxPlans: v.IndexPlans, + tblPlans: v.TablePlans, + PushedLimit: v.PushedLimit, + idxNetDataSize: v.GetAvgTableRowSize(), + avgRowSize: v.GetAvgTableRowSize(), } if v.ExtraHandleCol != nil { @@ -4591,7 +4593,7 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte } tbInfo := e.table.Meta() if tbInfo.GetPartitionInfo() == nil || !builder.ctx.GetSessionVars().StmtCtx.UseDynamicPartitionPrune() { - kvRanges, err := buildKvRangesForIndexJoin(e.Ctx().GetDistSQLCtx(), e.Ctx().GetRangerCtx(), e.physicalTableID, e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memoryTracker, interruptSignal) + kvRanges, err := buildKvRangesForIndexJoin(e.dctx, e.rctx, e.physicalTableID, e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memoryTracker, interruptSignal) if err != nil { return nil, err } @@ -4605,7 +4607,7 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte if err != nil { return nil, err } - if e.ranges, err = buildRangesForIndexJoin(e.Ctx(), lookUpContents, indexRanges, keyOff2IdxOff, cwc); err != nil { + if e.ranges, err = buildRangesForIndexJoin(e.rctx, lookUpContents, indexRanges, keyOff2IdxOff, cwc); err != nil { return nil, err } if err := exec.Open(ctx, e); err != nil { @@ -4621,7 +4623,7 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte } if len(usedPartition) != 0 { if canPrune { - rangeMap, err := buildIndexRangeForEachPartition(e.Ctx(), usedPartition, contentPos, lookUpContents, indexRanges, keyOff2IdxOff, cwc) + rangeMap, err := buildIndexRangeForEachPartition(e.rctx, usedPartition, contentPos, lookUpContents, indexRanges, keyOff2IdxOff, cwc) if err != nil { return nil, err } @@ -4630,7 +4632,7 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte e.partRangeMap = rangeMap } else { e.partitions = usedPartition - if e.ranges, err = buildRangesForIndexJoin(e.Ctx(), lookUpContents, indexRanges, keyOff2IdxOff, cwc); err != nil { + if e.ranges, err = buildRangesForIndexJoin(e.rctx, lookUpContents, indexRanges, keyOff2IdxOff, cwc); err != nil { return nil, err } } @@ -4653,7 +4655,7 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context tbInfo := e.table.Meta() if tbInfo.GetPartitionInfo() == nil || !builder.ctx.GetSessionVars().StmtCtx.UseDynamicPartitionPrune() { - e.kvRanges, err = buildKvRangesForIndexJoin(e.Ctx().GetDistSQLCtx(), e.Ctx().GetRangerCtx(), getPhysicalTableID(e.table), e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal) + e.kvRanges, err = buildKvRangesForIndexJoin(e.dctx, e.rctx, getPhysicalTableID(e.table), e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal) if err != nil { return nil, err } @@ -4667,7 +4669,7 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context if err != nil { return nil, err } - e.ranges, err = buildRangesForIndexJoin(e.Ctx(), lookUpContents, indexRanges, keyOff2IdxOff, cwc) + e.ranges, err = buildRangesForIndexJoin(e.rctx, lookUpContents, indexRanges, keyOff2IdxOff, cwc) if err != nil { return nil, err } @@ -4684,7 +4686,7 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context } if len(usedPartition) != 0 { if canPrune { - rangeMap, err := buildIndexRangeForEachPartition(e.Ctx(), usedPartition, contentPos, lookUpContents, indexRanges, keyOff2IdxOff, cwc) + rangeMap, err := buildIndexRangeForEachPartition(e.rctx, usedPartition, contentPos, lookUpContents, indexRanges, keyOff2IdxOff, cwc) if err != nil { return nil, err } @@ -4693,7 +4695,7 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context e.partitionRangeMap = rangeMap } else { e.prunedPartitions = usedPartition - e.ranges, err = buildRangesForIndexJoin(e.Ctx(), lookUpContents, indexRanges, keyOff2IdxOff, cwc) + e.ranges, err = buildRangesForIndexJoin(e.rctx, lookUpContents, indexRanges, keyOff2IdxOff, cwc) if err != nil { return nil, err } @@ -4761,7 +4763,7 @@ func (builder *dataReaderBuilder) buildProjectionForIndexJoin( } // buildRangesForIndexJoin builds kv ranges for index join when the inner plan is index scan plan. -func buildRangesForIndexJoin(ctx sessionctx.Context, lookUpContents []*join.IndexJoinLookUpContent, +func buildRangesForIndexJoin(rctx *rangerctx.RangerContext, lookUpContents []*join.IndexJoinLookUpContent, ranges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager) ([]*ranger.Range, error) { retRanges := make([]*ranger.Range, 0, len(ranges)*len(lookUpContents)) lastPos := len(ranges[0].LowVal) - 1 @@ -4780,7 +4782,7 @@ func buildRangesForIndexJoin(ctx sessionctx.Context, lookUpContents []*join.Inde } continue } - nextColRanges, err := cwc.BuildRangesByRow(ctx.GetRangerCtx(), content.Row) + nextColRanges, err := cwc.BuildRangesByRow(rctx, content.Row) if err != nil { return nil, err } @@ -4800,7 +4802,7 @@ func buildRangesForIndexJoin(ctx sessionctx.Context, lookUpContents []*join.Inde return retRanges, nil } - return ranger.UnionRanges(ctx.GetRangerCtx(), tmpDatumRanges, true) + return ranger.UnionRanges(rctx, tmpDatumRanges, true) } // buildKvRangesForIndexJoin builds kv ranges for index join when the inner plan is index scan plan. diff --git a/pkg/executor/distsql.go b/pkg/executor/distsql.go index 7349166b48dc8..19d1b14801725 100644 --- a/pkg/executor/distsql.go +++ b/pkg/executor/distsql.go @@ -33,11 +33,13 @@ import ( "github.com/pingcap/tidb/pkg/executor/internal/builder" "github.com/pingcap/tidb/pkg/executor/internal/exec" "github.com/pingcap/tidb/pkg/expression" + isctx "github.com/pingcap/tidb/pkg/infoschema/context" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser/charset" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/parser/terror" + planctx "github.com/pingcap/tidb/pkg/planner/context" plannercore "github.com/pingcap/tidb/pkg/planner/core" "github.com/pingcap/tidb/pkg/planner/core/base" plannerutil "github.com/pingcap/tidb/pkg/planner/util" @@ -56,6 +58,7 @@ import ( "github.com/pingcap/tidb/pkg/util/logutil/consistency" "github.com/pingcap/tidb/pkg/util/memory" "github.com/pingcap/tidb/pkg/util/ranger" + rangerctx "github.com/pingcap/tidb/pkg/util/ranger/context" "github.com/pingcap/tidb/pkg/util/size" "github.com/pingcap/tipb/go-tipb" "go.uber.org/zap" @@ -149,23 +152,47 @@ func closeAll(objs ...Closeable) error { // rebuildIndexRanges will be called if there's correlated column in access conditions. We will rebuild the range // by substituting correlated column with the constant. -func rebuildIndexRanges(ctx sessionctx.Context, is *plannercore.PhysicalIndexScan, idxCols []*expression.Column, colLens []int) (ranges []*ranger.Range, err error) { +func rebuildIndexRanges(ectx expression.BuildContext, rctx *rangerctx.RangerContext, is *plannercore.PhysicalIndexScan, idxCols []*expression.Column, colLens []int) (ranges []*ranger.Range, err error) { access := make([]expression.Expression, 0, len(is.AccessCondition)) for _, cond := range is.AccessCondition { - newCond, err1 := expression.SubstituteCorCol2Constant(ctx.GetExprCtx(), cond) + newCond, err1 := expression.SubstituteCorCol2Constant(ectx, cond) if err1 != nil { return nil, err1 } access = append(access, newCond) } // All of access conditions must be used to build ranges, so we don't limit range memory usage. - ranges, _, err = ranger.DetachSimpleCondAndBuildRangeForIndex(ctx.GetRangerCtx(), access, idxCols, colLens, 0) + ranges, _, err = ranger.DetachSimpleCondAndBuildRangeForIndex(rctx, access, idxCols, colLens, 0) return ranges, err } +type indexReaderExecutorContext struct { + rctx *rangerctx.RangerContext + dctx *distsqlctx.DistSQLContext + ectx expression.BuildContext + infoSchema isctx.MetaOnlyInfoSchema + buildPBCtx *planctx.BuildPBContext + + stmtMemTracker *memory.Tracker +} + +func newIndexReaderExecutorContext(sctx sessionctx.Context) indexReaderExecutorContext { + pctx := sctx.GetPlanCtx() + + return indexReaderExecutorContext{ + rctx: pctx.GetRangerCtx(), + dctx: sctx.GetDistSQLCtx(), + ectx: sctx.GetExprCtx(), + infoSchema: pctx.GetInfoSchema(), + buildPBCtx: pctx.GetBuildPBCtx(), + stmtMemTracker: sctx.GetSessionVars().StmtCtx.MemTracker, + } +} + // IndexReaderExecutor sends dag request and reads index data from kv layer. type IndexReaderExecutor struct { - exec.BaseExecutor + indexReaderExecutorContext + exec.BaseExecutorV2 indexUsageReporter *exec.IndexUsageReporter // For a partitioned table, the IndexReaderExecutor works on a partition, so @@ -270,7 +297,7 @@ func (e *IndexReaderExecutor) buildKeyRanges(dctx *distsqlctx.DistSQLContext, ra func (e *IndexReaderExecutor) Open(ctx context.Context) error { var err error if e.corColInAccess { - e.ranges, err = rebuildIndexRanges(e.Ctx(), e.plans[0].(*plannercore.PhysicalIndexScan), e.idxCols, e.colLens) + e.ranges, err = rebuildIndexRanges(e.ectx, e.rctx, e.plans[0].(*plannercore.PhysicalIndexScan), e.idxCols, e.colLens) if err != nil { return err } @@ -283,14 +310,14 @@ func (e *IndexReaderExecutor) Open(ctx context.Context) error { if pRange, ok := e.partRangeMap[p.GetPhysicalID()]; ok { partRange = pRange } - kvRange, err := e.buildKeyRanges(e.Ctx().GetDistSQLCtx(), partRange, p.GetPhysicalID()) + kvRange, err := e.buildKeyRanges(e.dctx, partRange, p.GetPhysicalID()) if err != nil { return err } kvRanges = append(kvRanges, kvRange...) } } else { - kvRanges, err = e.buildKeyRanges(e.Ctx().GetDistSQLCtx(), e.ranges, e.physicalTableID) + kvRanges, err = e.buildKeyRanges(e.dctx, e.ranges, e.physicalTableID) } if err != nil { return err @@ -309,11 +336,11 @@ func (e *IndexReaderExecutor) buildKVReq(r []kv.KeyRange) (*kv.Request, error) { SetTxnScope(e.txnScope). SetReadReplicaScope(e.readReplicaScope). SetIsStaleness(e.isStaleness). - SetFromSessionVars(e.Ctx().GetDistSQLCtx()). - SetFromInfoSchema(e.Ctx().GetInfoSchema()). + SetFromSessionVars(e.dctx). + SetFromInfoSchema(e.infoSchema). SetMemTracker(e.memTracker). - SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.Ctx().GetDistSQLCtx(), &builder.Request, e.netDataSize)). - SetConnIDAndConnAlias(e.Ctx().GetSessionVars().ConnectionID, e.Ctx().GetSessionVars().SessionAlias) + SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.dctx, &builder.Request, e.netDataSize)). + SetConnIDAndConnAlias(e.dctx.ConnectionID, e.dctx.SessionAlias) kvReq, err := builder.Build() return kvReq, err } @@ -321,7 +348,7 @@ func (e *IndexReaderExecutor) buildKVReq(r []kv.KeyRange) (*kv.Request, error) { func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) error { var err error if e.corColInFilter { - e.dagPB.Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetBuildPBCtx(), e.plans) + e.dagPB.Executors, err = builder.ConstructListBasedDistExec(e.buildPBCtx, e.plans) if err != nil { return err } @@ -344,7 +371,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) } else { e.memTracker = memory.NewTracker(e.ID(), -1) } - e.memTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.MemTracker) + e.memTracker.AttachTo(e.stmtMemTracker) slices.SortFunc(kvRanges, func(i, j kv.KeyRange) int { return bytes.Compare(i.StartKey, j.StartKey) }) @@ -354,7 +381,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) if err != nil { return err } - e.result, err = e.SelectResult(ctx, e.Ctx().GetDistSQLCtx(), kvReq, exec.RetTypes(e), getPhysicalPlanIDs(e.plans), e.ID()) + e.result, err = e.SelectResult(ctx, e.dctx, kvReq, exec.RetTypes(e), getPhysicalPlanIDs(e.plans), e.ID()) if err != nil { return err } @@ -369,20 +396,47 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) } var results []distsql.SelectResult for _, kvReq := range kvReqs { - result, err := e.SelectResult(ctx, e.Ctx().GetDistSQLCtx(), kvReq, exec.RetTypes(e), getPhysicalPlanIDs(e.plans), e.ID()) + result, err := e.SelectResult(ctx, e.dctx, kvReq, exec.RetTypes(e), getPhysicalPlanIDs(e.plans), e.ID()) if err != nil { return err } results = append(results, result) } - e.result = distsql.NewSortedSelectResults(e.Ctx().GetExprCtx().GetEvalCtx(), results, e.Schema(), e.byItems, e.memTracker) + e.result = distsql.NewSortedSelectResults(e.ectx.GetEvalCtx(), results, e.Schema(), e.byItems, e.memTracker) } return nil } +type indexLookUpExecutorContext struct { + tableReaderExecutorContext + + stmtRuntimeStatsColl *execdetails.RuntimeStatsColl + + indexLookupSize int + indexLookupConcurrency int + enableRedactLog string + storage kv.Storage + weakConsistency bool +} + +func newIndexLookUpExecutorContext(sctx sessionctx.Context) indexLookUpExecutorContext { + return indexLookUpExecutorContext{ + tableReaderExecutorContext: newTableReaderExecutorContext(sctx), + + stmtRuntimeStatsColl: sctx.GetSessionVars().StmtCtx.RuntimeStatsColl, + + indexLookupSize: sctx.GetSessionVars().IndexLookupSize, + indexLookupConcurrency: sctx.GetSessionVars().IndexLookupConcurrency(), + enableRedactLog: sctx.GetSessionVars().EnableRedactLog, + storage: sctx.GetStore(), + weakConsistency: sctx.GetSessionVars().StmtCtx.WeakConsistency, + } +} + // IndexLookUpExecutor implements double read for index scan. type IndexLookUpExecutor struct { - exec.BaseExecutor + indexLookUpExecutorContext + exec.BaseExecutorV2 indexUsageReporter *exec.IndexUsageReporter table table.Table @@ -481,7 +535,7 @@ func (e *IndexLookUpExecutor) setDummy() { func (e *IndexLookUpExecutor) Open(ctx context.Context) error { var err error if e.corColInAccess { - e.ranges, err = rebuildIndexRanges(e.Ctx(), e.idxPlans[0].(*plannercore.PhysicalIndexScan), e.idxCols, e.colLens) + e.ranges, err = rebuildIndexRanges(e.ectx, e.rctx, e.idxPlans[0].(*plannercore.PhysicalIndexScan), e.idxCols, e.colLens) if err != nil { return err } @@ -500,7 +554,7 @@ func (e *IndexLookUpExecutor) Open(ctx context.Context) error { } func (e *IndexLookUpExecutor) buildTableKeyRanges() (err error) { - dctx := e.Ctx().GetDistSQLCtx() + dctx := e.dctx if e.partitionTableMode { e.partitionKVRanges = make([][]kv.KeyRange, 0, len(e.prunedPartitions)) for _, p := range e.prunedPartitions { @@ -547,21 +601,21 @@ func (e *IndexLookUpExecutor) open(_ context.Context) error { } else { e.memTracker = memory.NewTracker(e.ID(), -1) } - e.memTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.MemTracker) + e.memTracker.AttachTo(e.stmtMemTracker) e.finished = make(chan struct{}) e.resultCh = make(chan *lookupTableTask, atomic.LoadInt32(&LookupTableTaskChannelSize)) var err error if e.corColInIdxSide { - e.dagPB.Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetBuildPBCtx(), e.idxPlans) + e.dagPB.Executors, err = builder.ConstructListBasedDistExec(e.buildPBCtx, e.idxPlans) if err != nil { return err } } if e.corColInTblSide { - e.tableRequest.Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetBuildPBCtx(), e.tblPlans) + e.tableRequest.Executors, err = builder.ConstructListBasedDistExec(e.buildPBCtx, e.tblPlans) if err != nil { return err } @@ -622,7 +676,7 @@ func (e *IndexLookUpExecutor) getRetTpsForIndexReader() []*types.FieldType { var tps []*types.FieldType if len(e.byItems) != 0 { for _, item := range e.byItems { - tps = append(tps, item.Expr.GetType(e.Ctx().GetExprCtx().GetEvalCtx())) + tps = append(tps, item.Expr.GetType(e.ectx.GetEvalCtx())) } } if e.isCommonHandle() { @@ -669,7 +723,7 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan< resultCh: e.resultCh, keepOrder: e.keepOrder, checkIndexValue: e.checkIndexValue, - maxBatchSize: e.Ctx().GetSessionVars().IndexLookupSize, + maxBatchSize: e.indexLookupSize, maxChunkSize: e.MaxChunkSize(), PushedLimit: e.PushedLimit, } @@ -681,11 +735,11 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan< SetTxnScope(e.txnScope). SetReadReplicaScope(e.readReplicaScope). SetIsStaleness(e.isStaleness). - SetFromSessionVars(e.Ctx().GetDistSQLCtx()). - SetFromInfoSchema(e.Ctx().GetInfoSchema()). - SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.Ctx().GetDistSQLCtx(), &builder.Request, e.idxNetDataSize/float64(len(kvRanges)))). + SetFromSessionVars(e.dctx). + SetFromInfoSchema(e.infoSchema). + SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.dctx, &builder.Request, e.idxNetDataSize/float64(len(kvRanges)))). SetMemTracker(tracker). - SetConnIDAndConnAlias(e.Ctx().GetSessionVars().ConnectionID, e.Ctx().GetSessionVars().SessionAlias) + SetConnIDAndConnAlias(e.dctx.ConnectionID, e.dctx.SessionAlias) worker.batchSize = e.calculateBatchSize(initBatchSize, worker.maxBatchSize) if builder.Request.Paging.Enable && builder.Request.Paging.MinPagingSize < uint64(worker.batchSize) { @@ -719,7 +773,7 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan< worker.syncErr(err) break } - result, err := distsql.SelectWithRuntimeStats(ctx, e.Ctx().GetDistSQLCtx(), kvReq, tps, getPhysicalPlanIDs(e.idxPlans), idxID) + result, err := distsql.SelectWithRuntimeStats(ctx, e.dctx, kvReq, tps, getPhysicalPlanIDs(e.idxPlans), idxID) if err != nil { worker.syncErr(err) break @@ -728,7 +782,7 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan< } if len(results) > 1 && len(e.byItems) != 0 { // e.Schema() not the output schema for indexReader, and we put byItems related column at first in `buildIndexReq`, so use nil here. - ssr := distsql.NewSortedSelectResults(e.Ctx().GetExprCtx().GetEvalCtx(), results, nil, e.byItems, e.memTracker) + ssr := distsql.NewSortedSelectResults(e.ectx.GetEvalCtx(), results, nil, e.byItems, e.memTracker) results = []distsql.SelectResult{ssr} } ctx1, cancel := context.WithCancel(ctx) @@ -778,7 +832,7 @@ func CalculateBatchSize(estRows, initBatchSize, maxBatchSize int) int { // startTableWorker launches some background goroutines which pick tasks from workCh and execute the task. func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-chan *lookupTableTask) { - lookupConcurrencyLimit := e.Ctx().GetSessionVars().IndexLookupConcurrency() + lookupConcurrencyLimit := e.indexLookupConcurrency e.tblWorkerWg.Add(lookupConcurrencyLimit) for i := 0; i < lookupConcurrencyLimit; i++ { workerID := i @@ -808,8 +862,8 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, task *lookup table = task.partitionTable } tableReaderExec := &TableReaderExecutor{ - BaseExecutorV2: exec.NewBaseExecutorV2(e.Ctx().GetSessionVars(), e.Schema(), e.getTableRootPlanID()), - tableReaderExecutorContext: newTableReaderExecutorContext(e.Ctx()), + BaseExecutorV2: e.BuildNewBaseExecutorV2(e.stmtRuntimeStatsColl, e.Schema(), e.getTableRootPlanID()), + tableReaderExecutorContext: e.tableReaderExecutorContext, table: table, dagPB: e.tableRequest, startTS: e.startTS, @@ -834,7 +888,7 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, task *lookup // Close implements Exec Close interface. func (e *IndexLookUpExecutor) Close() error { if e.stats != nil { - defer e.Ctx().GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.ID(), e.stats) + defer e.stmtRuntimeStatsColl.RegisterStats(e.ID(), e.stats) } if e.indexUsageReporter != nil { e.indexUsageReporter.ReportCopIndexUsageForTable( @@ -942,7 +996,7 @@ func (e *IndexLookUpExecutor) initRuntimeStats() { if e.RuntimeStats() != nil { e.stats = &IndexLookUpRunTimeStats{ indexScanBasicStats: &execdetails.BasicRuntimeStats{}, - Concurrency: e.Ctx().GetSessionVars().IndexLookupConcurrency(), + Concurrency: e.indexLookupConcurrency, } } } @@ -1006,9 +1060,9 @@ func (w *indexWorker) fetchHandles(ctx context.Context, results []distsql.Select }() chk := w.idxLookup.AllocPool.Alloc(w.idxLookup.getRetTpsForIndexReader(), w.idxLookup.MaxChunkSize(), w.idxLookup.MaxChunkSize()) idxID := w.idxLookup.getIndexPlanRootID() - if w.idxLookup.Ctx().GetSessionVars().StmtCtx.RuntimeStatsColl != nil { + if w.idxLookup.stmtRuntimeStatsColl != nil { if idxID != w.idxLookup.ID() && w.idxLookup.stats != nil { - w.idxLookup.stats.indexScanBasicStats = w.idxLookup.Ctx().GetSessionVars().StmtCtx.RuntimeStatsColl.GetBasicRuntimeStats(idxID) + w.idxLookup.stats.indexScanBasicStats = w.idxLookup.stmtRuntimeStatsColl.GetBasicRuntimeStats(idxID) } } for i := 0; i < len(results); { @@ -1228,8 +1282,10 @@ func (e *IndexLookUpExecutor) getHandle(row chunk.Row, handleIdx []int, datums = append(datums, row.GetDatum(idx, e.handleCols[i].RetType)) } tablecodec.TruncateIndexValues(e.table.Meta(), e.primaryKeyIndex, datums) - handleEncoded, err = codec.EncodeKey(e.Ctx().GetSessionVars().StmtCtx.TimeZone(), nil, datums...) - err = e.Ctx().GetSessionVars().StmtCtx.HandleError(err) + ectx := e.ectx.GetEvalCtx() + handleEncoded, err = codec.EncodeKey(ectx.Location(), nil, datums...) + errCtx := ectx.ErrCtx() + err = errCtx.HandleError(err) if err != nil { return nil, err } @@ -1358,8 +1414,8 @@ func (w *tableWorker) compareData(ctx context.Context, task *lookupTableTask, ta if idx == nil { return nil } - sc := w.idxLookup.Ctx().GetSessionVars().StmtCtx - k, _, err := idx.GenIndexKey(sc.ErrCtx(), sc.TimeZone(), idxRow.Values[:len(idx.Meta().Columns)], idxRow.Handle, nil) + ectx := w.idxLookup.ectx.GetEvalCtx() + k, _, err := idx.GenIndexKey(ectx.ErrCtx(), ectx.Location(), idxRow.Values[:len(idx.Meta().Columns)], idxRow.Handle, nil) if err != nil { return nil } @@ -1367,8 +1423,8 @@ func (w *tableWorker) compareData(ctx context.Context, task *lookupTableTask, ta }, Tbl: tblInfo, Idx: w.idxLookup.index, - EnableRedactLog: w.idxLookup.Ctx().GetSessionVars().EnableRedactLog, - Storage: w.idxLookup.Ctx().GetStore(), + EnableRedactLog: w.idxLookup.enableRedactLog, + Storage: w.idxLookup.storage, } } @@ -1416,7 +1472,7 @@ func (w *tableWorker) compareData(ctx context.Context, task *lookupTableTask, ta vals = append(vals, row.GetDatum(i, &col.FieldType)) } tablecodec.TruncateIndexValues(tblInfo, w.idxLookup.index, vals) - tc := w.idxLookup.Ctx().GetSessionVars().StmtCtx.TypeCtx() + tc := w.idxLookup.ectx.GetEvalCtx().TypeCtx() for i := range vals { col := w.idxTblCols[i] idxVal := idxRow.GetDatum(i, w.idxColTps[i]) @@ -1539,7 +1595,7 @@ func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) er } if handleCnt != len(task.rows) && !util.HasCancelled(ctx) && - !w.idxLookup.Ctx().GetSessionVars().StmtCtx.WeakConsistency { + !w.idxLookup.weakConsistency { if len(w.idxLookup.tblPlans) == 1 { obtainedHandlesMap := kv.NewHandleMap() for _, row := range task.rows { @@ -1556,8 +1612,8 @@ func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) er }, Tbl: w.idxLookup.table.Meta(), Idx: w.idxLookup.index, - EnableRedactLog: w.idxLookup.Ctx().GetSessionVars().EnableRedactLog, - Storage: w.idxLookup.Ctx().GetStore(), + EnableRedactLog: w.idxLookup.enableRedactLog, + Storage: w.idxLookup.storage, }).ReportLookupInconsistent(ctx, handleCnt, len(task.rows), diff --git a/pkg/executor/index_merge_reader.go b/pkg/executor/index_merge_reader.go index 0a1017463dc24..84261164a5925 100644 --- a/pkg/executor/index_merge_reader.go +++ b/pkg/executor/index_merge_reader.go @@ -221,7 +221,7 @@ func (e *IndexMergeReaderExecutor) rebuildRangeForCorCol() (err error) { if e.isCorColInPartialAccess[i] { switch x := plan[0].(type) { case *plannercore.PhysicalIndexScan: - e.ranges[i], err = rebuildIndexRanges(e.Ctx(), x, x.IdxCols, x.IdxColLens) + e.ranges[i], err = rebuildIndexRanges(e.Ctx().GetExprCtx(), e.Ctx().GetRangerCtx(), x, x.IdxCols, x.IdxColLens) case *plannercore.PhysicalTableScan: e.ranges[i], err = x.ResolveCorrelatedColumns() default: diff --git a/pkg/executor/internal/exec/executor.go b/pkg/executor/internal/exec/executor.go index ccd4cd4d5ff30..32c6fa29a99df 100644 --- a/pkg/executor/internal/exec/executor.go +++ b/pkg/executor/internal/exec/executor.go @@ -309,6 +309,30 @@ func (*BaseExecutorV2) Next(_ context.Context, _ *chunk.Chunk) error { return nil } +// BuildNewBaseExecutorV2 builds a new `BaseExecutorV2` based on the configuration of the current base executor. +// It's used to build a new sub-executor from an existing executor. For example, the `IndexLookUpExecutor` will use +// this function to build `TableReaderExecutor` +func (e *BaseExecutorV2) BuildNewBaseExecutorV2(stmtRuntimeStatsColl *execdetails.RuntimeStatsColl, schema *expression.Schema, id int, children ...Executor) BaseExecutorV2 { + newExecutorMeta := newExecutorMeta(schema, id, children...) + + newExecutorStats := e.executorStats + if stmtRuntimeStatsColl != nil { + if id > 0 { + newExecutorStats.runtimeStats = stmtRuntimeStatsColl.GetBasicRuntimeStats(id) + } + } + + newChunkAllocator := e.executorChunkAllocator + newChunkAllocator.retFieldTypes = newExecutorMeta.RetFieldTypes() + newE := BaseExecutorV2{ + executorMeta: newExecutorMeta, + executorStats: newExecutorStats, + executorChunkAllocator: newChunkAllocator, + executorKillerHandler: e.executorKillerHandler, + } + return newE +} + // BaseExecutor holds common information for executors. type BaseExecutor struct { ctx sessionctx.Context diff --git a/pkg/executor/table_readers_required_rows_test.go b/pkg/executor/table_readers_required_rows_test.go index 6be0105c8a097..04ada4e91da20 100644 --- a/pkg/executor/table_readers_required_rows_test.go +++ b/pkg/executor/table_readers_required_rows_test.go @@ -153,14 +153,14 @@ func buildMockDAGRequest(sctx sessionctx.Context) *tipb.DAGRequest { return req } -func buildMockBaseExec(sctx sessionctx.Context) exec.BaseExecutor { +func buildMockBaseExec(sctx sessionctx.Context) exec.BaseExecutorV2 { retTypes := []*types.FieldType{types.NewFieldType(mysql.TypeDouble), types.NewFieldType(mysql.TypeLonglong)} cols := make([]*expression.Column, len(retTypes)) for i := range retTypes { cols[i] = &expression.Column{Index: i, RetType: retTypes[i]} } schema := expression.NewSchema(cols...) - baseExec := exec.NewBaseExecutor(sctx, schema, 0) + baseExec := exec.NewBaseExecutorV2(sctx.GetSessionVars(), schema, 0) return baseExec } @@ -208,10 +208,11 @@ func TestTableReaderRequiredRows(t *testing.T) { func buildIndexReader(sctx sessionctx.Context) exec.Executor { e := &IndexReaderExecutor{ - BaseExecutor: buildMockBaseExec(sctx), - dagPB: buildMockDAGRequest(sctx), - index: &model.IndexInfo{}, - selectResultHook: selectResultHook{mockSelectResult}, + indexReaderExecutorContext: newIndexReaderExecutorContext(sctx), + BaseExecutorV2: buildMockBaseExec(sctx), + dagPB: buildMockDAGRequest(sctx), + index: &model.IndexInfo{}, + selectResultHook: selectResultHook{mockSelectResult}, } return e } diff --git a/pkg/util/logutil/consistency/BUILD.bazel b/pkg/util/logutil/consistency/BUILD.bazel index dc321bba7ee9d..f251a22a8c198 100644 --- a/pkg/util/logutil/consistency/BUILD.bazel +++ b/pkg/util/logutil/consistency/BUILD.bazel @@ -9,7 +9,6 @@ go_library( "//pkg/errno", "//pkg/kv", "//pkg/parser/model", - "//pkg/sessionctx", "//pkg/store/helper", "//pkg/tablecodec", "//pkg/types",