diff --git a/config/config.go b/config/config.go index 3b4bda54436bd..90725ff40ed88 100644 --- a/config/config.go +++ b/config/config.go @@ -424,6 +424,7 @@ type Performance struct { MemProfileInterval string `toml:"mem-profile-interval" json:"mem-profile-interval"` IndexUsageSyncLease string `toml:"index-usage-sync-lease" json:"index-usage-sync-lease"` GOGC int `toml:"gogc" json:"gogc"` + EnforceMPP bool `toml:"enforce-mpp" json:"enforce-mpp"` } // PlanCache is the PlanCache section of the config. @@ -623,6 +624,7 @@ var defaultConf = Config{ // TODO: set indexUsageSyncLease to 60s. IndexUsageSyncLease: "0s", GOGC: 100, + EnforceMPP: false, }, ProxyProtocol: ProxyProtocol{ Networks: "", diff --git a/executor/tiflash_test.go b/executor/tiflash_test.go index d4a4f873e6db5..282410eb06472 100644 --- a/executor/tiflash_test.go +++ b/executor/tiflash_test.go @@ -300,7 +300,7 @@ func (s *tiflashTestSuite) TestTiFlashPartitionTableShuffledHashJoin(c *C) { tk.MustExec(fmt.Sprintf("analyze table %v", tbl)) } - tk.MustExec("SET tidb_allow_mpp=2") + tk.MustExec("SET tidb_enforce_mpp=1") tk.MustExec("SET tidb_opt_broadcast_join=0") tk.MustExec("SET tidb_broadcast_join_threshold_count=0") tk.MustExec("SET tidb_broadcast_join_threshold_size=0") @@ -378,7 +378,7 @@ func (s *tiflashTestSuite) TestTiFlashPartitionTableReader(c *C) { tk.MustExec(fmt.Sprintf("insert into %v values %v", tbl, strings.Join(vals, ", "))) } - tk.MustExec("SET tidb_allow_mpp=2") + tk.MustExec("SET tidb_enforce_mpp=1") tk.MustExec("set @@session.tidb_isolation_read_engines='tiflash'") for i := 0; i < 100; i++ { l, r := rand.Intn(400), rand.Intn(400) @@ -742,7 +742,7 @@ func (s *tiflashTestSuite) TestTiFlashPartitionTableShuffledHashAggregation(c *C tk.MustExec(fmt.Sprintf("analyze table %v", tbl)) } tk.MustExec("set @@session.tidb_isolation_read_engines='tiflash'") - tk.MustExec("set @@session.tidb_allow_mpp=2") + tk.MustExec("set @@session.tidb_enforce_mpp=1") // mock executor does not support use outer table as build side for outer join, so need to // force the inner table as build side tk.MustExec("set tidb_opt_mpp_outer_join_fixed_build_side=1") @@ -814,7 +814,7 @@ func (s *tiflashTestSuite) TestTiFlashPartitionTableBroadcastJoin(c *C) { tk.MustExec(fmt.Sprintf("analyze table %v", tbl)) } tk.MustExec("set @@session.tidb_isolation_read_engines='tiflash'") - tk.MustExec("set @@session.tidb_allow_mpp=2") + tk.MustExec("set @@session.tidb_enforce_mpp=1") tk.MustExec("set @@session.tidb_opt_broadcast_join=ON") // mock executor does not support use outer table as build side for outer join, so need to // force the inner table as build side diff --git a/planner/core/enforce_mpp_test.go b/planner/core/enforce_mpp_test.go new file mode 100644 index 0000000000000..b2ba38cb515de --- /dev/null +++ b/planner/core/enforce_mpp_test.go @@ -0,0 +1,299 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package core_test + +import ( + "strings" + + . "github.com/pingcap/check" + "github.com/pingcap/parser/model" + "github.com/pingcap/tidb/domain" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/util/collate" + "github.com/pingcap/tidb/util/testkit" + "github.com/pingcap/tidb/util/testutil" +) + +var _ = SerialSuites(&testEnforceMPPSuite{}) + +type testEnforceMPPSuite struct { + testData testutil.TestData + store kv.Storage + dom *domain.Domain +} + +func (s *testEnforceMPPSuite) SetUpSuite(c *C) { + var err error + s.testData, err = testutil.LoadTestSuiteData("testdata", "enforce_mpp_suite") + c.Assert(err, IsNil) +} + +func (s *testEnforceMPPSuite) TearDownSuite(c *C) { + c.Assert(s.testData.GenerateOutputIfNeeded(), IsNil) +} + +func (s *testEnforceMPPSuite) SetUpTest(c *C) { + var err error + s.store, s.dom, err = newStoreWithBootstrap() + c.Assert(err, IsNil) +} + +func (s *testEnforceMPPSuite) TearDownTest(c *C) { + s.dom.Close() + err := s.store.Close() + c.Assert(err, IsNil) +} + +func (s *testEnforceMPPSuite) TestSetVariables(c *C) { + tk := testkit.NewTestKit(c, s.store) + + // test value limit of tidb_opt_tiflash_concurrency_factor + err := tk.ExecToErr("set @@tidb_opt_tiflash_concurrency_factor = 0") + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, `[variable:1231]Variable 'tidb_opt_tiflash_concurrency_factor' can't be set to the value of '0'`) + + // test set tidb_enforce_mpp when tidb_allow_mpp=false; + err = tk.ExecToErr("set @@tidb_allow_mpp = 0; set @@tidb_enforce_mpp = 1;") + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, `[variable:1231]Variable 'tidb_enforce_mpp' can't be set to the value of '1' but tidb_allow_mpp is 0, please activate tidb_allow_mpp at first.'`) + + err = tk.ExecToErr("set @@tidb_allow_mpp = 1; set @@tidb_enforce_mpp = 1;") + c.Assert(err, IsNil) + + err = tk.ExecToErr("set @@tidb_allow_mpp = 0;") + c.Assert(err, IsNil) +} + +func (s *testEnforceMPPSuite) TestEnforceMPP(c *C) { + tk := testkit.NewTestKit(c, s.store) + + // test query + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int, b int)") + tk.MustExec("create index idx on t(a)") + + // Create virtual tiflash replica info. + dom := domain.GetDomain(tk.Se) + is := dom.InfoSchema() + db, exists := is.SchemaByName(model.NewCIStr("test")) + c.Assert(exists, IsTrue) + for _, tblInfo := range db.Tables { + if tblInfo.Name.L == "t" { + tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{ + Count: 1, + Available: true, + } + } + } + + var input []string + var output []struct { + SQL string + Plan []string + Warn []string + } + s.testData.GetTestCases(c, &input, &output) + for i, tt := range input { + s.testData.OnRecord(func() { + output[i].SQL = tt + }) + if strings.HasPrefix(tt, "set") { + tk.MustExec(tt) + continue + } + s.testData.OnRecord(func() { + output[i].SQL = tt + output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery(tt).Rows()) + output[i].Warn = s.testData.ConvertSQLWarnToStrings(tk.Se.GetSessionVars().StmtCtx.GetWarnings()) + }) + res := tk.MustQuery(tt) + res.Check(testkit.Rows(output[i].Plan...)) + c.Assert(s.testData.ConvertSQLWarnToStrings(tk.Se.GetSessionVars().StmtCtx.GetWarnings()), DeepEquals, output[i].Warn) + } +} + +// general cases. +func (s *testEnforceMPPSuite) TestEnforceMPPWarning1(c *C) { + tk := testkit.NewTestKit(c, s.store) + + // test query + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int, b int as (a+1), c time)") + tk.MustExec("create index idx on t(a)") + + var input []string + var output []struct { + SQL string + Plan []string + Warn []string + } + s.testData.GetTestCases(c, &input, &output) + for i, tt := range input { + s.testData.OnRecord(func() { + output[i].SQL = tt + }) + if strings.HasPrefix(tt, "set") { + tk.MustExec(tt) + continue + } + if strings.HasPrefix(tt, "cmd: create-replica") { + // Create virtual tiflash replica info. + dom := domain.GetDomain(tk.Se) + is := dom.InfoSchema() + db, exists := is.SchemaByName(model.NewCIStr("test")) + c.Assert(exists, IsTrue) + for _, tblInfo := range db.Tables { + if tblInfo.Name.L == "t" { + tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{ + Count: 1, + Available: false, + } + } + } + continue + } + if strings.HasPrefix(tt, "cmd: enable-replica") { + // Create virtual tiflash replica info. + dom := domain.GetDomain(tk.Se) + is := dom.InfoSchema() + db, exists := is.SchemaByName(model.NewCIStr("test")) + c.Assert(exists, IsTrue) + for _, tblInfo := range db.Tables { + if tblInfo.Name.L == "t" { + tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{ + Count: 1, + Available: true, + } + } + } + continue + } + s.testData.OnRecord(func() { + output[i].SQL = tt + output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery(tt).Rows()) + output[i].Warn = s.testData.ConvertSQLWarnToStrings(tk.Se.GetSessionVars().StmtCtx.GetWarnings()) + }) + res := tk.MustQuery(tt) + res.Check(testkit.Rows(output[i].Plan...)) + c.Assert(s.testData.ConvertSQLWarnToStrings(tk.Se.GetSessionVars().StmtCtx.GetWarnings()), DeepEquals, output[i].Warn) + } +} + +// partition table. +func (s *testEnforceMPPSuite) TestEnforceMPPWarning2(c *C) { + tk := testkit.NewTestKit(c, s.store) + + // test query + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("CREATE TABLE t (a int, b char(20)) PARTITION BY HASH(a)") + + // Create virtual tiflash replica info. + dom := domain.GetDomain(tk.Se) + is := dom.InfoSchema() + db, exists := is.SchemaByName(model.NewCIStr("test")) + c.Assert(exists, IsTrue) + for _, tblInfo := range db.Tables { + if tblInfo.Name.L == "t" { + tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{ + Count: 1, + Available: true, + } + } + } + + var input []string + var output []struct { + SQL string + Plan []string + Warn []string + } + s.testData.GetTestCases(c, &input, &output) + for i, tt := range input { + s.testData.OnRecord(func() { + output[i].SQL = tt + }) + if strings.HasPrefix(tt, "set") { + tk.MustExec(tt) + continue + } + s.testData.OnRecord(func() { + output[i].SQL = tt + output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery(tt).Rows()) + output[i].Warn = s.testData.ConvertSQLWarnToStrings(tk.Se.GetSessionVars().StmtCtx.GetWarnings()) + }) + res := tk.MustQuery(tt) + res.Check(testkit.Rows(output[i].Plan...)) + c.Assert(s.testData.ConvertSQLWarnToStrings(tk.Se.GetSessionVars().StmtCtx.GetWarnings()), DeepEquals, output[i].Warn) + } +} + +// new collation. +func (s *testEnforceMPPSuite) TestEnforceMPPWarning3(c *C) { + tk := testkit.NewTestKit(c, s.store) + + // test query + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("CREATE TABLE t (a int, b char(20))") + + // Create virtual tiflash replica info. + dom := domain.GetDomain(tk.Se) + is := dom.InfoSchema() + db, exists := is.SchemaByName(model.NewCIStr("test")) + c.Assert(exists, IsTrue) + for _, tblInfo := range db.Tables { + if tblInfo.Name.L == "t" { + tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{ + Count: 1, + Available: true, + } + } + } + + var input []string + var output []struct { + SQL string + Plan []string + Warn []string + } + s.testData.GetTestCases(c, &input, &output) + for i, tt := range input { + s.testData.OnRecord(func() { + output[i].SQL = tt + }) + if strings.HasPrefix(tt, "set") || strings.HasPrefix(tt, "UPDATE") { + tk.MustExec(tt) + continue + } + if strings.HasPrefix(tt, "cmd: enable-new-collation") { + collate.SetNewCollationEnabledForTest(true) + continue + } + if strings.HasPrefix(tt, "cmd: disable-new-collation") { + collate.SetNewCollationEnabledForTest(false) + continue + } + s.testData.OnRecord(func() { + output[i].SQL = tt + output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery(tt).Rows()) + output[i].Warn = s.testData.ConvertSQLWarnToStrings(tk.Se.GetSessionVars().StmtCtx.GetWarnings()) + }) + res := tk.MustQuery(tt) + res.Check(testkit.Rows(output[i].Plan...)) + c.Assert(s.testData.ConvertSQLWarnToStrings(tk.Se.GetSessionVars().StmtCtx.GetWarnings()), DeepEquals, output[i].Warn) + } +} diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index d7a17d3017221..eef51465a072d 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -41,6 +41,8 @@ import ( func (p *LogicalUnionScan) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool, error) { if prop.IsFlashProp() { + p.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced( + "MPP mode may be blocked because operator `UnionScan` is not supported now.") return nil, true, nil } childProp := prop.CloneEssentialFields() @@ -1687,7 +1689,7 @@ func (p *LogicalJoin) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]P if prop.IsFlashProp() && ((p.preferJoinType&preferBCJoin) == 0 && p.preferJoinType > 0) { return nil, false, nil } - if prop.PartitionTp == property.BroadcastType { + if prop.MPPPartitionTp == property.BroadcastType { return nil, false, nil } joins := make([]PhysicalPlan, 0, 8) @@ -1785,7 +1787,7 @@ func (p *LogicalJoin) tryToGetMppHashJoin(prop *property.PhysicalProperty, useBC return nil } - if prop.PartitionTp == property.BroadcastType { + if prop.MPPPartitionTp == property.BroadcastType { return nil } if !canExprsInJoinPushdown(p, kv.TiFlash) { @@ -1828,27 +1830,27 @@ func (p *LogicalJoin) tryToGetMppHashJoin(prop *property.PhysicalProperty, useBC baseJoin.InnerChildIdx = preferredBuildIndex childrenProps := make([]*property.PhysicalProperty, 2) if useBCJ { - childrenProps[preferredBuildIndex] = &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, PartitionTp: property.BroadcastType, CanAddEnforcer: true} + childrenProps[preferredBuildIndex] = &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, MPPPartitionTp: property.BroadcastType, CanAddEnforcer: true} expCnt := math.MaxFloat64 if prop.ExpectedCnt < p.stats.RowCount { expCntScale := prop.ExpectedCnt / p.stats.RowCount expCnt = p.children[1-preferredBuildIndex].statsInfo().RowCount * expCntScale } - if prop.PartitionTp == property.HashType { + if prop.MPPPartitionTp == property.HashType { hashKeys := rkeys if preferredBuildIndex == 1 { hashKeys = lkeys } if matches := prop.IsSubsetOf(hashKeys); len(matches) != 0 { - childrenProps[1-preferredBuildIndex] = &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: expCnt, PartitionTp: property.HashType, PartitionCols: prop.PartitionCols} + childrenProps[1-preferredBuildIndex] = &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: expCnt, MPPPartitionTp: property.HashType, MPPPartitionCols: prop.MPPPartitionCols} } else { return nil } } else { - childrenProps[1-preferredBuildIndex] = &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: expCnt, PartitionTp: property.AnyType} + childrenProps[1-preferredBuildIndex] = &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: expCnt, MPPPartitionTp: property.AnyType} } } else { - if prop.PartitionTp == property.HashType { + if prop.MPPPartitionTp == property.HashType { var matches []int if matches = prop.IsSubsetOf(lkeys); len(matches) == 0 { matches = prop.IsSubsetOf(rkeys) @@ -1859,8 +1861,8 @@ func (p *LogicalJoin) tryToGetMppHashJoin(prop *property.PhysicalProperty, useBC lkeys = chooseSubsetOfJoinKeys(lkeys, matches) rkeys = chooseSubsetOfJoinKeys(rkeys, matches) } - childrenProps[0] = &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, PartitionTp: property.HashType, PartitionCols: lkeys, CanAddEnforcer: true} - childrenProps[1] = &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, PartitionTp: property.HashType, PartitionCols: rkeys, CanAddEnforcer: true} + childrenProps[0] = &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, MPPPartitionTp: property.HashType, MPPPartitionCols: lkeys, CanAddEnforcer: true} + childrenProps[1] = &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, MPPPartitionTp: property.HashType, MPPPartitionCols: rkeys, CanAddEnforcer: true} } join := PhysicalHashJoin{ basePhysicalJoin: baseJoin, @@ -2096,6 +2098,8 @@ func (la *LogicalApply) GetHashJoin(prop *property.PhysicalProperty) *PhysicalHa func (la *LogicalApply) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool, error) { if !prop.AllColsFromSchema(la.children[0].Schema()) || prop.IsFlashProp() { // for convenient, we don't pass through any prop + la.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced( + "MPP mode may be blocked because operator `Apply` is not supported now.") return nil, true, nil } disableAggPushDownToCop(la.children[0]) @@ -2142,6 +2146,8 @@ func disableAggPushDownToCop(p LogicalPlan) { } func (p *LogicalWindow) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool, error) { + p.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced( + "MPP mode may be blocked because operator `Window` is not supported now.") if prop.IsFlashProp() { return nil, true, nil } @@ -2205,7 +2211,12 @@ func (p *baseLogicalPlan) canPushToCopImpl(storeTp kv.StoreType, considerDual bo } else { return false } + // These operators can be partially push down to TiFlash, so we don't raise warning for them. + case *LogicalLimit, *LogicalTopN: + return false default: + p.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced( + "MPP mode may be blocked because operator `" + c.TP() + "` is not supported now.") return false } } @@ -2363,13 +2374,13 @@ func (la *LogicalAggregation) tryToGetMppHashAggs(prop *property.PhysicalPropert if prop.TaskTp != property.RootTaskType && prop.TaskTp != property.MppTaskType { return nil } - if prop.PartitionTp == property.BroadcastType { + if prop.MPPPartitionTp == property.BroadcastType { return nil } if len(la.GroupByItems) > 0 { partitionCols := la.GetGroupByCols() // trying to match the required parititions. - if prop.PartitionTp == property.HashType { + if prop.MPPPartitionTp == property.HashType { if matches := prop.IsSubsetOf(partitionCols); len(matches) != 0 { partitionCols = chooseSubsetOfJoinKeys(partitionCols, matches) } else { @@ -2382,7 +2393,7 @@ func (la *LogicalAggregation) tryToGetMppHashAggs(prop *property.PhysicalPropert // If there are no available partition cols, but still have group by items, that means group by items are all expressions or constants. // To avoid mess, we don't do any one-phase aggregation in this case. if len(partitionCols) != 0 { - childProp := &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, PartitionTp: property.HashType, PartitionCols: partitionCols, CanAddEnforcer: true} + childProp := &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, MPPPartitionTp: property.HashType, MPPPartitionCols: partitionCols, CanAddEnforcer: true} agg := NewPhysicalHashAgg(la, la.stats.ScaleByExpectCnt(prop.ExpectedCnt), childProp) agg.SetSchema(la.schema.Clone()) agg.MppRunMode = Mpp1Phase @@ -2390,7 +2401,7 @@ func (la *LogicalAggregation) tryToGetMppHashAggs(prop *property.PhysicalPropert } // 2-phase agg - childProp := &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, PartitionTp: property.AnyType} + childProp := &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, MPPPartitionTp: property.AnyType} agg := NewPhysicalHashAgg(la, la.stats.ScaleByExpectCnt(prop.ExpectedCnt), childProp) agg.SetSchema(la.schema.Clone()) agg.MppRunMode = Mpp2Phase @@ -2429,7 +2440,7 @@ func (la *LogicalAggregation) getHashAggs(prop *property.PhysicalProperty) []Phy taskTypes = append(taskTypes, property.CopTiFlashLocalReadTaskType) } canPushDownToTiFlash := la.canPushToCop(kv.TiFlash) - canPushDownToMPP := la.ctx.GetSessionVars().IsMPPAllowed() && la.checkCanPushDownToMPP() && canPushDownToTiFlash + canPushDownToMPP := canPushDownToTiFlash && la.ctx.GetSessionVars().IsMPPAllowed() && la.checkCanPushDownToMPP() if la.HasDistinct() { // TODO: remove after the cost estimation of distinct pushdown is implemented. if !la.ctx.GetSessionVars().AllowDistinctAggPushDown { @@ -2557,6 +2568,8 @@ func (p *LogicalLimit) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([] func (p *LogicalLock) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool, error) { if prop.IsFlashProp() { + p.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced( + "MPP mode may be blocked because operator `Lock` is not supported now.") return nil, true, nil } childProp := prop.CloneEssentialFields() @@ -2574,7 +2587,7 @@ func (p *LogicalUnionAll) exhaustPhysicalPlans(prop *property.PhysicalProperty) return nil, true, nil } // TODO: UnionAll can pass partition info, but for briefness, we prevent it from pushing down. - if prop.TaskTp == property.MppTaskType && prop.PartitionTp != property.AnyType { + if prop.TaskTp == property.MppTaskType && prop.MPPPartitionTp != property.AnyType { return nil, true, nil } canUseMpp := p.ctx.GetSessionVars().IsMPPAllowed() && p.canPushToCopImpl(kv.TiFlash, true) @@ -2650,6 +2663,7 @@ func (ls *LogicalSort) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([] func (p *LogicalMaxOneRow) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool, error) { if !prop.IsEmpty() || prop.IsFlashProp() { + p.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because operator `MaxOneRow` is not supported now.") return nil, true, nil } mor := PhysicalMaxOneRow{}.Init(p.ctx, p.stats, p.blockOffset, &property.PhysicalProperty{ExpectedCnt: 2}) diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index 051695ad6988f..b0bb7aa8fa0ef 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -328,8 +328,8 @@ func (p *baseLogicalPlan) findBestTask(prop *property.PhysicalProperty, planCoun // try to get the task with an enforced sort. newProp.SortItems = []property.SortItem{} newProp.ExpectedCnt = math.MaxFloat64 - newProp.PartitionCols = nil - newProp.PartitionTp = property.AnyType + newProp.MPPPartitionCols = nil + newProp.MPPPartitionTp = property.AnyType var hintCanWork bool plansNeedEnforce, hintCanWork, err = p.self.exhaustPhysicalPlans(newProp) if err != nil { @@ -644,8 +644,8 @@ func (ds *DataSource) findBestTask(prop *property.PhysicalProperty, planCounter } // Next, get the bestTask with enforced prop prop.SortItems = []property.SortItem{} - prop.PartitionTp = property.AnyType - } else if prop.PartitionTp != property.AnyType { + prop.MPPPartitionTp = property.AnyType + } else if prop.MPPPartitionTp != property.AnyType { return invalidTask, 0, nil } defer func() { @@ -1546,12 +1546,14 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid if ts.KeepOrder { return &mppTask{}, nil } - if prop.PartitionTp != property.AnyType || ts.isPartition { + if prop.MPPPartitionTp != property.AnyType || ts.isPartition { // If ts is a single partition, then this partition table is in static-only prune, then we should not choose mpp execution. + ds.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because table `" + ds.tableInfo.Name.O + "`is a partition table which is not supported when `@@tidb_partition_prune_mode=static`.") return &mppTask{}, nil } for _, col := range ts.schema.Columns { if col.VirtualExpr != nil { + ds.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because column `" + col.OrigName + "` is a virtual column which is not supported now.") return &mppTask{}, nil } } diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index 9c95d5026f79b..394d5c02b0c1e 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -3889,129 +3889,3 @@ func (s *testIntegrationSerialSuite) TestMergeContinuousSelections(c *C) { res.Check(testkit.Rows(output[i].Plan...)) } } - -func (s *testIntegrationSerialSuite) TestEnforceMPP(c *C) { - tk := testkit.NewTestKit(c, s.store) - - // test value limit of tidb_opt_tiflash_concurrency_factor - err := tk.ExecToErr("set @@tidb_opt_tiflash_concurrency_factor = 0") - c.Assert(err, NotNil) - c.Assert(err.Error(), Equals, `[variable:1231]Variable 'tidb_opt_tiflash_concurrency_factor' can't be set to the value of '0'`) - - tk.MustExec("set @@tidb_opt_tiflash_concurrency_factor = 1") - tk.MustQuery("select @@tidb_opt_tiflash_concurrency_factor").Check(testkit.Rows("1")) - tk.MustExec("set @@tidb_opt_tiflash_concurrency_factor = 24") - tk.MustQuery("select @@tidb_opt_tiflash_concurrency_factor").Check(testkit.Rows("24")) - - // test set tidb_allow_mpp - tk.MustExec("set @@session.tidb_allow_mpp = 0") - tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("OFF")) - tk.MustExec("set @@session.tidb_allow_mpp = 1") - tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ON")) - tk.MustExec("set @@session.tidb_allow_mpp = 2") - tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ENFORCE")) - - tk.MustExec("set @@session.tidb_allow_mpp = off") - tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("OFF")) - tk.MustExec("set @@session.tidb_allow_mpp = oN") - tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ON")) - tk.MustExec("set @@session.tidb_allow_mpp = enForcE") - tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ENFORCE")) - - tk.MustExec("set @@global.tidb_allow_mpp = faLsE") - tk.MustQuery("select @@global.tidb_allow_mpp").Check(testkit.Rows("OFF")) - tk.MustExec("set @@global.tidb_allow_mpp = True") - tk.MustQuery("select @@global.tidb_allow_mpp").Check(testkit.Rows("ON")) - - err = tk.ExecToErr("set @@global.tidb_allow_mpp = enforceWithTypo") - c.Assert(err, NotNil) - c.Assert(err.Error(), Equals, `[variable:1231]Variable 'tidb_allow_mpp' can't be set to the value of 'enforceWithTypo'`) - - // test query - tk.MustExec("use test") - tk.MustExec("drop table if exists t") - tk.MustExec("create table t(a int)") - tk.MustExec("create index idx on t(a)") - - // Create virtual tiflash replica info. - dom := domain.GetDomain(tk.Se) - is := dom.InfoSchema() - db, exists := is.SchemaByName(model.NewCIStr("test")) - c.Assert(exists, IsTrue) - for _, tblInfo := range db.Tables { - if tblInfo.Name.L == "t" { - tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{ - Count: 1, - Available: true, - } - } - } - - // ban mpp - tk.MustExec("set @@session.tidb_allow_mpp = 0") - tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("OFF")) - - // read from tiflash, batch cop. - tk.MustQuery("explain format='verbose' select /*+ read_from_storage(tiflash[t]) */ count(*) from t where a=1").Check(testkit.Rows( - "StreamAgg_20 1.00 285050.00 root funcs:count(Column#5)->Column#3", - "└─TableReader_21 1.00 19003.88 root data:StreamAgg_9", - " └─StreamAgg_9 1.00 19006.88 batchCop[tiflash] funcs:count(1)->Column#5", - " └─Selection_19 10.00 285020.00 batchCop[tiflash] eq(test.t.a, 1)", - " └─TableFullScan_18 10000.00 255020.00 batchCop[tiflash] table:t keep order:false, stats:pseudo")) - - // open mpp - tk.MustExec("set @@session.tidb_allow_mpp = 1") - tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ON")) - - // should use tikv to index read - tk.MustQuery("explain format='verbose' select count(*) from t where a=1;").Check(testkit.Rows( - "StreamAgg_30 1.00 485.00 root funcs:count(Column#6)->Column#3", - "└─IndexReader_31 1.00 32.88 root index:StreamAgg_10", - " └─StreamAgg_10 1.00 35.88 cop[tikv] funcs:count(1)->Column#6", - " └─IndexRangeScan_29 10.00 455.00 cop[tikv] table:t, index:idx(a) range:[1,1], keep order:false, stats:pseudo")) - - // read from tikv, indexRead - tk.MustQuery("explain format='verbose' select /*+ read_from_storage(tikv[t]) */ count(*) from t where a=1;").Check(testkit.Rows( - "StreamAgg_18 1.00 485.00 root funcs:count(Column#5)->Column#3", - "└─IndexReader_19 1.00 32.88 root index:StreamAgg_10", - " └─StreamAgg_10 1.00 35.88 cop[tikv] funcs:count(1)->Column#5", - " └─IndexRangeScan_17 10.00 455.00 cop[tikv] table:t, index:idx(a) range:[1,1], keep order:false, stats:pseudo")) - - // read from tiflash, mpp with large cost - tk.MustQuery("explain format='verbose' select /*+ read_from_storage(tiflash[t]) */ count(*) from t where a=1").Check(testkit.Rows( - "HashAgg_21 1.00 11910.73 root funcs:count(Column#5)->Column#3", - "└─TableReader_23 1.00 11877.13 root data:ExchangeSender_22", - " └─ExchangeSender_22 1.00 285050.00 batchCop[tiflash] ExchangeType: PassThrough", - " └─HashAgg_9 1.00 285050.00 batchCop[tiflash] funcs:count(1)->Column#5", - " └─Selection_20 10.00 285020.00 batchCop[tiflash] eq(test.t.a, 1)", - " └─TableFullScan_19 10000.00 255020.00 batchCop[tiflash] table:t keep order:false, stats:pseudo")) - - // enforce mpp - tk.MustExec("set @@session.tidb_allow_mpp = 2") - tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ENFORCE")) - - // should use mpp - tk.MustQuery("explain format='verbose' select count(*) from t where a=1;").Check(testkit.Rows( - "HashAgg_24 1.00 33.60 root funcs:count(Column#5)->Column#3", - "└─TableReader_26 1.00 0.00 root data:ExchangeSender_25", - " └─ExchangeSender_25 1.00 285050.00 batchCop[tiflash] ExchangeType: PassThrough", - " └─HashAgg_9 1.00 285050.00 batchCop[tiflash] funcs:count(1)->Column#5", - " └─Selection_23 10.00 285020.00 batchCop[tiflash] eq(test.t.a, 1)", - " └─TableFullScan_22 10000.00 255020.00 batchCop[tiflash] table:t keep order:false, stats:pseudo")) - - // read from tikv, indexRead - tk.MustQuery("explain format='verbose' select /*+ read_from_storage(tikv[t]) */ count(*) from t where a=1;").Check(testkit.Rows( - "StreamAgg_18 1.00 485.00 root funcs:count(Column#5)->Column#3", - "└─IndexReader_19 1.00 32.88 root index:StreamAgg_10", - " └─StreamAgg_10 1.00 35.88 cop[tikv] funcs:count(1)->Column#5", - " └─IndexRangeScan_17 10.00 455.00 cop[tikv] table:t, index:idx(a) range:[1,1], keep order:false, stats:pseudo")) - - // read from tiflash, mpp with little cost - tk.MustQuery("explain format='verbose' select /*+ read_from_storage(tiflash[t]) */ count(*) from t where a=1").Check(testkit.Rows( - "HashAgg_21 1.00 33.60 root funcs:count(Column#5)->Column#3", - "└─TableReader_23 1.00 0.00 root data:ExchangeSender_22", - " └─ExchangeSender_22 1.00 285050.00 batchCop[tiflash] ExchangeType: PassThrough", - " └─HashAgg_9 1.00 285050.00 batchCop[tiflash] funcs:count(1)->Column#5", - " └─Selection_20 10.00 285020.00 batchCop[tiflash] eq(test.t.a, 1)", - " └─TableFullScan_19 10000.00 255020.00 batchCop[tiflash] table:t keep order:false, stats:pseudo")) -} diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index e575bb79e7135..8389f0be4cade 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -619,6 +619,8 @@ func (ds *DataSource) setPreferredStoreType(hintInfo *tableHintInfo) { ds.DBName.O, ds.table.Meta().Name.O, kv.TiKV.Name(), ds.ctx.GetSessionVars().GetIsolationReadEngines()) warning := ErrInternal.GenWithStack(errMsg) ds.ctx.GetSessionVars().StmtCtx.AppendWarning(warning) + } else { + ds.ctx.GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because you have set a hint to read table `" + hintTbl.tblName.O + "` from TiKV.") } } if hintTbl := hintInfo.ifPreferTiFlash(alias); hintTbl != nil { diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 6b4d4118379f8..bb56843dfe8d9 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -973,10 +973,16 @@ func getPossibleAccessPaths(ctx sessionctx.Context, tableHints *tableHintInfo, i tablePath := &util.AccessPath{StoreType: tp} fillContentForTablePath(tablePath, tblInfo) publicPaths = append(publicPaths, tablePath) - if tblInfo.TiFlashReplica != nil && tblInfo.TiFlashReplica.Available { + + if tblInfo.TiFlashReplica == nil { + ctx.GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because there aren't tiflash replicas of table `" + tblInfo.Name.O + "`.") + } else if !tblInfo.TiFlashReplica.Available { + ctx.GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because tiflash replicas of table `" + tblInfo.Name.O + "` not ready.") + } else { publicPaths = append(publicPaths, genTiFlashPath(tblInfo, false)) publicPaths = append(publicPaths, genTiFlashPath(tblInfo, true)) } + optimizerUseInvisibleIndexes := ctx.GetSessionVars().OptimizerUseInvisibleIndexes check = check && ctx.GetSessionVars().ConnectionID > 0 @@ -1110,11 +1116,15 @@ func filterPathByIsolationRead(ctx sessionctx.Context, paths []*util.AccessPath, } } var err error + engineVals, _ := ctx.GetSessionVars().GetSystemVar(variable.TiDBIsolationReadEngines) if len(paths) == 0 { - engineVals, _ := ctx.GetSessionVars().GetSystemVar(variable.TiDBIsolationReadEngines) err = ErrInternal.GenWithStackByArgs(fmt.Sprintf("Can not find access path matching '%v'(value: '%v'). Available values are '%v'.", variable.TiDBIsolationReadEngines, engineVals, availableEngineStr)) } + if _, ok := isolationReadEngines[kv.TiFlash]; !ok { + ctx.GetSessionVars().RaiseWarningWhenMPPEnforced( + fmt.Sprintf("MPP mode may be blocked because '%v'(value: '%v') not match, need 'tiflash'.", variable.TiDBIsolationReadEngines, engineVals)) + } return paths, err } diff --git a/planner/core/task.go b/planner/core/task.go index eacc5dbf73e6d..26261836f5e07 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -728,9 +728,9 @@ func (p *PhysicalHashJoin) convertPartitionKeysIfNeed(lTask, rTask *mppTask) (*m nlTask := lTask.copy().(*mppTask) nlTask.p = lProj nlTask = nlTask.enforceExchangerImpl(&property.PhysicalProperty{ - TaskTp: property.MppTaskType, - PartitionTp: property.HashType, - PartitionCols: lPartKeys, + TaskTp: property.MppTaskType, + MPPPartitionTp: property.HashType, + MPPPartitionCols: lPartKeys, }) nlTask.cst = lTask.cst lProj.cost = nlTask.cst @@ -740,9 +740,9 @@ func (p *PhysicalHashJoin) convertPartitionKeysIfNeed(lTask, rTask *mppTask) (*m nrTask := rTask.copy().(*mppTask) nrTask.p = rProj nrTask = nrTask.enforceExchangerImpl(&property.PhysicalProperty{ - TaskTp: property.MppTaskType, - PartitionTp: property.HashType, - PartitionCols: rPartKeys, + TaskTp: property.MppTaskType, + MPPPartitionTp: property.HashType, + MPPPartitionCols: rPartKeys, }) nrTask.cst = rTask.cst rProj.cost = nrTask.cst @@ -1404,10 +1404,13 @@ func CheckAggCanPushCop(sctx sessionctx.Context, aggFuncs []*aggregation.AggFunc for _, aggFunc := range aggFuncs { // if the aggFunc contain VirtualColumn or CorrelatedColumn, it can not be pushed down. if expression.ContainVirtualColumn(aggFunc.Args) || expression.ContainCorrelatedColumn(aggFunc.Args) { + sctx.GetSessionVars().RaiseWarningWhenMPPEnforced( + "MPP mode may be blocked because expressions of AggFunc `" + aggFunc.Name + "` contain virtual column or correlated column, which is not supported now.") return false } pb := aggregation.AggFuncToPBExpr(sc, client, aggFunc) if pb == nil { + sctx.GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because AggFunc `" + aggFunc.Name + "` is not supported now.") return false } if !aggregation.CheckAggPushDown(aggFunc, storeType) { @@ -1425,6 +1428,7 @@ func CheckAggCanPushCop(sctx sessionctx.Context, aggFuncs []*aggregation.AggFunc } } if expression.ContainVirtualColumn(groupByItems) { + sctx.GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because groupByItems contain virtual column, which is not supported now.") return false } return expression.CanExprsPushDown(sc, groupByItems, client, storeType) @@ -1911,7 +1915,7 @@ func (p *PhysicalHashAgg) attach2TaskForMpp(tasks ...task) task { } } partialAgg.SetCost(mpp.cost()) - prop := &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, PartitionTp: property.HashType, PartitionCols: partitionCols} + prop := &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, MPPPartitionTp: property.HashType, MPPPartitionCols: partitionCols} newMpp := mpp.enforceExchangerImpl(prop) if newMpp.invalid() { return newMpp @@ -2042,7 +2046,7 @@ type mppTask struct { p PhysicalPlan cst float64 - partTp property.PartitionType + partTp property.MPPPartitionType hashCols []*expression.Column } @@ -2091,7 +2095,7 @@ func (t *mppTask) convertToRootTaskImpl(ctx sessionctx.Context) *rootTask { cst := t.cst + t.count()*ctx.GetSessionVars().GetNetworkFactor(nil) p.cost = cst / p.ctx.GetSessionVars().CopTiFlashConcurrencyFactor if p.ctx.GetSessionVars().IsMPPEnforced() { - p.cost = 0 + p.cost = cst / 1000000000 } rt := &rootTask{ p: p, @@ -2101,7 +2105,7 @@ func (t *mppTask) convertToRootTaskImpl(ctx sessionctx.Context) *rootTask { } func (t *mppTask) needEnforce(prop *property.PhysicalProperty) bool { - switch prop.PartitionTp { + switch prop.MPPPartitionTp { case property.AnyType: return false case property.BroadcastType: @@ -2111,10 +2115,10 @@ func (t *mppTask) needEnforce(prop *property.PhysicalProperty) bool { return true } // TODO: consider equalivant class - if len(prop.PartitionCols) != len(t.hashCols) { + if len(prop.MPPPartitionCols) != len(t.hashCols) { return true } - for i, col := range prop.PartitionCols { + for i, col := range prop.MPPPartitionCols { if !col.Equal(nil, t.hashCols[i]) { return true } @@ -2125,6 +2129,7 @@ func (t *mppTask) needEnforce(prop *property.PhysicalProperty) bool { func (t *mppTask) enforceExchanger(prop *property.PhysicalProperty) *mppTask { if len(prop.SortItems) != 0 { + t.p.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because operator `Sort` is not supported now.") return &mppTask{} } if !t.needEnforce(prop) { @@ -2134,17 +2139,18 @@ func (t *mppTask) enforceExchanger(prop *property.PhysicalProperty) *mppTask { } func (t *mppTask) enforceExchangerImpl(prop *property.PhysicalProperty) *mppTask { - if collate.NewCollationEnabled() && prop.PartitionTp == property.HashType { - for _, col := range prop.PartitionCols { + if collate.NewCollationEnabled() && prop.MPPPartitionTp == property.HashType { + for _, col := range prop.MPPPartitionCols { if types.IsString(col.RetType.Tp) { + t.p.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because when `new_collation_enabled` is true, HashJoin or HashAgg with string key is not supported now.") return &mppTask{cst: math.MaxFloat64} } } } ctx := t.p.SCtx() sender := PhysicalExchangeSender{ - ExchangeType: tipb.ExchangeType(prop.PartitionTp), - HashCols: prop.PartitionCols, + ExchangeType: tipb.ExchangeType(prop.MPPPartitionTp), + HashCols: prop.MPPPartitionCols, }.Init(ctx, t.p.statsInfo()) sender.SetChildren(t.p) receiver := PhysicalExchangeReceiver{}.Init(ctx, t.p.statsInfo()) @@ -2155,7 +2161,7 @@ func (t *mppTask) enforceExchangerImpl(prop *property.PhysicalProperty) *mppTask return &mppTask{ p: receiver, cst: cst, - partTp: prop.PartitionTp, - hashCols: prop.PartitionCols, + partTp: prop.MPPPartitionTp, + hashCols: prop.MPPPartitionCols, } } diff --git a/planner/core/testdata/enforce_mpp_suite_in.json b/planner/core/testdata/enforce_mpp_suite_in.json new file mode 100644 index 0000000000000..8f80d928190cf --- /dev/null +++ b/planner/core/testdata/enforce_mpp_suite_in.json @@ -0,0 +1,67 @@ +[ + { + "name": "TestEnforceMPP", + "cases": [ + "select @@tidb_allow_mpp", + "select @@tidb_enforce_mpp", + "select @@tidb_opt_tiflash_concurrency_factor", + "set @@tidb_allow_mpp=0", + "explain format='verbose' select count(*) from t where a=1", + "explain format='verbose' select /*+ read_from_storage(tikv[t]) */ count(*) from t where a=1", + "explain format='verbose' select /*+ read_from_storage(tiflash[t]) */ count(*) from t where a=1", + "set @@tidb_allow_mpp=1;", + "set @@tidb_enforce_mpp=0;", + "explain format='verbose' select count(*) from t where a=1", + "explain format='verbose' select /*+ read_from_storage(tikv[t]) */ count(*) from t where a=1", + "explain format='verbose' select /*+ read_from_storage(tiflash[t]) */ count(*) from t where a=1", + "set @@tidb_opt_tiflash_concurrency_factor = 1000000", + "explain format='verbose' select count(*) from t where a=1", + "explain format='verbose' select /*+ read_from_storage(tikv[t]) */ count(*) from t where a=1", + "explain format='verbose' select /*+ read_from_storage(tiflash[t]) */ count(*) from t where a=1", + "set @@tidb_enforce_mpp=1;", + "explain format='verbose' select count(*) from t where a=1", + "explain format='verbose' select /*+ read_from_storage(tikv[t]) */ count(*) from t where a=1", + "explain format='verbose' select /*+ read_from_storage(tiflash[t]) */ count(*) from t where a=1" + ] + }, + { + "name": "TestEnforceMPPWarning1", + "cases": [ + "set @@tidb_allow_mpp=1;set @@tidb_enforce_mpp=1;", + "explain select count(*) from t where a=1 -- 1. no replica", + "cmd: create-replica", + "explain select count(*) from t where a=1 -- 2. replica not ready", + "cmd: enable-replica", + "set @@session.tidb_isolation_read_engines = 'tikv';", + "explain select count(*) from t where a=1 -- 3. isolation_engine not match", + "set @@session.tidb_isolation_read_engines = 'tikv, tiflash';", + "explain select /*+ read_from_storage(tikv[t]) */ count(*) from t where a=1 -- 4. hint use tikv", + "explain SELECT a, ROW_NUMBER() OVER (ORDER BY a) FROM t; -- 5. window unsupported", + "EXPLAIN SELECT t1.b FROM t t1 join t t2 where t1.a=t2.a; -- 6. virtual column", + "EXPLAIN SELECT count(b) from t where a=1; -- 7. agg func has virtual column", + "EXPLAIN SELECT count(*) from t group by b; -- 8. group by virtual column", + "EXPLAIN SELECT group_concat(a) from t; -- 9. agg func not supported", + "EXPLAIN SELECT count(a) from t group by md5(a); -- 10. scalar func not supported", + "EXPLAIN SELECT count(a) from t where c=1; -- 11. type not supported" + ] + }, + { + "name": "TestEnforceMPPWarning2", + "cases": [ + "set @@tidb_allow_mpp=1;set @@tidb_enforce_mpp=1;", + "set @@tidb_partition_prune_mode=static;", + "EXPLAIN SELECT count(*) from t where a=1; -- 12. static partition prune", + "set @@tidb_partition_prune_mode=dynamic;" + + ] + }, + { + "name": "TestEnforceMPPWarning3", + "cases": [ + "set @@tidb_allow_mpp=1;set @@tidb_enforce_mpp=1;", + "cmd: enable-new-collation", + "EXPLAIN SELECT count(*) from t group by b; -- 13. new collation FIXME", + "EXPLAIN SELECT * from t t1 join t t2 on t1.b=t2.b; -- 13. new collation FIXME" + ] + } +] diff --git a/planner/core/testdata/enforce_mpp_suite_out.json b/planner/core/testdata/enforce_mpp_suite_out.json new file mode 100644 index 0000000000000..372a69d73513f --- /dev/null +++ b/planner/core/testdata/enforce_mpp_suite_out.json @@ -0,0 +1,438 @@ +[ + { + "Name": "TestEnforceMPP", + "Cases": [ + { + "SQL": "select @@tidb_allow_mpp", + "Plan": [ + "1" + ], + "Warn": null + }, + { + "SQL": "select @@tidb_enforce_mpp", + "Plan": [ + "0" + ], + "Warn": null + }, + { + "SQL": "select @@tidb_opt_tiflash_concurrency_factor", + "Plan": [ + "24" + ], + "Warn": null + }, + { + "SQL": "set @@tidb_allow_mpp=0", + "Plan": null, + "Warn": null + }, + { + "SQL": "explain format='verbose' select count(*) from t where a=1", + "Plan": [ + "StreamAgg_24 1.00 485.00 root funcs:count(Column#6)->Column#4", + "└─IndexReader_25 1.00 32.88 root index:StreamAgg_9", + " └─StreamAgg_9 1.00 35.88 cop[tikv] funcs:count(1)->Column#6", + " └─IndexRangeScan_23 10.00 455.00 cop[tikv] table:t, index:idx(a) range:[1,1], keep order:false, stats:pseudo" + ], + "Warn": null + }, + { + "SQL": "explain format='verbose' select /*+ read_from_storage(tikv[t]) */ count(*) from t where a=1", + "Plan": [ + "StreamAgg_17 1.00 485.00 root funcs:count(Column#6)->Column#4", + "└─IndexReader_18 1.00 32.88 root index:StreamAgg_9", + " └─StreamAgg_9 1.00 35.88 cop[tikv] funcs:count(1)->Column#6", + " └─IndexRangeScan_16 10.00 455.00 cop[tikv] table:t, index:idx(a) range:[1,1], keep order:false, stats:pseudo" + ], + "Warn": null + }, + { + "SQL": "explain format='verbose' select /*+ read_from_storage(tiflash[t]) */ count(*) from t where a=1", + "Plan": [ + "StreamAgg_20 1.00 285050.00 root funcs:count(Column#6)->Column#4", + "└─TableReader_21 1.00 19003.88 root data:StreamAgg_9", + " └─StreamAgg_9 1.00 19006.88 batchCop[tiflash] funcs:count(1)->Column#6", + " └─Selection_19 10.00 285020.00 batchCop[tiflash] eq(test.t.a, 1)", + " └─TableFullScan_18 10000.00 255020.00 batchCop[tiflash] table:t keep order:false, stats:pseudo" + ], + "Warn": null + }, + { + "SQL": "set @@tidb_allow_mpp=1;", + "Plan": null, + "Warn": null + }, + { + "SQL": "set @@tidb_enforce_mpp=0;", + "Plan": null, + "Warn": null + }, + { + "SQL": "explain format='verbose' select count(*) from t where a=1", + "Plan": [ + "StreamAgg_30 1.00 485.00 root funcs:count(Column#7)->Column#4", + "└─IndexReader_31 1.00 32.88 root index:StreamAgg_10", + " └─StreamAgg_10 1.00 35.88 cop[tikv] funcs:count(1)->Column#7", + " └─IndexRangeScan_29 10.00 455.00 cop[tikv] table:t, index:idx(a) range:[1,1], keep order:false, stats:pseudo" + ], + "Warn": null + }, + { + "SQL": "explain format='verbose' select /*+ read_from_storage(tikv[t]) */ count(*) from t where a=1", + "Plan": [ + "StreamAgg_18 1.00 485.00 root funcs:count(Column#6)->Column#4", + "└─IndexReader_19 1.00 32.88 root index:StreamAgg_10", + " └─StreamAgg_10 1.00 35.88 cop[tikv] funcs:count(1)->Column#6", + " └─IndexRangeScan_17 10.00 455.00 cop[tikv] table:t, index:idx(a) range:[1,1], keep order:false, stats:pseudo" + ], + "Warn": null + }, + { + "SQL": "explain format='verbose' select /*+ read_from_storage(tiflash[t]) */ count(*) from t where a=1", + "Plan": [ + "HashAgg_21 1.00 11910.73 root funcs:count(Column#6)->Column#4", + "└─TableReader_23 1.00 11877.13 root data:ExchangeSender_22", + " └─ExchangeSender_22 1.00 285050.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─HashAgg_9 1.00 285050.00 batchCop[tiflash] funcs:count(1)->Column#6", + " └─Selection_20 10.00 285020.00 batchCop[tiflash] eq(test.t.a, 1)", + " └─TableFullScan_19 10000.00 255020.00 batchCop[tiflash] table:t keep order:false, stats:pseudo" + ], + "Warn": null + }, + { + "SQL": "set @@tidb_opt_tiflash_concurrency_factor = 1000000", + "Plan": null, + "Warn": null + }, + { + "SQL": "explain format='verbose' select count(*) from t where a=1", + "Plan": [ + "HashAgg_24 1.00 33.89 root funcs:count(Column#6)->Column#4", + "└─TableReader_26 1.00 0.29 root data:ExchangeSender_25", + " └─ExchangeSender_25 1.00 285050.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─HashAgg_9 1.00 285050.00 batchCop[tiflash] funcs:count(1)->Column#6", + " └─Selection_23 10.00 285020.00 batchCop[tiflash] eq(test.t.a, 1)", + " └─TableFullScan_22 10000.00 255020.00 batchCop[tiflash] table:t keep order:false, stats:pseudo" + ], + "Warn": null + }, + { + "SQL": "explain format='verbose' select /*+ read_from_storage(tikv[t]) */ count(*) from t where a=1", + "Plan": [ + "StreamAgg_18 1.00 485.00 root funcs:count(Column#6)->Column#4", + "└─IndexReader_19 1.00 32.88 root index:StreamAgg_10", + " └─StreamAgg_10 1.00 35.88 cop[tikv] funcs:count(1)->Column#6", + " └─IndexRangeScan_17 10.00 455.00 cop[tikv] table:t, index:idx(a) range:[1,1], keep order:false, stats:pseudo" + ], + "Warn": null + }, + { + "SQL": "explain format='verbose' select /*+ read_from_storage(tiflash[t]) */ count(*) from t where a=1", + "Plan": [ + "HashAgg_21 1.00 33.89 root funcs:count(Column#6)->Column#4", + "└─TableReader_23 1.00 0.29 root data:ExchangeSender_22", + " └─ExchangeSender_22 1.00 285050.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─HashAgg_9 1.00 285050.00 batchCop[tiflash] funcs:count(1)->Column#6", + " └─Selection_20 10.00 285020.00 batchCop[tiflash] eq(test.t.a, 1)", + " └─TableFullScan_19 10000.00 255020.00 batchCop[tiflash] table:t keep order:false, stats:pseudo" + ], + "Warn": null + }, + { + "SQL": "set @@tidb_enforce_mpp=1;", + "Plan": null, + "Warn": null + }, + { + "SQL": "explain format='verbose' select count(*) from t where a=1", + "Plan": [ + "HashAgg_24 1.00 33.60 root funcs:count(Column#6)->Column#4", + "└─TableReader_26 1.00 0.00 root data:ExchangeSender_25", + " └─ExchangeSender_25 1.00 285050.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─HashAgg_9 1.00 285050.00 batchCop[tiflash] funcs:count(1)->Column#6", + " └─Selection_23 10.00 285020.00 batchCop[tiflash] eq(test.t.a, 1)", + " └─TableFullScan_22 10000.00 255020.00 batchCop[tiflash] table:t keep order:false, stats:pseudo" + ], + "Warn": null + }, + { + "SQL": "explain format='verbose' select /*+ read_from_storage(tikv[t]) */ count(*) from t where a=1", + "Plan": [ + "StreamAgg_18 1.00 485.00 root funcs:count(Column#6)->Column#4", + "└─IndexReader_19 1.00 32.88 root index:StreamAgg_10", + " └─StreamAgg_10 1.00 35.88 cop[tikv] funcs:count(1)->Column#6", + " └─IndexRangeScan_17 10.00 455.00 cop[tikv] table:t, index:idx(a) range:[1,1], keep order:false, stats:pseudo" + ], + "Warn": [ + "MPP mode may be blocked because you have set a hint to read table `t` from TiKV." + ] + }, + { + "SQL": "explain format='verbose' select /*+ read_from_storage(tiflash[t]) */ count(*) from t where a=1", + "Plan": [ + "HashAgg_21 1.00 33.60 root funcs:count(Column#6)->Column#4", + "└─TableReader_23 1.00 0.00 root data:ExchangeSender_22", + " └─ExchangeSender_22 1.00 285050.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─HashAgg_9 1.00 285050.00 batchCop[tiflash] funcs:count(1)->Column#6", + " └─Selection_20 10.00 285020.00 batchCop[tiflash] eq(test.t.a, 1)", + " └─TableFullScan_19 10000.00 255020.00 batchCop[tiflash] table:t keep order:false, stats:pseudo" + ], + "Warn": null + } + ] + }, + { + "Name": "TestEnforceMPPWarning1", + "Cases": [ + { + "SQL": "set @@tidb_allow_mpp=1;set @@tidb_enforce_mpp=1;", + "Plan": null, + "Warn": null + }, + { + "SQL": "explain select count(*) from t where a=1 -- 1. no replica", + "Plan": [ + "StreamAgg_17 1.00 root funcs:count(Column#7)->Column#5", + "└─IndexReader_18 1.00 root index:StreamAgg_9", + " └─StreamAgg_9 1.00 cop[tikv] funcs:count(1)->Column#7", + " └─IndexRangeScan_16 10.00 cop[tikv] table:t, index:idx(a) range:[1,1], keep order:false, stats:pseudo" + ], + "Warn": [ + "MPP mode may be blocked because there aren't tiflash replicas of table `t`." + ] + }, + { + "SQL": "cmd: create-replica", + "Plan": null, + "Warn": null + }, + { + "SQL": "explain select count(*) from t where a=1 -- 2. replica not ready", + "Plan": [ + "StreamAgg_17 1.00 root funcs:count(Column#7)->Column#5", + "└─IndexReader_18 1.00 root index:StreamAgg_9", + " └─StreamAgg_9 1.00 cop[tikv] funcs:count(1)->Column#7", + " └─IndexRangeScan_16 10.00 cop[tikv] table:t, index:idx(a) range:[1,1], keep order:false, stats:pseudo" + ], + "Warn": [ + "MPP mode may be blocked because tiflash replicas of table `t` not ready." + ] + }, + { + "SQL": "cmd: enable-replica", + "Plan": null, + "Warn": null + }, + { + "SQL": "set @@session.tidb_isolation_read_engines = 'tikv';", + "Plan": null, + "Warn": null + }, + { + "SQL": "explain select count(*) from t where a=1 -- 3. isolation_engine not match", + "Plan": [ + "StreamAgg_17 1.00 root funcs:count(Column#7)->Column#5", + "└─IndexReader_18 1.00 root index:StreamAgg_9", + " └─StreamAgg_9 1.00 cop[tikv] funcs:count(1)->Column#7", + " └─IndexRangeScan_16 10.00 cop[tikv] table:t, index:idx(a) range:[1,1], keep order:false, stats:pseudo" + ], + "Warn": [ + "MPP mode may be blocked because 'tidb_isolation_read_engines'(value: 'tikv') not match, need 'tiflash'." + ] + }, + { + "SQL": "set @@session.tidb_isolation_read_engines = 'tikv, tiflash';", + "Plan": null, + "Warn": null + }, + { + "SQL": "explain select /*+ read_from_storage(tikv[t]) */ count(*) from t where a=1 -- 4. hint use tikv", + "Plan": [ + "StreamAgg_18 1.00 root funcs:count(Column#7)->Column#5", + "└─IndexReader_19 1.00 root index:StreamAgg_10", + " └─StreamAgg_10 1.00 cop[tikv] funcs:count(1)->Column#7", + " └─IndexRangeScan_17 10.00 cop[tikv] table:t, index:idx(a) range:[1,1], keep order:false, stats:pseudo" + ], + "Warn": [ + "MPP mode may be blocked because you have set a hint to read table `t` from TiKV." + ] + }, + { + "SQL": "explain SELECT a, ROW_NUMBER() OVER (ORDER BY a) FROM t; -- 5. window unsupported", + "Plan": [ + "Window_7 10000.00 root row_number()->Column#6 over(order by test.t.a rows between current row and current row)", + "└─IndexReader_9 10000.00 root index:IndexFullScan_8", + " └─IndexFullScan_8 10000.00 cop[tikv] table:t, index:idx(a) keep order:true, stats:pseudo" + ], + "Warn": [ + "MPP mode may be blocked because operator `Window` is not supported now." + ] + }, + { + "SQL": "EXPLAIN SELECT t1.b FROM t t1 join t t2 where t1.a=t2.a; -- 6. virtual column", + "Plan": [ + "HashJoin_35 12487.50 root inner join, equal:[eq(test.t.a, test.t.a)]", + "├─TableReader_55(Build) 9990.00 root data:Selection_54", + "│ └─Selection_54 9990.00 cop[tiflash] not(isnull(test.t.a))", + "│ └─TableFullScan_53 10000.00 cop[tiflash] table:t2 keep order:false, stats:pseudo", + "└─TableReader_49(Probe) 9990.00 root data:Selection_48", + " └─Selection_48 9990.00 cop[tiflash] not(isnull(test.t.a))", + " └─TableFullScan_47 10000.00 cop[tiflash] table:t1 keep order:false, stats:pseudo" + ], + "Warn": [ + "MPP mode may be blocked because column `test.t.b` is a virtual column which is not supported now." + ] + }, + { + "SQL": "EXPLAIN SELECT count(b) from t where a=1; -- 7. agg func has virtual column", + "Plan": [ + "StreamAgg_10 1.00 root funcs:count(test.t.b)->Column#5", + "└─IndexLookUp_41 10.00 root ", + " ├─IndexRangeScan_39(Build) 10.00 cop[tikv] table:t, index:idx(a) range:[1,1], keep order:false, stats:pseudo", + " └─TableRowIDScan_40(Probe) 10.00 cop[tikv] table:t keep order:false, stats:pseudo" + ], + "Warn": [ + "MPP mode may be blocked because expressions of AggFunc `count` contain virtual column or correlated column, which is not supported now.", + "MPP mode may be blocked because expressions of AggFunc `count` contain virtual column or correlated column, which is not supported now.", + "MPP mode may be blocked because expressions of AggFunc `count` contain virtual column or correlated column, which is not supported now.", + "MPP mode may be blocked because expressions of AggFunc `count` contain virtual column or correlated column, which is not supported now." + ] + }, + { + "SQL": "EXPLAIN SELECT count(*) from t group by b; -- 8. group by virtual column", + "Plan": [ + "HashAgg_5 8000.00 root group by:test.t.b, funcs:count(1)->Column#5", + "└─Projection_11 10000.00 root test.t.b", + " └─TableReader_10 10000.00 root data:TableFullScan_9", + " └─TableFullScan_9 10000.00 cop[tiflash] table:t keep order:false, stats:pseudo" + ], + "Warn": [ + "MPP mode may be blocked because groupByItems contain virtual column, which is not supported now.", + "MPP mode may be blocked because groupByItems contain virtual column, which is not supported now." + ] + }, + { + "SQL": "EXPLAIN SELECT group_concat(a) from t; -- 9. agg func not supported", + "Plan": [ + "HashAgg_5 1.00 root funcs:group_concat(Column#6 separator \",\")->Column#5", + "└─Projection_30 10000.00 root cast(test.t.a, var_string(20))->Column#6", + " └─TableReader_13 10000.00 root data:TableFullScan_11", + " └─TableFullScan_11 10000.00 cop[tiflash] table:t keep order:false, stats:pseudo" + ], + "Warn": [ + "MPP mode may be blocked because AggFunc `group_concat` is not supported now.", + "MPP mode may be blocked because AggFunc `group_concat` is not supported now.", + "MPP mode may be blocked because AggFunc `group_concat` is not supported now." + ] + }, + { + "SQL": "EXPLAIN SELECT count(a) from t group by md5(a); -- 10. scalar func not supported", + "Plan": [ + "HashAgg_5 8000.00 root group by:Column#7, funcs:count(Column#6)->Column#5", + "└─Projection_18 10000.00 root test.t.a, md5(cast(test.t.a, var_string(20)))->Column#7", + " └─TableReader_11 10000.00 root data:TableFullScan_9", + " └─TableFullScan_9 10000.00 cop[tiflash] table:t keep order:false, stats:pseudo" + ], + "Warn": [ + "Scalar function 'md5'(signature: MD5) can not be pushed to tiflash", + "Scalar function 'md5'(signature: MD5) can not be pushed to tiflash" + ] + }, + { + "SQL": "EXPLAIN SELECT count(a) from t where c=1; -- 11. type not supported", + "Plan": [ + "HashAgg_6 1.00 root funcs:count(test.t.a)->Column#5", + "└─Selection_16 10000.00 root eq(test.t.c, 00:00:01.000000)", + " └─TableReader_15 10000.00 root data:TableFullScan_14", + " └─TableFullScan_14 10000.00 cop[tiflash] table:t keep order:false, stats:pseudo" + ], + "Warn": [ + "Expr 'test.t.c' can not be pushed to TiFlash because it contains Duration type", + "Expr 'test.t.c' can not be pushed to TiFlash because it contains Duration type", + "Expr 'test.t.c' can not be pushed to TiFlash because it contains Duration type", + "Expr 'test.t.c' can not be pushed to TiFlash because it contains Duration type", + "Expr 'test.t.c' can not be pushed to TiFlash because it contains Duration type" + ] + } + ] + }, + { + "Name": "TestEnforceMPPWarning2", + "Cases": [ + { + "SQL": "set @@tidb_allow_mpp=1;set @@tidb_enforce_mpp=1;", + "Plan": null, + "Warn": null + }, + { + "SQL": "set @@tidb_partition_prune_mode=static;", + "Plan": null, + "Warn": null + }, + { + "SQL": "EXPLAIN SELECT count(*) from t where a=1; -- 12. static partition prune", + "Plan": [ + "StreamAgg_31 1.00 root funcs:count(Column#6)->Column#4", + "└─TableReader_32 1.00 root data:StreamAgg_12", + " └─StreamAgg_12 1.00 batchCop[tiflash] funcs:count(1)->Column#6", + " └─Selection_30 10.00 batchCop[tiflash] eq(test.t.a, 1)", + " └─TableFullScan_29 10000.00 batchCop[tiflash] table:t, partition:p0 keep order:false, stats:pseudo" + ], + "Warn": [ + "MPP mode may be blocked because table `t`is a partition table which is not supported when `@@tidb_partition_prune_mode=static`." + ] + }, + { + "SQL": "set @@tidb_partition_prune_mode=dynamic;", + "Plan": null, + "Warn": null + } + ] + }, + { + "Name": "TestEnforceMPPWarning3", + "Cases": [ + { + "SQL": "set @@tidb_allow_mpp=1;set @@tidb_enforce_mpp=1;", + "Plan": null, + "Warn": null + }, + { + "SQL": "cmd: enable-new-collation", + "Plan": null, + "Warn": null + }, + { + "SQL": "EXPLAIN SELECT count(*) from t group by b; -- 13. new collation FIXME", + "Plan": [ + "HashAgg_23 8000.00 root group by:test.t.b, funcs:count(Column#7)->Column#4", + "└─TableReader_25 8000.00 root data:ExchangeSender_24", + " └─ExchangeSender_24 8000.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─HashAgg_10 8000.00 batchCop[tiflash] group by:test.t.b, funcs:count(1)->Column#7", + " └─TableFullScan_20 10000.00 batchCop[tiflash] table:t keep order:false, stats:pseudo" + ], + "Warn": [ + "MPP mode may be blocked because when `new_collation_enabled` is true, HashJoin or HashAgg with string key is not supported now.", + "MPP mode may be blocked because when `new_collation_enabled` is true, HashJoin or HashAgg with string key is not supported now." + ] + }, + { + "SQL": "EXPLAIN SELECT * from t t1 join t t2 on t1.b=t2.b; -- 13. new collation FIXME", + "Plan": [ + "TableReader_18 12487.50 root data:ExchangeSender_17", + "└─ExchangeSender_17 12487.50 cop[tiflash] ExchangeType: PassThrough", + " └─HashJoin_8 12487.50 cop[tiflash] inner join, equal:[eq(test.t.b, test.t.b)]", + " ├─ExchangeReceiver_14(Build) 9990.00 cop[tiflash] ", + " │ └─ExchangeSender_13 9990.00 cop[tiflash] ExchangeType: Broadcast", + " │ └─Selection_12 9990.00 cop[tiflash] not(isnull(test.t.b))", + " │ └─TableFullScan_11 10000.00 cop[tiflash] table:t1 keep order:false, stats:pseudo", + " └─Selection_16(Probe) 9990.00 cop[tiflash] not(isnull(test.t.b))", + " └─TableFullScan_15 10000.00 cop[tiflash] table:t2 keep order:false, stats:pseudo" + ], + "Warn": null + } + ] + } +] diff --git a/planner/property/physical_property.go b/planner/property/physical_property.go index 8ddb1a6212437..51cf72572fc9d 100644 --- a/planner/property/physical_property.go +++ b/planner/property/physical_property.go @@ -30,12 +30,12 @@ type SortItem struct { Desc bool } -// PartitionType is the way to partition during mpp data exchanging. -type PartitionType int +// MPPPartitionType is the way to partition during mpp data exchanging. +type MPPPartitionType int const ( // AnyType will not require any special partition types. - AnyType PartitionType = iota + AnyType MPPPartitionType = iota // BroadcastType requires current task to broadcast its data. BroadcastType // HashType requires current task to shuffle its data according to some columns. @@ -70,10 +70,10 @@ type PhysicalProperty struct { CanAddEnforcer bool // If the partition type is hash, the data should be reshuffled by partition cols. - PartitionCols []*expression.Column + MPPPartitionCols []*expression.Column // which types the exchange sender belongs to, only take effects when it's a mpp task. - PartitionTp PartitionType + MPPPartitionTp MPPPartitionType } // NewPhysicalProperty builds property from columns. @@ -97,11 +97,11 @@ func SortItemsFromCols(cols []*expression.Column, desc bool) []SortItem { // IsSubsetOf check if the keys can match the needs of partition. func (p *PhysicalProperty) IsSubsetOf(keys []*expression.Column) []int { - if len(p.PartitionCols) > len(keys) { + if len(p.MPPPartitionCols) > len(keys) { return nil } matches := make([]int, 0, len(keys)) - for _, partCol := range p.PartitionCols { + for _, partCol := range p.MPPPartitionCols { found := false for i, key := range keys { if partCol.Equal(nil, key) { @@ -183,8 +183,8 @@ func (p *PhysicalProperty) HashCode() []byte { } } if p.TaskTp == MppTaskType { - p.hashcode = codec.EncodeInt(p.hashcode, int64(p.PartitionTp)) - for _, col := range p.PartitionCols { + p.hashcode = codec.EncodeInt(p.hashcode, int64(p.MPPPartitionTp)) + for _, col := range p.MPPPartitionCols { p.hashcode = append(p.hashcode, col.HashCode(nil)...) } } @@ -200,11 +200,11 @@ func (p *PhysicalProperty) String() string { // property, specifically, `CanAddEnforcer` should not be included. func (p *PhysicalProperty) CloneEssentialFields() *PhysicalProperty { prop := &PhysicalProperty{ - SortItems: p.SortItems, - TaskTp: p.TaskTp, - ExpectedCnt: p.ExpectedCnt, - PartitionTp: p.PartitionTp, - PartitionCols: p.PartitionCols, + SortItems: p.SortItems, + TaskTp: p.TaskTp, + ExpectedCnt: p.ExpectedCnt, + MPPPartitionTp: p.MPPPartitionTp, + MPPPartitionCols: p.MPPPartitionCols, } return prop } diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index cd076bc50f4b5..da9e1f5dbff77 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -28,6 +28,7 @@ import ( "sync/atomic" "time" + "github.com/pingcap/errors" "github.com/pingcap/parser" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/auth" @@ -508,9 +509,16 @@ type SessionVars struct { // Value set to 2 means to force to send batch cop for any query. Value set to 0 means never use batch cop. AllowBatchCop int - // AllowMPPExecution means if we should use mpp way to execute query. Default value is "ON", means to be determined by the optimizer. - // Value set to "ENFORCE" means to use mpp whenever possible. Value set to means never use mpp. - allowMPPExecution string + // allowMPPExecution means if we should use mpp way to execute query. + // Default value is `true`, means to be determined by the optimizer. + // Value set to `false` means never use mpp. + allowMPPExecution bool + + // enforceMPPExecution means if we should enforce mpp way to execute query. + // Default value is `false`, means to be determined by variable `allowMPPExecution`. + // Value set to `true` means enforce use mpp. + // Note if you want to set `enforceMPPExecution` to `true`, you must set `allowMPPExecution` to `true` first. + enforceMPPExecution bool // TiDBAllowAutoRandExplicitInsert indicates whether explicit insertion on auto_random column is allowed. AllowAutoRandExplicitInsert bool @@ -854,12 +862,21 @@ func (s *SessionVars) AllocMPPTaskID(startTS uint64) int64 { // IsMPPAllowed returns whether mpp execution is allowed. func (s *SessionVars) IsMPPAllowed() bool { - return s.allowMPPExecution != "OFF" + return s.allowMPPExecution } // IsMPPEnforced returns whether mpp execution is enforced. func (s *SessionVars) IsMPPEnforced() bool { - return s.allowMPPExecution == "ENFORCE" + return s.allowMPPExecution && s.enforceMPPExecution +} + +// RaiseWarningWhenMPPEnforced will raise a warning when mpp mode is enforced and executing explain statement. +// TODO: Confirm whether this function will be inlined and +// omit the overhead of string construction when calling with false condition. +func (s *SessionVars) RaiseWarningWhenMPPEnforced(warning string) { + if s.IsMPPEnforced() && s.StmtCtx.InExplainStmt { + s.StmtCtx.AppendWarning(errors.New(warning)) + } } // CheckAndGetTxnScope will return the transaction scope we should use in the current session. @@ -1089,6 +1106,7 @@ func NewSessionVars() *SessionVars { vars.AllowBatchCop = DefTiDBAllowBatchCop vars.allowMPPExecution = DefTiDBAllowMPPExecution + vars.enforceMPPExecution = DefTiDBEnforceMPPExecution var enableChunkRPC string if config.GetGlobalConfig().TiKVClient.EnableChunkRPC { diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 9af9a9a9afe3c..1727d90be0761 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -836,8 +836,17 @@ var defaultSysVars = []*SysVar{ } return normalizedValue, nil }}, - {Scope: ScopeGlobal | ScopeSession, Name: TiDBAllowMPPExecution, Value: On, Type: TypeEnum, PossibleValues: []string{"OFF", "ON", "ENFORCE"}, SetSession: func(s *SessionVars, val string) error { - s.allowMPPExecution = val + {Scope: ScopeGlobal | ScopeSession, Name: TiDBAllowMPPExecution, Type: TypeBool, Value: BoolToOnOff(DefTiDBAllowMPPExecution), SetSession: func(s *SessionVars, val string) error { + s.allowMPPExecution = TiDBOptOn(val) + return nil + }}, + {Scope: ScopeSession, Name: TiDBEnforceMPPExecution, Type: TypeBool, Value: BoolToOnOff(config.GetGlobalConfig().Performance.EnforceMPP), Validation: func(vars *SessionVars, normalizedValue string, originalValue string, scope ScopeFlag) (string, error) { + if TiDBOptOn(normalizedValue) && !vars.allowMPPExecution { + return normalizedValue, ErrWrongValueForVar.GenWithStackByArgs("tidb_enforce_mpp", "1' but tidb_allow_mpp is 0, please activate tidb_allow_mpp at first.") + } + return normalizedValue, nil + }, SetSession: func(s *SessionVars, val string) error { + s.enforceMPPExecution = TiDBOptOn(val) return nil }}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBBCJThresholdCount, Value: strconv.Itoa(DefBroadcastJoinThresholdCount), Type: TypeInt, MinValue: 0, MaxValue: math.MaxInt64, SetSession: func(s *SessionVars, val string) error { diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 62e05bc3a1c41..e8e4c075c81d8 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -299,10 +299,17 @@ const ( // The default value is 0 TiDBAllowBatchCop = "tidb_allow_batch_cop" - // TiDBAllowMPPExecution means if we should use mpp way to execute query. Default value is 1 (or 'ON'), means to be determined by the optimizer. - // Value set to 2 (or 'ENFORCE') which means to use mpp whenever possible. Value set to 2 (or 'OFF') means never use mpp. + // TiDBAllowMPPExecution means if we should use mpp way to execute query or not. + // Default value is `true`, means to be determined by the optimizer. + // Value set to `false` means never use mpp. TiDBAllowMPPExecution = "tidb_allow_mpp" + // TiDBEnforceMPPExecution means if we should enforce mpp way to execute query or not. + // Default value is `false`, means to be determined by variable `tidb_allow_mpp`. + // Value set to `true` means enforce use mpp. + // Note if you want to set `tidb_enforce_mpp` to `true`, you must set `tidb_allow_mpp` to `true` first. + TiDBEnforceMPPExecution = "tidb_enforce_mpp" + // TiDBInitChunkSize is used to control the init chunk size during query execution. TiDBInitChunkSize = "tidb_init_chunk_size" @@ -642,7 +649,8 @@ const ( DefBroadcastJoinThresholdCount = 10 * 1024 DefTiDBOptimizerSelectivityLevel = 0 DefTiDBAllowBatchCop = 1 - DefTiDBAllowMPPExecution = "ON" + DefTiDBAllowMPPExecution = true + DefTiDBEnforceMPPExecution = false DefTiDBTxnMode = "" DefTiDBRowFormatV1 = 1 DefTiDBRowFormatV2 = 2 diff --git a/tidb-server/main.go b/tidb-server/main.go index 0c32603190c01..c3f60a466a13e 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -537,6 +537,7 @@ func setGlobalVars() { variable.SetSysVar(variable.DataDir, cfg.Path) variable.SetSysVar(variable.TiDBSlowQueryFile, cfg.Log.SlowQueryFile) variable.SetSysVar(variable.TiDBIsolationReadEngines, strings.Join(cfg.IsolationRead.Engines, ",")) + variable.SetSysVar(variable.TiDBEnforceMPPExecution, variable.BoolToOnOff(config.GetGlobalConfig().Performance.EnforceMPP)) variable.MemoryUsageAlarmRatio.Store(cfg.Performance.MemoryUsageAlarmRatio) if hostname, err := os.Hostname(); err != nil { variable.SetSysVar(variable.Hostname, hostname)