Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: Use the BaseExecutorV2 to refactor the SelectionExec and remove the session context inside #54617

Merged
merged 1 commit into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1963,8 +1963,9 @@ func (b *executorBuilder) buildSelection(v *plannercore.PhysicalSelection) exec.
return nil
}
e := &SelectionExec{
BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID(), childExec),
filters: v.Conditions,
selectionExecutorContext: newSelectionExecutorContext(b.ctx),
BaseExecutorV2: exec.NewBaseExecutorV2(b.ctx.GetSessionVars(), v.Schema(), v.ID(), childExec),
filters: v.Conditions,
}
return e
}
Expand Down Expand Up @@ -4265,8 +4266,9 @@ func (builder *dataReaderBuilder) buildExecutorForIndexJoinInternal(ctx context.
return nil, err
}
exec := &SelectionExec{
BaseExecutor: exec.NewBaseExecutor(builder.ctx, v.Schema(), v.ID(), childExec),
filters: v.Conditions,
selectionExecutorContext: newSelectionExecutorContext(builder.ctx),
BaseExecutorV2: exec.NewBaseExecutorV2(builder.ctx.GetSessionVars(), v.Schema(), v.ID(), childExec),
filters: v.Conditions,
}
err = exec.open(ctx)
return exec, err
Expand Down
29 changes: 22 additions & 7 deletions pkg/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1526,9 +1526,24 @@ func (e *TableDualExec) Next(_ context.Context, req *chunk.Chunk) error {
return nil
}

type selectionExecutorContext struct {
stmtMemTracker *memory.Tracker
evalCtx expression.EvalContext
enableVectorizedExpression bool
}

func newSelectionExecutorContext(sctx sessionctx.Context) selectionExecutorContext {
return selectionExecutorContext{
stmtMemTracker: sctx.GetSessionVars().StmtCtx.MemTracker,
evalCtx: sctx.GetExprCtx().GetEvalCtx(),
enableVectorizedExpression: sctx.GetSessionVars().EnableVectorizedExpression,
}
}

// SelectionExec represents a filter executor.
type SelectionExec struct {
exec.BaseExecutor
selectionExecutorContext
exec.BaseExecutorV2

batched bool
filters []expression.Expression
Expand All @@ -1542,7 +1557,7 @@ type SelectionExec struct {

// Open implements the Executor Open interface.
func (e *SelectionExec) Open(ctx context.Context) error {
if err := e.BaseExecutor.Open(ctx); err != nil {
if err := e.BaseExecutorV2.Open(ctx); err != nil {
return err
}
failpoint.Inject("mockSelectionExecBaseExecutorOpenReturnedError", func(val failpoint.Value) {
Expand All @@ -1559,7 +1574,7 @@ func (e *SelectionExec) open(context.Context) error {
} else {
e.memTracker = memory.NewTracker(e.ID(), -1)
}
e.memTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.MemTracker)
e.memTracker.AttachTo(e.stmtMemTracker)
e.childResult = exec.TryNewCacheChunk(e.Children(0))
e.memTracker.Consume(e.childResult.MemoryUsage())
e.batched = expression.Vectorizable(e.filters)
Expand All @@ -1578,7 +1593,7 @@ func (e *SelectionExec) Close() error {
e.childResult = nil
}
e.selected = nil
return e.BaseExecutor.Close()
return e.BaseExecutorV2.Close()
}

// Next implements the Executor Next interface.
Expand Down Expand Up @@ -1611,7 +1626,7 @@ func (e *SelectionExec) Next(ctx context.Context, req *chunk.Chunk) error {
if e.childResult.NumRows() == 0 {
return nil
}
e.selected, err = expression.VectorizedFilter(e.Ctx().GetExprCtx().GetEvalCtx(), e.Ctx().GetSessionVars().EnableVectorizedExpression, e.filters, e.inputIter, e.selected)
e.selected, err = expression.VectorizedFilter(e.evalCtx, e.enableVectorizedExpression, e.filters, e.inputIter, e.selected)
if err != nil {
return err
}
Expand All @@ -1623,10 +1638,10 @@ func (e *SelectionExec) Next(ctx context.Context, req *chunk.Chunk) error {
// For sql with "SETVAR" in filter and "GETVAR" in projection, for example: "SELECT @a FROM t WHERE (@a := 2) > 0",
// we have to set batch size to 1 to do the evaluation of filter and projection.
func (e *SelectionExec) unBatchedNext(ctx context.Context, chk *chunk.Chunk) error {
exprCtx := e.Ctx().GetExprCtx()
evalCtx := e.evalCtx
for {
for ; e.inputRow != e.inputIter.End(); e.inputRow = e.inputIter.Next() {
selected, _, err := expression.EvalBool(exprCtx.GetEvalCtx(), e.filters, e.inputRow)
selected, _, err := expression.EvalBool(evalCtx, e.filters, e.inputRow)
if err != nil {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/executor/executor_required_rows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,8 +482,9 @@ func TestSelectionRequiredRows(t *testing.T) {

func buildSelectionExec(ctx sessionctx.Context, filters []expression.Expression, src exec.Executor) exec.Executor {
return &SelectionExec{
BaseExecutor: exec.NewBaseExecutor(ctx, src.Schema(), 0, src),
filters: filters,
selectionExecutorContext: newSelectionExecutorContext(ctx),
BaseExecutorV2: exec.NewBaseExecutorV2(ctx.GetSessionVars(), src.Schema(), 0, src),
filters: filters,
}
}

Expand Down