Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/executor: refine the Executor interface #49494

Merged
merged 1 commit into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions pkg/executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,7 @@ func (a *recordSet) NewChunk(alloc chunk.Allocator) *chunk.Chunk {
return exec.NewFirstChunk(a.executor)
}

base := a.executor.Base()
return alloc.Alloc(base.RetFieldTypes(), base.InitCap(), base.MaxChunkSize())
return alloc.Alloc(a.executor.RetFieldTypes(), a.executor.InitCap(), a.executor.MaxChunkSize())
}

func (a *recordSet) Finish() error {
Expand Down Expand Up @@ -886,8 +885,7 @@ func (c *chunkRowRecordSet) NewChunk(alloc chunk.Allocator) *chunk.Chunk {
return exec.NewFirstChunk(c.e)
}

base := c.e.Base()
return alloc.Alloc(base.RetFieldTypes(), base.InitCap(), base.MaxChunkSize())
return alloc.Alloc(c.e.RetFieldTypes(), c.e.InitCap(), c.e.MaxChunkSize())
}

func (c *chunkRowRecordSet) Close() error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (e *RecoverIndexExec) columnsTypes() []*types.FieldType {

// Open implements the Executor Open interface.
func (e *RecoverIndexExec) Open(ctx context.Context) error {
if err := exec.Open(ctx, e.BaseExecutor.Base()); err != nil {
if err := exec.Open(ctx, &e.BaseExecutor); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (e *BatchPointGetExec) Next(ctx context.Context, req *chunk.Chunk) error {
}
for !req.IsFull() && e.index < len(e.values) {
handle, val := e.handles[e.index], e.values[e.index]
err := DecodeRowValToChunk(e.Base().Ctx(), e.Schema(), e.tblInfo, handle, val, req, e.rowDecoder)
err := DecodeRowValToChunk(e.BaseExecutor.Ctx(), e.Schema(), e.tblInfo, handle, val, req, e.rowDecoder)
if err != nil {
return err
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3469,7 +3469,7 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) e
}

if len(partitions) == 0 {
return &TableDualExec{BaseExecutor: *ret.Base()}
return &TableDualExec{BaseExecutor: ret.BaseExecutor}
}

// Sort the partition is necessary to make the final multiple partition key ranges ordered.
Expand Down Expand Up @@ -4473,7 +4473,7 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte
}
return e, nil
}
ret := &TableDualExec{BaseExecutor: *e.Base()}
ret := &TableDualExec{BaseExecutor: e.BaseExecutor}
err = exec.Open(ctx, ret)
return ret, err
}
Expand Down Expand Up @@ -4550,7 +4550,7 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context
}
return e, err
}
ret := &TableDualExec{BaseExecutor: *e.Base()}
ret := &TableDualExec{BaseExecutor: e.BaseExecutor}
err = exec.Open(ctx, ret)
return ret, err
}
Expand Down Expand Up @@ -5120,8 +5120,8 @@ func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan
}
capacity = len(e.handles)
}
e.Base().SetInitCap(capacity)
e.Base().SetMaxChunkSize(capacity)
e.SetInitCap(capacity)
e.SetMaxChunkSize(capacity)
e.buildVirtualColumnInfo()
return e
}
Expand Down Expand Up @@ -5303,7 +5303,7 @@ func (b *executorBuilder) buildCTE(v *plannercore.PhysicalCTE) exec.Executor {
}

