Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[reverted]planner: support set tidb_allow_mpp to 2 or ENFORCE to enforce use mpp mode. #24516

Merged
merged 14 commits into from
May 12, 2021
Merged
2 changes: 1 addition & 1 deletion executor/mpp_gather.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
)

func useMPPExecution(ctx sessionctx.Context, tr *plannercore.PhysicalTableReader) bool {
if !ctx.GetSessionVars().AllowMPPExecution {
if !ctx.GetSessionVars().IsMPPAllowed() {
return false
}
_, ok := tr.GetTablePlan().(*plannercore.PhysicalExchangeSender)
Expand Down
6 changes: 3 additions & 3 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1662,7 +1662,7 @@ func (p *LogicalJoin) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]P
}
joins := make([]PhysicalPlan, 0, 8)
canPushToTiFlash := p.canPushToCop(kv.TiFlash)
if p.ctx.GetSessionVars().AllowMPPExecution && canPushToTiFlash {
if p.ctx.GetSessionVars().IsMPPAllowed() && canPushToTiFlash {
if p.shouldUseMPPBCJ() {
mppJoins := p.tryToGetMppHashJoin(prop, true)
if (p.preferJoinType & preferBCJoin) > 0 {
Expand Down Expand Up @@ -1965,7 +1965,7 @@ func (lt *LogicalTopN) getPhysTopN(prop *property.PhysicalProperty) []PhysicalPl
if !lt.limitHints.preferLimitToCop {
allTaskTypes = append(allTaskTypes, property.RootTaskType)
}
if lt.ctx.GetSessionVars().AllowMPPExecution {
if lt.ctx.GetSessionVars().IsMPPAllowed() {
allTaskTypes = append(allTaskTypes, property.MppTaskType)
}
ret := make([]PhysicalPlan, 0, len(allTaskTypes))
Expand Down Expand Up @@ -2355,7 +2355,7 @@ func (la *LogicalAggregation) getHashAggs(prop *property.PhysicalProperty) []Phy
taskTypes = append(taskTypes, property.CopTiFlashLocalReadTaskType)
}
canPushDownToTiFlash := la.canPushToCop(kv.TiFlash)
canPushDownToMPP := la.ctx.GetSessionVars().AllowMPPExecution && la.checkCanPushDownToMPP() && canPushDownToTiFlash
canPushDownToMPP := la.ctx.GetSessionVars().IsMPPAllowed() && la.checkCanPushDownToMPP() && canPushDownToTiFlash
if la.HasDistinct() {
// TODO: remove after the cost estimation of distinct pushdown is implemented.
if !la.ctx.GetSessionVars().AllowDistinctAggPushDown {
Expand Down
126 changes: 126 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3706,3 +3706,129 @@ func (s *testIntegrationSerialSuite) TestMergeContinuousSelections(c *C) {
res.Check(testkit.Rows(output[i].Plan...))
}
}

func (s *testIntegrationSerialSuite) TestEnforceMPP(c *C) {
tk := testkit.NewTestKit(c, s.store)

// test value limit of tidb_opt_tiflash_concurrency_factor
err := tk.ExecToErr("set @@tidb_opt_tiflash_concurrency_factor = 0")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, `[variable:1231]Variable 'tidb_opt_tiflash_concurrency_factor' can't be set to the value of '0'`)

tk.MustExec("set @@tidb_opt_tiflash_concurrency_factor = 1")
tk.MustQuery("select @@tidb_opt_tiflash_concurrency_factor").Check(testkit.Rows("1"))
tk.MustExec("set @@tidb_opt_tiflash_concurrency_factor = 24")
tk.MustQuery("select @@tidb_opt_tiflash_concurrency_factor").Check(testkit.Rows("24"))

// test set tidb_allow_mpp
tk.MustExec("set @@session.tidb_allow_mpp = 0")
tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("OFF"))
tk.MustExec("set @@session.tidb_allow_mpp = 1")
tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ON"))
tk.MustExec("set @@session.tidb_allow_mpp = 2")
tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ENFORCE"))

tk.MustExec("set @@session.tidb_allow_mpp = off")
tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("OFF"))
tk.MustExec("set @@session.tidb_allow_mpp = oN")
tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ON"))
tk.MustExec("set @@session.tidb_allow_mpp = enForcE")
tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ENFORCE"))

tk.MustExec("set @@global.tidb_allow_mpp = faLsE")
tk.MustQuery("select @@global.tidb_allow_mpp").Check(testkit.Rows("OFF"))
tk.MustExec("set @@global.tidb_allow_mpp = True")
tk.MustQuery("select @@global.tidb_allow_mpp").Check(testkit.Rows("ON"))

err = tk.ExecToErr("set @@global.tidb_allow_mpp = enforceWithTypo")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, `[variable:1231]Variable 'tidb_allow_mpp' can't be set to the value of 'enforceWithTypo'`)

// test query
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int)")
tk.MustExec("create index idx on t(a)")

// Create virtual tiflash replica info.
dom := domain.GetDomain(tk.Se)
is := dom.InfoSchema()
db, exists := is.SchemaByName(model.NewCIStr("test"))
c.Assert(exists, IsTrue)
for _, tblInfo := range db.Tables {
if tblInfo.Name.L == "t" {
tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{
Count: 1,
Available: true,
}
}
}

// ban mpp
tk.MustExec("set @@session.tidb_allow_mpp = 0")
tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("OFF"))

