diff --git a/pkg/executor/adapter.go b/pkg/executor/adapter.go index 4bb4e5c2d483c..ae239e1a81cac 100644 --- a/pkg/executor/adapter.go +++ b/pkg/executor/adapter.go @@ -177,8 +177,7 @@ func (a *recordSet) NewChunk(alloc chunk.Allocator) *chunk.Chunk { return exec.NewFirstChunk(a.executor) } - base := a.executor.Base() - return alloc.Alloc(base.RetFieldTypes(), base.InitCap(), base.MaxChunkSize()) + return alloc.Alloc(a.executor.RetFieldTypes(), a.executor.InitCap(), a.executor.MaxChunkSize()) } func (a *recordSet) Finish() error { @@ -886,8 +885,7 @@ func (c *chunkRowRecordSet) NewChunk(alloc chunk.Allocator) *chunk.Chunk { return exec.NewFirstChunk(c.e) } - base := c.e.Base() - return alloc.Alloc(base.RetFieldTypes(), base.InitCap(), base.MaxChunkSize()) + return alloc.Alloc(c.e.RetFieldTypes(), c.e.InitCap(), c.e.MaxChunkSize()) } func (c *chunkRowRecordSet) Close() error { diff --git a/pkg/executor/admin.go b/pkg/executor/admin.go index a340de24058de..2ae551c1eced8 100644 --- a/pkg/executor/admin.go +++ b/pkg/executor/admin.go @@ -216,7 +216,7 @@ func (e *RecoverIndexExec) columnsTypes() []*types.FieldType { // Open implements the Executor Open interface. func (e *RecoverIndexExec) Open(ctx context.Context) error { - if err := exec.Open(ctx, e.BaseExecutor.Base()); err != nil { + if err := exec.Open(ctx, &e.BaseExecutor); err != nil { return err } diff --git a/pkg/executor/batch_point_get.go b/pkg/executor/batch_point_get.go index 06afa7285fa8f..d1bd094e29b6d 100644 --- a/pkg/executor/batch_point_get.go +++ b/pkg/executor/batch_point_get.go @@ -186,7 +186,7 @@ func (e *BatchPointGetExec) Next(ctx context.Context, req *chunk.Chunk) error { } for !req.IsFull() && e.index < len(e.values) { handle, val := e.handles[e.index], e.values[e.index] - err := DecodeRowValToChunk(e.Base().Ctx(), e.Schema(), e.tblInfo, handle, val, req, e.rowDecoder) + err := DecodeRowValToChunk(e.BaseExecutor.Ctx(), e.Schema(), e.tblInfo, handle, val, req, e.rowDecoder) if err != nil { return err } diff --git a/pkg/executor/builder.go b/pkg/executor/builder.go index 7436f05e0ad1c..5b230445c63fc 100644 --- a/pkg/executor/builder.go +++ b/pkg/executor/builder.go @@ -3469,7 +3469,7 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) e } if len(partitions) == 0 { - return &TableDualExec{BaseExecutor: *ret.Base()} + return &TableDualExec{BaseExecutor: ret.BaseExecutor} } // Sort the partition is necessary to make the final multiple partition key ranges ordered. @@ -4473,7 +4473,7 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte } return e, nil } - ret := &TableDualExec{BaseExecutor: *e.Base()} + ret := &TableDualExec{BaseExecutor: e.BaseExecutor} err = exec.Open(ctx, ret) return ret, err } @@ -4550,7 +4550,7 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context } return e, err } - ret := &TableDualExec{BaseExecutor: *e.Base()} + ret := &TableDualExec{BaseExecutor: e.BaseExecutor} err = exec.Open(ctx, ret) return ret, err } @@ -5120,8 +5120,8 @@ func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan } capacity = len(e.handles) } - e.Base().SetInitCap(capacity) - e.Base().SetMaxChunkSize(capacity) + e.SetInitCap(capacity) + e.SetMaxChunkSize(capacity) e.buildVirtualColumnInfo() return e } @@ -5303,7 +5303,7 @@ func (b *executorBuilder) buildCTE(v *plannercore.PhysicalCTE) exec.Executor { } // Setup storages. - tps := seedExec.Base().RetFieldTypes() + tps := seedExec.RetFieldTypes() resTbl = cteutil.NewStorageRowContainer(tps, chkSize) if err := resTbl.OpenAndRef(); err != nil { b.err = err diff --git a/pkg/executor/coprocessor.go b/pkg/executor/coprocessor.go index fd563965eb461..c77565aa35181 100644 --- a/pkg/executor/coprocessor.go +++ b/pkg/executor/coprocessor.go @@ -82,7 +82,7 @@ func (h *CoprocessorDAGHandler) HandleRequest(ctx context.Context, req *coproces } chk := exec.TryNewCacheChunk(e) - tps := e.Base().RetFieldTypes() + tps := e.RetFieldTypes() var totalChunks, partChunks []tipb.Chunk memTracker := h.sctx.GetSessionVars().StmtCtx.MemTracker for { @@ -125,7 +125,7 @@ func (h *CoprocessorDAGHandler) HandleStreamRequest(ctx context.Context, req *co } chk := exec.TryNewCacheChunk(e) - tps := e.Base().RetFieldTypes() + tps := e.RetFieldTypes() for { chk.Reset() if err = exec.Next(ctx, e, chk); err != nil { diff --git a/pkg/executor/cte.go b/pkg/executor/cte.go index 97fe2bb2c49cd..f820ea391b509 100644 --- a/pkg/executor/cte.go +++ b/pkg/executor/cte.go @@ -204,7 +204,7 @@ func (p *cteProducer) openProducer(ctx context.Context, cteExec *CTEExec) (err e // For non-recursive CTE, the result will be put into resTbl directly. // So no need to build iterOutTbl. // Construct iterOutTbl in Open() instead of buildCTE(), because its destruct is in Close(). - recursiveTypes := p.recursiveExec.Base().RetFieldTypes() + recursiveTypes := p.recursiveExec.RetFieldTypes() p.iterOutTbl = cteutil.NewStorageRowContainer(recursiveTypes, cteExec.MaxChunkSize()) if err = p.iterOutTbl.OpenAndRef(); err != nil { return err @@ -214,7 +214,7 @@ func (p *cteProducer) openProducer(ctx context.Context, cteExec *CTEExec) (err e if p.isDistinct { p.hashTbl = newConcurrentMapHashTable() p.hCtx = &hashContext{ - allTypes: cteExec.Base().RetFieldTypes(), + allTypes: cteExec.RetFieldTypes(), } // We use all columns to compute hash. p.hCtx.keyColIdx = make([]int, len(p.hCtx.allTypes)) diff --git a/pkg/executor/executor.go b/pkg/executor/executor.go index 885193f9d2a04..16eba4e2447d3 100644 --- a/pkg/executor/executor.go +++ b/pkg/executor/executor.go @@ -212,8 +212,7 @@ func (*globalPanicOnExceed) GetPriority() int64 { // newList creates a new List to buffer current executor's result. func newList(e exec.Executor) *chunk.List { - base := e.Base() - return chunk.NewList(base.RetFieldTypes(), base.InitCap(), base.MaxChunkSize()) + return chunk.NewList(e.RetFieldTypes(), e.InitCap(), e.MaxChunkSize()) } // CommandDDLJobsExec is the general struct for Cancel/Pause/Resume commands on @@ -2377,7 +2376,7 @@ func (w *checkIndexWorker) HandleTask(task checkIndexTask, _ func(workerpool.Non w.e.err.CompareAndSwap(nil, &err) } - se, err := w.e.Base().GetSysSession() + se, err := w.e.BaseExecutor.GetSysSession() if err != nil { trySaveErr(err) return @@ -2385,7 +2384,7 @@ func (w *checkIndexWorker) HandleTask(task checkIndexTask, _ func(workerpool.Non restoreCtx := w.initSessCtx(se) defer func() { restoreCtx() - w.e.Base().ReleaseSysSession(ctx, se) + w.e.BaseExecutor.ReleaseSysSession(ctx, se) }() var pkCols []string diff --git a/pkg/executor/index_lookup_join.go b/pkg/executor/index_lookup_join.go index f178e99f88080..afd9fbbf7e7e2 100644 --- a/pkg/executor/index_lookup_join.go +++ b/pkg/executor/index_lookup_join.go @@ -437,7 +437,7 @@ func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) { } maxChunkSize := ow.ctx.GetSessionVars().MaxChunkSize for requiredRows > task.outerResult.Len() { - chk := ow.ctx.GetSessionVars().GetNewChunkWithCapacity(ow.outerCtx.rowTypes, maxChunkSize, maxChunkSize, ow.executor.Base().AllocPool) + chk := ow.executor.NewChunkWithCapacity(ow.outerCtx.rowTypes, maxChunkSize, maxChunkSize) chk = chk.SetRequiredRows(requiredRows, maxChunkSize) err := exec.Next(ctx, ow.executor, chk) if err != nil { @@ -468,7 +468,11 @@ func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) { } task.encodedLookUpKeys = make([]*chunk.Chunk, task.outerResult.NumChunks()) for i := range task.encodedLookUpKeys { - task.encodedLookUpKeys[i] = ow.ctx.GetSessionVars().GetNewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeBlob)}, task.outerResult.GetChunk(i).NumRows(), task.outerResult.GetChunk(i).NumRows(), ow.executor.Base().AllocPool) + task.encodedLookUpKeys[i] = ow.executor.NewChunkWithCapacity( + []*types.FieldType{types.NewFieldType(mysql.TypeBlob)}, + task.outerResult.GetChunk(i).NumRows(), + task.outerResult.GetChunk(i).NumRows(), + ) } return task, nil } diff --git a/pkg/executor/index_lookup_merge_join.go b/pkg/executor/index_lookup_merge_join.go index c28a61eddb1a2..f526058962998 100644 --- a/pkg/executor/index_lookup_merge_join.go +++ b/pkg/executor/index_lookup_merge_join.go @@ -343,7 +343,7 @@ func (*outerMergeWorker) pushToChan(ctx context.Context, task *lookUpMergeJoinTa func (omw *outerMergeWorker) buildTask(ctx context.Context) (*lookUpMergeJoinTask, error) { task := &lookUpMergeJoinTask{ results: make(chan *indexMergeJoinResult, numResChkHold), - outerResult: chunk.NewList(omw.rowTypes, omw.executor.Base().InitCap(), omw.executor.Base().MaxChunkSize()), + outerResult: chunk.NewList(omw.rowTypes, omw.executor.InitCap(), omw.executor.MaxChunkSize()), } task.memTracker = memory.NewTracker(memory.LabelForSimpleTask, -1) task.memTracker.AttachTo(omw.parentMemTracker) @@ -712,7 +712,7 @@ func (imw *innerMergeWorker) dedupDatumLookUpKeys(lookUpContents []*indexJoinLoo // fetchNextInnerResult collects a chunk of inner results from inner child executor. func (imw *innerMergeWorker) fetchNextInnerResult(ctx context.Context, task *lookUpMergeJoinTask) (beginRow chunk.Row, err error) { - task.innerResult = imw.ctx.GetSessionVars().GetNewChunkWithCapacity(exec.RetTypes(imw.innerExec), imw.ctx.GetSessionVars().MaxChunkSize, imw.ctx.GetSessionVars().MaxChunkSize, imw.innerExec.Base().AllocPool) + task.innerResult = imw.innerExec.NewChunkWithCapacity(imw.innerExec.RetFieldTypes(), imw.innerExec.MaxChunkSize(), imw.innerExec.MaxChunkSize()) err = exec.Next(ctx, imw.innerExec, task.innerResult) task.innerIter = chunk.NewIterator4Chunk(task.innerResult) beginRow = task.innerIter.Begin() diff --git a/pkg/executor/index_merge_reader.go b/pkg/executor/index_merge_reader.go index 2c22a983d7b5e..64077e7068c45 100644 --- a/pkg/executor/index_merge_reader.go +++ b/pkg/executor/index_merge_reader.go @@ -631,7 +631,7 @@ func (w *partialTableWorker) needPartitionHandle() (bool, error) { func (w *partialTableWorker) fetchHandles(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *indexMergeTableTask, finished <-chan struct{}, handleCols plannercore.HandleCols, parTblIdx int, partialPlanIndex int) (count int64, err error) { - chk := w.sc.GetSessionVars().GetNewChunkWithCapacity(w.getRetTpsForTableScan(), w.maxChunkSize, w.maxChunkSize, w.tableReader.Base().AllocPool) + chk := w.tableReader.NewChunkWithCapacity(w.getRetTpsForTableScan(), w.maxChunkSize, w.maxBatchSize) for { start := time.Now() handles, retChunk, err := w.extractTaskHandles(ctx, chk, handleCols) @@ -686,8 +686,8 @@ func (w *partialTableWorker) extractTaskHandles(ctx context.Context, chk *chunk. if err != nil { return nil, nil, err } - if be := w.tableReader.Base(); be != nil && be.RuntimeStats() != nil { - be.RuntimeStats().Record(time.Since(start), chk.NumRows()) + if w.tableReader != nil && w.tableReader.RuntimeStats() != nil { + w.tableReader.RuntimeStats().Record(time.Since(start), chk.NumRows()) } if chk.NumRows() == 0 { failpoint.Inject("testIndexMergeErrorPartialTableWorker", func(v failpoint.Value) { diff --git a/pkg/executor/insert.go b/pkg/executor/insert.go index f3e304b3324c7..1c4903891090b 100644 --- a/pkg/executor/insert.go +++ b/pkg/executor/insert.go @@ -367,7 +367,7 @@ func (e *InsertExec) initEvalBuffer4Dup() { evalBufferTypes = append(evalBufferTypes, &(col.FieldType)) } if extraLen > 0 { - evalBufferTypes = append(evalBufferTypes, e.SelectExec.Base().RetFieldTypes()[e.rowLen:]...) + evalBufferTypes = append(evalBufferTypes, e.SelectExec.RetFieldTypes()[e.rowLen:]...) } for _, col := range e.Table.Cols() { evalBufferTypes = append(evalBufferTypes, &(col.FieldType)) diff --git a/pkg/executor/internal/exec/executor.go b/pkg/executor/internal/exec/executor.go index 6295523263e4a..c3440bed6f670 100644 --- a/pkg/executor/internal/exec/executor.go +++ b/pkg/executor/internal/exec/executor.go @@ -45,7 +45,15 @@ import ( // return a batch of rows, other than a single row in Volcano. // NOTE: Executors must call "chk.Reset()" before appending their results to it. type Executor interface { - Base() *BaseExecutor + NewChunk() *chunk.Chunk + NewChunkWithCapacity(fields []*types.FieldType, capacity int, maxCachesize int) *chunk.Chunk + + RuntimeStats() *execdetails.BasicRuntimeStats + + HandleSQLKillerSignal() error + RegisterSQLAndPlanInExecForTopSQL() + + AllChildren() []Executor Open(context.Context) error Next(ctx context.Context, req *chunk.Chunk) error Close() error @@ -156,11 +164,6 @@ func (e *BaseExecutor) SetMaxChunkSize(size int) { e.maxChunkSize = size } -// Base returns the BaseExecutor of an executor, don't override this method! -func (e *BaseExecutor) Base() *BaseExecutor { - return e -} - // Open initializes children recursively and "childrenResults" according to children's schemas. func (e *BaseExecutor) Open(ctx context.Context) error { for _, child := range e.children { @@ -239,23 +242,43 @@ func (e *BaseExecutor) ReleaseSysSession(ctx context.Context, sctx sessionctx.Co sysSessionPool.Put(sctx.(pools.Resource)) } +// NewChunk creates a new chunk according to the executor configuration +func (e *BaseExecutor) NewChunk() *chunk.Chunk { + return e.NewChunkWithCapacity(e.RetFieldTypes(), e.InitCap(), e.MaxChunkSize()) +} + +// NewChunkWithCapacity allows the caller to allocate the chunk with any types, capacity and max size in the pool +func (e *BaseExecutor) NewChunkWithCapacity(fields []*types.FieldType, capacity int, maxCachesize int) *chunk.Chunk { + return e.ctx.GetSessionVars().GetNewChunkWithCapacity(fields, capacity, maxCachesize, e.AllocPool) +} + +// HandleSQLKillerSignal handles the signal sent by SQLKiller +func (e *BaseExecutor) HandleSQLKillerSignal() error { + return e.ctx.GetSessionVars().SQLKiller.HandleSignal() +} + +// RegisterSQLAndPlanInExecForTopSQL registers the current SQL and Plan on top sql +// TODO: consider whether it's appropriate to have this on executor +func (e *BaseExecutor) RegisterSQLAndPlanInExecForTopSQL() { + sessVars := e.ctx.GetSessionVars() + if topsqlstate.TopSQLEnabled() && sessVars.StmtCtx.IsSQLAndPlanRegistered.CompareAndSwap(false, true) { + RegisterSQLAndPlanInExecForTopSQL(sessVars) + } +} + // TryNewCacheChunk tries to get a cached chunk func TryNewCacheChunk(e Executor) *chunk.Chunk { - base := e.Base() - s := base.Ctx().GetSessionVars() - return s.GetNewChunkWithCapacity(base.RetFieldTypes(), base.InitCap(), base.MaxChunkSize(), base.AllocPool) + return e.NewChunk() } // RetTypes returns all output column types. func RetTypes(e Executor) []*types.FieldType { - base := e.Base() - return base.RetFieldTypes() + return e.RetFieldTypes() } // NewFirstChunk creates a new chunk to buffer current executor's result. func NewFirstChunk(e Executor) *chunk.Chunk { - base := e.Base() - return chunk.New(base.RetFieldTypes(), base.InitCap(), base.MaxChunkSize()) + return chunk.New(e.RetFieldTypes(), e.InitCap(), e.MaxChunkSize()) } // Open is a wrapper function on e.Open(), it handles some common codes. @@ -270,29 +293,26 @@ func Open(ctx context.Context, e Executor) (err error) { // Next is a wrapper function on e.Next(), it handles some common codes. func Next(ctx context.Context, e Executor, req *chunk.Chunk) error { - base := e.Base() - if base.RuntimeStats() != nil { + if e.RuntimeStats() != nil { start := time.Now() - defer func() { base.RuntimeStats().Record(time.Since(start), req.NumRows()) }() + defer func() { e.RuntimeStats().Record(time.Since(start), req.NumRows()) }() } - sessVars := base.Ctx().GetSessionVars() - if err := sessVars.SQLKiller.HandleSignal(); err != nil { + + if err := e.HandleSQLKillerSignal(); err != nil { return err } r, ctx := tracing.StartRegionEx(ctx, fmt.Sprintf("%T.Next", e)) defer r.End() - if topsqlstate.TopSQLEnabled() && sessVars.StmtCtx.IsSQLAndPlanRegistered.CompareAndSwap(false, true) { - RegisterSQLAndPlanInExecForTopSQL(sessVars) - } + e.RegisterSQLAndPlanInExecForTopSQL() err := e.Next(ctx, req) if err != nil { return err } // recheck whether the session/query is killed during the Next() - return sessVars.SQLKiller.HandleSignal() + return e.HandleSQLKillerSignal() } // Close is a wrapper function on e.Close(), it handles some common codes. diff --git a/pkg/executor/join.go b/pkg/executor/join.go index 0495e22d6b5f0..e06db25ac1815 100644 --- a/pkg/executor/join.go +++ b/pkg/executor/join.go @@ -332,7 +332,7 @@ func (w *buildWorker) fetchBuildSideRows(ctx context.Context, chkCh chan<- *chun if w.hashJoinCtx.finished.Load() { return } - chk := sessVars.GetNewChunkWithCapacity(w.buildSideExec.Base().RetFieldTypes(), sessVars.MaxChunkSize, sessVars.MaxChunkSize, w.hashJoinCtx.allocPool) + chk := sessVars.GetNewChunkWithCapacity(w.buildSideExec.RetFieldTypes(), sessVars.MaxChunkSize, sessVars.MaxChunkSize, w.hashJoinCtx.allocPool) err = exec.Next(ctx, w.buildSideExec, chk) if err != nil { errCh <- errors.Trace(err) @@ -1355,7 +1355,7 @@ func (e *NestedLoopApplyExec) Open(ctx context.Context) error { // aggExecutorTreeInputEmpty checks whether the executor tree returns empty if without aggregate operators. // Note that, the prerequisite is that this executor tree has been executed already and it returns one row. func aggExecutorTreeInputEmpty(e exec.Executor) bool { - children := e.Base().AllChildren() + children := e.AllChildren() if len(children) == 0 { return false } diff --git a/pkg/executor/merge_join.go b/pkg/executor/merge_join.go index ef38c2486ee2e..2f129b1babeb0 100644 --- a/pkg/executor/merge_join.go +++ b/pkg/executor/merge_join.go @@ -90,7 +90,7 @@ func (t *mergeJoinTable) init(executor *MergeJoinExec) { t.groupRowsIter = chunk.NewIterator4Chunk(t.childChunk) if t.isInner { - t.rowContainer = chunk.NewRowContainer(child.Base().RetFieldTypes(), t.childChunk.Capacity()) + t.rowContainer = chunk.NewRowContainer(child.RetFieldTypes(), t.childChunk.Capacity()) t.rowContainer.GetMemTracker().AttachTo(executor.memTracker) t.rowContainer.GetMemTracker().SetLabel(memory.LabelForInnerTable) t.rowContainer.GetDiskTracker().AttachTo(executor.diskTracker) diff --git a/pkg/executor/point_get.go b/pkg/executor/point_get.go index 42b1d8930c68b..450012b2df1f0 100644 --- a/pkg/executor/point_get.go +++ b/pkg/executor/point_get.go @@ -63,8 +63,8 @@ func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) exec.Execut isStaleness: b.isStaleness, } - e.Base().SetInitCap(1) - e.Base().SetMaxChunkSize(1) + e.SetInitCap(1) + e.SetMaxChunkSize(1) e.Init(p) e.snapshot, err = b.getSnapshot() @@ -333,7 +333,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error { } return nil } - err = DecodeRowValToChunk(e.Base().Ctx(), e.Schema(), e.tblInfo, e.handle, val, req, e.rowDecoder) + err = DecodeRowValToChunk(e.BaseExecutor.Ctx(), e.Schema(), e.tblInfo, e.handle, val, req, e.rowDecoder) if err != nil { return err }