Skip to content

Commit

Permalink
pkg/executor: refine the Executor interface (#49494)
Browse files Browse the repository at this point in the history
close #49490
  • Loading branch information
YangKeao authored Dec 19, 2023
1 parent abc54d0 commit b850d26
Show file tree
Hide file tree
Showing 15 changed files with 77 additions and 56 deletions.
6 changes: 2 additions & 4 deletions pkg/executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,7 @@ func (a *recordSet) NewChunk(alloc chunk.Allocator) *chunk.Chunk {
return exec.NewFirstChunk(a.executor)
}

base := a.executor.Base()
return alloc.Alloc(base.RetFieldTypes(), base.InitCap(), base.MaxChunkSize())
return alloc.Alloc(a.executor.RetFieldTypes(), a.executor.InitCap(), a.executor.MaxChunkSize())
}

func (a *recordSet) Finish() error {
Expand Down Expand Up @@ -886,8 +885,7 @@ func (c *chunkRowRecordSet) NewChunk(alloc chunk.Allocator) *chunk.Chunk {
return exec.NewFirstChunk(c.e)
}

base := c.e.Base()
return alloc.Alloc(base.RetFieldTypes(), base.InitCap(), base.MaxChunkSize())
return alloc.Alloc(c.e.RetFieldTypes(), c.e.InitCap(), c.e.MaxChunkSize())
}

func (c *chunkRowRecordSet) Close() error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (e *RecoverIndexExec) columnsTypes() []*types.FieldType {

// Open implements the Executor Open interface.
func (e *RecoverIndexExec) Open(ctx context.Context) error {
if err := exec.Open(ctx, e.BaseExecutor.Base()); err != nil {
if err := exec.Open(ctx, &e.BaseExecutor); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (e *BatchPointGetExec) Next(ctx context.Context, req *chunk.Chunk) error {
}
for !req.IsFull() && e.index < len(e.values) {
handle, val := e.handles[e.index], e.values[e.index]
err := DecodeRowValToChunk(e.Base().Ctx(), e.Schema(), e.tblInfo, handle, val, req, e.rowDecoder)
err := DecodeRowValToChunk(e.BaseExecutor.Ctx(), e.Schema(), e.tblInfo, handle, val, req, e.rowDecoder)
if err != nil {
return err
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3463,7 +3463,7 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) e
}

if len(partitions) == 0 {
return &TableDualExec{BaseExecutor: *ret.Base()}
return &TableDualExec{BaseExecutor: ret.BaseExecutor}
}

// Sort the partition is necessary to make the final multiple partition key ranges ordered.
Expand Down Expand Up @@ -4467,7 +4467,7 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte
}
return e, nil
}
ret := &TableDualExec{BaseExecutor: *e.Base()}
ret := &TableDualExec{BaseExecutor: e.BaseExecutor}
err = exec.Open(ctx, ret)
return ret, err
}
Expand Down Expand Up @@ -4544,7 +4544,7 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context
}
return e, err
}
ret := &TableDualExec{BaseExecutor: *e.Base()}
ret := &TableDualExec{BaseExecutor: e.BaseExecutor}
err = exec.Open(ctx, ret)
return ret, err
}
Expand Down Expand Up @@ -5114,8 +5114,8 @@ func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan
}
capacity = len(e.handles)
}
e.Base().SetInitCap(capacity)
e.Base().SetMaxChunkSize(capacity)
e.SetInitCap(capacity)
e.SetMaxChunkSize(capacity)
e.buildVirtualColumnInfo()
return e
}
Expand Down Expand Up @@ -5297,7 +5297,7 @@ func (b *executorBuilder) buildCTE(v *plannercore.PhysicalCTE) exec.Executor {
}

