Skip to content

Commit

Permalink
planner: add 2pb logic and fix some bugs for partitionTopN (#42334)
Browse files Browse the repository at this point in the history
ref #39792, close #42321
  • Loading branch information
windtalker authored Mar 17, 2023
1 parent 17ba09d commit 4ae5be1
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@
" └─Window 1.00 root row_number()->Column#4 over(partition by test.t.b rows between current row and current row)",
" └─Sort 1.00 root test.t.b",
" └─TableReader 1.00 root data:Limit",
" └─Limit 1.00 cop[tikv] offset:0, count:1",
" └─Limit 1.00 cop[tikv] partition by test.t.b, offset:0, count:1",
" └─TableFullScan 1.00 cop[tikv] table:t keep order:false, stats:pseudo"
],
"Res": [
Expand Down Expand Up @@ -403,7 +403,7 @@
" └─Window 3.00 root row_number()->Column#4 over(partition by test.t.b rows between current row and current row)",
" └─Sort 3.00 root test.t.b",
" └─TableReader 3.00 root data:Limit",
" └─Limit 3.00 cop[tikv] offset:0, count:3",
" └─Limit 3.00 cop[tikv] partition by test.t.b, offset:0, count:3",
" └─Selection 3.00 cop[tikv] ge(test.t.a, 2)",
" └─TableFullScan 9.00 cop[tikv] table:t keep order:false, stats:pseudo"
],
Expand Down Expand Up @@ -452,7 +452,7 @@
" └─Window 1.00 root row_number()->Column#4 over(partition by test.td.b rows between current row and current row)",
" └─Sort 1.00 root test.td.b",
" └─TableReader 1.00 root data:Limit",
" └─Limit 1.00 cop[tikv] offset:0, count:1",
" └─Limit 1.00 cop[tikv] partition by test.td.b, offset:0, count:1",
" └─TableFullScan 1.00 cop[tikv] table:td keep order:false, stats:pseudo"
],
"Res": [
Expand Down
28 changes: 16 additions & 12 deletions planner/core/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,12 +364,14 @@ func (p *PhysicalSort) ExplainInfo() string {

// ExplainInfo implements Plan interface.
func (p *PhysicalLimit) ExplainInfo() string {
var str strings.Builder
str.WriteString("offset:")
str.WriteString(strconv.FormatUint(p.Offset, 10))
str.WriteString(", count:")
str.WriteString(strconv.FormatUint(p.Count, 10))
return str.String()
buffer := bytes.NewBufferString("")
if len(p.GetPartitionBy()) > 0 {
buffer = explainPartitionBy(buffer, p.GetPartitionBy(), false)
fmt.Fprintf(buffer, ", offset:%v, count:%v", p.Offset, p.Count)
} else {
fmt.Fprintf(buffer, "offset:%v, count:%v", p.Offset, p.Count)
}
return buffer.String()
}

// ExplainInfo implements Plan interface.
Expand Down Expand Up @@ -960,12 +962,14 @@ func (lt *LogicalTopN) ExplainInfo() string {

// ExplainInfo implements Plan interface.
func (p *LogicalLimit) ExplainInfo() string {
var str strings.Builder
str.WriteString("offset:")
str.WriteString(strconv.FormatUint(p.Offset, 10))
str.WriteString(", count:")
str.WriteString(strconv.FormatUint(p.Count, 10))
return str.String()
buffer := bytes.NewBufferString("")
if len(p.GetPartitionBy()) > 0 {
buffer = explainPartitionBy(buffer, p.GetPartitionBy(), false)
fmt.Fprintf(buffer, ", offset:%v, count:%v", p.Offset, p.Count)
} else {
fmt.Fprintf(buffer, "offset:%v, count:%v", p.Offset, p.Count)
}
return buffer.String()
}

// ExplainInfo implements Plan interface.
Expand Down
1 change: 1 addition & 0 deletions planner/core/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ type LogicalJoin struct {
rightPreferJoinType uint

EqualConditions []*expression.ScalarFunction
// NAEQConditions means null aware equal conditions, which is used for null aware semi joins.
NAEQConditions []*expression.ScalarFunction
LeftConditions expression.CNFExprs
RightConditions expression.CNFExprs
Expand Down
8 changes: 8 additions & 0 deletions planner/core/plan_to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ func (p *PhysicalTopN) ToPB(ctx sessionctx.Context, storeType kv.StoreType) (*ti
for _, item := range p.ByItems {
topNExec.OrderBy = append(topNExec.OrderBy, expression.SortByItemToPB(sc, client, item.Expr, item.Desc))
}
for _, item := range p.PartitionBy {
topNExec.PartitionBy = append(topNExec.PartitionBy, expression.SortByItemToPB(sc, client, item.Col.Clone(), item.Desc))
}
executorID := ""
if storeType == kv.TiFlash {
var err error
Expand All @@ -194,10 +197,15 @@ func (p *PhysicalTopN) ToPB(ctx sessionctx.Context, storeType kv.StoreType) (*ti

// ToPB implements PhysicalPlan ToPB interface.
func (p *PhysicalLimit) ToPB(ctx sessionctx.Context, storeType kv.StoreType) (*tipb.Executor, error) {
sc := ctx.GetSessionVars().StmtCtx
client := ctx.GetClient()
limitExec := &tipb.Limit{
Limit: p.Count,
}
executorID := ""
for _, item := range p.PartitionBy {
limitExec.PartitionBy = append(limitExec.PartitionBy, expression.SortByItemToPB(sc, client, item.Col.Clone(), item.Desc))
}
if storeType == kv.TiFlash {
var err error
limitExec.Child, err = p.children[0].ToPB(ctx, storeType)
Expand Down
23 changes: 23 additions & 0 deletions planner/core/resolve_indices.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,29 @@ func (p *PhysicalTopN) ResolveIndices() (err error) {
return err
}
}
for i, item := range p.PartitionBy {
newCol, err := item.Col.ResolveIndices(p.children[0].Schema())
if err != nil {
return err
}
p.PartitionBy[i].Col = newCol.(*expression.Column)
}
return
}

// ResolveIndices implements Plan interface.
func (p *PhysicalLimit) ResolveIndices() (err error) {
err = p.basePhysicalPlan.ResolveIndices()
if err != nil {
return err
}
for i, item := range p.PartitionBy {
newCol, err := item.Col.ResolveIndices(p.children[0].Schema())
if err != nil {
return err
}
p.PartitionBy[i].Col = newCol.(*expression.Column)
}
return
}

Expand Down
4 changes: 2 additions & 2 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ func (p *PhysicalLimit) attach2Task(tasks ...task) task {
// Strictly speaking, for the row count of stats, we should multiply newCount with "regionNum",
// but "regionNum" is unknown since the copTask can be a double read, so we ignore it now.
stats := deriveLimitStats(childProfile, float64(newCount))
pushedDownLimit := PhysicalLimit{Count: newCount}.Init(p.ctx, stats, p.blockOffset)
pushedDownLimit := PhysicalLimit{PartitionBy: newPartitionBy, Count: newCount}.Init(p.ctx, stats, p.blockOffset)
cop = attachPlan2Task(pushedDownLimit, cop).(*copTask)
// Don't use clone() so that Limit and its children share the same schema. Otherwise the virtual generated column may not be resolved right.
pushedDownLimit.SetSchema(pushedDownLimit.children[0].Schema())
Expand All @@ -851,7 +851,7 @@ func (p *PhysicalLimit) attach2Task(tasks ...task) task {
for _, partialScan := range cop.idxMergePartPlans {
childProfile := partialScan.statsInfo()
stats := deriveLimitStats(childProfile, float64(newCount))
pushedDownLimit := PhysicalLimit{Count: newCount}.Init(p.ctx, stats, p.blockOffset)
pushedDownLimit := PhysicalLimit{PartitionBy: newPartitionBy, Count: newCount}.Init(p.ctx, stats, p.blockOffset)
pushedDownLimit.SetChildren(partialScan)
pushedDownLimit.SetSchema(pushedDownLimit.children[0].Schema())
limitChildren = append(limitChildren, pushedDownLimit)
Expand Down

0 comments on commit 4ae5be1

Please sign in to comment.