Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: fix CTE predicate pushdown (#33627) #33630

Merged
merged 3 commits into from
Mar 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions cmd/explaintest/r/explain_cte.result
Original file line number Diff line number Diff line change
Expand Up @@ -444,3 +444,38 @@ CTE_0 50.00 root Non-Recursive CTE
└─IndexLookUp(Probe) 1.00 root
├─IndexRangeScan(Build) 1.00 cop[tikv] table:customer, index:PRIMARY(c_customer_sk) range: decided by [eq(test.customer.c_customer_sk, test.web_sales.ws_bill_customer_sk)], keep order:false, stats:pseudo
└─TableRowIDScan(Probe) 1.00 cop[tikv] table:customer keep order:false, stats:pseudo
drop table if exists t1;
create table t1 (id int, bench_type varchar(10),version varchar(10),tps int(20));
insert into t1 (id,bench_type,version,tps) values (1,'sysbench','5.4.0',1111111);
insert into t1 (id,bench_type,version,tps) values (2,'sysbench','6.0.0',222222);
with all_data as
(select * from t1
),version1 as (select * from all_data where version ='5.4.0'
),version2 as(select * from all_data where version ='6.0.0')
select v1.tps v1_tps,v2.tps v2_tps
from version1 v1, version2 v2
where v1.bench_type =v2.bench_type;
v1_tps v2_tps
1111111 222222
desc format='brief' with all_data as
(select * from t1
),version1 as (select * from all_data where version ='5.4.0'
),version2 as(select * from all_data where version ='6.0.0')
select v1.tps v1_tps,v2.tps v2_tps
from version1 v1, version2 v2
where v1.bench_type =v2.bench_type;
id estRows task access object operator info
HashJoin 8000.00 root inner join, equal:[eq(test.t1.bench_type, test.t1.bench_type)]
├─Selection(Build) 6400.00 root not(isnull(test.t1.bench_type))
│ └─CTEFullScan 8000.00 root CTE:v2 data:CTE_2
└─Selection(Probe) 6400.00 root not(isnull(test.t1.bench_type))
└─CTEFullScan 8000.00 root CTE:v1 data:CTE_1
CTE_2 8000.00 root Non-Recursive CTE
└─Selection(Seed Part) 8000.00 root eq(test.t1.version, "6.0.0"), not(isnull(test.t1.bench_type))
└─CTEFullScan 10000.00 root CTE:all_data data:CTE_0
CTE_1 8000.00 root Non-Recursive CTE
└─Selection(Seed Part) 8000.00 root eq(test.t1.version, "5.4.0"), not(isnull(test.t1.bench_type))
└─CTEFullScan 10000.00 root CTE:all_data data:CTE_0
CTE_0 10000.00 root Non-Recursive CTE
└─TableReader(Seed Part) 10000.00 root data:TableFullScan
└─TableFullScan 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
20 changes: 20 additions & 0 deletions cmd/explaintest/t/explain_cte.test
Original file line number Diff line number Diff line change
Expand Up @@ -237,3 +237,23 @@ desc format='brief' with year_total as (
,t_s_secyear.customer_last_name
,t_s_secyear.customer_email_address
limit 100;

# predicate pushdown
drop table if exists t1;
create table t1 (id int, bench_type varchar(10),version varchar(10),tps int(20));
insert into t1 (id,bench_type,version,tps) values (1,'sysbench','5.4.0',1111111);
insert into t1 (id,bench_type,version,tps) values (2,'sysbench','6.0.0',222222);
with all_data as
(select * from t1
),version1 as (select * from all_data where version ='5.4.0'
),version2 as(select * from all_data where version ='6.0.0')
select v1.tps v1_tps,v2.tps v2_tps
from version1 v1, version2 v2
where v1.bench_type =v2.bench_type;
desc format='brief' with all_data as
(select * from t1
),version1 as (select * from all_data where version ='5.4.0'
),version2 as(select * from all_data where version ='6.0.0')
select v1.tps v1_tps,v2.tps v2_tps
from version1 v1, version2 v2
where v1.bench_type =v2.bench_type;
8 changes: 7 additions & 1 deletion planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3935,7 +3935,7 @@ func (b *PlanBuilder) tryBuildCTE(ctx context.Context, tn *ast.TableName, asName
LimitEnd: limitEnd, pushDownPredicates: make([]expression.Expression, 0), ColumnMap: make(map[string]*expression.Column)}
}
var p LogicalPlan
lp := LogicalCTE{cteAsName: tn.Name, cte: cte.cteClass, seedStat: cte.seedStat}.Init(b.ctx, b.getSelectOffset())
lp := LogicalCTE{cteAsName: tn.Name, cte: cte.cteClass, seedStat: cte.seedStat, isOuterMostCTE: !b.buildingCTE}.Init(b.ctx, b.getSelectOffset())
prevSchema := cte.seedLP.Schema().Clone()
lp.SetSchema(getResultCTESchema(cte.seedLP.Schema(), b.ctx.GetSessionVars()))
for i, col := range lp.schema.Columns {
Expand Down Expand Up @@ -6383,6 +6383,12 @@ func containDifferentJoinTypes(preferJoinType uint) bool {
}

func (b *PlanBuilder) buildCte(ctx context.Context, cte *ast.CommonTableExpression, isRecursive bool) (p LogicalPlan, err error) {
saveBuildingCTE := b.buildingCTE
b.buildingCTE = true
defer func() {
b.buildingCTE = saveBuildingCTE
}()

if isRecursive {
// buildingRecursivePartForCTE likes a stack. We save it before building a recursive CTE and restore it after building.
// We need a stack because we need to handle the nested recursive CTE. And buildingRecursivePartForCTE indicates the innermost CTE.
Expand Down
7 changes: 4 additions & 3 deletions planner/core/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1786,9 +1786,10 @@ type CTEClass struct {
type LogicalCTE struct {
logicalSchemaProducer

cte *CTEClass
cteAsName model.CIStr
seedStat *property.StatsInfo
cte *CTEClass
cteAsName model.CIStr
seedStat *property.StatsInfo
isOuterMostCTE bool
}

// LogicalCTETable is for CTE table
Expand Down
1 change: 1 addition & 0 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,7 @@ type PlanBuilder struct {
isForUpdateRead bool
allocIDForCTEStorage int
buildingRecursivePartForCTE bool
buildingCTE bool
}

type handleColHelper struct {
Expand Down
3 changes: 3 additions & 0 deletions planner/core/rule_predicate_push_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,9 @@ func (p *LogicalCTE) PredicatePushDown(predicates []expression.Expression, opt *
// Doesn't support recursive CTE yet.
return predicates, p.self
}
if !p.isOuterMostCTE {
return predicates, p.self
}
if len(predicates) == 0 {
p.cte.pushDownPredicates = append(p.cte.pushDownPredicates, expression.NewOne())
return predicates, p.self
Expand Down