Skip to content

Commit

Permalink
support push down broadcast cartesian join to TiFlash
Browse files Browse the repository at this point in the history
  • Loading branch information
windtalker committed Jun 2, 2021
1 parent 4896c8b commit b7b4a73
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 12 deletions.
4 changes: 2 additions & 2 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1635,7 +1635,7 @@ func (p *LogicalJoin) shouldUseMPPBCJ() bool {
if p.ctx.GetSessionVars().BroadcastJoinThresholdSize == 0 || p.ctx.GetSessionVars().BroadcastJoinThresholdCount == 0 {
return p.ctx.GetSessionVars().AllowBCJ
}
if len(p.EqualConditions) == 0 && p.ctx.GetSessionVars().AllowCARTESIANBCJ == 2 {
if len(p.EqualConditions) == 0 && p.ctx.GetSessionVars().AllowCartesianBCJ == 2 {
return true
}
if p.JoinType == LeftOuterJoin || p.JoinType == SemiJoin || p.JoinType == AntiSemiJoin {
Expand Down Expand Up @@ -1751,7 +1751,7 @@ func (p *LogicalJoin) tryToGetMppHashJoin(prop *property.PhysicalProperty, useBC
}

if len(p.EqualConditions) == 0 {
if p.ctx.GetSessionVars().AllowCARTESIANBCJ < 1 || !useBCJ {
if p.ctx.GetSessionVars().AllowCartesianBCJ < 1 || !useBCJ {
return nil
}
}
Expand Down
6 changes: 5 additions & 1 deletion planner/core/testdata/integration_serial_suite_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@
"explain format = 'brief' select count(*) from fact_t where exists (select 1 from d1_t where d1_k = fact_t.d1_k)",
"explain format = 'brief' select count(*) from fact_t where exists (select 1 from d1_t where d1_k = fact_t.d1_k and value > fact_t.col1)",
"explain format = 'brief' select count(*) from fact_t where not exists (select 1 from d1_t where d1_k = fact_t.d1_k)",
"explain format = 'brief' select count(*) from fact_t where not exists (select 1 from d1_t where d1_k = fact_t.d1_k and value > fact_t.col1)"
"explain format = 'brief' select count(*) from fact_t where not exists (select 1 from d1_t where d1_k = fact_t.d1_k and value > fact_t.col1)",
"explain format = 'brief' select count(*) from fact_t join d1_t on fact_t.d1_k > d1_t.d1_k",
"explain format = 'brief' select count(*) from fact_t left join d1_t on fact_t.d1_k > d1_t.d1_k",
"explain format = 'brief' select count(*) from fact_t right join d1_t on fact_t.d1_k > d1_t.d1_k",
"explain format = 'brief' select count(*) from fact_t where d1_k not in (select d1_k from d1_t)"
]
},
{
Expand Down
65 changes: 62 additions & 3 deletions planner/core/testdata/integration_serial_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,65 @@
" │ └─TableFullScan 2.00 cop[tiflash] table:d1_t keep order:false",
" └─TableFullScan(Probe) 8.00 cop[tiflash] table:fact_t keep order:false"
]
},
{
"SQL": "explain format = 'brief' select count(*) from fact_t join d1_t on fact_t.d1_k > d1_t.d1_k",
"Plan": [
"HashAgg 1.00 root funcs:count(Column#12)->Column#11",
"└─TableReader 1.00 root data:ExchangeSender",
" └─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough",
" └─HashAgg 1.00 batchCop[tiflash] funcs:count(1)->Column#12",
" └─HashJoin 16.00 batchCop[tiflash] CARTESIAN inner join, other cond:gt(test.fact_t.d1_k, test.d1_t.d1_k)",
" ├─ExchangeReceiver(Build) 2.00 batchCop[tiflash] ",
" │ └─ExchangeSender 2.00 batchCop[tiflash] ExchangeType: Broadcast",
" │ └─Selection 2.00 batchCop[tiflash] not(isnull(test.d1_t.d1_k))",
" │ └─TableFullScan 2.00 batchCop[tiflash] table:d1_t keep order:false",
" └─Selection(Probe) 8.00 batchCop[tiflash] not(isnull(test.fact_t.d1_k))",
" └─TableFullScan 8.00 batchCop[tiflash] table:fact_t keep order:false"
]
},
{
"SQL": "explain format = 'brief' select count(*) from fact_t left join d1_t on fact_t.d1_k > d1_t.d1_k",
"Plan": [
"HashAgg 1.00 root funcs:count(Column#12)->Column#11",
"└─TableReader 1.00 root data:ExchangeSender",
" └─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough",
" └─HashAgg 1.00 batchCop[tiflash] funcs:count(1)->Column#12",
" └─HashJoin 16.00 batchCop[tiflash] CARTESIAN left outer join, other cond:gt(test.fact_t.d1_k, test.d1_t.d1_k)",
" ├─ExchangeReceiver(Build) 2.00 batchCop[tiflash] ",
" │ └─ExchangeSender 2.00 batchCop[tiflash] ExchangeType: Broadcast",
" │ └─Selection 2.00 batchCop[tiflash] not(isnull(test.d1_t.d1_k))",
" │ └─TableFullScan 2.00 batchCop[tiflash] table:d1_t keep order:false",
" └─TableFullScan(Probe) 8.00 batchCop[tiflash] table:fact_t keep order:false"
]
},
{
"SQL": "explain format = 'brief' select count(*) from fact_t right join d1_t on fact_t.d1_k > d1_t.d1_k",
"Plan": [
"HashAgg 1.00 root funcs:count(Column#12)->Column#11",
"└─TableReader 1.00 root data:ExchangeSender",
" └─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough",
" └─HashAgg 1.00 batchCop[tiflash] funcs:count(1)->Column#12",
" └─HashJoin 16.00 batchCop[tiflash] CARTESIAN right outer join, other cond:gt(test.fact_t.d1_k, test.d1_t.d1_k)",
" ├─ExchangeReceiver(Build) 8.00 batchCop[tiflash] ",
" │ └─ExchangeSender 8.00 batchCop[tiflash] ExchangeType: Broadcast",
" │ └─Selection 8.00 batchCop[tiflash] not(isnull(test.fact_t.d1_k))",
" │ └─TableFullScan 8.00 batchCop[tiflash] table:fact_t keep order:false",
" └─TableFullScan(Probe) 2.00 batchCop[tiflash] table:d1_t keep order:false"
]
},
{
"SQL": "explain format = 'brief' select count(*) from fact_t where d1_k not in (select d1_k from d1_t)",
"Plan": [
"StreamAgg 1.00 root funcs:count(1)->Column#11",
"└─TableReader 6.40 root data:ExchangeSender",
" └─ExchangeSender 6.40 cop[tiflash] ExchangeType: PassThrough",
" └─HashJoin 6.40 cop[tiflash] CARTESIAN anti semi join, other cond:eq(test.fact_t.d1_k, test.d1_t.d1_k)",
" ├─ExchangeReceiver(Build) 2.00 cop[tiflash] ",
" │ └─ExchangeSender 2.00 cop[tiflash] ExchangeType: Broadcast",
" │ └─TableFullScan 2.00 cop[tiflash] table:d1_t keep order:false",
" └─TableFullScan(Probe) 8.00 cop[tiflash] table:fact_t keep order:false"
]
}
]
},
Expand Down Expand Up @@ -2618,11 +2677,11 @@
"HashAgg 7992.00 root group by:test.ts.col_char_64, funcs:firstrow(test.ts.col_char_64)->test.ts.col_char_64",
"└─HashJoin 9990.00 root CARTESIAN inner join, other cond:or(ge(test.ts.col_char_64_not_null, Column#25), if(ne(Column#26, 0), NULL, 0))",
" ├─Selection(Build) 0.80 root ne(Column#27, 0)",
" │ └─HashAgg 1.00 root funcs:min(Column#33)->Column#25, funcs:sum(Column#34)->Column#26, funcs:count(Column#35)->Column#27",
" │ └─HashAgg 1.00 root funcs:min(Column#36)->Column#25, funcs:sum(Column#37)->Column#26, funcs:count(Column#38)->Column#27",
" │ └─TableReader 1.00 root data:ExchangeSender",
" │ └─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough",
" │ └─HashAgg 1.00 batchCop[tiflash] funcs:min(Column#39)->Column#33, funcs:sum(Column#40)->Column#34, funcs:count(1)->Column#35",
" │ └─Projection 10000.00 batchCop[tiflash] test.ts.col_varchar_64, cast(isnull(test.ts.col_varchar_64), decimal(22,0) BINARY)->Column#40",
" │ └─HashAgg 1.00 batchCop[tiflash] funcs:min(Column#42)->Column#36, funcs:sum(Column#43)->Column#37, funcs:count(1)->Column#38",
" │ └─Projection 10000.00 batchCop[tiflash] test.ts.col_varchar_64, cast(isnull(test.ts.col_varchar_64), decimal(22,0) BINARY)->Column#43",
" │ └─TableFullScan 10000.00 batchCop[tiflash] table:SUBQUERY4_t1 keep order:false, stats:pseudo",
" └─TableReader(Probe) 12487.50 root data:ExchangeSender",
" └─ExchangeSender 12487.50 cop[tiflash] ExchangeType: PassThrough",
Expand Down
8 changes: 5 additions & 3 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,10 @@ type SessionVars struct {
// AllowBCJ means allow broadcast join.
AllowBCJ bool

// AllowCARTESIANBCJ means allow broadcast join for CARTESIAN join
AllowCARTESIANBCJ int
// AllowCartesianBCJ means allow broadcast CARTESIAN join, 0 means not allow, 1 means allow broadcast CARTESIAN join
// but the table size should under the broadcast threshold, 2 means allow broadcast CARTESIAN join even if the table
// size exceeds the broadcast threshold
AllowCartesianBCJ int

// AllowDistinctAggPushDown can be set true to allow agg with distinct push down to tikv/tiflash.
AllowDistinctAggPushDown bool
Expand Down Expand Up @@ -969,7 +971,7 @@ func NewSessionVars() *SessionVars {
StmtCtx: new(stmtctx.StatementContext),
AllowAggPushDown: false,
AllowBCJ: false,
AllowCARTESIANBCJ: DefOptCARTESIANBCJ,
AllowCartesianBCJ: DefOptCartesianBCJ,
BroadcastJoinThresholdSize: DefBroadcastJoinThresholdSize,
BroadcastJoinThresholdCount: DefBroadcastJoinThresholdSize,
OptimizerSelectivityLevel: DefTiDBOptimizerSelectivityLevel,
Expand Down
2 changes: 1 addition & 1 deletion sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,7 @@ var defaultSysVars = []*SysVar{
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBBuildStatsConcurrency, skipInit: true, Value: strconv.Itoa(DefBuildStatsConcurrency)},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBOptCARTESIANBCJ, Value: strconv.Itoa(DefOptCARTESIANBCJ), Type: TypeInt, MinValue: 0, MaxValue: 2},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBOptCartesianBCJ, Value: strconv.Itoa(DefOptCartesianBCJ), Type: TypeInt, MinValue: 0, MaxValue: 2},
{Scope: ScopeGlobal, Name: TiDBAutoAnalyzeRatio, Value: strconv.FormatFloat(DefAutoAnalyzeRatio, 'f', -1, 64), Type: TypeFloat, MinValue: 0, MaxValue: math.MaxUint64},
{Scope: ScopeGlobal, Name: TiDBAutoAnalyzeStartTime, Value: DefAutoAnalyzeStartTime, Type: TypeTime},
{Scope: ScopeGlobal, Name: TiDBAutoAnalyzeEndTime, Value: DefAutoAnalyzeEndTime, Type: TypeTime},
Expand Down
6 changes: 4 additions & 2 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ const (
// tidb_opt_agg_push_down is used to enable/disable the optimizer rule of aggregation push down.
TiDBOptAggPushDown = "tidb_opt_agg_push_down"

// TiDBOptBCJ is used to enable/disable broadcast join in MPP mode
TiDBOptBCJ = "tidb_opt_broadcast_join"

TiDBOptCARTESIANBCJ = "tidb_opt_cartesian_bcj"
// TiDBOptCartesianBCJ is used to disable/enable broadcast cartesian join in MPP mode
TiDBOptCartesianBCJ = "tidb_opt_broadcast_cartesian_join"

// tidb_opt_distinct_agg_push_down is used to decide whether agg with distinct should be pushed to tikv/tiflash.
TiDBOptDistinctAggPushDown = "tidb_opt_distinct_agg_push_down"
Expand Down Expand Up @@ -585,7 +587,7 @@ const (
DefSkipASCIICheck = false
DefOptAggPushDown = false
DefOptBCJ = false
DefOptCARTESIANBCJ = 1
DefOptCartesianBCJ = 1
DefOptWriteRowID = false
DefOptCorrelationThreshold = 0.9
DefOptCorrelationExpFactor = 1
Expand Down

0 comments on commit b7b4a73

Please sign in to comment.