From d4758efaf440be7190e28612fca354fd78b95ade Mon Sep 17 00:00:00 2001 From: Arenatlx <314806019@qq.com> Date: Thu, 11 Apr 2024 19:02:00 +0800 Subject: [PATCH] planner: refine cop task as a capital one for latter pkg move (#52506) ref pingcap/tidb#52181 --- pkg/planner/core/exhaust_physical_plans.go | 4 +- pkg/planner/core/find_best_task.go | 16 +- pkg/planner/core/planbuilder.go | 2 +- pkg/planner/core/task.go | 246 ++------------------- pkg/planner/core/task_base.go | 225 +++++++++++++++++++ 5 files changed, 251 insertions(+), 242 deletions(-) diff --git a/pkg/planner/core/exhaust_physical_plans.go b/pkg/planner/core/exhaust_physical_plans.go index dac10422cd1c9..c93c9ae618d35 100644 --- a/pkg/planner/core/exhaust_physical_plans.go +++ b/pkg/planner/core/exhaust_physical_plans.go @@ -1075,7 +1075,7 @@ func (p *LogicalJoin) constructInnerTableScanTask( if usedStats != nil && usedStats.GetUsedInfo(ts.physicalTableID) != nil { ts.usedStatsInfo = usedStats.GetUsedInfo(ts.physicalTableID) } - copTask := &copTask{ + copTask := &CopTask{ tablePlan: ts, indexPlanFinished: true, tblColHists: ds.TblColHists, @@ -1237,7 +1237,7 @@ func (p *LogicalJoin) constructInnerIndexScanTask( tblColHists: ds.TblColHists, pkIsHandleCol: ds.getPKIsHandleCol(), }.Init(ds.SCtx(), ds.QueryBlockOffset()) - cop := &copTask{ + cop := &CopTask{ indexPlan: is, tblColHists: ds.TblColHists, tblCols: ds.TblCols, diff --git a/pkg/planner/core/find_best_task.go b/pkg/planner/core/find_best_task.go index 0f9d7abb26b56..ed50d9cc1b8b3 100644 --- a/pkg/planner/core/find_best_task.go +++ b/pkg/planner/core/find_best_task.go @@ -425,8 +425,8 @@ func getTaskPlanCost(t Task, pop *coreusage.PhysicalOptimizeOp) (float64, bool, switch t.(type) { case *RootTask: taskType = property.RootTaskType - case *copTask: // no need to know whether the task is single-read or double-read, so both CopSingleReadTaskType and CopDoubleReadTaskType are OK - cop := t.(*copTask) + case *CopTask: // no need to know whether the task is single-read or double-read, so both CopSingleReadTaskType and CopDoubleReadTaskType are OK + cop := t.(*CopTask) if cop.indexPlan != nil && cop.tablePlan != nil { // handle IndexLookup specially taskType = property.CopMultiReadTaskType // keep compatible with the old cost interface, for CopMultiReadTask, the cost is idxCost + tblCost. @@ -481,7 +481,7 @@ func getTaskPlanCost(t Task, pop *coreusage.PhysicalOptimizeOp) (float64, bool, // It's a very special case for index merge case. // t.plan() == nil in index merge COP case, it means indexPlanFinished is false in other words. cost := 0.0 - copTsk := t.(*copTask) + copTsk := t.(*CopTask) for _, partialScan := range copTsk.idxMergePartPlans { partialCost, err := getPlanCost(partialScan, taskType, coreusage.NewDefaultPlanCostOption().WithOptimizeTracer(pop)) if err != nil { @@ -1610,7 +1610,7 @@ func (ds *DataSource) convertToIndexMergeScan(prop *property.PhysicalProperty, c }) path := candidate.path scans := make([]PhysicalPlan, 0, len(path.PartialIndexPaths)) - cop := &copTask{ + cop := &CopTask{ indexPlanFinished: false, tblColHists: ds.TblColHists, } @@ -2019,7 +2019,7 @@ func (ds *DataSource) convertToIndexScan(prop *property.PhysicalProperty, } path := candidate.path is := ds.getOriginalPhysicalIndexScan(prop, path, candidate.isMatchProp, candidate.path.IsSingleScan) - cop := &copTask{ + cop := &CopTask{ indexPlan: is, tblColHists: ds.TblColHists, tblCols: ds.TblCols, @@ -2204,7 +2204,7 @@ func (is *PhysicalIndexScan) initSchema(idxExprCols []*expression.Column, isDoub is.SetSchema(expression.NewSchema(indexCols...)) } -func (is *PhysicalIndexScan) addPushedDownSelection(copTask *copTask, p *DataSource, path *util.AccessPath, finalStats *property.StatsInfo) { +func (is *PhysicalIndexScan) addPushedDownSelection(copTask *CopTask, p *DataSource, path *util.AccessPath, finalStats *property.StatsInfo) { // Add filter condition to table plan now. indexConds, tableConds := path.IndexFilters, path.TableFilters tableConds, copTask.rootTaskConds = SplitSelCondsWithVirtualColumn(tableConds) @@ -2477,7 +2477,7 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid // prop.TaskTp is cop related, just return invalidTask. return invalidTask, nil } - copTask := &copTask{ + copTask := &CopTask{ tablePlan: ts, indexPlanFinished: true, tblColHists: ds.TblColHists, @@ -2711,7 +2711,7 @@ func (ts *PhysicalTableScan) addPushedDownSelectionToMppTask(mpp *MppTask, stats return mpp } -func (ts *PhysicalTableScan) addPushedDownSelection(copTask *copTask, stats *property.StatsInfo) { +func (ts *PhysicalTableScan) addPushedDownSelection(copTask *CopTask, stats *property.StatsInfo) { ts.filterCondition, copTask.rootTaskConds = SplitSelCondsWithVirtualColumn(ts.filterCondition) var newRootConds []expression.Expression ts.filterCondition, newRootConds = expression.PushDownExprs(GetPushDownCtx(ts.SCtx()), ts.filterCondition, ts.StoreType) diff --git a/pkg/planner/core/planbuilder.go b/pkg/planner/core/planbuilder.go index 7fdb02e1e6ab7..fc2240147a45d 100644 --- a/pkg/planner/core/planbuilder.go +++ b/pkg/planner/core/planbuilder.go @@ -1555,7 +1555,7 @@ func (b *PlanBuilder) buildPhysicalIndexLookUpReader(_ context.Context, dbName m } } - cop := &copTask{ + cop := &CopTask{ indexPlan: is, tablePlan: ts, tblColHists: is.StatsInfo().HistColl, diff --git a/pkg/planner/core/task.go b/pkg/planner/core/task.go index f1a8f07642fa9..1967f16f271c4 100644 --- a/pkg/planner/core/task.go +++ b/pkg/planner/core/task.go @@ -29,91 +29,18 @@ import ( "github.com/pingcap/tidb/pkg/planner/core/internal/base" "github.com/pingcap/tidb/pkg/planner/property" "github.com/pingcap/tidb/pkg/planner/util" - "github.com/pingcap/tidb/pkg/statistics" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/pkg/util/collate" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/paging" "github.com/pingcap/tidb/pkg/util/plancodec" - "github.com/pingcap/tidb/pkg/util/size" "go.uber.org/zap" ) -var _ Task = &copTask{} - -// copTask is a task that runs in a distributed kv store. -// TODO: In future, we should split copTask to indexTask and tableTask. -type copTask struct { - indexPlan PhysicalPlan - tablePlan PhysicalPlan - // indexPlanFinished means we have finished index plan. - indexPlanFinished bool - // keepOrder indicates if the plan scans data by order. - keepOrder bool - // needExtraProj means an extra prune is needed because - // in double read / index merge cases, they may output one more column for handle(row id). - needExtraProj bool - // originSchema is the target schema to be projected to when needExtraProj is true. - originSchema *expression.Schema - - extraHandleCol *expression.Column - commonHandleCols []*expression.Column - // tblColHists stores the original stats of DataSource, it is used to get - // average row width when computing network cost. - tblColHists *statistics.HistColl - // tblCols stores the original columns of DataSource before being pruned, it - // is used to compute average row width when computing scan cost. - tblCols []*expression.Column - - idxMergePartPlans []PhysicalPlan - idxMergeIsIntersection bool - idxMergeAccessMVIndex bool - - // rootTaskConds stores select conditions containing virtual columns. - // These conditions can't push to TiKV, so we have to add a selection for rootTask - rootTaskConds []expression.Expression - - // For table partition. - physPlanPartInfo PhysPlanPartInfo - - // expectCnt is the expected row count of upper task, 0 for unlimited. - // It's used for deciding whether using paging distsql. - expectCnt uint64 -} - -// Invalid implements Task interface. -func (t *copTask) Invalid() bool { - return t.tablePlan == nil && t.indexPlan == nil && len(t.idxMergePartPlans) == 0 -} - -// Count implements Task interface. -func (t *copTask) Count() float64 { - if t.indexPlanFinished { - return t.tablePlan.StatsInfo().RowCount - } - return t.indexPlan.StatsInfo().RowCount -} - -// Copy implements Task interface. -func (t *copTask) Copy() Task { - nt := *t - return &nt -} - -// Plan implements Task interface. -// copTask plan should be careful with indexMergeReader, whose real plan is stored in -// idxMergePartPlans, when its indexPlanFinished is marked with false. -func (t *copTask) Plan() PhysicalPlan { - if t.indexPlanFinished { - return t.tablePlan - } - return t.indexPlan -} - func attachPlan2Task(p PhysicalPlan, t Task) Task { switch v := t.(type) { - case *copTask: + case *CopTask: if v.indexPlanFinished { p.SetChildren(v.tablePlan) v.tablePlan = p @@ -132,7 +59,7 @@ func attachPlan2Task(p PhysicalPlan, t Task) Task { } // finishIndexPlan means we no longer add plan to index plan, and compute the network cost for it. -func (t *copTask) finishIndexPlan() { +func (t *CopTask) finishIndexPlan() { if t.indexPlanFinished { return } @@ -150,7 +77,7 @@ func (t *copTask) finishIndexPlan() { } } -func (t *copTask) getStoreType() kv.StoreType { +func (t *CopTask) getStoreType() kv.StoreType { if t.tablePlan == nil { return kv.TiKV } @@ -167,42 +94,6 @@ func (t *copTask) getStoreType() kv.StoreType { return kv.TiKV } -// MemoryUsage return the memory usage of copTask -func (t *copTask) MemoryUsage() (sum int64) { - if t == nil { - return - } - - sum = size.SizeOfInterface*(2+int64(cap(t.idxMergePartPlans)+cap(t.rootTaskConds))) + size.SizeOfBool*3 + size.SizeOfUint64 + - size.SizeOfPointer*(3+int64(cap(t.commonHandleCols)+cap(t.tblCols))) + size.SizeOfSlice*4 + t.physPlanPartInfo.MemoryUsage() - if t.indexPlan != nil { - sum += t.indexPlan.MemoryUsage() - } - if t.tablePlan != nil { - sum += t.tablePlan.MemoryUsage() - } - if t.originSchema != nil { - sum += t.originSchema.MemoryUsage() - } - if t.extraHandleCol != nil { - sum += t.extraHandleCol.MemoryUsage() - } - - for _, col := range t.commonHandleCols { - sum += col.MemoryUsage() - } - for _, col := range t.tblCols { - sum += col.MemoryUsage() - } - for _, p := range t.idxMergePartPlans { - sum += p.MemoryUsage() - } - for _, expr := range t.rootTaskConds { - sum += expr.MemoryUsage() - } - return -} - // Attach2Task implements PhysicalPlan interface. func (p *basePhysicalPlan) Attach2Task(tasks ...Task) Task { t := tasks[0].ConvertToRootTask(p.SCtx()) @@ -563,8 +454,8 @@ func (p *PhysicalHashJoin) attach2TaskForMpp(tasks ...Task) Task { } func (p *PhysicalHashJoin) attach2TaskForTiFlash(tasks ...Task) Task { - lTask, lok := tasks[0].(*copTask) - rTask, rok := tasks[1].(*copTask) + lTask, lok := tasks[0].(*CopTask) + rTask, rok := tasks[1].(*CopTask) if !lok || !rok { return p.attach2TaskForMpp(tasks...) } @@ -577,7 +468,7 @@ func (p *PhysicalHashJoin) attach2TaskForTiFlash(tasks ...Task) Task { rTask.finishIndexPlan() } - task := &copTask{ + task := &CopTask{ tblColHists: rTask.tblColHists, indexPlanFinished: true, tablePlan: p, @@ -595,7 +486,7 @@ func (p *PhysicalMergeJoin) Attach2Task(tasks ...Task) Task { return t } -func buildIndexLookUpTask(ctx PlanContext, t *copTask) *RootTask { +func buildIndexLookUpTask(ctx PlanContext, t *CopTask) *RootTask { newTask := &RootTask{} p := PhysicalIndexLookUpReader{ tablePlan: t.tablePlan, @@ -664,114 +555,7 @@ func calcPagingCost(ctx PlanContext, indexPlan PhysicalPlan, expectCnt uint64) f return math.Max(pagingCst-sessVars.GetSeekFactor(nil), 0) } -// ConvertToRootTask implements Task interface. -func (t *copTask) ConvertToRootTask(ctx PlanContext) *RootTask { - // copy one to avoid changing itself. - return t.Copy().(*copTask).convertToRootTaskImpl(ctx) -} - -func (t *copTask) convertToRootTaskImpl(ctx PlanContext) *RootTask { - // copTasks are run in parallel, to make the estimated cost closer to execution time, we amortize - // the cost to cop iterator workers. According to `CopClient::Send`, the concurrency - // is Min(DistSQLScanConcurrency, numRegionsInvolvedInScan), since we cannot infer - // the number of regions involved, we simply use DistSQLScanConcurrency. - t.finishIndexPlan() - // Network cost of transferring rows of table scan to TiDB. - if t.tablePlan != nil { - tp := t.tablePlan - for len(tp.Children()) > 0 { - if len(tp.Children()) == 1 { - tp = tp.Children()[0] - } else { - join := tp.(*PhysicalHashJoin) - tp = join.children[1-join.InnerChildIdx] - } - } - ts := tp.(*PhysicalTableScan) - prevColumnLen := len(ts.Columns) - prevSchema := ts.schema.Clone() - ts.Columns = ExpandVirtualColumn(ts.Columns, ts.schema, ts.Table.Columns) - if !t.needExtraProj && len(ts.Columns) > prevColumnLen { - // Add an projection to make sure not to output extract columns. - t.needExtraProj = true - t.originSchema = prevSchema - } - } - newTask := &RootTask{} - if t.idxMergePartPlans != nil { - p := PhysicalIndexMergeReader{ - partialPlans: t.idxMergePartPlans, - tablePlan: t.tablePlan, - IsIntersectionType: t.idxMergeIsIntersection, - AccessMVIndex: t.idxMergeAccessMVIndex, - KeepOrder: t.keepOrder, - }.Init(ctx, t.idxMergePartPlans[0].QueryBlockOffset()) - p.PlanPartInfo = t.physPlanPartInfo - setTableScanToTableRowIDScan(p.tablePlan) - newTask.SetPlan(p) - t.handleRootTaskConds(ctx, newTask) - if t.needExtraProj { - schema := t.originSchema - proj := PhysicalProjection{Exprs: expression.Column2Exprs(schema.Columns)}.Init(ctx, p.StatsInfo(), t.idxMergePartPlans[0].QueryBlockOffset(), nil) - proj.SetSchema(schema) - proj.SetChildren(p) - newTask.SetPlan(proj) - } - return newTask - } - if t.indexPlan != nil && t.tablePlan != nil { - newTask = buildIndexLookUpTask(ctx, t) - } else if t.indexPlan != nil { - p := PhysicalIndexReader{indexPlan: t.indexPlan}.Init(ctx, t.indexPlan.QueryBlockOffset()) - p.PlanPartInfo = t.physPlanPartInfo - p.SetStats(t.indexPlan.StatsInfo()) - newTask.SetPlan(p) - } else { - tp := t.tablePlan - for len(tp.Children()) > 0 { - if len(tp.Children()) == 1 { - tp = tp.Children()[0] - } else { - join := tp.(*PhysicalHashJoin) - tp = join.children[1-join.InnerChildIdx] - } - } - ts := tp.(*PhysicalTableScan) - p := PhysicalTableReader{ - tablePlan: t.tablePlan, - StoreType: ts.StoreType, - IsCommonHandle: ts.Table.IsCommonHandle, - }.Init(ctx, t.tablePlan.QueryBlockOffset()) - p.PlanPartInfo = t.physPlanPartInfo - p.SetStats(t.tablePlan.StatsInfo()) - - // If agg was pushed down in Attach2Task(), the partial agg was placed on the top of tablePlan, the final agg was - // placed above the PhysicalTableReader, and the schema should have been set correctly for them, the schema of - // partial agg contains the columns needed by the final agg. - // If we add the projection here, the projection will be between the final agg and the partial agg, then the - // schema will be broken, the final agg will fail to find needed columns in ResolveIndices(). - // Besides, the agg would only be pushed down if it doesn't contain virtual columns, so virtual column should not be affected. - aggPushedDown := false - switch p.tablePlan.(type) { - case *PhysicalHashAgg, *PhysicalStreamAgg: - aggPushedDown = true - } - - if t.needExtraProj && !aggPushedDown { - proj := PhysicalProjection{Exprs: expression.Column2Exprs(t.originSchema.Columns)}.Init(ts.SCtx(), ts.StatsInfo(), ts.QueryBlockOffset(), nil) - proj.SetSchema(t.originSchema) - proj.SetChildren(p) - newTask.SetPlan(proj) - } else { - newTask.SetPlan(p) - } - } - - t.handleRootTaskConds(ctx, newTask) - return newTask -} - -func (t *copTask) handleRootTaskConds(ctx PlanContext, newTask *RootTask) { +func (t *CopTask) handleRootTaskConds(ctx PlanContext, newTask *RootTask) { if len(t.rootTaskConds) > 0 { selectivity, _, err := cardinality.Selectivity(ctx, t.tblColHists, t.rootTaskConds, nil) if err != nil { @@ -820,7 +604,7 @@ func (p *PhysicalLimit) Attach2Task(tasks ...Task) Task { } sunk := false - if cop, ok := t.(*copTask); ok { + if cop, ok := t.(*CopTask); ok { suspendLimitAboveTablePlan := func() { newCount := p.Offset + p.Count childProfile := cop.tablePlan.StatsInfo() @@ -844,7 +628,7 @@ func (p *PhysicalLimit) Attach2Task(tasks ...Task) Task { // but "regionNum" is unknown since the copTask can be a double read, so we ignore it now. stats := deriveLimitStats(childProfile, float64(newCount)) pushedDownLimit := PhysicalLimit{PartitionBy: newPartitionBy, Count: newCount}.Init(p.SCtx(), stats, p.QueryBlockOffset()) - cop = attachPlan2Task(pushedDownLimit, cop).(*copTask) + cop = attachPlan2Task(pushedDownLimit, cop).(*CopTask) // Don't use clone() so that Limit and its children share the same schema. Otherwise the virtual generated column may not be resolved right. pushedDownLimit.SetSchema(pushedDownLimit.children[0].Schema()) } @@ -1116,7 +900,7 @@ func (p *PhysicalTopN) containVirtualColumn(tCols []*expression.Column) bool { } // canPushDownToTiKV checks whether this topN can be pushed down to TiKV. -func (p *PhysicalTopN) canPushDownToTiKV(copTask *copTask) bool { +func (p *PhysicalTopN) canPushDownToTiKV(copTask *CopTask) bool { if !p.canExpressionConvertedToPB(kv.TiKV) { return false } @@ -1154,7 +938,7 @@ func (p *PhysicalTopN) Attach2Task(tasks ...Task) Task { cols = append(cols, expression.ExtractColumns(item.Expr)...) } needPushDown := len(cols) > 0 - if copTask, ok := t.(*copTask); ok && needPushDown && p.canPushDownToTiKV(copTask) && len(copTask.rootTaskConds) == 0 { + if copTask, ok := t.(*CopTask); ok && needPushDown && p.canPushDownToTiKV(copTask) && len(copTask.rootTaskConds) == 0 { // If all columns in topN are from index plan, we push it to index plan, otherwise we finish the index plan and // push it to table plan. var pushedDownTopN *PhysicalTopN @@ -1195,7 +979,7 @@ func (p *PhysicalExpand) Attach2Task(tasks ...Task) Task { // Attach2Task implements PhysicalPlan interface. func (p *PhysicalProjection) Attach2Task(tasks ...Task) Task { t := tasks[0].Copy() - if cop, ok := t.(*copTask); ok { + if cop, ok := t.(*CopTask); ok { if (len(cop.rootTaskConds) == 0 && len(cop.idxMergePartPlans) == 0) && expression.CanExprsPushDown(GetPushDownCtx(p.SCtx()), p.Exprs, cop.getStoreType()) { copTask := attachPlan2Task(p, cop) return copTask @@ -1916,7 +1700,7 @@ func computePartialCursorOffset(name string) int { // Attach2Task implements PhysicalPlan interface. func (p *PhysicalStreamAgg) Attach2Task(tasks ...Task) Task { t := tasks[0].Copy() - if cop, ok := t.(*copTask); ok { + if cop, ok := t.(*CopTask); ok { // We should not push agg down across // 1. double read, since the data of second read is ordered by handle instead of index. The `extraHandleCol` is added // if the double read needs to keep order. So we just use it to decided @@ -2420,7 +2204,7 @@ func (p *PhysicalHashAgg) attach2TaskForMpp(tasks ...Task) Task { // Attach2Task implements the PhysicalPlan interface. func (p *PhysicalHashAgg) Attach2Task(tasks ...Task) Task { t := tasks[0].Copy() - if cop, ok := t.(*copTask); ok { + if cop, ok := t.(*CopTask); ok { if len(cop.rootTaskConds) == 0 && len(cop.idxMergePartPlans) == 0 { copTaskType := cop.getStoreType() partialAgg, finalAgg := p.newPartialAggregate(copTaskType, false) diff --git a/pkg/planner/core/task_base.go b/pkg/planner/core/task_base.go index afe70e001fe7b..d4c172132409b 100644 --- a/pkg/planner/core/task_base.go +++ b/pkg/planner/core/task_base.go @@ -29,6 +29,7 @@ import ( var ( _ Task = &RootTask{} _ Task = &MppTask{} + _ Task = &CopTask{} ) // Task is a new version of `PhysicalPlanInfo`. It stores cost information for a task. @@ -48,6 +49,8 @@ type Task interface { MemoryUsage() int64 } +// ************************************* RootTask Start ****************************************** + // RootTask is the final sink node of a plan graph. It should be a single goroutine on tidb. type RootTask struct { p PhysicalPlan @@ -114,6 +117,10 @@ func (t *RootTask) MemoryUsage() (sum int64) { return sum } +// ************************************* RootTask End ****************************************** + +// ************************************* MPPTask Start ****************************************** + // MppTask can not : // 1. keep order // 2. support double read @@ -223,3 +230,221 @@ func (t *MppTask) ConvertToRootTaskImpl(ctx PlanContext) *RootTask { } return rt } + +// ************************************* MPPTask End ****************************************** + +// ************************************* CopTask Start ****************************************** + +// CopTask is a task that runs in a distributed kv store. +// TODO: In future, we should split copTask to indexTask and tableTask. +type CopTask struct { + indexPlan PhysicalPlan + tablePlan PhysicalPlan + // indexPlanFinished means we have finished index plan. + indexPlanFinished bool + // keepOrder indicates if the plan scans data by order. + keepOrder bool + // needExtraProj means an extra prune is needed because + // in double read / index merge cases, they may output one more column for handle(row id). + needExtraProj bool + // originSchema is the target schema to be projected to when needExtraProj is true. + originSchema *expression.Schema + + extraHandleCol *expression.Column + commonHandleCols []*expression.Column + // tblColHists stores the original stats of DataSource, it is used to get + // average row width when computing network cost. + tblColHists *statistics.HistColl + // tblCols stores the original columns of DataSource before being pruned, it + // is used to compute average row width when computing scan cost. + tblCols []*expression.Column + + idxMergePartPlans []PhysicalPlan + idxMergeIsIntersection bool + idxMergeAccessMVIndex bool + + // rootTaskConds stores select conditions containing virtual columns. + // These conditions can't push to TiKV, so we have to add a selection for rootTask + rootTaskConds []expression.Expression + + // For table partition. + physPlanPartInfo PhysPlanPartInfo + + // expectCnt is the expected row count of upper task, 0 for unlimited. + // It's used for deciding whether using paging distsql. + expectCnt uint64 +} + +// Invalid implements Task interface. +func (t *CopTask) Invalid() bool { + return t.tablePlan == nil && t.indexPlan == nil && len(t.idxMergePartPlans) == 0 +} + +// Count implements Task interface. +func (t *CopTask) Count() float64 { + if t.indexPlanFinished { + return t.tablePlan.StatsInfo().RowCount + } + return t.indexPlan.StatsInfo().RowCount +} + +// Copy implements Task interface. +func (t *CopTask) Copy() Task { + nt := *t + return &nt +} + +// Plan implements Task interface. +// copTask plan should be careful with indexMergeReader, whose real plan is stored in +// idxMergePartPlans, when its indexPlanFinished is marked with false. +func (t *CopTask) Plan() PhysicalPlan { + if t.indexPlanFinished { + return t.tablePlan + } + return t.indexPlan +} + +// MemoryUsage return the memory usage of copTask +func (t *CopTask) MemoryUsage() (sum int64) { + if t == nil { + return + } + + sum = size.SizeOfInterface*(2+int64(cap(t.idxMergePartPlans)+cap(t.rootTaskConds))) + size.SizeOfBool*3 + size.SizeOfUint64 + + size.SizeOfPointer*(3+int64(cap(t.commonHandleCols)+cap(t.tblCols))) + size.SizeOfSlice*4 + t.physPlanPartInfo.MemoryUsage() + if t.indexPlan != nil { + sum += t.indexPlan.MemoryUsage() + } + if t.tablePlan != nil { + sum += t.tablePlan.MemoryUsage() + } + if t.originSchema != nil { + sum += t.originSchema.MemoryUsage() + } + if t.extraHandleCol != nil { + sum += t.extraHandleCol.MemoryUsage() + } + + for _, col := range t.commonHandleCols { + sum += col.MemoryUsage() + } + for _, col := range t.tblCols { + sum += col.MemoryUsage() + } + for _, p := range t.idxMergePartPlans { + sum += p.MemoryUsage() + } + for _, expr := range t.rootTaskConds { + sum += expr.MemoryUsage() + } + return +} + +// ConvertToRootTask implements Task interface. +func (t *CopTask) ConvertToRootTask(ctx PlanContext) *RootTask { + // copy one to avoid changing itself. + return t.Copy().(*CopTask).convertToRootTaskImpl(ctx) +} + +func (t *CopTask) convertToRootTaskImpl(ctx PlanContext) *RootTask { + // copTasks are run in parallel, to make the estimated cost closer to execution time, we amortize + // the cost to cop iterator workers. According to `CopClient::Send`, the concurrency + // is Min(DistSQLScanConcurrency, numRegionsInvolvedInScan), since we cannot infer + // the number of regions involved, we simply use DistSQLScanConcurrency. + t.finishIndexPlan() + // Network cost of transferring rows of table scan to TiDB. + if t.tablePlan != nil { + tp := t.tablePlan + for len(tp.Children()) > 0 { + if len(tp.Children()) == 1 { + tp = tp.Children()[0] + } else { + join := tp.(*PhysicalHashJoin) + tp = join.children[1-join.InnerChildIdx] + } + } + ts := tp.(*PhysicalTableScan) + prevColumnLen := len(ts.Columns) + prevSchema := ts.schema.Clone() + ts.Columns = ExpandVirtualColumn(ts.Columns, ts.schema, ts.Table.Columns) + if !t.needExtraProj && len(ts.Columns) > prevColumnLen { + // Add a projection to make sure not to output extract columns. + t.needExtraProj = true + t.originSchema = prevSchema + } + } + newTask := &RootTask{} + if t.idxMergePartPlans != nil { + p := PhysicalIndexMergeReader{ + partialPlans: t.idxMergePartPlans, + tablePlan: t.tablePlan, + IsIntersectionType: t.idxMergeIsIntersection, + AccessMVIndex: t.idxMergeAccessMVIndex, + KeepOrder: t.keepOrder, + }.Init(ctx, t.idxMergePartPlans[0].QueryBlockOffset()) + p.PlanPartInfo = t.physPlanPartInfo + setTableScanToTableRowIDScan(p.tablePlan) + newTask.SetPlan(p) + t.handleRootTaskConds(ctx, newTask) + if t.needExtraProj { + schema := t.originSchema + proj := PhysicalProjection{Exprs: expression.Column2Exprs(schema.Columns)}.Init(ctx, p.StatsInfo(), t.idxMergePartPlans[0].QueryBlockOffset(), nil) + proj.SetSchema(schema) + proj.SetChildren(p) + newTask.SetPlan(proj) + } + return newTask + } + if t.indexPlan != nil && t.tablePlan != nil { + newTask = buildIndexLookUpTask(ctx, t) + } else if t.indexPlan != nil { + p := PhysicalIndexReader{indexPlan: t.indexPlan}.Init(ctx, t.indexPlan.QueryBlockOffset()) + p.PlanPartInfo = t.physPlanPartInfo + p.SetStats(t.indexPlan.StatsInfo()) + newTask.SetPlan(p) + } else { + tp := t.tablePlan + for len(tp.Children()) > 0 { + if len(tp.Children()) == 1 { + tp = tp.Children()[0] + } else { + join := tp.(*PhysicalHashJoin) + tp = join.children[1-join.InnerChildIdx] + } + } + ts := tp.(*PhysicalTableScan) + p := PhysicalTableReader{ + tablePlan: t.tablePlan, + StoreType: ts.StoreType, + IsCommonHandle: ts.Table.IsCommonHandle, + }.Init(ctx, t.tablePlan.QueryBlockOffset()) + p.PlanPartInfo = t.physPlanPartInfo + p.SetStats(t.tablePlan.StatsInfo()) + + // If agg was pushed down in Attach2Task(), the partial agg was placed on the top of tablePlan, the final agg was + // placed above the PhysicalTableReader, and the schema should have been set correctly for them, the schema of + // partial agg contains the columns needed by the final agg. + // If we add the projection here, the projection will be between the final agg and the partial agg, then the + // schema will be broken, the final agg will fail to find needed columns in ResolveIndices(). + // Besides, the agg would only be pushed down if it doesn't contain virtual columns, so virtual column should not be affected. + aggPushedDown := false + switch p.tablePlan.(type) { + case *PhysicalHashAgg, *PhysicalStreamAgg: + aggPushedDown = true + } + + if t.needExtraProj && !aggPushedDown { + proj := PhysicalProjection{Exprs: expression.Column2Exprs(t.originSchema.Columns)}.Init(ts.SCtx(), ts.StatsInfo(), ts.QueryBlockOffset(), nil) + proj.SetSchema(t.originSchema) + proj.SetChildren(p) + newTask.SetPlan(proj) + } else { + newTask.SetPlan(p) + } + } + + t.handleRootTaskConds(ctx, newTask) + return newTask +} + +// ************************************* CopTask End ******************************************