// Setup storages.
tps := seedExec.Base().RetFieldTypes()
tps := seedExec.RetFieldTypes()
resTbl = cteutil.NewStorageRowContainer(tps, chkSize)
if err := resTbl.OpenAndRef(); err != nil {
b.err = err
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (h *CoprocessorDAGHandler) HandleRequest(ctx context.Context, req *coproces
}

chk := exec.TryNewCacheChunk(e)
tps := e.Base().RetFieldTypes()
tps := e.RetFieldTypes()
var totalChunks, partChunks []tipb.Chunk
memTracker := h.sctx.GetSessionVars().StmtCtx.MemTracker
for {
Expand Down Expand Up @@ -125,7 +125,7 @@ func (h *CoprocessorDAGHandler) HandleStreamRequest(ctx context.Context, req *co
}

chk := exec.TryNewCacheChunk(e)
tps := e.Base().RetFieldTypes()
tps := e.RetFieldTypes()
for {
chk.Reset()
if err = exec.Next(ctx, e, chk); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/cte.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (p *cteProducer) openProducer(ctx context.Context, cteExec *CTEExec) (err e
// For non-recursive CTE, the result will be put into resTbl directly.
// So no need to build iterOutTbl.
// Construct iterOutTbl in Open() instead of buildCTE(), because its destruct is in Close().
recursiveTypes := p.recursiveExec.Base().RetFieldTypes()
recursiveTypes := p.recursiveExec.RetFieldTypes()
p.iterOutTbl = cteutil.NewStorageRowContainer(recursiveTypes, cteExec.MaxChunkSize())
if err = p.iterOutTbl.OpenAndRef(); err != nil {
return err
Expand All @@ -214,7 +214,7 @@ func (p *cteProducer) openProducer(ctx context.Context, cteExec *CTEExec) (err e
if p.isDistinct {
p.hashTbl = newConcurrentMapHashTable()
p.hCtx = &hashContext{
allTypes: cteExec.Base().RetFieldTypes(),
allTypes: cteExec.RetFieldTypes(),
}
// We use all columns to compute hash.
p.hCtx.keyColIdx = make([]int, len(p.hCtx.allTypes))
Expand Down
7 changes: 3 additions & 4 deletions pkg/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,7 @@ func (*globalPanicOnExceed) GetPriority() int64 {

// newList creates a new List to buffer current executor's result.
func newList(e exec.Executor) *chunk.List {
base := e.Base()
return chunk.NewList(base.RetFieldTypes(), base.InitCap(), base.MaxChunkSize())
return chunk.NewList(e.RetFieldTypes(), e.InitCap(), e.MaxChunkSize())
}

// CommandDDLJobsExec is the general struct for Cancel/Pause/Resume commands on
Expand Down Expand Up @@ -2377,15 +2376,15 @@ func (w *checkIndexWorker) HandleTask(task checkIndexTask, _ func(workerpool.Non
w.e.err.CompareAndSwap(nil, &err)
}

se, err := w.e.Base().GetSysSession()
se, err := w.e.BaseExecutor.GetSysSession()
if err != nil {
trySaveErr(err)
return
}
restoreCtx := w.initSessCtx(se)
defer func() {
restoreCtx()
w.e.Base().ReleaseSysSession(ctx, se)
w.e.BaseExecutor.ReleaseSysSession(ctx, se)
}()

var pkCols []string
Expand Down
8 changes: 6 additions & 2 deletions pkg/executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) {
}
maxChunkSize := ow.ctx.GetSessionVars().MaxChunkSize
for requiredRows > task.outerResult.Len() {
chk := ow.ctx.GetSessionVars().GetNewChunkWithCapacity(ow.outerCtx.rowTypes, maxChunkSize, maxChunkSize, ow.executor.Base().AllocPool)
chk := ow.executor.NewChunkWithCapacity(ow.outerCtx.rowTypes, maxChunkSize, maxChunkSize)
chk = chk.SetRequiredRows(requiredRows, maxChunkSize)
err := exec.Next(ctx, ow.executor, chk)
if err != nil {
Expand Down Expand Up @@ -468,7 +468,11 @@ func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) {
}
task.encodedLookUpKeys = make([]*chunk.Chunk, task.outerResult.NumChunks())
for i := range task.encodedLookUpKeys {
task.encodedLookUpKeys[i] = ow.ctx.GetSessionVars().GetNewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeBlob)}, task.outerResult.GetChunk(i).NumRows(), task.outerResult.GetChunk(i).NumRows(), ow.executor.Base().AllocPool)
task.encodedLookUpKeys[i] = ow.executor.NewChunkWithCapacity(
[]*types.FieldType{types.NewFieldType(mysql.TypeBlob)},
task.outerResult.GetChunk(i).NumRows(),
task.outerResult.GetChunk(i).NumRows(),
)
}
return task, nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/index_lookup_merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func (*outerMergeWorker) pushToChan(ctx context.Context, task *lookUpMergeJoinTa
func (omw *outerMergeWorker) buildTask(ctx context.Context) (*lookUpMergeJoinTask, error) {
task := &lookUpMergeJoinTask{
results: make(chan *indexMergeJoinResult, numResChkHold),
outerResult: chunk.NewList(omw.rowTypes, omw.executor.Base().InitCap(), omw.executor.Base().MaxChunkSize()),
outerResult: chunk.NewList(omw.rowTypes, omw.executor.InitCap(), omw.executor.MaxChunkSize()),
}
task.memTracker = memory.NewTracker(memory.LabelForSimpleTask, -1)
task.memTracker.AttachTo(omw.parentMemTracker)
Expand Down Expand Up @@ -712,7 +712,7 @@ func (imw *innerMergeWorker) dedupDatumLookUpKeys(lookUpContents []*indexJoinLoo

// fetchNextInnerResult collects a chunk of inner results from inner child executor.
func (imw *innerMergeWorker) fetchNextInnerResult(ctx context.Context, task *lookUpMergeJoinTask) (beginRow chunk.Row, err error) {
task.innerResult = imw.ctx.GetSessionVars().GetNewChunkWithCapacity(exec.RetTypes(imw.innerExec), imw.ctx.GetSessionVars().MaxChunkSize, imw.ctx.GetSessionVars().MaxChunkSize, imw.innerExec.Base().AllocPool)
task.innerResult = imw.innerExec.NewChunkWithCapacity(imw.innerExec.RetFieldTypes(), imw.innerExec.MaxChunkSize(), imw.innerExec.MaxChunkSize())
err = exec.Next(ctx, imw.innerExec, task.innerResult)
task.innerIter = chunk.NewIterator4Chunk(task.innerResult)
beginRow = task.innerIter.Begin()
Expand Down
6 changes: 3 additions & 3 deletions pkg/executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ func (w *partialTableWorker) needPartitionHandle() (bool, error) {

func (w *partialTableWorker) fetchHandles(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *indexMergeTableTask,
finished <-chan struct{}, handleCols plannercore.HandleCols, parTblIdx int, partialPlanIndex int) (count int64, err error) {
chk := w.sc.GetSessionVars().GetNewChunkWithCapacity(w.getRetTpsForTableScan(), w.maxChunkSize, w.maxChunkSize, w.tableReader.Base().AllocPool)
chk := w.tableReader.NewChunkWithCapacity(w.getRetTpsForTableScan(), w.maxChunkSize, w.maxBatchSize)
for {
start := time.Now()
handles, retChunk, err := w.extractTaskHandles(ctx, chk, handleCols)
Expand Down Expand Up @@ -686,8 +686,8 @@ func (w *partialTableWorker) extractTaskHandles(ctx context.Context, chk *chunk.
if err != nil {
return nil, nil, err
}
if be := w.tableReader.Base(); be != nil && be.RuntimeStats() != nil {
be.RuntimeStats().Record(time.Since(start), chk.NumRows())
if w.tableReader != nil && w.tableReader.RuntimeStats() != nil {
w.tableReader.RuntimeStats().Record(time.Since(start), chk.NumRows())
}
if chk.NumRows() == 0 {
failpoint.Inject("testIndexMergeErrorPartialTableWorker", func(v failpoint.Value) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func (e *InsertExec) initEvalBuffer4Dup() {
evalBufferTypes = append(evalBufferTypes, &(col.FieldType))
}
if extraLen > 0 {
evalBufferTypes = append(evalBufferTypes, e.SelectExec.Base().RetFieldTypes()[e.rowLen:]...)
evalBufferTypes = append(evalBufferTypes, e.SelectExec.RetFieldTypes()[e.rowLen:]...)
}
for _, col := range e.Table.Cols() {
evalBufferTypes = append(evalBufferTypes, &(col.FieldType))
Expand Down
64 changes: 42 additions & 22 deletions pkg/executor/internal/exec/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,15 @@ import (
// return a batch of rows, other than a single row in Volcano.
// NOTE: Executors must call "chk.Reset()" before appending their results to it.
type Executor interface {
Base() *BaseExecutor
NewChunk() *chunk.Chunk
NewChunkWithCapacity(fields []*types.FieldType, capacity int, maxCachesize int) *chunk.Chunk

RuntimeStats() *execdetails.BasicRuntimeStats

HandleSQLKillerSignal() error
RegisterSQLAndPlanInExecForTopSQL()

AllChildren() []Executor
Open(context.Context) error
Next(ctx context.Context, req *chunk.Chunk) error
Close() error
Expand Down Expand Up @@ -156,11 +164,6 @@ func (e *BaseExecutor) SetMaxChunkSize(size int) {
e.maxChunkSize = size
}

// Base returns the BaseExecutor of an executor, don't override this method!
func (e *BaseExecutor) Base() *BaseExecutor {
return e
}

// Open initializes children recursively and "childrenResults" according to children's schemas.
func (e *BaseExecutor) Open(ctx context.Context) error {
for _, child := range e.children {
Expand Down Expand Up @@ -239,23 +242,43 @@ func (e *BaseExecutor) ReleaseSysSession(ctx context.Context, sctx sessionctx.Co
sysSessionPool.Put(sctx.(pools.Resource))
}

// NewChunk creates a new chunk according to the executor configuration
func (e *BaseExecutor) NewChunk() *chunk.Chunk {
return e.NewChunkWithCapacity(e.RetFieldTypes(), e.InitCap(), e.MaxChunkSize())
}

// NewChunkWithCapacity allows the caller to allocate the chunk with any types, capacity and max size in the pool
func (e *BaseExecutor) NewChunkWithCapacity(fields []*types.FieldType, capacity int, maxCachesize int) *chunk.Chunk {
return e.ctx.GetSessionVars().GetNewChunkWithCapacity(fields, capacity, maxCachesize, e.AllocPool)
}

// HandleSQLKillerSignal handles the signal sent by SQLKiller
func (e *BaseExecutor) HandleSQLKillerSignal() error {
return e.ctx.GetSessionVars().SQLKiller.HandleSignal()
}

// RegisterSQLAndPlanInExecForTopSQL registers the current SQL and Plan on top sql
// TODO: consider whether it's appropriate to have this on executor
func (e *BaseExecutor) RegisterSQLAndPlanInExecForTopSQL() {
sessVars := e.ctx.GetSessionVars()
if topsqlstate.TopSQLEnabled() && sessVars.StmtCtx.IsSQLAndPlanRegistered.CompareAndSwap(false, true) {
RegisterSQLAndPlanInExecForTopSQL(sessVars)
}
}

// TryNewCacheChunk tries to get a cached chunk
func TryNewCacheChunk(e Executor) *chunk.Chunk {
base := e.Base()
s := base.Ctx().GetSessionVars()
return s.GetNewChunkWithCapacity(base.RetFieldTypes(), base.InitCap(), base.MaxChunkSize(), base.AllocPool)
return e.NewChunk()
}

// RetTypes returns all output column types.
func RetTypes(e Executor) []*types.FieldType {
base := e.Base()
return base.RetFieldTypes()
return e.RetFieldTypes()
}

// NewFirstChunk creates a new chunk to buffer current executor's result.
func NewFirstChunk(e Executor) *chunk.Chunk {
base := e.Base()
return chunk.New(base.RetFieldTypes(), base.InitCap(), base.MaxChunkSize())
return chunk.New(e.RetFieldTypes(), e.InitCap(), e.MaxChunkSize())
}

// Open is a wrapper function on e.Open(), it handles some common codes.
Expand All @@ -270,29 +293,26 @@ func Open(ctx context.Context, e Executor) (err error) {

// Next is a wrapper function on e.Next(), it handles some common codes.
func Next(ctx context.Context, e Executor, req *chunk.Chunk) error {
base := e.Base()
if base.RuntimeStats() != nil {
if e.RuntimeStats() != nil {
start := time.Now()
defer func() { base.RuntimeStats().Record(time.Since(start), req.NumRows()) }()
defer func() { e.RuntimeStats().Record(time.Since(start), req.NumRows()) }()
}
sessVars := base.Ctx().GetSessionVars()
if err := sessVars.SQLKiller.HandleSignal(); err != nil {

if err := e.HandleSQLKillerSignal(); err != nil {
return err
}

r, ctx := tracing.StartRegionEx(ctx, fmt.Sprintf("%T.Next", e))
defer r.End()

if topsqlstate.TopSQLEnabled() && sessVars.StmtCtx.IsSQLAndPlanRegistered.CompareAndSwap(false, true) {
RegisterSQLAndPlanInExecForTopSQL(sessVars)
}
e.RegisterSQLAndPlanInExecForTopSQL()
err := e.Next(ctx, req)

if err != nil {
return err
}
// recheck whether the session/query is killed during the Next()
return sessVars.SQLKiller.HandleSignal()
return e.HandleSQLKillerSignal()
}

// Close is a wrapper function on e.Close(), it handles some common codes.
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func (w *buildWorker) fetchBuildSideRows(ctx context.Context, chkCh chan<- *chun
if w.hashJoinCtx.finished.Load() {
return
}
chk := sessVars.GetNewChunkWithCapacity(w.buildSideExec.Base().RetFieldTypes(), sessVars.MaxChunkSize, sessVars.MaxChunkSize, w.hashJoinCtx.allocPool)
chk := sessVars.GetNewChunkWithCapacity(w.buildSideExec.RetFieldTypes(), sessVars.MaxChunkSize, sessVars.MaxChunkSize, w.hashJoinCtx.allocPool)
err = exec.Next(ctx, w.buildSideExec, chk)
if err != nil {
errCh <- errors.Trace(err)
Expand Down Expand Up @@ -1355,7 +1355,7 @@ func (e *NestedLoopApplyExec) Open(ctx context.Context) error {
// aggExecutorTreeInputEmpty checks whether the executor tree returns empty if without aggregate operators.
// Note that, the prerequisite is that this executor tree has been executed already and it returns one row.
func aggExecutorTreeInputEmpty(e exec.Executor) bool {
children := e.Base().AllChildren()
children := e.AllChildren()
if len(children) == 0 {
return false
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (t *mergeJoinTable) init(executor *MergeJoinExec) {
t.groupRowsIter = chunk.NewIterator4Chunk(t.childChunk)

if t.isInner {
t.rowContainer = chunk.NewRowContainer(child.Base().RetFieldTypes(), t.childChunk.Capacity())
t.rowContainer = chunk.NewRowContainer(child.RetFieldTypes(), t.childChunk.Capacity())
t.rowContainer.GetMemTracker().AttachTo(executor.memTracker)
t.rowContainer.GetMemTracker().SetLabel(memory.LabelForInnerTable)
t.rowContainer.GetDiskTracker().AttachTo(executor.diskTracker)
Expand Down
6 changes: 3 additions & 3 deletions pkg/executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) exec.Execut
isStaleness: b.isStaleness,
}

e.Base().SetInitCap(1)
e.Base().SetMaxChunkSize(1)
e.SetInitCap(1)
e.SetMaxChunkSize(1)
e.Init(p)

e.snapshot, err = b.getSnapshot()
Expand Down Expand Up @@ -333,7 +333,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
}
return nil
}
err = DecodeRowValToChunk(e.Base().Ctx(), e.Schema(), e.tblInfo, e.handle, val, req, e.rowDecoder)
err = DecodeRowValToChunk(e.BaseExecutor.Ctx(), e.Schema(), e.tblInfo, e.handle, val, req, e.rowDecoder)
if err != nil {
return err
}
Expand Down