Skip to content

Commit

Permalink
planner: disable tidb_prefer_broadcast_join_by_exchange_data_size b…
Browse files Browse the repository at this point in the history
…y default; set scale factor to optimize estimating broadcast join; (#42915)

ref #40494, ref #42961
  • Loading branch information
solotzg authored Apr 12, 2023
1 parent 5109e41 commit 65e7457
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 5 deletions.
56 changes: 56 additions & 0 deletions planner/core/casetest/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,62 @@ func TestMPPBCJModel(t *testing.T) {
}
}

func TestMPPPreferBCJ(t *testing.T) {
store := testkit.CreateMockStore(t, internal.WithMockTiFlash(3))
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1 (a int)")
tk.MustExec("drop table if exists t2")
tk.MustExec("create table t2 (b int)")

tk.MustExec("insert into t1 values (1);")
tk.MustExec("insert into t2 values (1), (2), (3), (4), (5), (6), (7), (8);")

{
tk.MustExec("alter table t1 set tiflash replica 1")
tb := external.GetTableByName(t, tk, "test", "t1")
err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
require.NoError(t, err)
}
{
tk.MustExec("alter table t2 set tiflash replica 1")
tb := external.GetTableByName(t, tk, "test", "t2")
err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
require.NoError(t, err)
}
tk.MustExec("analyze table t1")
tk.MustExec("analyze table t2")
tk.MustExec("set @@tidb_allow_mpp=1; set @@tidb_enforce_mpp=1;")
{
var input []string
var output []struct {
SQL string
Plan []string
Warn []string
}
planSuiteData := GetPlanSuiteData()
planSuiteData.LoadTestCases(t, &input, &output)
for i, tt := range input {
testdata.OnRecord(func() {
output[i].SQL = tt
})
if strings.HasPrefix(tt, "set") || strings.HasPrefix(tt, "insert") {
tk.MustExec(tt)
continue
}
testdata.OnRecord(func() {
output[i].SQL = tt
output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery(tt).Rows())
output[i].Warn = testdata.ConvertSQLWarnToStrings(tk.Session().GetSessionVars().StmtCtx.GetWarnings())
})
res := tk.MustQuery(tt)
res.Check(testkit.Rows(output[i].Plan...))
require.Equal(t, output[i].Warn, testdata.ConvertSQLWarnToStrings(tk.Session().GetSessionVars().StmtCtx.GetWarnings()))
}
}
}