// read from tiflash, batch cop.
tk.MustQuery("explain format='verbose' select /*+ read_from_storage(tiflash[t]) */ count(*) from t where a=1").Check(testkit.Rows(
"StreamAgg_20 1.00 285050.00 root funcs:count(Column#5)->Column#3",
"└─TableReader_21 1.00 19003.88 root data:StreamAgg_9",
" └─StreamAgg_9 1.00 19006.88 batchCop[tiflash] funcs:count(1)->Column#5",
" └─Selection_19 10.00 285020.00 batchCop[tiflash] eq(test.t.a, 1)",
" └─TableFullScan_18 10000.00 255020.00 batchCop[tiflash] table:t keep order:false, stats:pseudo"))

// open mpp
tk.MustExec("set @@session.tidb_allow_mpp = 1")
tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ON"))

// should use tikv to index read
tk.MustQuery("explain format='verbose' select count(*) from t where a=1;").Check(testkit.Rows(
"StreamAgg_30 1.00 485.00 root funcs:count(Column#6)->Column#3",
"└─IndexReader_31 1.00 32.88 root index:StreamAgg_10",
" └─StreamAgg_10 1.00 35.88 cop[tikv] funcs:count(1)->Column#6",
" └─IndexRangeScan_29 10.00 455.00 cop[tikv] table:t, index:idx(a) range:[1,1], keep order:false, stats:pseudo"))

// read from tikv, indexRead
tk.MustQuery("explain format='verbose' select /*+ read_from_storage(tikv[t]) */ count(*) from t where a=1;").Check(testkit.Rows(
"StreamAgg_18 1.00 485.00 root funcs:count(Column#5)->Column#3",
"└─IndexReader_19 1.00 32.88 root index:StreamAgg_10",
" └─StreamAgg_10 1.00 35.88 cop[tikv] funcs:count(1)->Column#5",
" └─IndexRangeScan_17 10.00 455.00 cop[tikv] table:t, index:idx(a) range:[1,1], keep order:false, stats:pseudo"))

// read from tiflash, mpp with large cost
tk.MustQuery("explain format='verbose' select /*+ read_from_storage(tiflash[t]) */ count(*) from t where a=1").Check(testkit.Rows(
"HashAgg_21 1.00 11910.68 root funcs:count(Column#5)->Column#3",
"└─TableReader_23 1.00 11877.08 root data:ExchangeSender_22",
" └─ExchangeSender_22 1.00 285050.00 batchCop[tiflash] ExchangeType: PassThrough",
" └─HashAgg_9 1.00 285050.00 batchCop[tiflash] funcs:count(1)->Column#5",
" └─Selection_20 10.00 285020.00 batchCop[tiflash] eq(test.t.a, 1)",
" └─TableFullScan_19 10000.00 255020.00 batchCop[tiflash] table:t keep order:false, stats:pseudo"))

// enforce mpp
tk.MustExec("set @@session.tidb_allow_mpp = 2")
tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ENFORCE"))

