diff --git a/planner/core/casetest/physical_plan_test.go b/planner/core/casetest/physical_plan_test.go index e441794ae064f..3adea443c5a07 100644 --- a/planner/core/casetest/physical_plan_test.go +++ b/planner/core/casetest/physical_plan_test.go @@ -823,6 +823,62 @@ func TestMPPBCJModel(t *testing.T) { } } +func TestMPPPreferBCJ(t *testing.T) { + store := testkit.CreateMockStore(t, internal.WithMockTiFlash(3)) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1 (a int)") + tk.MustExec("drop table if exists t2") + tk.MustExec("create table t2 (b int)") + + tk.MustExec("insert into t1 values (1);") + tk.MustExec("insert into t2 values (1), (2), (3), (4), (5), (6), (7), (8);") + + { + tk.MustExec("alter table t1 set tiflash replica 1") + tb := external.GetTableByName(t, tk, "test", "t1") + err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true) + require.NoError(t, err) + } + { + tk.MustExec("alter table t2 set tiflash replica 1") + tb := external.GetTableByName(t, tk, "test", "t2") + err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true) + require.NoError(t, err) + } + tk.MustExec("analyze table t1") + tk.MustExec("analyze table t2") + tk.MustExec("set @@tidb_allow_mpp=1; set @@tidb_enforce_mpp=1;") + { + var input []string + var output []struct { + SQL string + Plan []string + Warn []string + } + planSuiteData := GetPlanSuiteData() + planSuiteData.LoadTestCases(t, &input, &output) + for i, tt := range input { + testdata.OnRecord(func() { + output[i].SQL = tt + }) + if strings.HasPrefix(tt, "set") || strings.HasPrefix(tt, "insert") { + tk.MustExec(tt) + continue + } + testdata.OnRecord(func() { + output[i].SQL = tt + output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery(tt).Rows()) + output[i].Warn = testdata.ConvertSQLWarnToStrings(tk.Session().GetSessionVars().StmtCtx.GetWarnings()) + }) + res := tk.MustQuery(tt) + res.Check(testkit.Rows(output[i].Plan...)) + require.Equal(t, output[i].Warn, testdata.ConvertSQLWarnToStrings(tk.Session().GetSessionVars().StmtCtx.GetWarnings())) + } + } +} + func TestMPPBCJModelOneTiFlash(t *testing.T) { /* if there are 1 mpp stores, planner should choose broadcast join if `tidb_prefer_broadcast_join_by_exchange_data_size` is ON diff --git a/planner/core/casetest/testdata/plan_suite_in.json b/planner/core/casetest/testdata/plan_suite_in.json index 5b5b459536fda..891b618fcf944 100644 --- a/planner/core/casetest/testdata/plan_suite_in.json +++ b/planner/core/casetest/testdata/plan_suite_in.json @@ -135,6 +135,16 @@ "explain select * from t t1, t t2 where t1.a=t2.a" ] }, + { + "name": "TestMPPPreferBCJ", + "cases": [ + "explain select * from t1, t2 where t1.a=t2.b", + "set @@session.tidb_prefer_broadcast_join_by_exchange_data_size=1", + "explain select * from t1, t2 where t1.a=t2.b", + "insert into t2 values (9); analyze table t2;", + "explain select * from t1, t2 where t1.a=t2.b" + ] + }, { "name": "TestMPPBCJModelOneTiFlash", "cases": [ diff --git a/planner/core/casetest/testdata/plan_suite_out.json b/planner/core/casetest/testdata/plan_suite_out.json index 059adad98c288..f9f784a4d21ac 100644 --- a/planner/core/casetest/testdata/plan_suite_out.json +++ b/planner/core/casetest/testdata/plan_suite_out.json @@ -1450,6 +1450,68 @@ } ] }, + { + "Name": "TestMPPPreferBCJ", + "Cases": [ + { + "SQL": "explain select * from t1, t2 where t1.a=t2.b", + "Plan": [ + "TableReader_34 1.00 root MppVersion: 1, data:ExchangeSender_33", + "└─ExchangeSender_33 1.00 mpp[tiflash] ExchangeType: PassThrough", + " └─HashJoin_32 1.00 mpp[tiflash] inner join, equal:[eq(test.t1.a, test.t2.b)]", + " ├─ExchangeReceiver_15(Build) 1.00 mpp[tiflash] ", + " │ └─ExchangeSender_14 1.00 mpp[tiflash] ExchangeType: Broadcast, Compression: FAST", + " │ └─Selection_13 1.00 mpp[tiflash] not(isnull(test.t1.a))", + " │ └─TableFullScan_12 1.00 mpp[tiflash] table:t1 pushed down filter:empty, keep order:false", + " └─Selection_17(Probe) 8.00 mpp[tiflash] not(isnull(test.t2.b))", + " └─TableFullScan_16 8.00 mpp[tiflash] table:t2 pushed down filter:empty, keep order:false" + ], + "Warn": null + }, + { + "SQL": "set @@session.tidb_prefer_broadcast_join_by_exchange_data_size=1", + "Plan": null, + "Warn": null + }, + { + "SQL": "explain select * from t1, t2 where t1.a=t2.b", + "Plan": [ + "TableReader_36 1.00 root MppVersion: 1, data:ExchangeSender_35", + "└─ExchangeSender_35 1.00 mpp[tiflash] ExchangeType: PassThrough", + " └─HashJoin_34 1.00 mpp[tiflash] inner join, equal:[eq(test.t1.a, test.t2.b)]", + " ├─ExchangeReceiver_15(Build) 1.00 mpp[tiflash] ", + " │ └─ExchangeSender_14 1.00 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: test.t1.a, collate: binary]", + " │ └─Selection_13 1.00 mpp[tiflash] not(isnull(test.t1.a))", + " │ └─TableFullScan_12 1.00 mpp[tiflash] table:t1 pushed down filter:empty, keep order:false", + " └─ExchangeReceiver_19(Probe) 8.00 mpp[tiflash] ", + " └─ExchangeSender_18 8.00 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: test.t2.b, collate: binary]", + " └─Selection_17 8.00 mpp[tiflash] not(isnull(test.t2.b))", + " └─TableFullScan_16 8.00 mpp[tiflash] table:t2 pushed down filter:empty, keep order:false" + ], + "Warn": null + }, + { + "SQL": "insert into t2 values (9); analyze table t2;", + "Plan": null, + "Warn": null + }, + { + "SQL": "explain select * from t1, t2 where t1.a=t2.b", + "Plan": [ + "TableReader_34 1.00 root MppVersion: 1, data:ExchangeSender_33", + "└─ExchangeSender_33 1.00 mpp[tiflash] ExchangeType: PassThrough", + " └─HashJoin_32 1.00 mpp[tiflash] inner join, equal:[eq(test.t1.a, test.t2.b)]", + " ├─ExchangeReceiver_15(Build) 1.00 mpp[tiflash] ", + " │ └─ExchangeSender_14 1.00 mpp[tiflash] ExchangeType: Broadcast, Compression: FAST", + " │ └─Selection_13 1.00 mpp[tiflash] not(isnull(test.t1.a))", + " │ └─TableFullScan_12 1.00 mpp[tiflash] table:t1 pushed down filter:empty, keep order:false", + " └─Selection_17(Probe) 9.00 mpp[tiflash] not(isnull(test.t2.b))", + " └─TableFullScan_16 9.00 mpp[tiflash] table:t2 pushed down filter:empty, keep order:false" + ], + "Warn": null + } + ] + }, { "Name": "TestMPPBCJModelOneTiFlash", "Cases": [ diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index 60c0286a7b863..933978eb513ee 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -2129,13 +2129,18 @@ func calcHashExchangeSizeByChild(p1 Plan, p2 Plan, mppStoreCnt int) (float64, fl return row1 + row2, 0, false } +// The size of `Build` hash table when using broadcast join is about `X`. +// The size of `Build` hash table when using shuffle join is about `X / (mppStoreCnt)`. +// It will cost more time to construct `Build` hash table and search `Probe` while using broadcast join. +// Set a scale factor (`mppStoreCnt^*`) when estimating broadcast join in `isJoinFitMPPBCJ` and `isJoinChildFitMPPBCJ` (based on TPCH benchmark, it has been verified in Q9). + func isJoinFitMPPBCJ(p *LogicalJoin, mppStoreCnt int) bool { rowBC, szBC, hasSizeBC := calcBroadcastExchangeSizeByChild(p.children[0], p.children[1], mppStoreCnt) rowHash, szHash, hasSizeHash := calcHashExchangeSizeByChild(p.children[0], p.children[1], mppStoreCnt) if hasSizeBC && hasSizeHash { - return szBC <= szHash + return szBC*float64(mppStoreCnt) <= szHash } - return rowBC <= rowHash + return rowBC*float64(mppStoreCnt) <= rowHash } func isJoinChildFitMPPBCJ(p *LogicalJoin, childIndexToBC int, mppStoreCnt int) bool { @@ -2143,9 +2148,9 @@ func isJoinChildFitMPPBCJ(p *LogicalJoin, childIndexToBC int, mppStoreCnt int) b rowHash, szHash, hasSizeHash := calcHashExchangeSizeByChild(p.children[0], p.children[1], mppStoreCnt) if hasSizeBC && hasSizeHash { - return szBC <= szHash + return szBC*float64(mppStoreCnt) <= szHash } - return rowBC <= rowHash + return rowBC*float64(mppStoreCnt) <= rowHash } // If we can use mpp broadcast join, that's our first choice. diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 3564211e58210..10917f4f8b751 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -1045,7 +1045,7 @@ const ( DefTiDBProjectionConcurrency = ConcurrencyUnset DefBroadcastJoinThresholdSize = 100 * 1024 * 1024 DefBroadcastJoinThresholdCount = 10 * 1024 - DefPreferBCJByExchangeDataSize = true + DefPreferBCJByExchangeDataSize = false DefTiDBOptimizerSelectivityLevel = 0 DefTiDBOptimizerEnableNewOFGB = false DefTiDBEnableOuterJoinReorder = true