func TestMPPBCJModelOneTiFlash(t *testing.T) {
/*
if there are 1 mpp stores, planner should choose broadcast join if `tidb_prefer_broadcast_join_by_exchange_data_size` is ON
Expand Down
10 changes: 10 additions & 0 deletions planner/core/casetest/testdata/plan_suite_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,16 @@
"explain select * from t t1, t t2 where t1.a=t2.a"
]
},
{
"name": "TestMPPPreferBCJ",
"cases": [
"explain select * from t1, t2 where t1.a=t2.b",
"set @@session.tidb_prefer_broadcast_join_by_exchange_data_size=1",
"explain select * from t1, t2 where t1.a=t2.b",
"insert into t2 values (9); analyze table t2;",
"explain select * from t1, t2 where t1.a=t2.b"
]
},
{
"name": "TestMPPBCJModelOneTiFlash",
"cases": [
Expand Down
62 changes: 62 additions & 0 deletions planner/core/casetest/testdata/plan_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -1450,6 +1450,68 @@
}
]
},
{
"Name": "TestMPPPreferBCJ",
"Cases": [
{
"SQL": "explain select * from t1, t2 where t1.a=t2.b",
"Plan": [
"TableReader_34 1.00 root MppVersion: 1, data:ExchangeSender_33",
"└─ExchangeSender_33 1.00 mpp[tiflash] ExchangeType: PassThrough",
" └─HashJoin_32 1.00 mpp[tiflash] inner join, equal:[eq(test.t1.a, test.t2.b)]",
" ├─ExchangeReceiver_15(Build) 1.00 mpp[tiflash] ",
" │ └─ExchangeSender_14 1.00 mpp[tiflash] ExchangeType: Broadcast, Compression: FAST",
" │ └─Selection_13 1.00 mpp[tiflash] not(isnull(test.t1.a))",
" │ └─TableFullScan_12 1.00 mpp[tiflash] table:t1 pushed down filter:empty, keep order:false",
" └─Selection_17(Probe) 8.00 mpp[tiflash] not(isnull(test.t2.b))",
" └─TableFullScan_16 8.00 mpp[tiflash] table:t2 pushed down filter:empty, keep order:false"
],
"Warn": null
},
{
"SQL": "set @@session.tidb_prefer_broadcast_join_by_exchange_data_size=1",
"Plan": null,
"Warn": null
},
{
"SQL": "explain select * from t1, t2 where t1.a=t2.b",
"Plan": [
"TableReader_36 1.00 root MppVersion: 1, data:ExchangeSender_35",
"└─ExchangeSender_35 1.00 mpp[tiflash] ExchangeType: PassThrough",
" └─HashJoin_34 1.00 mpp[tiflash] inner join, equal:[eq(test.t1.a, test.t2.b)]",
" ├─ExchangeReceiver_15(Build) 1.00 mpp[tiflash] ",
" │ └─ExchangeSender_14 1.00 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: test.t1.a, collate: binary]",
" │ └─Selection_13 1.00 mpp[tiflash] not(isnull(test.t1.a))",
" │ └─TableFullScan_12 1.00 mpp[tiflash] table:t1 pushed down filter:empty, keep order:false",
" └─ExchangeReceiver_19(Probe) 8.00 mpp[tiflash] ",
" └─ExchangeSender_18 8.00 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: test.t2.b, collate: binary]",
" └─Selection_17 8.00 mpp[tiflash] not(isnull(test.t2.b))",
" └─TableFullScan_16 8.00 mpp[tiflash] table:t2 pushed down filter:empty, keep order:false"
],
"Warn": null
},
{
"SQL": "insert into t2 values (9); analyze table t2;",
"Plan": null,
"Warn": null
},
{
"SQL": "explain select * from t1, t2 where t1.a=t2.b",
"Plan": [
"TableReader_34 1.00 root MppVersion: 1, data:ExchangeSender_33",
"└─ExchangeSender_33 1.00 mpp[tiflash] ExchangeType: PassThrough",
" └─HashJoin_32 1.00 mpp[tiflash] inner join, equal:[eq(test.t1.a, test.t2.b)]",
" ├─ExchangeReceiver_15(Build) 1.00 mpp[tiflash] ",
" │ └─ExchangeSender_14 1.00 mpp[tiflash] ExchangeType: Broadcast, Compression: FAST",
" │ └─Selection_13 1.00 mpp[tiflash] not(isnull(test.t1.a))",
" │ └─TableFullScan_12 1.00 mpp[tiflash] table:t1 pushed down filter:empty, keep order:false",
" └─Selection_17(Probe) 9.00 mpp[tiflash] not(isnull(test.t2.b))",
" └─TableFullScan_16 9.00 mpp[tiflash] table:t2 pushed down filter:empty, keep order:false"
],
"Warn": null
}
]
},
{
"Name": "TestMPPBCJModelOneTiFlash",
"Cases": [
Expand Down
13 changes: 9 additions & 4 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -2129,23 +2129,28 @@ func calcHashExchangeSizeByChild(p1 Plan, p2 Plan, mppStoreCnt int) (float64, fl
return row1 + row2, 0, false
}

// The size of `Build` hash table when using broadcast join is about `X`.
// The size of `Build` hash table when using shuffle join is about `X / (mppStoreCnt)`.
// It will cost more time to construct `Build` hash table and search `Probe` while using broadcast join.
// Set a scale factor (`mppStoreCnt^*`) when estimating broadcast join in `isJoinFitMPPBCJ` and `isJoinChildFitMPPBCJ` (based on TPCH benchmark, it has been verified in Q9).

func isJoinFitMPPBCJ(p *LogicalJoin, mppStoreCnt int) bool {
rowBC, szBC, hasSizeBC := calcBroadcastExchangeSizeByChild(p.children[0], p.children[1], mppStoreCnt)
rowHash, szHash, hasSizeHash := calcHashExchangeSizeByChild(p.children[0], p.children[1], mppStoreCnt)
if hasSizeBC && hasSizeHash {
return szBC <= szHash
return szBC*float64(mppStoreCnt) <= szHash
}
return rowBC <= rowHash
return rowBC*float64(mppStoreCnt) <= rowHash
}

func isJoinChildFitMPPBCJ(p *LogicalJoin, childIndexToBC int, mppStoreCnt int) bool {
rowBC, szBC, hasSizeBC := calcBroadcastExchangeSize(p.children[childIndexToBC], mppStoreCnt)
rowHash, szHash, hasSizeHash := calcHashExchangeSizeByChild(p.children[0], p.children[1], mppStoreCnt)

if hasSizeBC && hasSizeHash {
return szBC <= szHash
return szBC*float64(mppStoreCnt) <= szHash
}
return rowBC <= rowHash
return rowBC*float64(mppStoreCnt) <= rowHash
}

// If we can use mpp broadcast join, that's our first choice.
Expand Down
2 changes: 1 addition & 1 deletion sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -1045,7 +1045,7 @@ const (
DefTiDBProjectionConcurrency = ConcurrencyUnset
DefBroadcastJoinThresholdSize = 100 * 1024 * 1024
DefBroadcastJoinThresholdCount = 10 * 1024
DefPreferBCJByExchangeDataSize = true
DefPreferBCJByExchangeDataSize = false
DefTiDBOptimizerSelectivityLevel = 0
DefTiDBOptimizerEnableNewOFGB = false
DefTiDBEnableOuterJoinReorder = true
Expand Down

0 comments on commit 65e7457

Please sign in to comment.