// Setup storages.
tps := seedExec.Base().RetFieldTypes()
tps := seedExec.RetFieldTypes()
resTbl = cteutil.NewStorageRowContainer(tps, chkSize)
if err := resTbl.OpenAndRef(); err != nil {
b.err = err
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (h *CoprocessorDAGHandler) HandleRequest(ctx context.Context, req *coproces
}

chk := exec.TryNewCacheChunk(e)
tps := e.Base().RetFieldTypes()
tps := e.RetFieldTypes()
var totalChunks, partChunks []tipb.Chunk
memTracker := h.sctx.GetSessionVars().StmtCtx.MemTracker
for {
Expand Down Expand Up @@ -125,7 +125,7 @@ func (h *CoprocessorDAGHandler) HandleStreamRequest(ctx context.Context, req *co
}

chk := exec.TryNewCacheChunk(e)
tps := e.Base().RetFieldTypes()
tps := e.RetFieldTypes()
for {
chk.Reset()
if err = exec.Next(ctx, e, chk); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/cte.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (p *cteProducer) openProducer(ctx context.Context, cteExec *CTEExec) (err e
// For non-recursive CTE, the result will be put into resTbl directly.
// So no need to build iterOutTbl.
// Construct iterOutTbl in Open() instead of buildCTE(), because its destruct is in Close().
recursiveTypes := p.recursiveExec.Base().RetFieldTypes()
recursiveTypes := p.recursiveExec.RetFieldTypes()
p.iterOutTbl = cteutil.NewStorageRowContainer(recursiveTypes, cteExec.MaxChunkSize())
if err = p.iterOutTbl.OpenAndRef(); err != nil {
return err
Expand All @@ -214,7 +214,7 @@ func (p *cteProducer) openProducer(ctx context.Context, cteExec *CTEExec) (err e
if p.isDistinct {
p.hashTbl = newConcurrentMapHashTable()
p.hCtx = &hashContext{
allTypes: cteExec.Base().RetFieldTypes(),
allTypes: cteExec.RetFieldTypes(),
}
// We use all columns to compute hash.
p.hCtx.keyColIdx = make([]int, len(p.hCtx.allTypes))
Expand Down
7 changes: 3 additions & 4 deletions pkg/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,7 @@ func (*globalPanicOnExceed) GetPriority() int64 {

// newList creates a new List to buffer current executor's result.
func newList(e exec.Executor) *chunk.List {
base := e.Base()
return chunk.NewList(base.RetFieldTypes(), base.InitCap(), base.MaxChunkSize())
return chunk.NewList(e.RetFieldTypes(), e.InitCap(), e.MaxChunkSize())
}

// CommandDDLJobsExec is the general struct for Cancel/Pause/Resume commands on
Expand Down Expand Up @@ -2377,15 +2376,15 @@ func (w *checkIndexWorker) HandleTask(task checkIndexTask, _ func(workerpool.Non
w.e.err.CompareAndSwap(nil, &err)
}

se, err := w.e.Base().GetSysSession()
se, err := w.e.BaseExecutor.GetSysSession()
if err != nil {
trySaveErr(err)
return
}
restoreCtx := w.initSessCtx(se)
defer func() {
restoreCtx()
w.e.Base().ReleaseSysSession(ctx, se)
w.e.BaseExecutor.ReleaseSysSession(ctx, se)
}()

var pkCols []string
Expand Down
8 changes: 6 additions & 2 deletions pkg/executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) {
}
maxChunkSize := ow.ctx.GetSessionVars().MaxChunkSize
for requiredRows > task.outerResult.Len() {
chk := ow.ctx.GetSessionVars().GetNewChunkWithCapacity(ow.outerCtx.rowTypes, maxChunkSize, maxChunkSize, ow.executor.Base().AllocPool)
chk := ow.executor.NewChunkWithCapacity(ow.outerCtx.rowTypes, maxChunkSize, maxChunkSize)
chk = chk.SetRequiredRows(requiredRows, maxChunkSize)
err := exec.Next(ctx, ow.executor, chk)
if err != nil {
Expand Down Expand Up @@ -468,7 +468,11 @@ func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) {
}
task.encodedLookUpKeys = make([]*chunk.Chunk, task.outerResult.NumChunks())
for i := range task.encodedLookUpKeys {
task.encodedLookUpKeys[i] = ow.ctx.GetSessionVars().GetNewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeBlob)}, task.outerResult.GetChunk(i).NumRows(), task.outerResult.GetChunk(i).NumRows(), ow.executor.Base().AllocPool)
task.encodedLookUpKeys[i] = ow.executor.NewChunkWithCapacity(
[]*types.FieldType{types.NewFieldType(mysql.TypeBlob)},
task.outerResult.GetChunk(i).NumRows(),
task.outerResult.GetChunk(i).NumRows(),
)
}
return task, nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/index_lookup_merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func (*outerMergeWorker) pushToChan(ctx context.Context, task *lookUpMergeJoinTa
func (omw *outerMergeWorker) buildTask(ctx context.Context) (*lookUpMergeJoinTask, error) {
task := &lookUpMergeJoinTask{
results: make(chan *indexMergeJoinResult, numResChkHold),
outerResult: chunk.NewList(omw.rowTypes, omw.executor.Base().InitCap(), omw.executor.Base().MaxChunkSize()),
outerResult: chunk.NewList(omw.rowTypes, omw.executor.InitCap(), omw.executor.MaxChunkSize()),
}
task.memTracker = memory.NewTracker(memory.LabelForSimpleTask, -1)
task.memTracker.AttachTo(omw.parentMemTracker)
Expand Down Expand Up @@ -712,7 +712,7 @@ func (imw *innerMergeWorker) dedupDatumLookUpKeys(lookUpContents []*indexJoinLoo

// fetchNextInnerResult collects a chunk of inner results from inner child executor.
func (imw *innerMergeWorker) fetchNextInnerResult(ctx context.Context, task *lookUpMergeJoinTask) (beginRow chunk.Row, err error) {
task.innerResult = imw.ctx.GetSessionVars().GetNewChunkWithCapacity(exec.RetTypes(imw.innerExec), imw.ctx.GetSessionVars().MaxChunkSize, imw.ctx.GetSessionVars().MaxChunkSize, imw.innerExec.Base().AllocPool)
task.innerResult = imw.innerExec.NewChunkWithCapacity(imw.innerExec.RetFieldTypes(), imw.innerExec.MaxChunkSize(), imw.innerExec.MaxChunkSize())
err = exec.Next(ctx, imw.innerExec, task.innerResult)
task.innerIter = chunk.NewIterator4Chunk(task.innerResult)
beginRow = task.innerIter.Begin()
Expand Down
6 changes: 3 additions & 3 deletions pkg/executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ func (w *partialTableWorker) needPartitionHandle() (bool, error) {

func (w *partialTableWorker) fetchHandles(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *indexMergeTableTask,
finished <-chan struct{}, handleCols plannercore.HandleCols, parTblIdx int, partialPlanIndex int) (count int64, err error) {
chk := w.sc.GetSessionVars().GetNewChunkWithCapacity(w.getRetTpsForTableScan(), w.maxChunkSize, w.maxChunkSize, w.tableReader.Base().AllocPool)
chk := w.tableReader.NewChunkWithCapacity(w.getRetTpsForTableScan(), w.maxChunkSize, w.maxBatchSize)
for {
start := time.Now()
handles, retChunk, err := w.extractTaskHandles(ctx, chk, handleCols)
Expand Down Expand Up @@ -686,8 +686,8 @@ func (w *partialTableWorker) extractTaskHandles(ctx context.Context, chk *chunk.
if err != nil {
return nil, nil, err
}
if be := w.tableReader.Base(); be != nil && be.RuntimeStats() != nil {
be.RuntimeStats().Record(time.Since(start), chk.NumRows())
if w.tableReader != nil && w.tableReader.RuntimeStats() != nil {
w.tableReader.RuntimeStats().Record(time.Since(start), chk.NumRows())
}
if chk.NumRows() == 0 {
failpoint.Inject("testIndexMergeErrorPartialTableWorker", func(v failpoint.Value) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func (e *InsertExec) initEvalBuffer4Dup() {
evalBufferTypes = append(evalBufferTypes, &(col.FieldType))
}
if extraLen > 0 {
evalBufferTypes = append(evalBufferTypes, e.SelectExec.Base().RetFieldTypes()[e.rowLen:]...)
evalBufferTypes = append(evalBufferTypes, e.SelectExec.RetFieldTypes()[e.rowLen:]...)
}
for _, col := range e.Table.Cols() {
evalBufferTypes = append(evalBufferTypes, &(col.FieldType))
Expand Down
64 changes: 42 additions & 22 deletions pkg/executor/internal/exec/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,15 @@ import (
// return a batch of rows, other than a single row in Volcano.
// NOTE: Executors must call "chk.Reset()" before appending their results to it.
type Executor interface {
Base() *BaseExecutor
NewChunk() *chunk.Chunk
NewChunkWithCapacity(fields []*types.FieldType, capacity int, maxCachesize int) *chunk.Chunk

RuntimeStats() *execdetails.BasicRuntimeStats

HandleSQLKillerSignal() error
RegisterSQLAndPlanInExecForTopSQL()

AllChildren() []Executor
Open(context.Context) error
Next(ctx context.Context, req *chunk.Chunk) error
Close() error
Expand Down Expand Up @@ -156,11 +164,6 @@ func (e *BaseExecutor) SetMaxChunkSize(size int) {
e.maxChunkSize = size
}

// Base returns the BaseExecutor of an executor, don't override this method!
func (e *BaseExecutor) Base() *BaseExecutor {
return e
}

// Open initializes children recursively and "childrenResults" according to children's schemas.
func (e *BaseExecutor) Open(ctx context.Context) error {
for _, child := range e.children {
Expand Down Expand Up @@ -239,23 +242,43 @@ func (e *BaseExecutor) ReleaseSysSession(ctx context.Context, sctx sessionctx.Co
sysSessionPool.Put(sctx.(pools.Resource))
}

// NewChunk creates a new chunk according to the executor configuration
func (e *BaseExecutor) NewChunk() *chunk.Chunk {
return e.NewChunkWithCapacity(e.RetFieldTypes(), e.InitCap(), e.MaxChunkSize())
}

// NewChunkWithCapacity allows the caller to allocate the chunk with any types, capacity and max size in the pool
func (e *BaseExecutor) NewChunkWithCapacity(fields []*types.FieldType, capacity int, maxCachesize int) *chunk.Chunk {
return e.ctx.GetSessionVars().GetNewChunkWithCapacity(fields, capacity, maxCachesize, e.AllocPool)
}

// HandleSQLKillerSignal handles the signal sent by SQLKiller
func (e *BaseExecutor) HandleSQLKillerSignal() error {
return e.ctx.GetSessionVars().SQLKiller.HandleSignal()
}

// RegisterSQLAndPlanInExecForTopSQL registers the current SQL and Plan on top sql
// TODO: consider whether it's appropriate to have this on executor
func (e *BaseExecutor) RegisterSQLAndPlanInExecForTopSQL() {
sessVars := e.ctx.GetSessionVars()
if topsqlstate.TopSQLEnabled() && sessVars.StmtCtx.IsSQLAndPlanRegistered.CompareAndSwap(false, true) {
RegisterSQLAndPlanInExecForTopSQL(sessVars)
}
}

// TryNewCacheChunk tries to get a cached chunk
func TryNewCacheChunk(e Executor) *chunk.Chunk {
base := e.Base()
s := base.Ctx().GetSessionVars()
return s.GetNewChunkWithCapacity(base.RetFieldTypes(), base.InitCap(), base.MaxChunkSize(), base.AllocPool)
return e.NewChunk()
}

// RetTypes returns all output column types.
func RetTypes(e Executor) []*types.FieldType {
base := e.Base()
return base.RetFieldTypes()
return e.RetFieldTypes()
}

// NewFirstChunk creates a new chunk to buffer current executor's result.
func NewFirstChunk(e Executor) *chunk.Chunk {
base := e.Base()
return chunk.New(base.RetFieldTypes(), base.InitCap(), base.MaxChunkSize())
return chunk.New(e.RetFieldTypes(), e.InitCap(), e.MaxChunkSize())
}

// Open is a wrapper function on e.Open(), it handles some common codes.
Expand All @@ -270,29 +293,26 @@ func Open(ctx context.Context, e Executor) (err error) {

// Next is a wrapper function on e.Next(), it handles some common codes.
func Next(ctx context.Context, e Executor, req *chunk.Chunk) error {
base := e.Base()
if base.RuntimeStats() != nil {
if e.RuntimeStats() != nil {
start := time.Now()
defer func() { base.RuntimeStats().Record(time.Since(start), req.NumRows()) }()
defer func() { e.RuntimeStats().Record(time.Since(start), req.NumRows()) }()
}
sessVars := base.Ctx().GetSessionVars()
if err := sessVars.SQLKiller.HandleSignal(); err != nil {

if err := e.HandleSQLKillerSignal(); err != nil {
return err
}

r, ctx := tracing.StartRegionEx(ctx, fmt.Sprintf("%T.Next", e))
defer r.End()

if topsqlstate.TopSQLEnabled() && sessVars.StmtCtx.IsSQLAndPlanRegistered.CompareAndSwap(false, true) {
RegisterSQLAndPlanInExecForTopSQL(sessVars)
}
e.RegisterSQLAndPlanInExecForTopSQL()
err := e.Next(ctx, req)

if err != nil {
return err
}
// recheck whether the session/query is killed during the Next()
return sessVars.SQLKiller.HandleSignal()
return e.HandleSQLKillerSignal()
}

// Close is a wrapper function on e.Close(), it handles some common codes.
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func (w *buildWorker) fetchBuildSideRows(ctx context.Context, chkCh chan<- *chun
if w.hashJoinCtx.finished.Load() {
return
}
chk := sessVars.GetNewChunkWithCapacity(w.buildSideExec.Base().RetFieldTypes(), sessVars.MaxChunkSize, sessVars.MaxChunkSize, w.hashJoinCtx.allocPool)
chk := sessVars.GetNewChunkWithCapacity(w.buildSideExec.RetFieldTypes(), sessVars.MaxChunkSize, sessVars.MaxChunkSize, w.hashJoinCtx.allocPool)
err = exec.Next(ctx, w.buildSideExec, chk)
if err != nil {
errCh <- errors.Trace(err)
Expand Down Expand Up @@ -1355,7 +1355,7 @@ func (e *NestedLoopApplyExec) Open(ctx context.Context) error {
// aggExecutorTreeInputEmpty checks whether the executor tree returns empty if without aggregate operators.
// Note that, the prerequisite is that this executor tree has been executed already and it returns one row.
func aggExecutorTreeInputEmpty(e exec.Executor) bool {
children := e.Base().AllChildren()
children := e.AllChildren()
if len(children) == 0 {
return false
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (t *mergeJoinTable) init(executor *MergeJoinExec) {
t.groupRowsIter = chunk.NewIterator4Chunk(t.childChunk)

if t.isInner {
t.rowContainer = chunk.NewRowContainer(child.Base().RetFieldTypes(), t.childChunk.Capacity())
t.rowContainer = chunk.NewRowContainer(child.RetFieldTypes(), t.childChunk.Capacity())
t.rowContainer.GetMemTracker().AttachTo(executor.memTracker)
t.rowContainer.GetMemTracker().SetLabel(memory.LabelForInnerTable)
t.rowContainer.GetDiskTracker().AttachTo(executor.diskTracker)
Expand Down
6 changes: 3 additions & 3 deletions pkg/executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) exec.Execut
isStaleness: b.isStaleness,
}

e.Base().SetInitCap(1)
e.Base().SetMaxChunkSize(1)
e.SetInitCap(1)
e.SetMaxChunkSize(1)
e.Init(p)

e.snapshot, err = b.getSnapshot()
Expand Down Expand Up @@ -333,7 +333,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
}
return nil
}
err = DecodeRowValToChunk(e.Base().Ctx(), e.Schema(), e.tblInfo, e.handle, val, req, e.rowDecoder)
err = DecodeRowValToChunk(e.BaseExecutor.Ctx(), e.Schema(), e.tblInfo, e.handle, val, req, e.rowDecoder)
if err != nil {
return err
}
Expand Down

0 comments on commit b850d26

Please sign in to comment.