From b7b4a73ef84868b7e2dff22eb25e34a1759341eb Mon Sep 17 00:00:00 2001 From: xufei Date: Wed, 2 Jun 2021 13:36:33 +0800 Subject: [PATCH] support push down broadcast cartesian join to TiFlash --- planner/core/exhaust_physical_plans.go | 4 +- .../testdata/integration_serial_suite_in.json | 6 +- .../integration_serial_suite_out.json | 65 ++++++++++++++++++- sessionctx/variable/session.go | 8 ++- sessionctx/variable/sysvar.go | 2 +- sessionctx/variable/tidb_vars.go | 6 +- 6 files changed, 79 insertions(+), 12 deletions(-) diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index 62f5122d34129..8d3e84f63b8fc 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -1635,7 +1635,7 @@ func (p *LogicalJoin) shouldUseMPPBCJ() bool { if p.ctx.GetSessionVars().BroadcastJoinThresholdSize == 0 || p.ctx.GetSessionVars().BroadcastJoinThresholdCount == 0 { return p.ctx.GetSessionVars().AllowBCJ } - if len(p.EqualConditions) == 0 && p.ctx.GetSessionVars().AllowCARTESIANBCJ == 2 { + if len(p.EqualConditions) == 0 && p.ctx.GetSessionVars().AllowCartesianBCJ == 2 { return true } if p.JoinType == LeftOuterJoin || p.JoinType == SemiJoin || p.JoinType == AntiSemiJoin { @@ -1751,7 +1751,7 @@ func (p *LogicalJoin) tryToGetMppHashJoin(prop *property.PhysicalProperty, useBC } if len(p.EqualConditions) == 0 { - if p.ctx.GetSessionVars().AllowCARTESIANBCJ < 1 || !useBCJ { + if p.ctx.GetSessionVars().AllowCartesianBCJ < 1 || !useBCJ { return nil } } diff --git a/planner/core/testdata/integration_serial_suite_in.json b/planner/core/testdata/integration_serial_suite_in.json index 34e50df03661b..8d6f3dee0bc53 100644 --- a/planner/core/testdata/integration_serial_suite_in.json +++ b/planner/core/testdata/integration_serial_suite_in.json @@ -52,7 +52,11 @@ "explain format = 'brief' select count(*) from fact_t where exists (select 1 from d1_t where d1_k = fact_t.d1_k)", "explain format = 'brief' select count(*) from fact_t where exists (select 1 from d1_t where d1_k = fact_t.d1_k and value > fact_t.col1)", "explain format = 'brief' select count(*) from fact_t where not exists (select 1 from d1_t where d1_k = fact_t.d1_k)", - "explain format = 'brief' select count(*) from fact_t where not exists (select 1 from d1_t where d1_k = fact_t.d1_k and value > fact_t.col1)" + "explain format = 'brief' select count(*) from fact_t where not exists (select 1 from d1_t where d1_k = fact_t.d1_k and value > fact_t.col1)", + "explain format = 'brief' select count(*) from fact_t join d1_t on fact_t.d1_k > d1_t.d1_k", + "explain format = 'brief' select count(*) from fact_t left join d1_t on fact_t.d1_k > d1_t.d1_k", + "explain format = 'brief' select count(*) from fact_t right join d1_t on fact_t.d1_k > d1_t.d1_k", + "explain format = 'brief' select count(*) from fact_t where d1_k not in (select d1_k from d1_t)" ] }, { diff --git a/planner/core/testdata/integration_serial_suite_out.json b/planner/core/testdata/integration_serial_suite_out.json index 026ef6ea1bce7..46f04a2af0340 100644 --- a/planner/core/testdata/integration_serial_suite_out.json +++ b/planner/core/testdata/integration_serial_suite_out.json @@ -460,6 +460,65 @@ " │ └─TableFullScan 2.00 cop[tiflash] table:d1_t keep order:false", " └─TableFullScan(Probe) 8.00 cop[tiflash] table:fact_t keep order:false" ] + }, + { + "SQL": "explain format = 'brief' select count(*) from fact_t join d1_t on fact_t.d1_k > d1_t.d1_k", + "Plan": [ + "HashAgg 1.00 root funcs:count(Column#12)->Column#11", + "└─TableReader 1.00 root data:ExchangeSender", + " └─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─HashAgg 1.00 batchCop[tiflash] funcs:count(1)->Column#12", + " └─HashJoin 16.00 batchCop[tiflash] CARTESIAN inner join, other cond:gt(test.fact_t.d1_k, test.d1_t.d1_k)", + " ├─ExchangeReceiver(Build) 2.00 batchCop[tiflash] ", + " │ └─ExchangeSender 2.00 batchCop[tiflash] ExchangeType: Broadcast", + " │ └─Selection 2.00 batchCop[tiflash] not(isnull(test.d1_t.d1_k))", + " │ └─TableFullScan 2.00 batchCop[tiflash] table:d1_t keep order:false", + " └─Selection(Probe) 8.00 batchCop[tiflash] not(isnull(test.fact_t.d1_k))", + " └─TableFullScan 8.00 batchCop[tiflash] table:fact_t keep order:false" + ] + }, + { + "SQL": "explain format = 'brief' select count(*) from fact_t left join d1_t on fact_t.d1_k > d1_t.d1_k", + "Plan": [ + "HashAgg 1.00 root funcs:count(Column#12)->Column#11", + "└─TableReader 1.00 root data:ExchangeSender", + " └─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─HashAgg 1.00 batchCop[tiflash] funcs:count(1)->Column#12", + " └─HashJoin 16.00 batchCop[tiflash] CARTESIAN left outer join, other cond:gt(test.fact_t.d1_k, test.d1_t.d1_k)", + " ├─ExchangeReceiver(Build) 2.00 batchCop[tiflash] ", + " │ └─ExchangeSender 2.00 batchCop[tiflash] ExchangeType: Broadcast", + " │ └─Selection 2.00 batchCop[tiflash] not(isnull(test.d1_t.d1_k))", + " │ └─TableFullScan 2.00 batchCop[tiflash] table:d1_t keep order:false", + " └─TableFullScan(Probe) 8.00 batchCop[tiflash] table:fact_t keep order:false" + ] + }, + { + "SQL": "explain format = 'brief' select count(*) from fact_t right join d1_t on fact_t.d1_k > d1_t.d1_k", + "Plan": [ + "HashAgg 1.00 root funcs:count(Column#12)->Column#11", + "└─TableReader 1.00 root data:ExchangeSender", + " └─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─HashAgg 1.00 batchCop[tiflash] funcs:count(1)->Column#12", + " └─HashJoin 16.00 batchCop[tiflash] CARTESIAN right outer join, other cond:gt(test.fact_t.d1_k, test.d1_t.d1_k)", + " ├─ExchangeReceiver(Build) 8.00 batchCop[tiflash] ", + " │ └─ExchangeSender 8.00 batchCop[tiflash] ExchangeType: Broadcast", + " │ └─Selection 8.00 batchCop[tiflash] not(isnull(test.fact_t.d1_k))", + " │ └─TableFullScan 8.00 batchCop[tiflash] table:fact_t keep order:false", + " └─TableFullScan(Probe) 2.00 batchCop[tiflash] table:d1_t keep order:false" + ] + }, + { + "SQL": "explain format = 'brief' select count(*) from fact_t where d1_k not in (select d1_k from d1_t)", + "Plan": [ + "StreamAgg 1.00 root funcs:count(1)->Column#11", + "└─TableReader 6.40 root data:ExchangeSender", + " └─ExchangeSender 6.40 cop[tiflash] ExchangeType: PassThrough", + " └─HashJoin 6.40 cop[tiflash] CARTESIAN anti semi join, other cond:eq(test.fact_t.d1_k, test.d1_t.d1_k)", + " ├─ExchangeReceiver(Build) 2.00 cop[tiflash] ", + " │ └─ExchangeSender 2.00 cop[tiflash] ExchangeType: Broadcast", + " │ └─TableFullScan 2.00 cop[tiflash] table:d1_t keep order:false", + " └─TableFullScan(Probe) 8.00 cop[tiflash] table:fact_t keep order:false" + ] } ] }, @@ -2618,11 +2677,11 @@ "HashAgg 7992.00 root group by:test.ts.col_char_64, funcs:firstrow(test.ts.col_char_64)->test.ts.col_char_64", "└─HashJoin 9990.00 root CARTESIAN inner join, other cond:or(ge(test.ts.col_char_64_not_null, Column#25), if(ne(Column#26, 0), NULL, 0))", " ├─Selection(Build) 0.80 root ne(Column#27, 0)", - " │ └─HashAgg 1.00 root funcs:min(Column#33)->Column#25, funcs:sum(Column#34)->Column#26, funcs:count(Column#35)->Column#27", + " │ └─HashAgg 1.00 root funcs:min(Column#36)->Column#25, funcs:sum(Column#37)->Column#26, funcs:count(Column#38)->Column#27", " │ └─TableReader 1.00 root data:ExchangeSender", " │ └─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough", - " │ └─HashAgg 1.00 batchCop[tiflash] funcs:min(Column#39)->Column#33, funcs:sum(Column#40)->Column#34, funcs:count(1)->Column#35", - " │ └─Projection 10000.00 batchCop[tiflash] test.ts.col_varchar_64, cast(isnull(test.ts.col_varchar_64), decimal(22,0) BINARY)->Column#40", + " │ └─HashAgg 1.00 batchCop[tiflash] funcs:min(Column#42)->Column#36, funcs:sum(Column#43)->Column#37, funcs:count(1)->Column#38", + " │ └─Projection 10000.00 batchCop[tiflash] test.ts.col_varchar_64, cast(isnull(test.ts.col_varchar_64), decimal(22,0) BINARY)->Column#43", " │ └─TableFullScan 10000.00 batchCop[tiflash] table:SUBQUERY4_t1 keep order:false, stats:pseudo", " └─TableReader(Probe) 12487.50 root data:ExchangeSender", " └─ExchangeSender 12487.50 cop[tiflash] ExchangeType: PassThrough", diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 2b561aec4888d..2851f29e3af22 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -487,8 +487,10 @@ type SessionVars struct { // AllowBCJ means allow broadcast join. AllowBCJ bool - // AllowCARTESIANBCJ means allow broadcast join for CARTESIAN join - AllowCARTESIANBCJ int + // AllowCartesianBCJ means allow broadcast CARTESIAN join, 0 means not allow, 1 means allow broadcast CARTESIAN join + // but the table size should under the broadcast threshold, 2 means allow broadcast CARTESIAN join even if the table + // size exceeds the broadcast threshold + AllowCartesianBCJ int // AllowDistinctAggPushDown can be set true to allow agg with distinct push down to tikv/tiflash. AllowDistinctAggPushDown bool @@ -969,7 +971,7 @@ func NewSessionVars() *SessionVars { StmtCtx: new(stmtctx.StatementContext), AllowAggPushDown: false, AllowBCJ: false, - AllowCARTESIANBCJ: DefOptCARTESIANBCJ, + AllowCartesianBCJ: DefOptCartesianBCJ, BroadcastJoinThresholdSize: DefBroadcastJoinThresholdSize, BroadcastJoinThresholdCount: DefBroadcastJoinThresholdSize, OptimizerSelectivityLevel: DefTiDBOptimizerSelectivityLevel, diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index bcca69bc1cb29..eff597839a3e2 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -871,7 +871,7 @@ var defaultSysVars = []*SysVar{ return nil }}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBBuildStatsConcurrency, skipInit: true, Value: strconv.Itoa(DefBuildStatsConcurrency)}, - {Scope: ScopeGlobal | ScopeSession, Name: TiDBOptCARTESIANBCJ, Value: strconv.Itoa(DefOptCARTESIANBCJ), Type: TypeInt, MinValue: 0, MaxValue: 2}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBOptCartesianBCJ, Value: strconv.Itoa(DefOptCartesianBCJ), Type: TypeInt, MinValue: 0, MaxValue: 2}, {Scope: ScopeGlobal, Name: TiDBAutoAnalyzeRatio, Value: strconv.FormatFloat(DefAutoAnalyzeRatio, 'f', -1, 64), Type: TypeFloat, MinValue: 0, MaxValue: math.MaxUint64}, {Scope: ScopeGlobal, Name: TiDBAutoAnalyzeStartTime, Value: DefAutoAnalyzeStartTime, Type: TypeTime}, {Scope: ScopeGlobal, Name: TiDBAutoAnalyzeEndTime, Value: DefAutoAnalyzeEndTime, Type: TypeTime}, diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 2c6254b8cdbfa..290cf7d3bc33e 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -46,9 +46,11 @@ const ( // tidb_opt_agg_push_down is used to enable/disable the optimizer rule of aggregation push down. TiDBOptAggPushDown = "tidb_opt_agg_push_down" + // TiDBOptBCJ is used to enable/disable broadcast join in MPP mode TiDBOptBCJ = "tidb_opt_broadcast_join" - TiDBOptCARTESIANBCJ = "tidb_opt_cartesian_bcj" + // TiDBOptCartesianBCJ is used to disable/enable broadcast cartesian join in MPP mode + TiDBOptCartesianBCJ = "tidb_opt_broadcast_cartesian_join" // tidb_opt_distinct_agg_push_down is used to decide whether agg with distinct should be pushed to tikv/tiflash. TiDBOptDistinctAggPushDown = "tidb_opt_distinct_agg_push_down" @@ -585,7 +587,7 @@ const ( DefSkipASCIICheck = false DefOptAggPushDown = false DefOptBCJ = false - DefOptCARTESIANBCJ = 1 + DefOptCartesianBCJ = 1 DefOptWriteRowID = false DefOptCorrelationThreshold = 0.9 DefOptCorrelationExpFactor = 1