From d93f2c2954367d55532acd636b61d6ddb876b396 Mon Sep 17 00:00:00 2001 From: Yang Keao Date: Mon, 15 Jul 2024 14:07:36 +0800 Subject: [PATCH] Use the BaseExecutorV2 to refactor the selection executor Signed-off-by: Yang Keao --- pkg/executor/builder.go | 10 ++++--- pkg/executor/executor.go | 29 ++++++++++++++++----- pkg/executor/executor_required_rows_test.go | 5 ++-- 3 files changed, 31 insertions(+), 13 deletions(-) diff --git a/pkg/executor/builder.go b/pkg/executor/builder.go index e9351171f72438..a6d7fbe031f943 100644 --- a/pkg/executor/builder.go +++ b/pkg/executor/builder.go @@ -1963,8 +1963,9 @@ func (b *executorBuilder) buildSelection(v *plannercore.PhysicalSelection) exec. return nil } e := &SelectionExec{ - BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID(), childExec), - filters: v.Conditions, + selectionExecutorContext: newSelectionExecutorContext(b.ctx), + BaseExecutorV2: exec.NewBaseExecutorV2(b.ctx.GetSessionVars(), v.Schema(), v.ID(), childExec), + filters: v.Conditions, } return e } @@ -4264,8 +4265,9 @@ func (builder *dataReaderBuilder) buildExecutorForIndexJoinInternal(ctx context. return nil, err } exec := &SelectionExec{ - BaseExecutor: exec.NewBaseExecutor(builder.ctx, v.Schema(), v.ID(), childExec), - filters: v.Conditions, + selectionExecutorContext: newSelectionExecutorContext(builder.ctx), + BaseExecutorV2: exec.NewBaseExecutorV2(builder.ctx.GetSessionVars(), v.Schema(), v.ID(), childExec), + filters: v.Conditions, } err = exec.open(ctx) return exec, err diff --git a/pkg/executor/executor.go b/pkg/executor/executor.go index 8f48bcbc94d573..929ff23884ce14 100644 --- a/pkg/executor/executor.go +++ b/pkg/executor/executor.go @@ -1526,9 +1526,24 @@ func (e *TableDualExec) Next(_ context.Context, req *chunk.Chunk) error { return nil } +type selectionExecutorContext struct { + stmtMemTracker *memory.Tracker + evalCtx expression.EvalContext + enableVectorizedExpression bool +} + +func newSelectionExecutorContext(sctx sessionctx.Context) selectionExecutorContext { + return selectionExecutorContext{ + stmtMemTracker: sctx.GetSessionVars().StmtCtx.MemTracker, + evalCtx: sctx.GetExprCtx().GetEvalCtx(), + enableVectorizedExpression: sctx.GetSessionVars().EnableVectorizedExpression, + } +} + // SelectionExec represents a filter executor. type SelectionExec struct { - exec.BaseExecutor + selectionExecutorContext + exec.BaseExecutorV2 batched bool filters []expression.Expression @@ -1542,7 +1557,7 @@ type SelectionExec struct { // Open implements the Executor Open interface. func (e *SelectionExec) Open(ctx context.Context) error { - if err := e.BaseExecutor.Open(ctx); err != nil { + if err := e.BaseExecutorV2.Open(ctx); err != nil { return err } failpoint.Inject("mockSelectionExecBaseExecutorOpenReturnedError", func(val failpoint.Value) { @@ -1559,7 +1574,7 @@ func (e *SelectionExec) open(context.Context) error { } else { e.memTracker = memory.NewTracker(e.ID(), -1) } - e.memTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.MemTracker) + e.memTracker.AttachTo(e.stmtMemTracker) e.childResult = exec.TryNewCacheChunk(e.Children(0)) e.memTracker.Consume(e.childResult.MemoryUsage()) e.batched = expression.Vectorizable(e.filters) @@ -1578,7 +1593,7 @@ func (e *SelectionExec) Close() error { e.childResult = nil } e.selected = nil - return e.BaseExecutor.Close() + return e.BaseExecutorV2.Close() } // Next implements the Executor Next interface. @@ -1611,7 +1626,7 @@ func (e *SelectionExec) Next(ctx context.Context, req *chunk.Chunk) error { if e.childResult.NumRows() == 0 { return nil } - e.selected, err = expression.VectorizedFilter(e.Ctx().GetExprCtx().GetEvalCtx(), e.Ctx().GetSessionVars().EnableVectorizedExpression, e.filters, e.inputIter, e.selected) + e.selected, err = expression.VectorizedFilter(e.evalCtx, e.enableVectorizedExpression, e.filters, e.inputIter, e.selected) if err != nil { return err } @@ -1623,10 +1638,10 @@ func (e *SelectionExec) Next(ctx context.Context, req *chunk.Chunk) error { // For sql with "SETVAR" in filter and "GETVAR" in projection, for example: "SELECT @a FROM t WHERE (@a := 2) > 0", // we have to set batch size to 1 to do the evaluation of filter and projection. func (e *SelectionExec) unBatchedNext(ctx context.Context, chk *chunk.Chunk) error { - exprCtx := e.Ctx().GetExprCtx() + evalCtx := e.evalCtx for { for ; e.inputRow != e.inputIter.End(); e.inputRow = e.inputIter.Next() { - selected, _, err := expression.EvalBool(exprCtx.GetEvalCtx(), e.filters, e.inputRow) + selected, _, err := expression.EvalBool(evalCtx, e.filters, e.inputRow) if err != nil { return err } diff --git a/pkg/executor/executor_required_rows_test.go b/pkg/executor/executor_required_rows_test.go index b2782935b5cdff..820222853d0110 100644 --- a/pkg/executor/executor_required_rows_test.go +++ b/pkg/executor/executor_required_rows_test.go @@ -482,8 +482,9 @@ func TestSelectionRequiredRows(t *testing.T) { func buildSelectionExec(ctx sessionctx.Context, filters []expression.Expression, src exec.Executor) exec.Executor { return &SelectionExec{ - BaseExecutor: exec.NewBaseExecutor(ctx, src.Schema(), 0, src), - filters: filters, + selectionExecutorContext: newSelectionExecutorContext(ctx), + BaseExecutorV2: exec.NewBaseExecutorV2(ctx.GetSessionVars(), src.Schema(), 0, src), + filters: filters, } }