Skip to content

Commit

Permalink
move BaseExecutorV2 outside
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Keao <[email protected]>
  • Loading branch information
YangKeao committed Feb 29, 2024
1 parent 8c4f2ed commit 5527ff3
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 13 deletions.
3 changes: 2 additions & 1 deletion pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3209,7 +3209,8 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
paging := b.ctx.GetSessionVars().EnablePaging

e := &TableReaderExecutor{
tableReaderExecutorContext: newTableReaderExecutorContext(b.ctx, v.Schema(), v.ID()),
BaseExecutorV2: exec.NewBaseExecutorV2(b.ctx.GetSessionVars(), v.Schema(), v.ID()),
tableReaderExecutorContext: newTableReaderExecutorContext(b.ctx),
dagPB: dagReq,
startTS: startTS,
txnScope: b.txnScope,
Expand Down
3 changes: 2 additions & 1 deletion pkg/executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,8 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, task *lookup
table = task.partitionTable
}
tableReaderExec := &TableReaderExecutor{
tableReaderExecutorContext: newTableReaderExecutorContext(e.Ctx(), e.Schema(), e.getTableRootPlanID()),
BaseExecutorV2: exec.NewBaseExecutorV2(e.Ctx().GetSessionVars(), e.Schema(), e.getTableRootPlanID()),
tableReaderExecutorContext: newTableReaderExecutorContext(e.Ctx()),
table: table,
dagPB: e.tableRequest,
startTS: e.startTS,
Expand Down
6 changes: 4 additions & 2 deletions pkg/executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,8 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context,
failpoint.Inject("testIndexMergePanicPartialTableWorker", nil)
var err error
partialTableReader := &TableReaderExecutor{
tableReaderExecutorContext: newTableReaderExecutorContext(e.Ctx(), ts.Schema(), e.getPartitalPlanID(workID)),
BaseExecutorV2: exec.NewBaseExecutorV2(e.Ctx().GetSessionVars(), ts.Schema(), e.getPartitalPlanID(workID)),
tableReaderExecutorContext: newTableReaderExecutorContext(e.Ctx()),
dagPB: e.dagPBs[workID],
startTS: e.startTS,
txnScope: e.txnScope,
Expand Down Expand Up @@ -785,7 +786,8 @@ func (e *IndexMergeReaderExecutor) startIndexMergeTableScanWorker(ctx context.Co

func (e *IndexMergeReaderExecutor) buildFinalTableReader(ctx context.Context, tbl table.Table, handles []kv.Handle) (_ exec.Executor, err error) {
tableReaderExec := &TableReaderExecutor{
tableReaderExecutorContext: newTableReaderExecutorContext(e.Ctx(), e.Schema(), e.getTablePlanRootID()),
BaseExecutorV2: exec.NewBaseExecutorV2(e.Ctx().GetSessionVars(), e.Schema(), e.getTablePlanRootID()),
tableReaderExecutorContext: newTableReaderExecutorContext(e.Ctx()),
table: tbl,
dagPB: e.tableRequest,
startTS: e.startTS,
Expand Down
14 changes: 6 additions & 8 deletions pkg/executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ type kvRangeBuilder interface {

// tableReaderExecutorContext is the execution context for the `TableReaderExecutor`
type tableReaderExecutorContext struct {
exec.BaseExecutorV2

dctx distsqlctx.DistSQLContext
pctx planctx.PlanContext
ectx exprctx.BuildContext
Expand All @@ -102,7 +100,7 @@ func (treCtx *tableReaderExecutorContext) GetDDLOwner(ctx context.Context) (*inf
return nil, errors.New("GetDDLOwner in a context without DDL")
}

func newTableReaderExecutorContext(sctx sessionctx.Context, schema *expression.Schema, id int) tableReaderExecutorContext {
func newTableReaderExecutorContext(sctx sessionctx.Context) tableReaderExecutorContext {
// Explicitly get `ownerManager` out of the closure to show that the `tableReaderExecutorContext` itself doesn't
// depend on `sctx` directly.
// The context of some tests don't have `DDL`, so make it optional
Expand All @@ -120,17 +118,17 @@ func newTableReaderExecutorContext(sctx sessionctx.Context, schema *expression.S
}

return tableReaderExecutorContext{
BaseExecutorV2: exec.NewBaseExecutorV2(sctx.GetSessionVars(), schema, id),
dctx: sctx.GetDistSQLCtx(),
pctx: sctx.GetPlanCtx(),
ectx: sctx.GetExprCtx(),
getDDLOwner: getDDLOwner,
dctx: sctx.GetDistSQLCtx(),
pctx: sctx.GetPlanCtx(),
ectx: sctx.GetExprCtx(),
getDDLOwner: getDDLOwner,
}
}

// TableReaderExecutor sends DAG request and reads table data from kv layer.
type TableReaderExecutor struct {
tableReaderExecutorContext
exec.BaseExecutorV2

table table.Table

Expand Down
3 changes: 2 additions & 1 deletion pkg/executor/table_readers_required_rows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ func buildTableReader(sctx sessionctx.Context) exec.Executor {
schema := expression.NewSchema(cols...)

e := &TableReaderExecutor{
tableReaderExecutorContext: newTableReaderExecutorContext(sctx, schema, 0),
BaseExecutorV2: exec.NewBaseExecutorV2(sctx.GetSessionVars(), schema, 0),
tableReaderExecutorContext: newTableReaderExecutorContext(sctx),
table: &tables.TableCommon{},
dagPB: buildMockDAGRequest(sctx),
selectResultHook: selectResultHook{mockSelectResult},
Expand Down

0 comments on commit 5527ff3

Please sign in to comment.