From 1f6c883c37bc40ba7510cba42538ddab24043630 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 15 Aug 2023 10:18:30 +0800 Subject: [PATCH] planner, executor: fix the logic for inline projection of executors (#45158) (#45332) close pingcap/tidb#45112 --- executor/benchmark_test.go | 27 +++++++- executor/builder.go | 45 ++++++++++--- planner/cascades/implementation_rules.go | 2 + planner/core/pb_to_plan.go | 18 +++-- planner/core/resolve_indices.go | 83 +++++++++++++++++++++++- 5 files changed, 159 insertions(+), 16 deletions(-) diff --git a/executor/benchmark_test.go b/executor/benchmark_test.go index 3f64164332ce7..5bff882e426f6 100644 --- a/executor/benchmark_test.go +++ b/executor/benchmark_test.go @@ -931,7 +931,7 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor) }, } - childrenUsedSchema := markChildrenUsedCols(e.Schema(), e.children[0].Schema(), e.children[1].Schema()) + childrenUsedSchema := markChildrenUsedColsForTest(e.Schema(), e.children[0].Schema(), e.children[0].Schema()) defaultValues := make([]types.Datum, e.buildWorker.buildSideExec.Schema().Len()) lhsTypes, rhsTypes := retTypes(innerExec), retTypes(outerExec) for i := uint(0); i < e.concurrency; i++ { @@ -958,6 +958,31 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor) return e } +// markChildrenUsedColsForTest compares each child with the output schema, and mark +// each column of the child is used by output or not. +func markChildrenUsedColsForTest(outputSchema *expression.Schema, childSchemas ...*expression.Schema) (childrenUsed [][]bool) { + childrenUsed = make([][]bool, 0, len(childSchemas)) + markedOffsets := make(map[int]struct{}) + for _, col := range outputSchema.Columns { + markedOffsets[col.Index] = struct{}{} + } + prefixLen := 0 + for _, childSchema := range childSchemas { + used := make([]bool, len(childSchema.Columns)) + for i := range childSchema.Columns { + if _, ok := markedOffsets[prefixLen+i]; ok { + used[i] = true + } + } + childrenUsed = append(childrenUsed, used) + } + for _, child := range childSchemas { + used := expression.GetUsedList(outputSchema.Columns, child) + childrenUsed = append(childrenUsed, used) + } + return +} + func benchmarkHashJoinExecWithCase(b *testing.B, casTest *hashJoinTestCase) { opt1 := mockDataSourceParameters{ rows: casTest.rows, diff --git a/executor/builder.go b/executor/builder.go index 711af3c8318c6..946d25a12d100 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -716,7 +716,7 @@ func (b *executorBuilder) buildLimit(v *plannercore.PhysicalLimit) Executor { end: v.Offset + v.Count, } - childUsedSchema := markChildrenUsedCols(v.Schema(), v.Children()[0].Schema())[0] + childUsedSchema := markChildrenUsedCols(v.Schema().Columns, v.Children()[0].Schema())[0] e.columnIdxsUsedByChild = make([]int, 0, len(childUsedSchema)) for i, used := range childUsedSchema { if used { @@ -1351,6 +1351,11 @@ func (b *executorBuilder) buildMergeJoin(v *plannercore.PhysicalMergeJoin) Execu } } + colsFromChildren := v.Schema().Columns + if v.JoinType == plannercore.LeftOuterSemiJoin || v.JoinType == plannercore.AntiLeftOuterSemiJoin { + colsFromChildren = colsFromChildren[:len(colsFromChildren)-1] + } + e := &MergeJoinExec{ stmtCtx: b.ctx.GetSessionVars().StmtCtx, baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID(), leftExec, rightExec), @@ -1363,7 +1368,7 @@ func (b *executorBuilder) buildMergeJoin(v *plannercore.PhysicalMergeJoin) Execu v.OtherConditions, retTypes(leftExec), retTypes(rightExec), - markChildrenUsedCols(v.Schema(), v.Children()[0].Schema(), v.Children()[1].Schema()), + markChildrenUsedCols(colsFromChildren, v.Children()[0].Schema(), v.Children()[1].Schema()), false, ), isOuterJoin: v.JoinType.IsOuterJoin(), @@ -1492,7 +1497,11 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo probeNAKeColIdx[i] = probeNAKeys[i].Index } isNAJoin := len(v.LeftNAJoinKeys) > 0 - childrenUsedSchema := markChildrenUsedCols(v.Schema(), v.Children()[0].Schema(), v.Children()[1].Schema()) + colsFromChildren := v.Schema().Columns + if v.JoinType == plannercore.LeftOuterSemiJoin || v.JoinType == plannercore.AntiLeftOuterSemiJoin { + colsFromChildren = colsFromChildren[:len(colsFromChildren)-1] + } + childrenUsedSchema := markChildrenUsedCols(colsFromChildren, v.Children()[0].Schema(), v.Children()[1].Schema()) for i := uint(0); i < e.concurrency; i++ { e.probeWorkers[i] = &probeWorker{ hashJoinCtx: e.hashJoinCtx, @@ -2973,10 +2982,22 @@ func constructDistExec(sctx sessionctx.Context, plans []plannercore.PhysicalPlan // markChildrenUsedCols compares each child with the output schema, and mark // each column of the child is used by output or not. -func markChildrenUsedCols(outputSchema *expression.Schema, childSchema ...*expression.Schema) (childrenUsed [][]bool) { - for _, child := range childSchema { - used := expression.GetUsedList(outputSchema.Columns, child) +func markChildrenUsedCols(outputCols []*expression.Column, childSchemas ...*expression.Schema) (childrenUsed [][]bool) { + childrenUsed = make([][]bool, 0, len(childSchemas)) + markedOffsets := make(map[int]struct{}) + for _, col := range outputCols { + markedOffsets[col.Index] = struct{}{} + } + prefixLen := 0 + for _, childSchema := range childSchemas { + used := make([]bool, len(childSchema.Columns)) + for i := range childSchema.Columns { + if _, ok := markedOffsets[prefixLen+i]; ok { + used[i] = true + } + } childrenUsed = append(childrenUsed, used) + prefixLen += childSchema.Len() } return } @@ -3154,7 +3175,11 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plannercore.PhysicalIndexJoin) lastColHelper: v.CompareFilters, finished: &atomic.Value{}, } - childrenUsedSchema := markChildrenUsedCols(v.Schema(), v.Children()[0].Schema(), v.Children()[1].Schema()) + colsFromChildren := v.Schema().Columns + if v.JoinType == plannercore.LeftOuterSemiJoin || v.JoinType == plannercore.AntiLeftOuterSemiJoin { + colsFromChildren = colsFromChildren[:len(colsFromChildren)-1] + } + childrenUsedSchema := markChildrenUsedCols(colsFromChildren, v.Children()[0].Schema(), v.Children()[1].Schema()) e.joiner = newJoiner(b.ctx, v.JoinType, v.InnerChildIdx == 0, defaultValues, v.OtherConditions, leftTypes, rightTypes, childrenUsedSchema, false) outerKeyCols := make([]int, len(v.OuterJoinKeys)) for i := 0; i < len(v.OuterJoinKeys); i++ { @@ -3276,7 +3301,11 @@ func (b *executorBuilder) buildIndexLookUpMergeJoin(v *plannercore.PhysicalIndex keyOff2IdxOff: v.KeyOff2IdxOff, lastColHelper: v.CompareFilters, } - childrenUsedSchema := markChildrenUsedCols(v.Schema(), v.Children()[0].Schema(), v.Children()[1].Schema()) + colsFromChildren := v.Schema().Columns + if v.JoinType == plannercore.LeftOuterSemiJoin || v.JoinType == plannercore.AntiLeftOuterSemiJoin { + colsFromChildren = colsFromChildren[:len(colsFromChildren)-1] + } + childrenUsedSchema := markChildrenUsedCols(colsFromChildren, v.Children()[0].Schema(), v.Children()[1].Schema()) joiners := make([]joiner, e.ctx.GetSessionVars().IndexLookupJoinConcurrency()) for i := 0; i < len(joiners); i++ { joiners[i] = newJoiner(b.ctx, v.JoinType, v.InnerChildIdx == 0, defaultValues, v.OtherConditions, leftTypes, rightTypes, childrenUsedSchema, false) diff --git a/planner/cascades/implementation_rules.go b/planner/cascades/implementation_rules.go index a1d8ff0ab4c1c..dbf5d1c716782 100644 --- a/planner/cascades/implementation_rules.go +++ b/planner/cascades/implementation_rules.go @@ -359,6 +359,7 @@ func (*ImplLimit) OnImplement(expr *memo.GroupExpr, _ *property.PhysicalProperty Offset: logicalLimit.Offset, Count: logicalLimit.Count, }.Init(logicalLimit.SCtx(), expr.Group.Prop.Stats, logicalLimit.SelectBlockOffset(), newProp) + physicalLimit.SetSchema(expr.Group.Prop.Schema.Clone()) return []memo.Implementation{impl.NewLimitImpl(physicalLimit)}, nil } @@ -420,6 +421,7 @@ func (*ImplTopNAsLimit) OnImplement(expr *memo.GroupExpr, _ *property.PhysicalPr Offset: lt.Offset, Count: lt.Count, }.Init(lt.SCtx(), expr.Group.Prop.Stats, lt.SelectBlockOffset(), newProp) + physicalLimit.SetSchema(expr.Group.Prop.Schema.Clone()) return []memo.Implementation{impl.NewLimitImpl(physicalLimit)}, nil } diff --git a/planner/core/pb_to_plan.go b/planner/core/pb_to_plan.go index 5c8e47918a60f..9b00624374fd2 100644 --- a/planner/core/pb_to_plan.go +++ b/planner/core/pb_to_plan.go @@ -48,20 +48,17 @@ func NewPBPlanBuilder(sctx sessionctx.Context, is infoschema.InfoSchema, ranges func (b *PBPlanBuilder) Build(executors []*tipb.Executor) (p PhysicalPlan, err error) { var src PhysicalPlan for i := 0; i < len(executors); i++ { - curr, err := b.pbToPhysicalPlan(executors[i]) + curr, err := b.pbToPhysicalPlan(executors[i], src) if err != nil { return nil, errors.Trace(err) } - if src != nil { - curr.SetChildren(src) - } src = curr } _, src = b.predicatePushDown(src, nil) return src, nil } -func (b *PBPlanBuilder) pbToPhysicalPlan(e *tipb.Executor) (p PhysicalPlan, err error) { +func (b *PBPlanBuilder) pbToPhysicalPlan(e *tipb.Executor, subPlan PhysicalPlan) (p PhysicalPlan, err error) { switch e.Tp { case tipb.ExecType_TypeTableScan: p, err = b.pbToTableScan(e) @@ -81,6 +78,17 @@ func (b *PBPlanBuilder) pbToPhysicalPlan(e *tipb.Executor) (p PhysicalPlan, err // TODO: Support other types. err = errors.Errorf("this exec type %v doesn't support yet", e.GetTp()) } + if subPlan != nil { + p.SetChildren(subPlan) + } + // The limit missed its output cols via the protobuf. + // We need to add it back and do a ResolveIndicies for the later inline projection. + if limit, ok := p.(*PhysicalLimit); ok { + limit.SetSchema(p.Children()[0].Schema().Clone()) + for i, col := range limit.Schema().Columns { + col.Index = i + } + } return p, err } diff --git a/planner/core/resolve_indices.go b/planner/core/resolve_indices.go index b602c46a78bd2..3d6efcf4d40d1 100644 --- a/planner/core/resolve_indices.go +++ b/planner/core/resolve_indices.go @@ -121,12 +121,33 @@ func (p *PhysicalHashJoin) ResolveIndicesItself() (err error) { return err } } + + mergedSchema := expression.MergeSchema(lSchema, rSchema) + for i, expr := range p.OtherConditions { - p.OtherConditions[i], err = expr.ResolveIndices(expression.MergeSchema(lSchema, rSchema)) + p.OtherConditions[i], err = expr.ResolveIndices(mergedSchema) if err != nil { return err } } + + colsNeedResolving := p.schema.Len() + // The last output column of this two join is the generated column to indicate whether the row is matched or not. + if p.JoinType == LeftOuterSemiJoin || p.JoinType == AntiLeftOuterSemiJoin { + colsNeedResolving-- + } + // To avoid that two plan shares the same column slice. + shallowColSlice := make([]*expression.Column, p.schema.Len()) + copy(shallowColSlice, p.schema.Columns) + p.schema = expression.NewSchema(shallowColSlice...) + for i := 0; i < colsNeedResolving; i++ { + newCol, err := p.schema.Columns[i].ResolveIndices(mergedSchema) + if err != nil { + return err + } + p.schema.Columns[i] = newCol.(*expression.Column) + } + return } @@ -173,11 +194,31 @@ func (p *PhysicalMergeJoin) ResolveIndices() (err error) { return err } } + + mergedSchema := expression.MergeSchema(lSchema, rSchema) + for i, expr := range p.OtherConditions { - p.OtherConditions[i], err = expr.ResolveIndices(expression.MergeSchema(lSchema, rSchema)) + p.OtherConditions[i], err = expr.ResolveIndices(mergedSchema) + if err != nil { + return err + } + } + + colsNeedResolving := p.schema.Len() + // The last output column of this two join is the generated column to indicate whether the row is matched or not. + if p.JoinType == LeftOuterSemiJoin || p.JoinType == AntiLeftOuterSemiJoin { + colsNeedResolving-- + } + // To avoid that two plan shares the same column slice. + shallowColSlice := make([]*expression.Column, p.schema.Len()) + copy(shallowColSlice, p.schema.Columns) + p.schema = expression.NewSchema(shallowColSlice...) + for i := 0; i < colsNeedResolving; i++ { + newCol, err := p.schema.Columns[i].ResolveIndices(mergedSchema) if err != nil { return err } + p.schema.Columns[i] = newCol.(*expression.Column) } return } @@ -245,6 +286,24 @@ func (p *PhysicalIndexJoin) ResolveIndices() (err error) { } p.OuterHashKeys[i], p.InnerHashKeys[i] = outerKey.(*expression.Column), innerKey.(*expression.Column) } + + colsNeedResolving := p.schema.Len() + // The last output column of this two join is the generated column to indicate whether the row is matched or not. + if p.JoinType == LeftOuterSemiJoin || p.JoinType == AntiLeftOuterSemiJoin { + colsNeedResolving-- + } + // To avoid that two plan shares the same column slice. + shallowColSlice := make([]*expression.Column, p.schema.Len()) + copy(shallowColSlice, p.schema.Columns) + p.schema = expression.NewSchema(shallowColSlice...) + for i := 0; i < colsNeedResolving; i++ { + newCol, err := p.schema.Columns[i].ResolveIndices(mergedSchema) + if err != nil { + return err + } + p.schema.Columns[i] = newCol.(*expression.Column) + } + return } @@ -559,6 +618,26 @@ func (p *PhysicalTopN) ResolveIndices() (err error) { return err } } + return nil +} + +// ResolveIndices implements Plan interface. +func (p *PhysicalLimit) ResolveIndices() (err error) { + err = p.basePhysicalPlan.ResolveIndices() + if err != nil { + return err + } + // To avoid that two plan shares the same column slice. + shallowColSlice := make([]*expression.Column, p.schema.Len()) + copy(shallowColSlice, p.schema.Columns) + p.schema = expression.NewSchema(shallowColSlice...) + for i, col := range p.schema.Columns { + newCol, err := col.ResolveIndices(p.children[0].Schema()) + if err != nil { + return err + } + p.schema.Columns[i] = newCol.(*expression.Column) + } return }