Skip to content

Commit

Permalink
Use the BaseExecutorV2 to refactor the selection executor
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Keao <[email protected]>
  • Loading branch information
YangKeao committed Jul 15, 2024
1 parent ac786cb commit d93f2c2
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 13 deletions.
10 changes: 6 additions & 4 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1963,8 +1963,9 @@ func (b *executorBuilder) buildSelection(v *plannercore.PhysicalSelection) exec.
return nil
}
e := &SelectionExec{
BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID(), childExec),
filters: v.Conditions,
selectionExecutorContext: newSelectionExecutorContext(b.ctx),
BaseExecutorV2: exec.NewBaseExecutorV2(b.ctx.GetSessionVars(), v.Schema(), v.ID(), childExec),
filters: v.Conditions,
}
return e
}
Expand Down Expand Up @@ -4264,8 +4265,9 @@ func (builder *dataReaderBuilder) buildExecutorForIndexJoinInternal(ctx context.
return nil, err
}
exec := &SelectionExec{
BaseExecutor: exec.NewBaseExecutor(builder.ctx, v.Schema(), v.ID(), childExec),
filters: v.Conditions,
selectionExecutorContext: newSelectionExecutorContext(builder.ctx),
BaseExecutorV2: exec.NewBaseExecutorV2(builder.ctx.GetSessionVars(), v.Schema(), v.ID(), childExec),
filters: v.Conditions,
}
err = exec.open(ctx)
return exec, err
Expand Down
29 changes: 22 additions & 7 deletions pkg/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1526,9 +1526,24 @@ func (e *TableDualExec) Next(_ context.Context, req *chunk.Chunk) error {
return nil
}

type selectionExecutorContext struct {
stmtMemTracker *memory.Tracker
evalCtx expression.EvalContext
enableVectorizedExpression bool
}

func newSelectionExecutorContext(sctx sessionctx.Context) selectionExecutorContext {
return selectionExecutorContext{
stmtMemTracker: sctx.GetSessionVars().StmtCtx.MemTracker,
evalCtx: sctx.GetExprCtx().GetEvalCtx(),
enableVectorizedExpression: sctx.GetSessionVars().EnableVectorizedExpression,
}
}

// SelectionExec represents a filter executor.
type SelectionExec struct {
exec.BaseExecutor
selectionExecutorContext
exec.BaseExecutorV2

batched bool
filters []expression.Expression
Expand All @@ -1542,7 +1557,7 @@ type SelectionExec struct {

// Open implements the Executor Open interface.
func (e *SelectionExec) Open(ctx context.Context) error {
if err := e.BaseExecutor.Open(ctx); err != nil {
if err := e.BaseExecutorV2.Open(ctx); err != nil {
return err
}
failpoint.Inject("mockSelectionExecBaseExecutorOpenReturnedError", func(val failpoint.Value) {
Expand All @@ -1559,7 +1574,7 @@ func (e *SelectionExec) open(context.Context) error {
} else {
e.memTracker = memory.NewTracker(e.ID(), -1)
}
e.memTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.MemTracker)
e.memTracker.AttachTo(e.stmtMemTracker)
e.childResult = exec.TryNewCacheChunk(e.Children(0))
e.memTracker.Consume(e.childResult.MemoryUsage())
e.batched = expression.Vectorizable(e.filters)
Expand All @@ -1578,7 +1593,7 @@ func (e *SelectionExec) Close() error {
e.childResult = nil
}
e.selected = nil
return e.BaseExecutor.Close()
return e.BaseExecutorV2.Close()
}

// Next implements the Executor Next interface.
Expand Down Expand Up @@ -1611,7 +1626,7 @@ func (e *SelectionExec) Next(ctx context.Context, req *chunk.Chunk) error {
if e.childResult.NumRows() == 0 {
return nil
}
e.selected, err = expression.VectorizedFilter(e.Ctx().GetExprCtx().GetEvalCtx(), e.Ctx().GetSessionVars().EnableVectorizedExpression, e.filters, e.inputIter, e.selected)
e.selected, err = expression.VectorizedFilter(e.evalCtx, e.enableVectorizedExpression, e.filters, e.inputIter, e.selected)
if err != nil {
return err
}
Expand All @@ -1623,10 +1638,10 @@ func (e *SelectionExec) Next(ctx context.Context, req *chunk.Chunk) error {
// For sql with "SETVAR" in filter and "GETVAR" in projection, for example: "SELECT @a FROM t WHERE (@a := 2) > 0",
// we have to set batch size to 1 to do the evaluation of filter and projection.
func (e *SelectionExec) unBatchedNext(ctx context.Context, chk *chunk.Chunk) error {
exprCtx := e.Ctx().GetExprCtx()
evalCtx := e.evalCtx
for {
for ; e.inputRow != e.inputIter.End(); e.inputRow = e.inputIter.Next() {
selected, _, err := expression.EvalBool(exprCtx.GetEvalCtx(), e.filters, e.inputRow)
selected, _, err := expression.EvalBool(evalCtx, e.filters, e.inputRow)
if err != nil {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/executor/executor_required_rows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,8 +482,9 @@ func TestSelectionRequiredRows(t *testing.T) {

func buildSelectionExec(ctx sessionctx.Context, filters []expression.Expression, src exec.Executor) exec.Executor {
return &SelectionExec{
BaseExecutor: exec.NewBaseExecutor(ctx, src.Schema(), 0, src),
filters: filters,
selectionExecutorContext: newSelectionExecutorContext(ctx),
BaseExecutorV2: exec.NewBaseExecutorV2(ctx.GetSessionVars(), src.Schema(), 0, src),
filters: filters,
}
}

Expand Down

0 comments on commit d93f2c2

Please sign in to comment.