Skip to content

Commit

Permalink
planner: support push down broadcast cartesian join to TiFlash (#25049)
Browse files Browse the repository at this point in the history
  • Loading branch information
windtalker authored Jun 3, 2021
1 parent 982dcec commit d3de547
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 20 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ require (
github.com/pingcap/parser v0.0.0-20210525032559-c37778aff307
github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3
github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible
github.com/pingcap/tipb v0.0.0-20210525032549-b80be13ddf6c
github.com/pingcap/tipb v0.0.0-20210601083426-79a378b6d1c4
github.com/prometheus/client_golang v1.5.1
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.9.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -448,8 +448,8 @@ github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3/go.mod h1:tckvA041
github.com/pingcap/tidb-dashboard v0.0.0-20210312062513-eef5d6404638/go.mod h1:OzFN8H0EDMMqeulPhPMw2i2JaiZWOKFQ7zdRPhENNgo=
github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible h1:ceznmu/lLseGHP/jKyOa/3u/5H3wtLLLqkH2V3ssSjg=
github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tipb v0.0.0-20210525032549-b80be13ddf6c h1:El3pMBpJHuSkItkHsnBqsaaHzJwFBNDt3Aul98AhREY=
github.com/pingcap/tipb v0.0.0-20210525032549-b80be13ddf6c/go.mod h1:nsEhnMokcn7MRqd2J60yxpn/ac3ZH8A6GOJ9NslabUo=
github.com/pingcap/tipb v0.0.0-20210601083426-79a378b6d1c4 h1:n47+OwdI/uxKenfBT8Y2/be11MwbeLKNLdzOWnxNQKg=
github.com/pingcap/tipb v0.0.0-20210601083426-79a378b6d1c4/go.mod h1:nsEhnMokcn7MRqd2J60yxpn/ac3ZH8A6GOJ9NslabUo=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down
15 changes: 14 additions & 1 deletion planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1661,6 +1661,9 @@ func (p *LogicalJoin) shouldUseMPPBCJ() bool {
if p.ctx.GetSessionVars().BroadcastJoinThresholdSize == 0 || p.ctx.GetSessionVars().BroadcastJoinThresholdCount == 0 {
return p.ctx.GetSessionVars().AllowBCJ
}
if len(p.EqualConditions) == 0 && p.ctx.GetSessionVars().AllowCartesianBCJ == 2 {
return true
}
if p.JoinType == LeftOuterJoin || p.JoinType == SemiJoin || p.JoinType == AntiSemiJoin {
return checkChildFitBC(p.children[1])
} else if p.JoinType == RightOuterJoin {
Expand Down Expand Up @@ -1769,9 +1772,19 @@ func (p *LogicalJoin) tryToGetMppHashJoin(prop *property.PhysicalProperty, useBC
return nil
}

if (p.JoinType != InnerJoin && p.JoinType != LeftOuterJoin && p.JoinType != RightOuterJoin && p.JoinType != SemiJoin && p.JoinType != AntiSemiJoin) || len(p.EqualConditions) == 0 {
if p.JoinType != InnerJoin && p.JoinType != LeftOuterJoin && p.JoinType != RightOuterJoin && p.JoinType != SemiJoin && p.JoinType != AntiSemiJoin {
return nil
}

if len(p.EqualConditions) == 0 {
if p.ctx.GetSessionVars().AllowCartesianBCJ == 0 || !useBCJ {
return nil
}
}
if (len(p.LeftConditions) != 0 && p.JoinType != LeftOuterJoin) || (len(p.RightConditions) != 0 && p.JoinType != RightOuterJoin) {
return nil
}

if prop.PartitionTp == property.BroadcastType {
return nil
}
Expand Down
43 changes: 31 additions & 12 deletions planner/core/plan_to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,25 @@ func (p *PhysicalHashJoin) ToPB(ctx sessionctx.Context, storeType kv.StoreType)
if err != nil {
return nil, err
}
otherConditions, err := expression.ExpressionsToPBList(sc, p.OtherConditions, client)

var otherConditionsInJoin expression.CNFExprs
var otherEqConditionsFromIn expression.CNFExprs
if p.JoinType == AntiSemiJoin {
for _, condition := range p.OtherConditions {
if expression.IsEQCondFromIn(condition) {
otherEqConditionsFromIn = append(otherEqConditionsFromIn, condition)
} else {
otherConditionsInJoin = append(otherConditionsInJoin, condition)
}
}
} else {
otherConditionsInJoin = p.OtherConditions
}
otherConditions, err := expression.ExpressionsToPBList(sc, otherConditionsInJoin, client)
if err != nil {
return nil, err
}
otherEqConditions, err := expression.ExpressionsToPBList(sc, otherEqConditionsFromIn, client)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -397,17 +415,18 @@ func (p *PhysicalHashJoin) ToPB(ctx sessionctx.Context, storeType kv.StoreType)
buildFiledTypes = append(buildFiledTypes, expression.ToPBFieldType(retType))
}
join := &tipb.Join{
JoinType: pbJoinType,
JoinExecType: tipb.JoinExecType_TypeHashJoin,
InnerIdx: int64(p.InnerChildIdx),
LeftJoinKeys: left,
RightJoinKeys: right,
ProbeTypes: probeFiledTypes,
BuildTypes: buildFiledTypes,
LeftConditions: leftConditions,
RightConditions: rightConditions,
OtherConditions: otherConditions,
Children: []*tipb.Executor{lChildren, rChildren},
JoinType: pbJoinType,
JoinExecType: tipb.JoinExecType_TypeHashJoin,
InnerIdx: int64(p.InnerChildIdx),
LeftJoinKeys: left,
RightJoinKeys: right,
ProbeTypes: probeFiledTypes,
BuildTypes: buildFiledTypes,
LeftConditions: leftConditions,
RightConditions: rightConditions,
OtherConditions: otherConditions,
OtherEqConditionsFromIn: otherEqConditions,
Children: []*tipb.Executor{lChildren, rChildren},
}

executorID := p.ExplainID().String()
Expand Down
6 changes: 5 additions & 1 deletion planner/core/testdata/integration_serial_suite_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@
"explain format = 'brief' select count(*) from fact_t where exists (select 1 from d1_t where d1_k = fact_t.d1_k)",
"explain format = 'brief' select count(*) from fact_t where exists (select 1 from d1_t where d1_k = fact_t.d1_k and value > fact_t.col1)",
"explain format = 'brief' select count(*) from fact_t where not exists (select 1 from d1_t where d1_k = fact_t.d1_k)",
"explain format = 'brief' select count(*) from fact_t where not exists (select 1 from d1_t where d1_k = fact_t.d1_k and value > fact_t.col1)"
"explain format = 'brief' select count(*) from fact_t where not exists (select 1 from d1_t where d1_k = fact_t.d1_k and value > fact_t.col1)",
"explain format = 'brief' select count(*) from fact_t join d1_t on fact_t.d1_k > d1_t.d1_k",
"explain format = 'brief' select count(*) from fact_t left join d1_t on fact_t.d1_k > d1_t.d1_k",
"explain format = 'brief' select count(*) from fact_t right join d1_t on fact_t.d1_k > d1_t.d1_k",
"explain format = 'brief' select count(*) from fact_t where d1_k not in (select d1_k from d1_t)"
]
},
{
Expand Down
65 changes: 62 additions & 3 deletions planner/core/testdata/integration_serial_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,65 @@
" │ └─TableFullScan 2.00 cop[tiflash] table:d1_t keep order:false",
" └─TableFullScan(Probe) 8.00 cop[tiflash] table:fact_t keep order:false"
]
},
{
"SQL": "explain format = 'brief' select count(*) from fact_t join d1_t on fact_t.d1_k > d1_t.d1_k",
"Plan": [
"HashAgg 1.00 root funcs:count(Column#12)->Column#11",
"└─TableReader 1.00 root data:ExchangeSender",
" └─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough",
" └─HashAgg 1.00 batchCop[tiflash] funcs:count(1)->Column#12",
" └─HashJoin 16.00 batchCop[tiflash] CARTESIAN inner join, other cond:gt(test.fact_t.d1_k, test.d1_t.d1_k)",
" ├─ExchangeReceiver(Build) 2.00 batchCop[tiflash] ",
" │ └─ExchangeSender 2.00 batchCop[tiflash] ExchangeType: Broadcast",
" │ └─Selection 2.00 batchCop[tiflash] not(isnull(test.d1_t.d1_k))",
" │ └─TableFullScan 2.00 batchCop[tiflash] table:d1_t keep order:false",
" └─Selection(Probe) 8.00 batchCop[tiflash] not(isnull(test.fact_t.d1_k))",
" └─TableFullScan 8.00 batchCop[tiflash] table:fact_t keep order:false"
]
},
{
"SQL": "explain format = 'brief' select count(*) from fact_t left join d1_t on fact_t.d1_k > d1_t.d1_k",
"Plan": [
"HashAgg 1.00 root funcs:count(Column#12)->Column#11",
"└─TableReader 1.00 root data:ExchangeSender",
" └─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough",
" └─HashAgg 1.00 batchCop[tiflash] funcs:count(1)->Column#12",
" └─HashJoin 16.00 batchCop[tiflash] CARTESIAN left outer join, other cond:gt(test.fact_t.d1_k, test.d1_t.d1_k)",
" ├─ExchangeReceiver(Build) 2.00 batchCop[tiflash] ",
" │ └─ExchangeSender 2.00 batchCop[tiflash] ExchangeType: Broadcast",
" │ └─Selection 2.00 batchCop[tiflash] not(isnull(test.d1_t.d1_k))",
" │ └─TableFullScan 2.00 batchCop[tiflash] table:d1_t keep order:false",
" └─TableFullScan(Probe) 8.00 batchCop[tiflash] table:fact_t keep order:false"
]
},
{
"SQL": "explain format = 'brief' select count(*) from fact_t right join d1_t on fact_t.d1_k > d1_t.d1_k",
"Plan": [
"HashAgg 1.00 root funcs:count(Column#12)->Column#11",
"└─TableReader 1.00 root data:ExchangeSender",
" └─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough",
" └─HashAgg 1.00 batchCop[tiflash] funcs:count(1)->Column#12",
" └─HashJoin 16.00 batchCop[tiflash] CARTESIAN right outer join, other cond:gt(test.fact_t.d1_k, test.d1_t.d1_k)",
" ├─ExchangeReceiver(Build) 8.00 batchCop[tiflash] ",
" │ └─ExchangeSender 8.00 batchCop[tiflash] ExchangeType: Broadcast",
" │ └─Selection 8.00 batchCop[tiflash] not(isnull(test.fact_t.d1_k))",
" │ └─TableFullScan 8.00 batchCop[tiflash] table:fact_t keep order:false",
" └─TableFullScan(Probe) 2.00 batchCop[tiflash] table:d1_t keep order:false"
]
},
{
"SQL": "explain format = 'brief' select count(*) from fact_t where d1_k not in (select d1_k from d1_t)",
"Plan": [
"StreamAgg 1.00 root funcs:count(1)->Column#11",
"└─TableReader 6.40 root data:ExchangeSender",
" └─ExchangeSender 6.40 cop[tiflash] ExchangeType: PassThrough",
" └─HashJoin 6.40 cop[tiflash] CARTESIAN anti semi join, other cond:eq(test.fact_t.d1_k, test.d1_t.d1_k)",
" ├─ExchangeReceiver(Build) 2.00 cop[tiflash] ",
" │ └─ExchangeSender 2.00 cop[tiflash] ExchangeType: Broadcast",
" │ └─TableFullScan 2.00 cop[tiflash] table:d1_t keep order:false",
" └─TableFullScan(Probe) 8.00 cop[tiflash] table:fact_t keep order:false"
]
}
]
},
Expand Down Expand Up @@ -2618,11 +2677,11 @@
"HashAgg 7992.00 root group by:test.ts.col_char_64, funcs:firstrow(test.ts.col_char_64)->test.ts.col_char_64",
"└─HashJoin 9990.00 root CARTESIAN inner join, other cond:or(ge(test.ts.col_char_64_not_null, Column#25), if(ne(Column#26, 0), NULL, 0))",
" ├─Selection(Build) 0.80 root ne(Column#27, 0)",
" │ └─HashAgg 1.00 root funcs:min(Column#33)->Column#25, funcs:sum(Column#34)->Column#26, funcs:count(Column#35)->Column#27",
" │ └─HashAgg 1.00 root funcs:min(Column#36)->Column#25, funcs:sum(Column#37)->Column#26, funcs:count(Column#38)->Column#27",
" │ └─TableReader 1.00 root data:ExchangeSender",
" │ └─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough",
" │ └─HashAgg 1.00 batchCop[tiflash] funcs:min(Column#39)->Column#33, funcs:sum(Column#40)->Column#34, funcs:count(1)->Column#35",
" │ └─Projection 10000.00 batchCop[tiflash] test.ts.col_varchar_64, cast(isnull(test.ts.col_varchar_64), decimal(22,0) BINARY)->Column#40",
" │ └─HashAgg 1.00 batchCop[tiflash] funcs:min(Column#42)->Column#36, funcs:sum(Column#43)->Column#37, funcs:count(1)->Column#38",
" │ └─Projection 10000.00 batchCop[tiflash] test.ts.col_varchar_64, cast(isnull(test.ts.col_varchar_64), decimal(22,0) BINARY)->Column#43",
" │ └─TableFullScan 10000.00 batchCop[tiflash] table:SUBQUERY4_t1 keep order:false, stats:pseudo",
" └─TableReader(Probe) 12487.50 root data:ExchangeSender",
" └─ExchangeSender 12487.50 cop[tiflash] ExchangeType: PassThrough",
Expand Down
7 changes: 7 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,12 @@ type SessionVars struct {

// AllowBCJ means allow broadcast join.
AllowBCJ bool

// AllowCartesianBCJ means allow broadcast CARTESIAN join, 0 means not allow, 1 means allow broadcast CARTESIAN join
// but the table size should under the broadcast threshold, 2 means allow broadcast CARTESIAN join even if the table
// size exceeds the broadcast threshold
AllowCartesianBCJ int

// AllowDistinctAggPushDown can be set true to allow agg with distinct push down to tikv/tiflash.
AllowDistinctAggPushDown bool

Expand Down Expand Up @@ -971,6 +977,7 @@ func NewSessionVars() *SessionVars {
StmtCtx: new(stmtctx.StatementContext),
AllowAggPushDown: false,
AllowBCJ: false,
AllowCartesianBCJ: DefOptCartesianBCJ,
BroadcastJoinThresholdSize: DefBroadcastJoinThresholdSize,
BroadcastJoinThresholdCount: DefBroadcastJoinThresholdSize,
OptimizerSelectivityLevel: DefTiDBOptimizerSelectivityLevel,
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,7 @@ var defaultSysVars = []*SysVar{
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBBuildStatsConcurrency, skipInit: true, Value: strconv.Itoa(DefBuildStatsConcurrency)},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBOptCartesianBCJ, Value: strconv.Itoa(DefOptCartesianBCJ), Type: TypeInt, MinValue: 0, MaxValue: 2},
{Scope: ScopeGlobal, Name: TiDBAutoAnalyzeRatio, Value: strconv.FormatFloat(DefAutoAnalyzeRatio, 'f', -1, 64), Type: TypeFloat, MinValue: 0, MaxValue: math.MaxUint64},
{Scope: ScopeGlobal, Name: TiDBAutoAnalyzeStartTime, Value: DefAutoAnalyzeStartTime, Type: TypeTime},
{Scope: ScopeGlobal, Name: TiDBAutoAnalyzeEndTime, Value: DefAutoAnalyzeEndTime, Type: TypeTime},
Expand Down
6 changes: 6 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@ const (
// tidb_opt_agg_push_down is used to enable/disable the optimizer rule of aggregation push down.
TiDBOptAggPushDown = "tidb_opt_agg_push_down"

// TiDBOptBCJ is used to enable/disable broadcast join in MPP mode
TiDBOptBCJ = "tidb_opt_broadcast_join"

// TiDBOptCartesianBCJ is used to disable/enable broadcast cartesian join in MPP mode
TiDBOptCartesianBCJ = "tidb_opt_broadcast_cartesian_join"

// tidb_opt_distinct_agg_push_down is used to decide whether agg with distinct should be pushed to tikv/tiflash.
TiDBOptDistinctAggPushDown = "tidb_opt_distinct_agg_push_down"

Expand Down Expand Up @@ -588,6 +593,7 @@ const (
DefSkipASCIICheck = false
DefOptAggPushDown = false
DefOptBCJ = false
DefOptCartesianBCJ = 1
DefOptWriteRowID = false
DefOptCorrelationThreshold = 0.9
DefOptCorrelationExpFactor = 1
Expand Down

0 comments on commit d3de547

Please sign in to comment.