From 896503044c197082b75dfef4757748a260041fcd Mon Sep 17 00:00:00 2001 From: Zhi Qi Date: Sat, 8 May 2021 16:45:29 +0800 Subject: [PATCH 1/9] expand var tidb_allow_mpp values from {true, false} to {0,1,2} --- executor/mpp_gather.go | 2 +- planner/core/exhaust_physical_plans.go | 6 +++--- planner/core/task.go | 8 ++++++-- sessionctx/variable/session.go | 7 ++++--- sessionctx/variable/sysvar.go | 4 ++-- sessionctx/variable/tidb_vars.go | 2 +- 6 files changed, 17 insertions(+), 12 deletions(-) diff --git a/executor/mpp_gather.go b/executor/mpp_gather.go index 64236558af94e..5bb2a3f7f9933 100644 --- a/executor/mpp_gather.go +++ b/executor/mpp_gather.go @@ -30,7 +30,7 @@ import ( ) func useMPPExecution(ctx sessionctx.Context, tr *plannercore.PhysicalTableReader) bool { - if !ctx.GetSessionVars().AllowMPPExecution { + if ctx.GetSessionVars().AllowMPPExecution == 0 { return false } _, ok := tr.GetTablePlan().(*plannercore.PhysicalExchangeSender) diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index d4f2923b6220a..eb980e792ed86 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -1662,7 +1662,7 @@ func (p *LogicalJoin) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]P } joins := make([]PhysicalPlan, 0, 8) canPushToTiFlash := p.canPushToCop(kv.TiFlash) - if p.ctx.GetSessionVars().AllowMPPExecution && canPushToTiFlash { + if p.ctx.GetSessionVars().AllowMPPExecution != 0 && canPushToTiFlash { if p.shouldUseMPPBCJ() { mppJoins := p.tryToGetMppHashJoin(prop, true) if (p.preferJoinType & preferBCJoin) > 0 { @@ -1965,7 +1965,7 @@ func (lt *LogicalTopN) getPhysTopN(prop *property.PhysicalProperty) []PhysicalPl if !lt.limitHints.preferLimitToCop { allTaskTypes = append(allTaskTypes, property.RootTaskType) } - if lt.ctx.GetSessionVars().AllowMPPExecution { + if lt.ctx.GetSessionVars().AllowMPPExecution != 0 { allTaskTypes = append(allTaskTypes, property.MppTaskType) } ret := make([]PhysicalPlan, 0, len(allTaskTypes)) @@ -2355,7 +2355,7 @@ func (la *LogicalAggregation) getHashAggs(prop *property.PhysicalProperty) []Phy taskTypes = append(taskTypes, property.CopTiFlashLocalReadTaskType) } canPushDownToTiFlash := la.canPushToCop(kv.TiFlash) - canPushDownToMPP := la.ctx.GetSessionVars().AllowMPPExecution && la.checkCanPushDownToMPP() && canPushDownToTiFlash + canPushDownToMPP := la.ctx.GetSessionVars().AllowMPPExecution != 0 && la.checkCanPushDownToMPP() && canPushDownToTiFlash if la.HasDistinct() { // TODO: remove after the cost estimation of distinct pushdown is implemented. if !la.ctx.GetSessionVars().AllowDistinctAggPushDown { diff --git a/planner/core/task.go b/planner/core/task.go index 82045579be155..23d049f7aabe9 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -2028,11 +2028,15 @@ func (t *mppTask) convertToRootTaskImpl(ctx sessionctx.Context) *rootTask { StoreType: kv.TiFlash, }.Init(ctx, t.p.SelectBlockOffset()) p.stats = t.p.statsInfo() + + p.cost = t.cst / p.ctx.GetSessionVars().CopTiFlashConcurrencyFactor + if p.ctx.GetSessionVars().AllowMPPExecution == 2 { + p.cost = 0 + } rt := &rootTask{ p: p, - cst: t.cst / p.ctx.GetSessionVars().CopTiFlashConcurrencyFactor, + cst: p.cost, } - p.cost = rt.cost() return rt } diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index a154d3d451b06..31f0a3f108073 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -491,11 +491,12 @@ type SessionVars struct { AllowWriteRowID bool // AllowBatchCop means if we should send batch coprocessor to TiFlash. Default value is 1, means to use batch cop in case of aggregation and join. - // If value is set to 2 , which means to force to send batch cop for any query. Value is set to 0 means never use batch cop. + // Value set to 2 means to force to send batch cop for any query. Value set to 0 means never use batch cop. AllowBatchCop int - // AllowMPPExecution will prefer using mpp way to execute a query. - AllowMPPExecution bool + // AllowMPPExecution means if we should use mpp way to execute query. Default value is 1, means to be determined by the optimizer. + // Value set to 2 which means to use mpp whenever possible. Value set to 0 means never use mpp. + AllowMPPExecution int // TiDBAllowAutoRandExplicitInsert indicates whether explicit insertion on auto_random column is allowed. AllowAutoRandExplicitInsert bool diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 751f0e70f0469..fa68cd6a378c6 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -665,8 +665,8 @@ var defaultSysVars = []*SysVar{ } return nil }}, - {Scope: ScopeGlobal | ScopeSession, Name: TiDBAllowMPPExecution, Type: TypeBool, Value: BoolToOnOff(DefTiDBAllowMPPExecution), SetSession: func(s *SessionVars, val string) error { - s.AllowMPPExecution = TiDBOptOn(val) + {Scope: ScopeGlobal | ScopeSession, Name: TiDBAllowMPPExecution, Value: strconv.Itoa(DefTiDBAllowMPPExecution), Type: TypeInt, MinValue: 0, MaxValue: 2, SetSession: func(s *SessionVars, val string) error { + s.AllowMPPExecution = int(tidbOptInt64(val, DefTiDBAllowBatchCop)) return nil }}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBBCJThresholdCount, Value: strconv.Itoa(DefBroadcastJoinThresholdCount), Type: TypeInt, MinValue: 0, MaxValue: math.MaxInt64, SetSession: func(s *SessionVars, val string) error { diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 30d52ac54f386..f34b87dd25221 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -614,7 +614,7 @@ const ( DefBroadcastJoinThresholdCount = 10 * 1024 DefTiDBOptimizerSelectivityLevel = 0 DefTiDBAllowBatchCop = 1 - DefTiDBAllowMPPExecution = true + DefTiDBAllowMPPExecution = 1 DefTiDBTxnMode = "" DefTiDBRowFormatV1 = 1 DefTiDBRowFormatV2 = 2 From 746785a4b17ead57ec3cf737ecc6eb1a3a040a0c Mon Sep 17 00:00:00 2001 From: Zhi Qi Date: Sat, 8 May 2021 17:45:53 +0800 Subject: [PATCH 2/9] use typeEnum instead of typeInt to keep compatible with old versions. --- executor/mpp_gather.go | 2 +- planner/core/exhaust_physical_plans.go | 6 +++--- planner/core/task.go | 2 +- sessionctx/variable/session.go | 6 +++--- sessionctx/variable/sysvar.go | 4 ++-- sessionctx/variable/tidb_vars.go | 4 +++- 6 files changed, 13 insertions(+), 11 deletions(-) diff --git a/executor/mpp_gather.go b/executor/mpp_gather.go index 5bb2a3f7f9933..086496758d024 100644 --- a/executor/mpp_gather.go +++ b/executor/mpp_gather.go @@ -30,7 +30,7 @@ import ( ) func useMPPExecution(ctx sessionctx.Context, tr *plannercore.PhysicalTableReader) bool { - if ctx.GetSessionVars().AllowMPPExecution == 0 { + if ctx.GetSessionVars().AllowMPPExecution == "OFF" { return false } _, ok := tr.GetTablePlan().(*plannercore.PhysicalExchangeSender) diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index eb980e792ed86..1f2823bb1d8a5 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -1662,7 +1662,7 @@ func (p *LogicalJoin) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]P } joins := make([]PhysicalPlan, 0, 8) canPushToTiFlash := p.canPushToCop(kv.TiFlash) - if p.ctx.GetSessionVars().AllowMPPExecution != 0 && canPushToTiFlash { + if p.ctx.GetSessionVars().AllowMPPExecution != "OFF" && canPushToTiFlash { if p.shouldUseMPPBCJ() { mppJoins := p.tryToGetMppHashJoin(prop, true) if (p.preferJoinType & preferBCJoin) > 0 { @@ -1965,7 +1965,7 @@ func (lt *LogicalTopN) getPhysTopN(prop *property.PhysicalProperty) []PhysicalPl if !lt.limitHints.preferLimitToCop { allTaskTypes = append(allTaskTypes, property.RootTaskType) } - if lt.ctx.GetSessionVars().AllowMPPExecution != 0 { + if lt.ctx.GetSessionVars().AllowMPPExecution != "OFF" { allTaskTypes = append(allTaskTypes, property.MppTaskType) } ret := make([]PhysicalPlan, 0, len(allTaskTypes)) @@ -2355,7 +2355,7 @@ func (la *LogicalAggregation) getHashAggs(prop *property.PhysicalProperty) []Phy taskTypes = append(taskTypes, property.CopTiFlashLocalReadTaskType) } canPushDownToTiFlash := la.canPushToCop(kv.TiFlash) - canPushDownToMPP := la.ctx.GetSessionVars().AllowMPPExecution != 0 && la.checkCanPushDownToMPP() && canPushDownToTiFlash + canPushDownToMPP := la.ctx.GetSessionVars().AllowMPPExecution != "OFF" && la.checkCanPushDownToMPP() && canPushDownToTiFlash if la.HasDistinct() { // TODO: remove after the cost estimation of distinct pushdown is implemented. if !la.ctx.GetSessionVars().AllowDistinctAggPushDown { diff --git a/planner/core/task.go b/planner/core/task.go index 23d049f7aabe9..006d58b9a3dd9 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -2030,7 +2030,7 @@ func (t *mppTask) convertToRootTaskImpl(ctx sessionctx.Context) *rootTask { p.stats = t.p.statsInfo() p.cost = t.cst / p.ctx.GetSessionVars().CopTiFlashConcurrencyFactor - if p.ctx.GetSessionVars().AllowMPPExecution == 2 { + if p.ctx.GetSessionVars().AllowMPPExecution == "ENFORCE" { p.cost = 0 } rt := &rootTask{ diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 31f0a3f108073..423a6e46e2fa4 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -494,9 +494,9 @@ type SessionVars struct { // Value set to 2 means to force to send batch cop for any query. Value set to 0 means never use batch cop. AllowBatchCop int - // AllowMPPExecution means if we should use mpp way to execute query. Default value is 1, means to be determined by the optimizer. - // Value set to 2 which means to use mpp whenever possible. Value set to 0 means never use mpp. - AllowMPPExecution int + // AllowMPPExecution means if we should use mpp way to execute query. Default value is 1 (or 'ON'), means to be determined by the optimizer. + // Value set to 2 (or 'ENFORCE') which means to use mpp whenever possible. Value set to 2 (or 'OFF') means never use mpp. + AllowMPPExecution string // TiDBAllowAutoRandExplicitInsert indicates whether explicit insertion on auto_random column is allowed. AllowAutoRandExplicitInsert bool diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index fa68cd6a378c6..d749dcb994f30 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -665,8 +665,8 @@ var defaultSysVars = []*SysVar{ } return nil }}, - {Scope: ScopeGlobal | ScopeSession, Name: TiDBAllowMPPExecution, Value: strconv.Itoa(DefTiDBAllowMPPExecution), Type: TypeInt, MinValue: 0, MaxValue: 2, SetSession: func(s *SessionVars, val string) error { - s.AllowMPPExecution = int(tidbOptInt64(val, DefTiDBAllowBatchCop)) + {Scope: ScopeGlobal | ScopeSession, Name: TiDBAllowMPPExecution, Value: On, Type: TypeEnum, PossibleValues: []string{Off, On, "ENFORCE"}, SetSession: func(s *SessionVars, val string) error { + s.AllowMPPExecution = val return nil }}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBBCJThresholdCount, Value: strconv.Itoa(DefBroadcastJoinThresholdCount), Type: TypeInt, MinValue: 0, MaxValue: math.MaxInt64, SetSession: func(s *SessionVars, val string) error { diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index f34b87dd25221..0c21d330e8d7a 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -291,6 +291,8 @@ const ( // The default value is 0 TiDBAllowBatchCop = "tidb_allow_batch_cop" + // TiDBAllowMPPExecution means if we should use mpp way to execute query. Default value is 1 (or 'ON'), means to be determined by the optimizer. + // Value set to 2 (or 'ENFORCE') which means to use mpp whenever possible. Value set to 2 (or 'OFF') means never use mpp. TiDBAllowMPPExecution = "tidb_allow_mpp" // TiDBInitChunkSize is used to control the init chunk size during query execution. @@ -614,7 +616,7 @@ const ( DefBroadcastJoinThresholdCount = 10 * 1024 DefTiDBOptimizerSelectivityLevel = 0 DefTiDBAllowBatchCop = 1 - DefTiDBAllowMPPExecution = 1 + DefTiDBAllowMPPExecution = "ON" DefTiDBTxnMode = "" DefTiDBRowFormatV1 = 1 DefTiDBRowFormatV2 = 2 From 6ed580a776cb5556bba3cafa32addd89b5f4e424 Mon Sep 17 00:00:00 2001 From: Zhi Qi Date: Mon, 10 May 2021 15:00:04 +0800 Subject: [PATCH 3/9] add test --- planner/core/integration_test.go | 90 ++++++++++++++++++++++++++++++++ 1 file changed, 90 insertions(+) diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index 0a6a560802240..df5863adca62b 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -3608,3 +3608,93 @@ func (s *testIntegrationSuite) TestSequenceAsDataSource(c *C) { tk.MustQuery("explain format = 'brief' " + tt).Check(testkit.Rows(output[i].Plan...)) } } + +func (s *testIntegrationSerialSuite) TestEnforceMPP(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int)") + tk.MustExec("create index idx on t(a)") + + // Create virtual tiflash replica info. + dom := domain.GetDomain(tk.Se) + is := dom.InfoSchema() + db, exists := is.SchemaByName(model.NewCIStr("test")) + c.Assert(exists, IsTrue) + for _, tblInfo := range db.Tables { + if tblInfo.Name.L == "t" { + tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{ + Count: 1, + Available: true, + } + } + } + + // ban mpp + tk.MustExec("set @@session.tidb_allow_mpp = 0") + tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("OFF")) + + // read from tiflash, batch cop. + tk.MustQuery("explain format='verbose' select /*+ read_from_storage(tiflash[t]) */ count(*) from t where a=1").Check(testkit.Rows( + "StreamAgg_20 1.00 285050.00 root funcs:count(Column#5)->Column#3", + "└─TableReader_21 1.00 19003.88 root data:StreamAgg_9", + " └─StreamAgg_9 1.00 19006.88 batchCop[tiflash] funcs:count(1)->Column#5", + " └─Selection_19 10.00 285020.00 batchCop[tiflash] eq(test.t.a, 1)", + " └─TableFullScan_18 10000.00 255020.00 batchCop[tiflash] table:t keep order:false, stats:pseudo")) + + // open mpp + tk.MustExec("set @@session.tidb_allow_mpp = 1") + tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ON")) + + // should use tikv to index read + tk.MustQuery("explain format='verbose' select count(*) from t where a=1;").Check(testkit.Rows( + "StreamAgg_30 1.00 485.00 root funcs:count(Column#6)->Column#3", + "└─IndexReader_31 1.00 32.88 root index:StreamAgg_10", + " └─StreamAgg_10 1.00 35.88 cop[tikv] funcs:count(1)->Column#6", + " └─IndexRangeScan_29 10.00 455.00 cop[tikv] table:t, index:idx(a) range:[1,1], keep order:false, stats:pseudo")) + + // read from tikv, indexRead + tk.MustQuery("explain format='verbose' select /*+ read_from_storage(tikv[t]) */ count(*) from t where a=1;").Check(testkit.Rows( + "StreamAgg_18 1.00 485.00 root funcs:count(Column#5)->Column#3", + "└─IndexReader_19 1.00 32.88 root index:StreamAgg_10", + " └─StreamAgg_10 1.00 35.88 cop[tikv] funcs:count(1)->Column#5", + " └─IndexRangeScan_17 10.00 455.00 cop[tikv] table:t, index:idx(a) range:[1,1], keep order:false, stats:pseudo")) + + // read from tiflash, mpp with large cost + tk.MustQuery("explain format='verbose' select /*+ read_from_storage(tiflash[t]) */ count(*) from t where a=1").Check(testkit.Rows( + "HashAgg_21 1.00 11910.68 root funcs:count(Column#5)->Column#3", + "└─TableReader_23 1.00 11877.08 root data:ExchangeSender_22", + " └─ExchangeSender_22 1.00 285050.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─HashAgg_9 1.00 285050.00 batchCop[tiflash] funcs:count(1)->Column#5", + " └─Selection_20 10.00 285020.00 batchCop[tiflash] eq(test.t.a, 1)", + " └─TableFullScan_19 10000.00 255020.00 batchCop[tiflash] table:t keep order:false, stats:pseudo")) + + // enforce mpp + tk.MustExec("set @@session.tidb_allow_mpp = 2") + tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ENFORCE")) + + // should use mpp + tk.MustQuery("explain format='verbose' select count(*) from t where a=1;").Check(testkit.Rows( + "HashAgg_24 1.00 33.60 root funcs:count(Column#5)->Column#3", + "└─TableReader_26 1.00 0.00 root data:ExchangeSender_25", + " └─ExchangeSender_25 1.00 285050.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─HashAgg_9 1.00 285050.00 batchCop[tiflash] funcs:count(1)->Column#5", + " └─Selection_23 10.00 285020.00 batchCop[tiflash] eq(test.t.a, 1)", + " └─TableFullScan_22 10000.00 255020.00 batchCop[tiflash] table:t keep order:false, stats:pseudo")) + + // read from tikv, indexRead + tk.MustQuery("explain format='verbose' select /*+ read_from_storage(tikv[t]) */ count(*) from t where a=1;").Check(testkit.Rows( + "StreamAgg_18 1.00 485.00 root funcs:count(Column#5)->Column#3", + "└─IndexReader_19 1.00 32.88 root index:StreamAgg_10", + " └─StreamAgg_10 1.00 35.88 cop[tikv] funcs:count(1)->Column#5", + " └─IndexRangeScan_17 10.00 455.00 cop[tikv] table:t, index:idx(a) range:[1,1], keep order:false, stats:pseudo")) + + // read from tiflash, mpp with little cost + tk.MustQuery("explain format='verbose' select /*+ read_from_storage(tiflash[t]) */ count(*) from t where a=1").Check(testkit.Rows( + "HashAgg_21 1.00 33.60 root funcs:count(Column#5)->Column#3", + "└─TableReader_23 1.00 0.00 root data:ExchangeSender_22", + " └─ExchangeSender_22 1.00 285050.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─HashAgg_9 1.00 285050.00 batchCop[tiflash] funcs:count(1)->Column#5", + " └─Selection_20 10.00 285020.00 batchCop[tiflash] eq(test.t.a, 1)", + " └─TableFullScan_19 10000.00 255020.00 batchCop[tiflash] table:t keep order:false, stats:pseudo")) +} From cca06862642d31dcf8cb6163d3a3c5dc8530be51 Mon Sep 17 00:00:00 2001 From: Zhi Qi Date: Mon, 10 May 2021 16:30:14 +0800 Subject: [PATCH 4/9] add test for set variable --- planner/core/integration_test.go | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index df5863adca62b..3d48b3836b39a 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -3611,6 +3611,32 @@ func (s *testIntegrationSuite) TestSequenceAsDataSource(c *C) { func (s *testIntegrationSerialSuite) TestEnforceMPP(c *C) { tk := testkit.NewTestKit(c, s.store) + + // test set variable + tk.MustExec("set @@session.tidb_allow_mpp = 0") + tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("OFF")) + tk.MustExec("set @@session.tidb_allow_mpp = 1") + tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ON")) + tk.MustExec("set @@session.tidb_allow_mpp = 2") + tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ENFORCE")) + + tk.MustExec("set @@session.tidb_allow_mpp = off") + tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("OFF")) + tk.MustExec("set @@session.tidb_allow_mpp = oN") + tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ON")) + tk.MustExec("set @@session.tidb_allow_mpp = enForcE") + tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ENFORCE")) + + tk.MustExec("set @@global.tidb_allow_mpp = faLsE") + tk.MustQuery("select @@global.tidb_allow_mpp").Check(testkit.Rows("OFF")) + tk.MustExec("set @@global.tidb_allow_mpp = True") + tk.MustQuery("select @@global.tidb_allow_mpp").Check(testkit.Rows("ON")) + + err := tk.ExecToErr("set @@global.tidb_allow_mpp = enforceWithTypo") + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, `[variable:1231]Variable 'tidb_allow_mpp' can't be set to the value of 'enforceWithTypo'`) + + // test query tk.MustExec("use test") tk.MustExec("drop table if exists t") tk.MustExec("create table t(a int)") From c9269254fc27b0a29bc6c5ce56ef6199971dbc9e Mon Sep 17 00:00:00 2001 From: Zhi Qi Date: Tue, 11 May 2021 05:16:36 +0800 Subject: [PATCH 5/9] tidb_opt_tiflash_concurrency_factor can't be less than 1. --- planner/core/integration_test.go | 12 +++++++++++- sessionctx/variable/sysvar.go | 2 +- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index 949ff6f1ad7c8..70e978ba8a7dd 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -3615,7 +3615,17 @@ func (s *testIntegrationSuite) TestSequenceAsDataSource(c *C) { func (s *testIntegrationSerialSuite) TestEnforceMPP(c *C) { tk := testkit.NewTestKit(c, s.store) - // test set variable + // test value limit of tidb_opt_tiflash_concurrency_factor + err := tk.ExecToErr("set @@tidb_opt_tiflash_concurrency_factor = 0") + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, `[variable:1231]Variable 'tidb_opt_tiflash_concurrency_factor' can't be set to the value of '0'`) + + tk.MustExec("set @@tidb_opt_tiflash_concurrency_factor = 1") + tk.MustQuery("select @@tidb_opt_tiflash_concurrency_factor").Check(testkit.Rows("1")) + tk.MustExec("set @@tidb_opt_tiflash_concurrency_factor = 24") + tk.MustQuery("select @@tidb_opt_tiflash_concurrency_factor").Check(testkit.Rows("24")) + + // test set tidb_allow_mpp tk.MustExec("set @@session.tidb_allow_mpp = 0") tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("OFF")) tk.MustExec("set @@session.tidb_allow_mpp = 1") diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 592ca52366924..a201b45220030 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -788,7 +788,7 @@ var defaultSysVars = []*SysVar{ s.CPUFactor = tidbOptFloat64(val, DefOptCPUFactor) return nil }}, - {Scope: ScopeGlobal | ScopeSession, Name: TiDBOptTiFlashConcurrencyFactor, Value: strconv.FormatFloat(DefOptTiFlashConcurrencyFactor, 'f', -1, 64), Type: TypeFloat, MinValue: 0, MaxValue: math.MaxUint64, SetSession: func(s *SessionVars, val string) error { + {Scope: ScopeGlobal | ScopeSession, Name: TiDBOptTiFlashConcurrencyFactor, Value: strconv.FormatFloat(DefOptTiFlashConcurrencyFactor, 'f', -1, 64), Type: TypeFloat, MinValue: 1, MaxValue: math.MaxUint64, SetSession: func(s *SessionVars, val string) error { s.CopTiFlashConcurrencyFactor = tidbOptFloat64(val, DefOptTiFlashConcurrencyFactor) return nil }}, From ded8fbd38d1b9ae9799c05140822ed378bdf944e Mon Sep 17 00:00:00 2001 From: Zhi Qi Date: Tue, 11 May 2021 05:21:32 +0800 Subject: [PATCH 6/9] fix typo. --- planner/core/integration_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index 2e956053e05aa..d1f14e538c4fe 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -3724,7 +3724,7 @@ func (s *testIntegrationSerialSuite) TestEnforceMPP(c *C) { tk.MustExec("set @@global.tidb_allow_mpp = True") tk.MustQuery("select @@global.tidb_allow_mpp").Check(testkit.Rows("ON")) - err := tk.ExecToErr("set @@global.tidb_allow_mpp = enforceWithTypo") + err = tk.ExecToErr("set @@global.tidb_allow_mpp = enforceWithTypo") c.Assert(err, NotNil) c.Assert(err.Error(), Equals, `[variable:1231]Variable 'tidb_allow_mpp' can't be set to the value of 'enforceWithTypo'`) From 73d7ff90aad5bb6233aa86c0de400d24960732d9 Mon Sep 17 00:00:00 2001 From: Zhi Qi Date: Wed, 12 May 2021 13:10:25 +0800 Subject: [PATCH 7/9] fmt --- planner/core/integration_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index 8edf474c92375..98ab7b7898370 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -3707,7 +3707,6 @@ func (s *testIntegrationSerialSuite) TestMergeContinuousSelections(c *C) { } } - func (s *testIntegrationSerialSuite) TestEnforceMPP(c *C) { tk := testkit.NewTestKit(c, s.store) From 07d2797dd5294ea8eb339fdf3b19e56b836c4c5d Mon Sep 17 00:00:00 2001 From: Zhi Qi Date: Wed, 12 May 2021 16:43:15 +0800 Subject: [PATCH 8/9] refine the internal variable logic. --- executor/mpp_gather.go | 2 +- planner/core/exhaust_physical_plans.go | 6 +++--- planner/core/task.go | 2 +- sessionctx/variable/session.go | 18 ++++++++++++++---- sessionctx/variable/sysvar.go | 4 ++-- 5 files changed, 21 insertions(+), 11 deletions(-) diff --git a/executor/mpp_gather.go b/executor/mpp_gather.go index 086496758d024..7cfeb613c40f6 100644 --- a/executor/mpp_gather.go +++ b/executor/mpp_gather.go @@ -30,7 +30,7 @@ import ( ) func useMPPExecution(ctx sessionctx.Context, tr *plannercore.PhysicalTableReader) bool { - if ctx.GetSessionVars().AllowMPPExecution == "OFF" { + if !ctx.GetSessionVars().IsMPPAllowed() { return false } _, ok := tr.GetTablePlan().(*plannercore.PhysicalExchangeSender) diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index 1f2823bb1d8a5..cd227657a75d9 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -1662,7 +1662,7 @@ func (p *LogicalJoin) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]P } joins := make([]PhysicalPlan, 0, 8) canPushToTiFlash := p.canPushToCop(kv.TiFlash) - if p.ctx.GetSessionVars().AllowMPPExecution != "OFF" && canPushToTiFlash { + if p.ctx.GetSessionVars().IsMPPAllowed() && canPushToTiFlash { if p.shouldUseMPPBCJ() { mppJoins := p.tryToGetMppHashJoin(prop, true) if (p.preferJoinType & preferBCJoin) > 0 { @@ -1965,7 +1965,7 @@ func (lt *LogicalTopN) getPhysTopN(prop *property.PhysicalProperty) []PhysicalPl if !lt.limitHints.preferLimitToCop { allTaskTypes = append(allTaskTypes, property.RootTaskType) } - if lt.ctx.GetSessionVars().AllowMPPExecution != "OFF" { + if lt.ctx.GetSessionVars().IsMPPAllowed() { allTaskTypes = append(allTaskTypes, property.MppTaskType) } ret := make([]PhysicalPlan, 0, len(allTaskTypes)) @@ -2355,7 +2355,7 @@ func (la *LogicalAggregation) getHashAggs(prop *property.PhysicalProperty) []Phy taskTypes = append(taskTypes, property.CopTiFlashLocalReadTaskType) } canPushDownToTiFlash := la.canPushToCop(kv.TiFlash) - canPushDownToMPP := la.ctx.GetSessionVars().AllowMPPExecution != "OFF" && la.checkCanPushDownToMPP() && canPushDownToTiFlash + canPushDownToMPP := la.ctx.GetSessionVars().IsMPPAllowed() && la.checkCanPushDownToMPP() && canPushDownToTiFlash if la.HasDistinct() { // TODO: remove after the cost estimation of distinct pushdown is implemented. if !la.ctx.GetSessionVars().AllowDistinctAggPushDown { diff --git a/planner/core/task.go b/planner/core/task.go index 62199a2d04244..fa6855503dd0e 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -2028,7 +2028,7 @@ func (t *mppTask) convertToRootTaskImpl(ctx sessionctx.Context) *rootTask { p.stats = t.p.statsInfo() p.cost = t.cst / p.ctx.GetSessionVars().CopTiFlashConcurrencyFactor - if p.ctx.GetSessionVars().AllowMPPExecution == "ENFORCE" { + if p.ctx.GetSessionVars().IsMPPEnforced() { p.cost = 0 } rt := &rootTask{ diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 2539ac349c62f..10bd66d77d7a1 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -495,9 +495,9 @@ type SessionVars struct { // Value set to 2 means to force to send batch cop for any query. Value set to 0 means never use batch cop. AllowBatchCop int - // AllowMPPExecution means if we should use mpp way to execute query. Default value is 1 (or 'ON'), means to be determined by the optimizer. - // Value set to 2 (or 'ENFORCE') which means to use mpp whenever possible. Value set to 2 (or 'OFF') means never use mpp. - AllowMPPExecution string + // AllowMPPExecution means if we should use mpp way to execute query. Default value is "ON", means to be determined by the optimizer. + // Value set to "ENFORCE" means to use mpp whenever possible. Value set to means never use mpp. + allowMPPExecution string // TiDBAllowAutoRandExplicitInsert indicates whether explicit insertion on auto_random column is allowed. AllowAutoRandExplicitInsert bool @@ -846,6 +846,16 @@ func (s *SessionVars) AllocMPPTaskID(startTS uint64) int64 { return 1 } +// IsMPPAllowed returns whether mpp execution is allowed. +func (s *SessionVars) IsMPPAllowed() bool { + return s.allowMPPExecution != "OFF" +} + +// IsMPPAllowed returns whether mpp execution is enforced. +func (s *SessionVars) IsMPPEnforced() bool { + return s.allowMPPExecution == "ENFORCE" +} + // CheckAndGetTxnScope will return the transaction scope we should use in the current session. func (s *SessionVars) CheckAndGetTxnScope() string { if s.InRestrictedSQL { @@ -1095,7 +1105,7 @@ func NewSessionVars() *SessionVars { terror.Log(vars.SetSystemVar(TiDBEnableStreaming, enableStreaming)) vars.AllowBatchCop = DefTiDBAllowBatchCop - vars.AllowMPPExecution = DefTiDBAllowMPPExecution + vars.allowMPPExecution = DefTiDBAllowMPPExecution var enableChunkRPC string if config.GetGlobalConfig().TiKVClient.EnableChunkRPC { diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index c1adfe7fdc6e9..73a8ca0066450 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -720,8 +720,8 @@ var defaultSysVars = []*SysVar{ } return nil }}, - {Scope: ScopeGlobal | ScopeSession, Name: TiDBAllowMPPExecution, Value: On, Type: TypeEnum, PossibleValues: []string{Off, On, "ENFORCE"}, SetSession: func(s *SessionVars, val string) error { - s.AllowMPPExecution = val + {Scope: ScopeGlobal | ScopeSession, Name: TiDBAllowMPPExecution, Value: On, Type: TypeEnum, PossibleValues: []string{"OFF", "ON", "ENFORCE"}, SetSession: func(s *SessionVars, val string) error { + s.allowMPPExecution = val return nil }}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBBCJThresholdCount, Value: strconv.Itoa(DefBroadcastJoinThresholdCount), Type: TypeInt, MinValue: 0, MaxValue: math.MaxInt64, SetSession: func(s *SessionVars, val string) error { From 930de0bb781c713e14fdcce8e2b265932048602f Mon Sep 17 00:00:00 2001 From: Zhi Qi Date: Wed, 12 May 2021 16:50:42 +0800 Subject: [PATCH 9/9] fix lint --- sessionctx/variable/session.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 10bd66d77d7a1..d6bb5763e67d8 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -851,7 +851,7 @@ func (s *SessionVars) IsMPPAllowed() bool { return s.allowMPPExecution != "OFF" } -// IsMPPAllowed returns whether mpp execution is enforced. +// IsMPPEnforced returns whether mpp execution is enforced. func (s *SessionVars) IsMPPEnforced() bool { return s.allowMPPExecution == "ENFORCE" }