// should use mpp
tk.MustQuery("explain format='verbose' select count(*) from t where a=1;").Check(testkit.Rows(
"HashAgg_24 1.00 33.60 root funcs:count(Column#5)->Column#3",
"└─TableReader_26 1.00 0.00 root data:ExchangeSender_25",
" └─ExchangeSender_25 1.00 285050.00 batchCop[tiflash] ExchangeType: PassThrough",
" └─HashAgg_9 1.00 285050.00 batchCop[tiflash] funcs:count(1)->Column#5",
" └─Selection_23 10.00 285020.00 batchCop[tiflash] eq(test.t.a, 1)",
" └─TableFullScan_22 10000.00 255020.00 batchCop[tiflash] table:t keep order:false, stats:pseudo"))

// read from tikv, indexRead
tk.MustQuery("explain format='verbose' select /*+ read_from_storage(tikv[t]) */ count(*) from t where a=1;").Check(testkit.Rows(
"StreamAgg_18 1.00 485.00 root funcs:count(Column#5)->Column#3",
"└─IndexReader_19 1.00 32.88 root index:StreamAgg_10",
" └─StreamAgg_10 1.00 35.88 cop[tikv] funcs:count(1)->Column#5",
" └─IndexRangeScan_17 10.00 455.00 cop[tikv] table:t, index:idx(a) range:[1,1], keep order:false, stats:pseudo"))

// read from tiflash, mpp with little cost
tk.MustQuery("explain format='verbose' select /*+ read_from_storage(tiflash[t]) */ count(*) from t where a=1").Check(testkit.Rows(
"HashAgg_21 1.00 33.60 root funcs:count(Column#5)->Column#3",
"└─TableReader_23 1.00 0.00 root data:ExchangeSender_22",
" └─ExchangeSender_22 1.00 285050.00 batchCop[tiflash] ExchangeType: PassThrough",
" └─HashAgg_9 1.00 285050.00 batchCop[tiflash] funcs:count(1)->Column#5",
" └─Selection_20 10.00 285020.00 batchCop[tiflash] eq(test.t.a, 1)",
" └─TableFullScan_19 10000.00 255020.00 batchCop[tiflash] table:t keep order:false, stats:pseudo"))
}
8 changes: 6 additions & 2 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2026,11 +2026,15 @@ func (t *mppTask) convertToRootTaskImpl(ctx sessionctx.Context) *rootTask {
StoreType: kv.TiFlash,
}.Init(ctx, t.p.SelectBlockOffset())
p.stats = t.p.statsInfo()

p.cost = t.cst / p.ctx.GetSessionVars().CopTiFlashConcurrencyFactor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should check if CopTiFlashConcurrencyFactor is 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested and found that if tidb_opt_tiflash_concurrency_factor is set to 0, p.cost will be +inf, which seems a little reasonable……

I will limit the value range of this variable so that it can't be less than 1, cc @hanfei1991

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setting it to zero will not compare the costs of different mpp plans...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can do some meddling during comparing the cost: If the task is oriented from mpp Task, it will be superior to the non-mpp ones.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can do some meddling during comparing the cost: If the task is oriented from mpp Task, it will be superior to the non-mpp ones.

I think this pr is mainly for user interface and a simple but reliable implement. How about merging this first and then treat "the estCost of mpp task will be 0 when tidb_allow_mpp is set to enforce" as an issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should check if CopTiFlashConcurrencyFactor is 0?

Yes, now this restriction is added in the variable framework and is tested.

if p.ctx.GetSessionVars().IsMPPEnforced() {
p.cost = 0
}
rt := &rootTask{
p: p,
cst: t.cst / p.ctx.GetSessionVars().CopTiFlashConcurrencyFactor,
cst: p.cost,
}
p.cost = rt.cost()
return rt
}

