From d3de5479a06a45336d2eab7bd88c50e89f4b8373 Mon Sep 17 00:00:00 2001 From: xufei Date: Thu, 3 Jun 2021 17:38:27 +0800 Subject: [PATCH] planner: support push down broadcast cartesian join to TiFlash (#25049) --- go.mod | 2 +- go.sum | 4 +- planner/core/exhaust_physical_plans.go | 15 ++++- planner/core/plan_to_pb.go | 43 ++++++++---- .../testdata/integration_serial_suite_in.json | 6 +- .../integration_serial_suite_out.json | 65 ++++++++++++++++++- sessionctx/variable/session.go | 7 ++ sessionctx/variable/sysvar.go | 1 + sessionctx/variable/tidb_vars.go | 6 ++ 9 files changed, 129 insertions(+), 20 deletions(-) diff --git a/go.mod b/go.mod index ac4ef88fd215d..ad115a92b4be3 100644 --- a/go.mod +++ b/go.mod @@ -48,7 +48,7 @@ require ( github.com/pingcap/parser v0.0.0-20210525032559-c37778aff307 github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3 github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible - github.com/pingcap/tipb v0.0.0-20210525032549-b80be13ddf6c + github.com/pingcap/tipb v0.0.0-20210601083426-79a378b6d1c4 github.com/prometheus/client_golang v1.5.1 github.com/prometheus/client_model v0.2.0 github.com/prometheus/common v0.9.1 diff --git a/go.sum b/go.sum index 6336c1749649d..db9370b27cdac 100644 --- a/go.sum +++ b/go.sum @@ -448,8 +448,8 @@ github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3/go.mod h1:tckvA041 github.com/pingcap/tidb-dashboard v0.0.0-20210312062513-eef5d6404638/go.mod h1:OzFN8H0EDMMqeulPhPMw2i2JaiZWOKFQ7zdRPhENNgo= github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible h1:ceznmu/lLseGHP/jKyOa/3u/5H3wtLLLqkH2V3ssSjg= github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= -github.com/pingcap/tipb v0.0.0-20210525032549-b80be13ddf6c h1:El3pMBpJHuSkItkHsnBqsaaHzJwFBNDt3Aul98AhREY= -github.com/pingcap/tipb v0.0.0-20210525032549-b80be13ddf6c/go.mod h1:nsEhnMokcn7MRqd2J60yxpn/ac3ZH8A6GOJ9NslabUo= +github.com/pingcap/tipb v0.0.0-20210601083426-79a378b6d1c4 h1:n47+OwdI/uxKenfBT8Y2/be11MwbeLKNLdzOWnxNQKg= +github.com/pingcap/tipb v0.0.0-20210601083426-79a378b6d1c4/go.mod h1:nsEhnMokcn7MRqd2J60yxpn/ac3ZH8A6GOJ9NslabUo= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index 1937ad6b09a26..6ad716c93cac2 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -1661,6 +1661,9 @@ func (p *LogicalJoin) shouldUseMPPBCJ() bool { if p.ctx.GetSessionVars().BroadcastJoinThresholdSize == 0 || p.ctx.GetSessionVars().BroadcastJoinThresholdCount == 0 { return p.ctx.GetSessionVars().AllowBCJ } + if len(p.EqualConditions) == 0 && p.ctx.GetSessionVars().AllowCartesianBCJ == 2 { + return true + } if p.JoinType == LeftOuterJoin || p.JoinType == SemiJoin || p.JoinType == AntiSemiJoin { return checkChildFitBC(p.children[1]) } else if p.JoinType == RightOuterJoin { @@ -1769,9 +1772,19 @@ func (p *LogicalJoin) tryToGetMppHashJoin(prop *property.PhysicalProperty, useBC return nil } - if (p.JoinType != InnerJoin && p.JoinType != LeftOuterJoin && p.JoinType != RightOuterJoin && p.JoinType != SemiJoin && p.JoinType != AntiSemiJoin) || len(p.EqualConditions) == 0 { + if p.JoinType != InnerJoin && p.JoinType != LeftOuterJoin && p.JoinType != RightOuterJoin && p.JoinType != SemiJoin && p.JoinType != AntiSemiJoin { return nil } + + if len(p.EqualConditions) == 0 { + if p.ctx.GetSessionVars().AllowCartesianBCJ == 0 || !useBCJ { + return nil + } + } + if (len(p.LeftConditions) != 0 && p.JoinType != LeftOuterJoin) || (len(p.RightConditions) != 0 && p.JoinType != RightOuterJoin) { + return nil + } + if prop.PartitionTp == property.BroadcastType { return nil } diff --git a/planner/core/plan_to_pb.go b/planner/core/plan_to_pb.go index e095f78dff960..1b6ef79bbc6bc 100644 --- a/planner/core/plan_to_pb.go +++ b/planner/core/plan_to_pb.go @@ -366,7 +366,25 @@ func (p *PhysicalHashJoin) ToPB(ctx sessionctx.Context, storeType kv.StoreType) if err != nil { return nil, err } - otherConditions, err := expression.ExpressionsToPBList(sc, p.OtherConditions, client) + + var otherConditionsInJoin expression.CNFExprs + var otherEqConditionsFromIn expression.CNFExprs + if p.JoinType == AntiSemiJoin { + for _, condition := range p.OtherConditions { + if expression.IsEQCondFromIn(condition) { + otherEqConditionsFromIn = append(otherEqConditionsFromIn, condition) + } else { + otherConditionsInJoin = append(otherConditionsInJoin, condition) + } + } + } else { + otherConditionsInJoin = p.OtherConditions + } + otherConditions, err := expression.ExpressionsToPBList(sc, otherConditionsInJoin, client) + if err != nil { + return nil, err + } + otherEqConditions, err := expression.ExpressionsToPBList(sc, otherEqConditionsFromIn, client) if err != nil { return nil, err } @@ -397,17 +415,18 @@ func (p *PhysicalHashJoin) ToPB(ctx sessionctx.Context, storeType kv.StoreType) buildFiledTypes = append(buildFiledTypes, expression.ToPBFieldType(retType)) } join := &tipb.Join{ - JoinType: pbJoinType, - JoinExecType: tipb.JoinExecType_TypeHashJoin, - InnerIdx: int64(p.InnerChildIdx), - LeftJoinKeys: left, - RightJoinKeys: right, - ProbeTypes: probeFiledTypes, - BuildTypes: buildFiledTypes, - LeftConditions: leftConditions, - RightConditions: rightConditions, - OtherConditions: otherConditions, - Children: []*tipb.Executor{lChildren, rChildren}, + JoinType: pbJoinType, + JoinExecType: tipb.JoinExecType_TypeHashJoin, + InnerIdx: int64(p.InnerChildIdx), + LeftJoinKeys: left, + RightJoinKeys: right, + ProbeTypes: probeFiledTypes, + BuildTypes: buildFiledTypes, + LeftConditions: leftConditions, + RightConditions: rightConditions, + OtherConditions: otherConditions, + OtherEqConditionsFromIn: otherEqConditions, + Children: []*tipb.Executor{lChildren, rChildren}, } executorID := p.ExplainID().String() diff --git a/planner/core/testdata/integration_serial_suite_in.json b/planner/core/testdata/integration_serial_suite_in.json index 34e50df03661b..8d6f3dee0bc53 100644 --- a/planner/core/testdata/integration_serial_suite_in.json +++ b/planner/core/testdata/integration_serial_suite_in.json @@ -52,7 +52,11 @@ "explain format = 'brief' select count(*) from fact_t where exists (select 1 from d1_t where d1_k = fact_t.d1_k)", "explain format = 'brief' select count(*) from fact_t where exists (select 1 from d1_t where d1_k = fact_t.d1_k and value > fact_t.col1)", "explain format = 'brief' select count(*) from fact_t where not exists (select 1 from d1_t where d1_k = fact_t.d1_k)", - "explain format = 'brief' select count(*) from fact_t where not exists (select 1 from d1_t where d1_k = fact_t.d1_k and value > fact_t.col1)" + "explain format = 'brief' select count(*) from fact_t where not exists (select 1 from d1_t where d1_k = fact_t.d1_k and value > fact_t.col1)", + "explain format = 'brief' select count(*) from fact_t join d1_t on fact_t.d1_k > d1_t.d1_k", + "explain format = 'brief' select count(*) from fact_t left join d1_t on fact_t.d1_k > d1_t.d1_k", + "explain format = 'brief' select count(*) from fact_t right join d1_t on fact_t.d1_k > d1_t.d1_k", + "explain format = 'brief' select count(*) from fact_t where d1_k not in (select d1_k from d1_t)" ] }, { diff --git a/planner/core/testdata/integration_serial_suite_out.json b/planner/core/testdata/integration_serial_suite_out.json index 026ef6ea1bce7..46f04a2af0340 100644 --- a/planner/core/testdata/integration_serial_suite_out.json +++ b/planner/core/testdata/integration_serial_suite_out.json @@ -460,6 +460,65 @@ " │ └─TableFullScan 2.00 cop[tiflash] table:d1_t keep order:false", " └─TableFullScan(Probe) 8.00 cop[tiflash] table:fact_t keep order:false" ] + }, + { + "SQL": "explain format = 'brief' select count(*) from fact_t join d1_t on fact_t.d1_k > d1_t.d1_k", + "Plan": [ + "HashAgg 1.00 root funcs:count(Column#12)->Column#11", + "└─TableReader 1.00 root data:ExchangeSender", + " └─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─HashAgg 1.00 batchCop[tiflash] funcs:count(1)->Column#12", + " └─HashJoin 16.00 batchCop[tiflash] CARTESIAN inner join, other cond:gt(test.fact_t.d1_k, test.d1_t.d1_k)", + " ├─ExchangeReceiver(Build) 2.00 batchCop[tiflash] ", + " │ └─ExchangeSender 2.00 batchCop[tiflash] ExchangeType: Broadcast", + " │ └─Selection 2.00 batchCop[tiflash] not(isnull(test.d1_t.d1_k))", + " │ └─TableFullScan 2.00 batchCop[tiflash] table:d1_t keep order:false", + " └─Selection(Probe) 8.00 batchCop[tiflash] not(isnull(test.fact_t.d1_k))", + " └─TableFullScan 8.00 batchCop[tiflash] table:fact_t keep order:false" + ] + }, + { + "SQL": "explain format = 'brief' select count(*) from fact_t left join d1_t on fact_t.d1_k > d1_t.d1_k", + "Plan": [ + "HashAgg 1.00 root funcs:count(Column#12)->Column#11", + "└─TableReader 1.00 root data:ExchangeSender", + " └─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─HashAgg 1.00 batchCop[tiflash] funcs:count(1)->Column#12", + " └─HashJoin 16.00 batchCop[tiflash] CARTESIAN left outer join, other cond:gt(test.fact_t.d1_k, test.d1_t.d1_k)", + " ├─ExchangeReceiver(Build) 2.00 batchCop[tiflash] ", + " │ └─ExchangeSender 2.00 batchCop[tiflash] ExchangeType: Broadcast", + " │ └─Selection 2.00 batchCop[tiflash] not(isnull(test.d1_t.d1_k))", + " │ └─TableFullScan 2.00 batchCop[tiflash] table:d1_t keep order:false", + " └─TableFullScan(Probe) 8.00 batchCop[tiflash] table:fact_t keep order:false" + ] + }, + { + "SQL": "explain format = 'brief' select count(*) from fact_t right join d1_t on fact_t.d1_k > d1_t.d1_k", + "Plan": [ + "HashAgg 1.00 root funcs:count(Column#12)->Column#11", + "└─TableReader 1.00 root data:ExchangeSender", + " └─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─HashAgg 1.00 batchCop[tiflash] funcs:count(1)->Column#12", + " └─HashJoin 16.00 batchCop[tiflash] CARTESIAN right outer join, other cond:gt(test.fact_t.d1_k, test.d1_t.d1_k)", + " ├─ExchangeReceiver(Build) 8.00 batchCop[tiflash] ", + " │ └─ExchangeSender 8.00 batchCop[tiflash] ExchangeType: Broadcast", + " │ └─Selection 8.00 batchCop[tiflash] not(isnull(test.fact_t.d1_k))", + " │ └─TableFullScan 8.00 batchCop[tiflash] table:fact_t keep order:false", + " └─TableFullScan(Probe) 2.00 batchCop[tiflash] table:d1_t keep order:false" + ] + }, + { + "SQL": "explain format = 'brief' select count(*) from fact_t where d1_k not in (select d1_k from d1_t)", + "Plan": [ + "StreamAgg 1.00 root funcs:count(1)->Column#11", + "└─TableReader 6.40 root data:ExchangeSender", + " └─ExchangeSender 6.40 cop[tiflash] ExchangeType: PassThrough", + " └─HashJoin 6.40 cop[tiflash] CARTESIAN anti semi join, other cond:eq(test.fact_t.d1_k, test.d1_t.d1_k)", + " ├─ExchangeReceiver(Build) 2.00 cop[tiflash] ", + " │ └─ExchangeSender 2.00 cop[tiflash] ExchangeType: Broadcast", + " │ └─TableFullScan 2.00 cop[tiflash] table:d1_t keep order:false", + " └─TableFullScan(Probe) 8.00 cop[tiflash] table:fact_t keep order:false" + ] } ] }, @@ -2618,11 +2677,11 @@ "HashAgg 7992.00 root group by:test.ts.col_char_64, funcs:firstrow(test.ts.col_char_64)->test.ts.col_char_64", "└─HashJoin 9990.00 root CARTESIAN inner join, other cond:or(ge(test.ts.col_char_64_not_null, Column#25), if(ne(Column#26, 0), NULL, 0))", " ├─Selection(Build) 0.80 root ne(Column#27, 0)", - " │ └─HashAgg 1.00 root funcs:min(Column#33)->Column#25, funcs:sum(Column#34)->Column#26, funcs:count(Column#35)->Column#27", + " │ └─HashAgg 1.00 root funcs:min(Column#36)->Column#25, funcs:sum(Column#37)->Column#26, funcs:count(Column#38)->Column#27", " │ └─TableReader 1.00 root data:ExchangeSender", " │ └─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough", - " │ └─HashAgg 1.00 batchCop[tiflash] funcs:min(Column#39)->Column#33, funcs:sum(Column#40)->Column#34, funcs:count(1)->Column#35", - " │ └─Projection 10000.00 batchCop[tiflash] test.ts.col_varchar_64, cast(isnull(test.ts.col_varchar_64), decimal(22,0) BINARY)->Column#40", + " │ └─HashAgg 1.00 batchCop[tiflash] funcs:min(Column#42)->Column#36, funcs:sum(Column#43)->Column#37, funcs:count(1)->Column#38", + " │ └─Projection 10000.00 batchCop[tiflash] test.ts.col_varchar_64, cast(isnull(test.ts.col_varchar_64), decimal(22,0) BINARY)->Column#43", " │ └─TableFullScan 10000.00 batchCop[tiflash] table:SUBQUERY4_t1 keep order:false, stats:pseudo", " └─TableReader(Probe) 12487.50 root data:ExchangeSender", " └─ExchangeSender 12487.50 cop[tiflash] ExchangeType: PassThrough", diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index ea8ad75390a52..16172ca1ed213 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -486,6 +486,12 @@ type SessionVars struct { // AllowBCJ means allow broadcast join. AllowBCJ bool + + // AllowCartesianBCJ means allow broadcast CARTESIAN join, 0 means not allow, 1 means allow broadcast CARTESIAN join + // but the table size should under the broadcast threshold, 2 means allow broadcast CARTESIAN join even if the table + // size exceeds the broadcast threshold + AllowCartesianBCJ int + // AllowDistinctAggPushDown can be set true to allow agg with distinct push down to tikv/tiflash. AllowDistinctAggPushDown bool @@ -971,6 +977,7 @@ func NewSessionVars() *SessionVars { StmtCtx: new(stmtctx.StatementContext), AllowAggPushDown: false, AllowBCJ: false, + AllowCartesianBCJ: DefOptCartesianBCJ, BroadcastJoinThresholdSize: DefBroadcastJoinThresholdSize, BroadcastJoinThresholdCount: DefBroadcastJoinThresholdSize, OptimizerSelectivityLevel: DefTiDBOptimizerSelectivityLevel, diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 3763229fb47ca..98e8ca0c94ed3 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -877,6 +877,7 @@ var defaultSysVars = []*SysVar{ return nil }}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBBuildStatsConcurrency, skipInit: true, Value: strconv.Itoa(DefBuildStatsConcurrency)}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBOptCartesianBCJ, Value: strconv.Itoa(DefOptCartesianBCJ), Type: TypeInt, MinValue: 0, MaxValue: 2}, {Scope: ScopeGlobal, Name: TiDBAutoAnalyzeRatio, Value: strconv.FormatFloat(DefAutoAnalyzeRatio, 'f', -1, 64), Type: TypeFloat, MinValue: 0, MaxValue: math.MaxUint64}, {Scope: ScopeGlobal, Name: TiDBAutoAnalyzeStartTime, Value: DefAutoAnalyzeStartTime, Type: TypeTime}, {Scope: ScopeGlobal, Name: TiDBAutoAnalyzeEndTime, Value: DefAutoAnalyzeEndTime, Type: TypeTime}, diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 1c1145c2ad36a..29f10bda8cdb8 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -46,7 +46,12 @@ const ( // tidb_opt_agg_push_down is used to enable/disable the optimizer rule of aggregation push down. TiDBOptAggPushDown = "tidb_opt_agg_push_down" + // TiDBOptBCJ is used to enable/disable broadcast join in MPP mode TiDBOptBCJ = "tidb_opt_broadcast_join" + + // TiDBOptCartesianBCJ is used to disable/enable broadcast cartesian join in MPP mode + TiDBOptCartesianBCJ = "tidb_opt_broadcast_cartesian_join" + // tidb_opt_distinct_agg_push_down is used to decide whether agg with distinct should be pushed to tikv/tiflash. TiDBOptDistinctAggPushDown = "tidb_opt_distinct_agg_push_down" @@ -588,6 +593,7 @@ const ( DefSkipASCIICheck = false DefOptAggPushDown = false DefOptBCJ = false + DefOptCartesianBCJ = 1 DefOptWriteRowID = false DefOptCorrelationThreshold = 0.9 DefOptCorrelationExpFactor = 1