diff --git a/DEPS.bzl b/DEPS.bzl index 0333a2dc70184..fe7d110931a2c 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -2881,8 +2881,8 @@ def go_deps(): name = "com_github_pingcap_badger", build_file_proto_mode = "disable_global", importpath = "github.com/pingcap/badger", - sum = "h1:QB16qn8wx5X4SRn3/5axrjPMNS3WRt87+5Bfrnmt6IA=", - version = "v1.5.1-0.20221229114011-ddffaa0fff7a", + sum = "h1:AEcvKyVM8CUII3bYzgz8haFXtGiqcrtXW1csu/5UELY=", + version = "v1.5.1-0.20230103063557-828f39b09b6d", ) go_repository( name = "com_github_pingcap_check", diff --git a/domain/historical_stats.go b/domain/historical_stats.go index ca68319c31ba8..5d6d90feedef8 100644 --- a/domain/historical_stats.go +++ b/domain/historical_stats.go @@ -17,6 +17,7 @@ package domain import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/metrics" + "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/statistics/handle" ) @@ -48,18 +49,27 @@ func (w *HistoricalStatsWorker) DumpHistoricalStats(tableID int64, statsHandle * } sctx := w.sctx is := GetDomain(sctx).InfoSchema() + isPartition := false + var tblInfo *model.TableInfo tbl, existed := is.TableByID(tableID) if !existed { - return errors.Errorf("cannot get table by id %d", tableID) + tbl, db, p := is.FindTableByPartitionID(tableID) + if tbl != nil && db != nil && p != nil { + isPartition = true + tblInfo = tbl.Meta() + } else { + return errors.Errorf("cannot get table by id %d", tableID) + } + } else { + tblInfo = tbl.Meta() } - tblInfo := tbl.Meta() dbInfo, existed := is.SchemaByTable(tblInfo) if !existed { return errors.Errorf("cannot get DBInfo by TableID %d", tableID) } - if _, err := statsHandle.RecordHistoricalStatsToStorage(dbInfo.Name.O, tblInfo); err != nil { + if _, err := statsHandle.RecordHistoricalStatsToStorage(dbInfo.Name.O, tblInfo, tableID, isPartition); err != nil { generateHistoricalStatsFailedCounter.Inc() - return errors.Errorf("record table %s.%s's historical stats failed", dbInfo.Name.O, tblInfo.Name.O) + return errors.Errorf("record table %s.%s's historical stats failed, err:%v", dbInfo.Name.O, tblInfo.Name.O, err) } generateHistoricalStatsSuccessCounter.Inc() return nil diff --git a/executor/analyze.go b/executor/analyze.go index af223b24dd4a8..705e6eed6c590 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -291,6 +291,8 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n } } + tableIDs := map[int64]struct{}{} + // save analyze results in single-thread. statsHandle := domain.GetDomain(e.ctx).StatsHandle() panicCnt := 0 @@ -311,6 +313,7 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n continue } handleGlobalStats(needGlobalStats, globalStatsMap, results) + tableIDs[results.TableID.GetStatisticsID()] = struct{}{} if err1 := statsHandle.SaveTableStatsToStorage(results, e.ctx.GetSessionVars().EnableAnalyzeSnapshot, handle.StatsMetaHistorySourceAnalyze); err1 != nil { tableID := results.TableID.TableID @@ -319,10 +322,6 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n finishJobWithLog(e.ctx, results.Job, err) } else { finishJobWithLog(e.ctx, results.Job, nil) - // Dump stats to historical storage. - if err := recordHistoricalStats(e.ctx, results.TableID.TableID); err != nil { - logutil.BgLogger().Error("record historical stats failed", zap.Error(err)) - } } invalidInfoSchemaStatCache(results.TableID.GetStatisticsID()) if atomic.LoadUint32(&e.ctx.GetSessionVars().Killed) == 1 { @@ -330,6 +329,13 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n return errors.Trace(ErrQueryInterrupted) } } + // Dump stats to historical storage. + for tableID := range tableIDs { + if err := recordHistoricalStats(e.ctx, tableID); err != nil { + logutil.BgLogger().Error("record historical stats failed", zap.Error(err)) + } + } + return err } @@ -348,6 +354,7 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency(ctx context.Context, sta worker.run(ctx1, e.ctx.GetSessionVars().EnableAnalyzeSnapshot) }) } + tableIDs := map[int64]struct{}{} panicCnt := 0 var err error for panicCnt < statsConcurrency { @@ -370,6 +377,7 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency(ctx context.Context, sta continue } handleGlobalStats(needGlobalStats, globalStatsMap, results) + tableIDs[results.TableID.GetStatisticsID()] = struct{}{} saveResultsCh <- results } close(saveResultsCh) @@ -382,6 +390,12 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency(ctx context.Context, sta } err = errors.New(strings.Join(errMsg, ",")) } + for tableID := range tableIDs { + // Dump stats to historical storage. + if err := recordHistoricalStats(e.ctx, tableID); err != nil { + logutil.BgLogger().Error("record historical stats failed", zap.Error(err)) + } + } return err } diff --git a/executor/analyze_global_stats.go b/executor/analyze_global_stats.go index 6b11e68a3e614..e8f8d53b8adbf 100644 --- a/executor/analyze_global_stats.go +++ b/executor/analyze_global_stats.go @@ -54,7 +54,9 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, needGlobalStats boo globalStatsTableIDs[globalStatsID.tableID] = struct{}{} } statsHandle := domain.GetDomain(e.ctx).StatsHandle() + tableIDs := map[int64]struct{}{} for tableID := range globalStatsTableIDs { + tableIDs[tableID] = struct{}{} tableAllPartitionStats := make(map[int64]*statistics.Table) for globalStatsID, info := range globalStatsMap { if globalStatsID.tableID != tableID { @@ -101,16 +103,18 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, needGlobalStats boo logutil.Logger(ctx).Error("save global-level stats to storage failed", zap.String("info", job.JobInfo), zap.Int64("histID", hg.ID), zap.Error(err), zap.Int64("tableID", tableID)) } - // Dump stats to historical storage. - if err1 := recordHistoricalStats(e.ctx, globalStatsID.tableID); err1 != nil { - logutil.BgLogger().Error("record historical stats failed", zap.String("info", job.JobInfo), zap.Int64("histID", hg.ID), zap.Error(err1)) - } } return err }() FinishAnalyzeMergeJob(e.ctx, job, mergeStatsErr) } } + for tableID := range tableIDs { + // Dump stats to historical storage. + if err := recordHistoricalStats(e.ctx, tableID); err != nil { + logutil.BgLogger().Error("record historical stats failed", zap.Error(err)) + } + } return nil } diff --git a/executor/analyze_worker.go b/executor/analyze_worker.go index 18edc514c5d4d..688f89f5a120d 100644 --- a/executor/analyze_worker.go +++ b/executor/analyze_worker.go @@ -66,10 +66,6 @@ func (worker *analyzeSaveStatsWorker) run(ctx context.Context, analyzeSnapshot b worker.errCh <- err } else { finishJobWithLog(worker.sctx, results.Job, nil) - // Dump stats to historical storage. - if err := recordHistoricalStats(worker.sctx, results.TableID.TableID); err != nil { - logutil.BgLogger().Error("record historical stats failed", zap.Error(err)) - } } invalidInfoSchemaStatCache(results.TableID.GetStatisticsID()) if err != nil { diff --git a/executor/historical_stats_test.go b/executor/historical_stats_test.go index 809c2c862bf43..6ae23dcebb365 100644 --- a/executor/historical_stats_test.go +++ b/executor/historical_stats_test.go @@ -216,3 +216,30 @@ func TestGCOutdatedHistoryStats(t *testing.T) { tk.MustQuery(fmt.Sprintf("select count(*) from mysql.stats_history where table_id = '%d'", tableInfo.Meta().ID)).Check(testkit.Rows("0")) } + +func TestPartitionTableHistoricalStats(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_historical_stats = 1") + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec(`CREATE TABLE t (a int, b int, index idx(b)) +PARTITION BY RANGE ( a ) ( +PARTITION p0 VALUES LESS THAN (6) +)`) + tk.MustExec("delete from mysql.stats_history") + + tk.MustExec("analyze table test.t") + // dump historical stats + h := dom.StatsHandle() + hsWorker := dom.GetHistoricalStatsWorker() + + // assert global table and partition table be dumped + tblID := hsWorker.GetOneHistoricalStatsTable() + err := hsWorker.DumpHistoricalStats(tblID, h) + require.NoError(t, err) + tblID = hsWorker.GetOneHistoricalStatsTable() + err = hsWorker.DumpHistoricalStats(tblID, h) + require.NoError(t, err) + tk.MustQuery("select count(*) from mysql.stats_history").Check(testkit.Rows("2")) +} diff --git a/expression/BUILD.bazel b/expression/BUILD.bazel index 5a201d906b5a3..17436f517a673 100644 --- a/expression/BUILD.bazel +++ b/expression/BUILD.bazel @@ -122,7 +122,7 @@ go_library( go_test( name = "expression_test", - timeout = "short", + timeout = "moderate", srcs = [ "bench_test.go", "builtin_arithmetic_test.go", diff --git a/go.mod b/go.mod index d87619f7dc67c..5bcda5c055148 100644 --- a/go.mod +++ b/go.mod @@ -66,7 +66,7 @@ require ( github.com/opentracing/basictracer-go v1.0.0 github.com/opentracing/opentracing-go v1.2.0 github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2 - github.com/pingcap/badger v1.5.1-0.20221229114011-ddffaa0fff7a + github.com/pingcap/badger v1.5.1-0.20230103063557-828f39b09b6d github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32 github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3 github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 diff --git a/go.sum b/go.sum index efeb0a0fb5537..41e8406455de2 100644 --- a/go.sum +++ b/go.sum @@ -761,8 +761,8 @@ github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2/go.mod h1:iIss55rK github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= -github.com/pingcap/badger v1.5.1-0.20221229114011-ddffaa0fff7a h1:QB16qn8wx5X4SRn3/5axrjPMNS3WRt87+5Bfrnmt6IA= -github.com/pingcap/badger v1.5.1-0.20221229114011-ddffaa0fff7a/go.mod h1:p8QnkZnmyV8L/M/jzYb8rT7kv3bz9m7bn1Ju94wDifs= +github.com/pingcap/badger v1.5.1-0.20230103063557-828f39b09b6d h1:AEcvKyVM8CUII3bYzgz8haFXtGiqcrtXW1csu/5UELY= +github.com/pingcap/badger v1.5.1-0.20230103063557-828f39b09b6d/go.mod h1:p8QnkZnmyV8L/M/jzYb8rT7kv3bz9m7bn1Ju94wDifs= github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8/go.mod h1:B1+S9LNcuMyLH/4HMTViQOJevkGiik3wW2AN9zb2fNQ= github.com/pingcap/check v0.0.0-20191107115940-caf2b9e6ccf4/go.mod h1:PYMCGwN0JHjoqGr3HrZoD+b8Tgx8bKnArhSq8YVzUMc= github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712 h1:R8gStypOBmpnHEx1qi//SaqxJVI4inOqljg/Aj5/390= diff --git a/planner/core/expression_rewriter.go b/planner/core/expression_rewriter.go index 5a8f8a2df9f35..a2d1f242a0ff6 100644 --- a/planner/core/expression_rewriter.go +++ b/planner/core/expression_rewriter.go @@ -1559,16 +1559,10 @@ func (er *expressionRewriter) inToExpression(lLen int, not bool, tp *types.Field if c, ok := args[i].(*expression.Constant); ok { var isExceptional bool if expression.MaybeOverOptimized4PlanCache(er.sctx, []expression.Expression{c}) { - if c.GetType().EvalType() == types.ETString { - // To keep the result be compatible with MySQL, refine `int non-constant str constant` - // here and skip this refine operation in all other cases for safety. - er.sctx.GetSessionVars().StmtCtx.SetSkipPlanCache(errors.Errorf("skip plan-cache: '%v' may be converted to INT", c.String())) - expression.RemoveMutableConst(er.sctx, []expression.Expression{c}) - } else { - continue + if c.GetType().EvalType() == types.ETInt { + continue // no need to refine it } - } else if !er.sctx.GetSessionVars().StmtCtx.UseCache { - // We should remove the mutable constant for correctness, because its value may be changed. + er.sctx.GetSessionVars().StmtCtx.SetSkipPlanCache(errors.Errorf("skip plan-cache: '%v' may be converted to INT", c.String())) expression.RemoveMutableConst(er.sctx, []expression.Expression{c}) } args[i], isExceptional = expression.RefineComparedConstant(er.sctx, *leftFt, c, opcode.EQ) diff --git a/planner/core/plan_cache_test.go b/planner/core/plan_cache_test.go index 0aa5c4e02d7cb..d7d47e55bc62e 100644 --- a/planner/core/plan_cache_test.go +++ b/planner/core/plan_cache_test.go @@ -385,6 +385,38 @@ func TestPlanCacheDiagInfo(t *testing.T) { tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 skip plan-cache: some parameters may be overwritten")) } +func TestIssue40224(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t (a int, key(a))") + tk.MustExec("prepare st from 'select a from t where a in (?, ?)'") + tk.MustExec("set @a=1.0, @b=2.0") + tk.MustExec("execute st using @a, @b") + tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 skip plan-cache: '1.0' may be converted to INT")) + tk.MustExec("execute st using @a, @b") + tkProcess := tk.Session().ShowProcess() + ps := []*util.ProcessInfo{tkProcess} + tk.Session().SetSessionManager(&testkit.MockSessionManager{PS: ps}) + tk.MustQuery(fmt.Sprintf("explain for connection %d", tkProcess.ID)).CheckAt([]int{0}, + [][]interface{}{ + {"IndexReader_6"}, + {"└─IndexRangeScan_5"}, // range scan not full scan + }) + + tk.MustExec("set @a=1, @b=2") + tk.MustExec("execute st using @a, @b") + tk.MustQuery("show warnings").Check(testkit.Rows()) // no warning for INT values + tk.MustExec("execute st using @a, @b") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) // cacheable for INT + tk.MustExec("execute st using @a, @b") + tk.MustQuery(fmt.Sprintf("explain for connection %d", tkProcess.ID)).CheckAt([]int{0}, + [][]interface{}{ + {"IndexReader_6"}, + {"└─IndexRangeScan_5"}, // range scan not full scan + }) +} + func TestIssue40225(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) diff --git a/resourcemanager/BUILD.bazel b/resourcemanager/BUILD.bazel index 2b97ccbd25bdd..968a73b4a0f95 100644 --- a/resourcemanager/BUILD.bazel +++ b/resourcemanager/BUILD.bazel @@ -1,12 +1,19 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( - name = "resourcemanage", - srcs = ["rm.go"], + name = "resourcemanager", + srcs = [ + "rm.go", + "schedule.go", + ], importpath = "github.com/pingcap/tidb/resourcemanager", visibility = ["//visibility:public"], deps = [ + "//resourcemanager/scheduler", + "//resourcemanager/util", "//util", "//util/cpu", + "@com_github_pingcap_log//:log", + "@org_uber_go_zap//:zap", ], ) diff --git a/resourcemanager/rm.go b/resourcemanager/rm.go index 1195176fbde95..35ac6afff960f 100644 --- a/resourcemanager/rm.go +++ b/resourcemanager/rm.go @@ -15,6 +15,10 @@ package resourcemanager import ( + "time" + + "github.com/pingcap/tidb/resourcemanager/scheduler" + "github.com/pingcap/tidb/resourcemanager/util" tidbutil "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/cpu" ) @@ -24,24 +28,55 @@ var GlobalResourceManager = NewResourceManger() // ResourceManager is a resource manager type ResourceManager struct { + poolMap *util.ShardPoolMap + scheduler []scheduler.Scheduler cpuObserver *cpu.Observer + exitCh chan struct{} wg tidbutil.WaitGroupWrapper } // NewResourceManger is to create a new resource manager func NewResourceManger() *ResourceManager { + sc := make([]scheduler.Scheduler, 0, 1) + sc = append(sc, scheduler.NewCPUScheduler()) return &ResourceManager{ cpuObserver: cpu.NewCPUObserver(), + exitCh: make(chan struct{}), + poolMap: util.NewShardPoolMap(), + scheduler: sc, } } // Start is to start resource manager func (r *ResourceManager) Start() { r.wg.Run(r.cpuObserver.Start) + r.wg.Run(func() { + tick := time.NewTicker(100 * time.Millisecond) + defer tick.Stop() + for { + select { + case <-tick.C: + r.schedule() + case <-r.exitCh: + return + } + } + }) } // Stop is to stop resource manager func (r *ResourceManager) Stop() { r.cpuObserver.Stop() + close(r.exitCh) r.wg.Wait() } + +// Register is to register pool into resource manager +func (r *ResourceManager) Register(pool util.GorotinuePool, name string, component util.Component) error { + p := util.PoolContainer{Pool: pool, Component: component} + return r.registerPool(name, &p) +} + +func (r *ResourceManager) registerPool(name string, pool *util.PoolContainer) error { + return r.poolMap.Add(name, pool) +} diff --git a/resourcemanager/schedule.go b/resourcemanager/schedule.go new file mode 100644 index 0000000000000..41560eed5c2a4 --- /dev/null +++ b/resourcemanager/schedule.go @@ -0,0 +1,69 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package resourcemanager + +import ( + "time" + + "github.com/pingcap/log" + "github.com/pingcap/tidb/resourcemanager/scheduler" + "github.com/pingcap/tidb/resourcemanager/util" + "go.uber.org/zap" +) + +func (r *ResourceManager) schedule() { + r.poolMap.Iter(func(pool *util.PoolContainer) { + cmd := r.schedulePool(pool) + r.exec(pool, cmd) + }) +} + +func (r *ResourceManager) schedulePool(pool *util.PoolContainer) scheduler.Command { + for _, sch := range r.scheduler { + cmd := sch.Tune(pool.Component, pool.Pool) + switch cmd { + case scheduler.Hold: + continue + default: + return cmd + } + } + return scheduler.Hold +} + +func (*ResourceManager) exec(pool *util.PoolContainer, cmd scheduler.Command) { + if cmd == scheduler.Hold { + return + } + if time.Since(pool.Pool.LastTunerTs()) > 200*time.Millisecond { + con := pool.Pool.Cap() + switch cmd { + case scheduler.Downclock: + concurrency := con - 1 + log.Info("downclock goroutine pool", + zap.Int("origin concurrency", con), + zap.Int("concurrency", concurrency), + zap.String("name", pool.Pool.Name())) + pool.Pool.Tune(concurrency) + case scheduler.Overclock: + concurrency := con + 1 + log.Info("overclock goroutine pool", + zap.Int("origin concurrency", con), + zap.Int("concurrency", concurrency), + zap.String("name", pool.Pool.Name())) + pool.Pool.Tune(concurrency) + } + } +} diff --git a/resourcemanager/scheduler/BUILD.bazel b/resourcemanager/scheduler/BUILD.bazel new file mode 100644 index 0000000000000..5dc17e8412d17 --- /dev/null +++ b/resourcemanager/scheduler/BUILD.bazel @@ -0,0 +1,16 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "scheduler", + srcs = [ + "cpu_scheduler.go", + "scheduler.go", + ], + importpath = "github.com/pingcap/tidb/resourcemanager/scheduler", + visibility = ["//visibility:public"], + deps = [ + "//resourcemanager/util", + "//util/cpu", + "@org_uber_go_atomic//:atomic", + ], +) diff --git a/resourcemanager/scheduler/cpu_scheduler.go b/resourcemanager/scheduler/cpu_scheduler.go new file mode 100644 index 0000000000000..7d0bdf1d31a07 --- /dev/null +++ b/resourcemanager/scheduler/cpu_scheduler.go @@ -0,0 +1,44 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scheduler + +import ( + "time" + + "github.com/pingcap/tidb/resourcemanager/util" + "github.com/pingcap/tidb/util/cpu" +) + +// CPUScheduler is a cpu scheduler +type CPUScheduler struct{} + +// NewCPUScheduler is to create a new cpu scheduler +func NewCPUScheduler() *CPUScheduler { + return &CPUScheduler{} +} + +// Tune is to tune the goroutine pool +func (*CPUScheduler) Tune(_ util.Component, pool util.GorotinuePool) Command { + if time.Since(pool.LastTunerTs()) < minCPUSchedulerInterval.Load() { + return Hold + } + if cpu.GetCPUUsage() < 0.5 { + return Downclock + } + if cpu.GetCPUUsage() > 0.7 { + return Overclock + } + return Hold +} diff --git a/resourcemanager/scheduler/scheduler.go b/resourcemanager/scheduler/scheduler.go new file mode 100644 index 0000000000000..6cba0e18923cc --- /dev/null +++ b/resourcemanager/scheduler/scheduler.go @@ -0,0 +1,43 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scheduler + +import ( + "time" + + "github.com/pingcap/tidb/resourcemanager/util" + "go.uber.org/atomic" +) + +var ( + minCPUSchedulerInterval = atomic.NewDuration(time.Minute) +) + +// Command is the command for scheduler +type Command int + +const ( + // Downclock is to reduce the number of concurrency. + Downclock Command = iota + // Hold is to hold the number of concurrency. + Hold + // Overclock is to increase the number of concurrency. + Overclock +) + +// Scheduler is a scheduler interface +type Scheduler interface { + Tune(component util.Component, p util.GorotinuePool) Command +} diff --git a/resourcemanager/util/BUILD.bazel b/resourcemanager/util/BUILD.bazel new file mode 100644 index 0000000000000..7688b26a93d93 --- /dev/null +++ b/resourcemanager/util/BUILD.bazel @@ -0,0 +1,20 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "util", + srcs = [ + "mock_gpool.go", + "shard_pool_map.go", + "util.go", + ], + importpath = "github.com/pingcap/tidb/resourcemanager/util", + visibility = ["//visibility:public"], + deps = ["@com_github_pingcap_errors//:errors"], +) + +go_test( + name = "util_test", + srcs = ["shard_pool_map_test.go"], + embed = [":util"], + deps = ["@com_github_stretchr_testify//require"], +) diff --git a/resourcemanager/util/mock_gpool.go b/resourcemanager/util/mock_gpool.go new file mode 100644 index 0000000000000..b9e66dd9afeab --- /dev/null +++ b/resourcemanager/util/mock_gpool.go @@ -0,0 +1,97 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import "time" + +// MockGPool is only for test +type MockGPool struct { + name string +} + +// NewMockGPool is only for test +func NewMockGPool(name string) *MockGPool { + return &MockGPool{name: name} +} + +// Release is only for test +func (*MockGPool) Release() { + panic("implement me") +} + +// Tune is only for test +func (*MockGPool) Tune(_ int) { + panic("implement me") +} + +// LastTunerTs is only for test +func (*MockGPool) LastTunerTs() time.Time { + panic("implement me") +} + +// MaxInFlight is only for test +func (*MockGPool) MaxInFlight() int64 { + panic("implement me") +} + +// InFlight is only for test +func (*MockGPool) InFlight() int64 { + panic("implement me") +} + +// MinRT is only for test +func (*MockGPool) MinRT() uint64 { + panic("implement me") +} + +// MaxPASS is only for test +func (*MockGPool) MaxPASS() uint64 { + panic("implement me") +} + +// Cap is only for test +func (*MockGPool) Cap() int { + panic("implement me") +} + +// LongRTT is to represent the baseline latency by tracking a measurement of the long term, less volatile RTT. +func (*MockGPool) LongRTT() float64 { + panic("implement me") +} + +// UpdateLongRTT is only for test +func (*MockGPool) UpdateLongRTT(_ func(float64) float64) { + panic("implement me") +} + +// ShortRTT is to represent the current system latency by tracking a measurement of the short time, and more volatile RTT. +func (*MockGPool) ShortRTT() uint64 { + panic("implement me") +} + +// GetQueueSize is only for test +func (*MockGPool) GetQueueSize() int64 { + panic("implement me") +} + +// Running is only for test +func (*MockGPool) Running() int { + panic("implement me") +} + +// Name is only for test +func (m *MockGPool) Name() string { + return m.name +} diff --git a/resourcemanager/util/shard_pool_map.go b/resourcemanager/util/shard_pool_map.go new file mode 100644 index 0000000000000..8819c56ed36fa --- /dev/null +++ b/resourcemanager/util/shard_pool_map.go @@ -0,0 +1,80 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "sync" + + "github.com/pingcap/errors" +) + +const shard = 8 + +func hash(key string) int { + return int(key[0]) % shard +} + +// ShardPoolMap is a map with shard +type ShardPoolMap struct { + pools [shard]poolMap +} + +// NewShardPoolMap creates a shard pool map +func NewShardPoolMap() *ShardPoolMap { + var result ShardPoolMap + for i := 0; i < shard; i++ { + result.pools[i] = newPoolMap() + } + return &result +} + +// Add adds a pool to the map +func (s *ShardPoolMap) Add(key string, pool *PoolContainer) error { + return s.pools[hash(key)].Add(key, pool) +} + +// Iter iterates the map +func (s *ShardPoolMap) Iter(fn func(pool *PoolContainer)) { + for i := 0; i < shard; i++ { + s.pools[i].Iter(fn) + } +} + +type poolMap struct { + mu sync.RWMutex + poolMap map[string]*PoolContainer +} + +func newPoolMap() poolMap { + return poolMap{poolMap: make(map[string]*PoolContainer)} +} + +func (p *poolMap) Add(key string, pool *PoolContainer) error { + p.mu.Lock() + defer p.mu.Unlock() + if _, contain := p.poolMap[key]; contain { + return errors.New("pool is already exist") + } + p.poolMap[key] = pool + return nil +} + +func (p *poolMap) Iter(fn func(pool *PoolContainer)) { + p.mu.RLock() + defer p.mu.RUnlock() + for _, pool := range p.poolMap { + fn(pool) + } +} diff --git a/resourcemanager/util/shard_pool_map_test.go b/resourcemanager/util/shard_pool_map_test.go new file mode 100644 index 0000000000000..34e0a11ca5976 --- /dev/null +++ b/resourcemanager/util/shard_pool_map_test.go @@ -0,0 +1,38 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "strconv" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestShardPoolMap(t *testing.T) { + rc := 10 + pm := NewShardPoolMap() + for i := 0; i < rc; i++ { + id := strconv.FormatInt(int64(i), 10) + require.NoError(t, pm.Add(id, &PoolContainer{Pool: NewMockGPool(id), Component: DDL})) + } + require.Error(t, pm.Add("1", &PoolContainer{Pool: NewMockGPool("1"), Component: DDL})) + var cnt atomic.Int32 + pm.Iter(func(pool *PoolContainer) { + cnt.Add(1) + }) + require.Equal(t, rc, int(cnt.Load())) +} diff --git a/resourcemanager/util/util.go b/resourcemanager/util/util.go new file mode 100644 index 0000000000000..d5b988c344295 --- /dev/null +++ b/resourcemanager/util/util.go @@ -0,0 +1,53 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import "time" + +// GorotinuePool is a pool interface +type GorotinuePool interface { + Release() + Tune(size int) + LastTunerTs() time.Time + MaxInFlight() int64 + InFlight() int64 + MinRT() uint64 + MaxPASS() uint64 + Cap() int + // LongRTT is to represent the baseline latency by tracking a measurement of the long term, less volatile RTT. + LongRTT() float64 + UpdateLongRTT(f func(float64) float64) + // ShortRTT is to represent the current system latency by tracking a measurement of the short time, and more volatile RTT. + ShortRTT() uint64 + GetQueueSize() int64 + Running() int + Name() string +} + +// PoolContainer is a pool container +type PoolContainer struct { + Pool GorotinuePool + Component Component +} + +// Component is ID for difference component +type Component int + +const ( + // UNKNOWN is for unknown component. It is only for test + UNKNOWN Component = iota + // DDL is for ddl component + DDL +) diff --git a/statistics/handle/dump.go b/statistics/handle/dump.go index 75f4ee9ea958a..a83c6e57ee3c7 100644 --- a/statistics/handle/dump.go +++ b/statistics/handle/dump.go @@ -46,6 +46,7 @@ type JSONTable struct { Count int64 `json:"count"` ModifyCount int64 `json:"modify_count"` Partitions map[string]*JSONTable `json:"partitions"` + Version uint64 `json:"version"` } type jsonExtendedStats struct { @@ -228,6 +229,7 @@ func GenJSONTableFromStats(dbName string, tableInfo *model.TableInfo, tbl *stati Indices: make(map[string]*jsonColumn, len(tbl.Indices)), Count: tbl.Count, ModifyCount: tbl.ModifyCount, + Version: tbl.Version, } for _, col := range tbl.Columns { sc := &stmtctx.StatementContext{TimeZone: time.UTC} diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index f53f075301acb..b3a9c99298a92 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -2565,17 +2565,27 @@ func (h *Handle) GetPredicateColumns(tableID int64) ([]int64, error) { const maxColumnSize = 6 << 20 // RecordHistoricalStatsToStorage records the given table's stats data to mysql.stats_history -func (h *Handle) RecordHistoricalStatsToStorage(dbName string, tableInfo *model.TableInfo) (uint64, error) { +func (h *Handle) RecordHistoricalStatsToStorage(dbName string, tableInfo *model.TableInfo, physicalID int64, isPartition bool) (uint64, error) { ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) - js, err := h.DumpStatsToJSON(dbName, tableInfo, nil, true) + var js *JSONTable + var err error + if isPartition { + js, err = h.tableStatsToJSON(dbName, tableInfo, physicalID, 0) + } else { + js, err = h.DumpStatsToJSON(dbName, tableInfo, nil, true) + } if err != nil { return 0, errors.Trace(err) } version := uint64(0) - for _, value := range js.Columns { - version = uint64(*value.StatsVer) - if version != 0 { - break + if len(js.Partitions) == 0 { + version = js.Version + } else { + for _, p := range js.Partitions { + version = p.Version + if version != 0 { + break + } } } blocks, err := JSONTableToBlocks(js, maxColumnSize) @@ -2596,7 +2606,7 @@ func (h *Handle) RecordHistoricalStatsToStorage(dbName string, tableInfo *model. const sql = "INSERT INTO mysql.stats_history(table_id, stats_data, seq_no, version, create_time) VALUES (%?, %?, %?, %?, %?)" for i := 0; i < len(blocks); i++ { - if _, err := exec.ExecuteInternal(ctx, sql, tableInfo.ID, blocks[i], i, version, ts); err != nil { + if _, err := exec.ExecuteInternal(ctx, sql, physicalID, blocks[i], i, version, ts); err != nil { return version, errors.Trace(err) } } diff --git a/statistics/handle/handle_test.go b/statistics/handle/handle_test.go index 9bb80498bc90f..ac9936bed11fe 100644 --- a/statistics/handle/handle_test.go +++ b/statistics/handle/handle_test.go @@ -3299,7 +3299,7 @@ func TestRecordHistoricalStatsToStorage(t *testing.T) { tableInfo, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) require.NoError(t, err) - version, err := dom.StatsHandle().RecordHistoricalStatsToStorage("t", tableInfo.Meta()) + version, err := dom.StatsHandle().RecordHistoricalStatsToStorage("t", tableInfo.Meta(), tableInfo.Meta().ID, false) require.NoError(t, err) rows := tk.MustQuery(fmt.Sprintf("select count(*) from mysql.stats_history where version = '%d'", version)).Rows() diff --git a/tidb-server/BUILD.bazel b/tidb-server/BUILD.bazel index 361a929351642..b918b7e1ca5f8 100644 --- a/tidb-server/BUILD.bazel +++ b/tidb-server/BUILD.bazel @@ -21,7 +21,7 @@ go_library( "//planner/core", "//plugin", "//privilege/privileges", - "//resourcemanager:resourcemanage", + "//resourcemanager", "//server", "//session", "//session/txninfo",