Expand Down
19 changes: 15 additions & 4 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,11 +492,12 @@ type SessionVars struct {
AllowWriteRowID bool

// AllowBatchCop means if we should send batch coprocessor to TiFlash. Default value is 1, means to use batch cop in case of aggregation and join.
// If value is set to 2 , which means to force to send batch cop for any query. Value is set to 0 means never use batch cop.
// Value set to 2 means to force to send batch cop for any query. Value set to 0 means never use batch cop.
AllowBatchCop int

// AllowMPPExecution will prefer using mpp way to execute a query.
AllowMPPExecution bool
// AllowMPPExecution means if we should use mpp way to execute query. Default value is "ON", means to be determined by the optimizer.
// Value set to "ENFORCE" means to use mpp whenever possible. Value set to means never use mpp.
allowMPPExecution string

// TiDBAllowAutoRandExplicitInsert indicates whether explicit insertion on auto_random column is allowed.
AllowAutoRandExplicitInsert bool
Expand Down Expand Up @@ -845,6 +846,16 @@ func (s *SessionVars) AllocMPPTaskID(startTS uint64) int64 {
return 1
}

// IsMPPAllowed returns whether mpp execution is allowed.
func (s *SessionVars) IsMPPAllowed() bool {
return s.allowMPPExecution != "OFF"
}

// IsMPPEnforced returns whether mpp execution is enforced.
func (s *SessionVars) IsMPPEnforced() bool {
return s.allowMPPExecution == "ENFORCE"
}

// CheckAndGetTxnScope will return the transaction scope we should use in the current session.
func (s *SessionVars) CheckAndGetTxnScope() string {
if s.InRestrictedSQL {
Expand Down Expand Up @@ -1094,7 +1105,7 @@ func NewSessionVars() *SessionVars {
terror.Log(vars.SetSystemVar(TiDBEnableStreaming, enableStreaming))

vars.AllowBatchCop = DefTiDBAllowBatchCop
vars.AllowMPPExecution = DefTiDBAllowMPPExecution
vars.allowMPPExecution = DefTiDBAllowMPPExecution

var enableChunkRPC string
if config.GetGlobalConfig().TiKVClient.EnableChunkRPC {
Expand Down
6 changes: 3 additions & 3 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -720,8 +720,8 @@ var defaultSysVars = []*SysVar{
}
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBAllowMPPExecution, Type: TypeBool, Value: BoolToOnOff(DefTiDBAllowMPPExecution), SetSession: func(s *SessionVars, val string) error {
s.AllowMPPExecution = TiDBOptOn(val)
{Scope: ScopeGlobal | ScopeSession, Name: TiDBAllowMPPExecution, Value: On, Type: TypeEnum, PossibleValues: []string{"OFF", "ON", "ENFORCE"}, SetSession: func(s *SessionVars, val string) error {
s.allowMPPExecution = val
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBBCJThresholdCount, Value: strconv.Itoa(DefBroadcastJoinThresholdCount), Type: TypeInt, MinValue: 0, MaxValue: math.MaxInt64, SetSession: func(s *SessionVars, val string) error {
Expand Down Expand Up @@ -793,7 +793,7 @@ var defaultSysVars = []*SysVar{
s.CPUFactor = tidbOptFloat64(val, DefOptCPUFactor)
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBOptTiFlashConcurrencyFactor, Value: strconv.FormatFloat(DefOptTiFlashConcurrencyFactor, 'f', -1, 64), Type: TypeFloat, MinValue: 0, MaxValue: math.MaxUint64, SetSession: func(s *SessionVars, val string) error {
{Scope: ScopeGlobal | ScopeSession, Name: TiDBOptTiFlashConcurrencyFactor, Value: strconv.FormatFloat(DefOptTiFlashConcurrencyFactor, 'f', -1, 64), Type: TypeFloat, MinValue: 1, MaxValue: math.MaxUint64, SetSession: func(s *SessionVars, val string) error {
s.CopTiFlashConcurrencyFactor = tidbOptFloat64(val, DefOptTiFlashConcurrencyFactor)
return nil
}},
Expand Down
4 changes: 3 additions & 1 deletion sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ const (
// The default value is 0
TiDBAllowBatchCop = "tidb_allow_batch_cop"

// TiDBAllowMPPExecution means if we should use mpp way to execute query. Default value is 1 (or 'ON'), means to be determined by the optimizer.
// Value set to 2 (or 'ENFORCE') which means to use mpp whenever possible. Value set to 2 (or 'OFF') means never use mpp.
TiDBAllowMPPExecution = "tidb_allow_mpp"

// TiDBInitChunkSize is used to control the init chunk size during query execution.
Expand Down Expand Up @@ -613,7 +615,7 @@ const (
DefBroadcastJoinThresholdCount = 10 * 1024
DefTiDBOptimizerSelectivityLevel = 0
DefTiDBAllowBatchCop = 1
DefTiDBAllowMPPExecution = true
DefTiDBAllowMPPExecution = "ON"
DefTiDBTxnMode = ""
DefTiDBRowFormatV1 = 1
DefTiDBRowFormatV2 = 2
Expand Down