From be112dc31d548db050aae1f6d0ebde1fd564e8e1 Mon Sep 17 00:00:00 2001 From: Yuanjia Zhang Date: Thu, 5 Jan 2023 11:20:20 +0800 Subject: [PATCH 1/6] planner: skip plan-cache for prepared queries with `INT in (Decimals...)` (#40312) close pingcap/tidb#40224 --- planner/core/expression_rewriter.go | 12 +++-------- planner/core/plan_cache_test.go | 32 +++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 9 deletions(-) diff --git a/planner/core/expression_rewriter.go b/planner/core/expression_rewriter.go index 5a8f8a2df9f35..a2d1f242a0ff6 100644 --- a/planner/core/expression_rewriter.go +++ b/planner/core/expression_rewriter.go @@ -1559,16 +1559,10 @@ func (er *expressionRewriter) inToExpression(lLen int, not bool, tp *types.Field if c, ok := args[i].(*expression.Constant); ok { var isExceptional bool if expression.MaybeOverOptimized4PlanCache(er.sctx, []expression.Expression{c}) { - if c.GetType().EvalType() == types.ETString { - // To keep the result be compatible with MySQL, refine `int non-constant str constant` - // here and skip this refine operation in all other cases for safety. - er.sctx.GetSessionVars().StmtCtx.SetSkipPlanCache(errors.Errorf("skip plan-cache: '%v' may be converted to INT", c.String())) - expression.RemoveMutableConst(er.sctx, []expression.Expression{c}) - } else { - continue + if c.GetType().EvalType() == types.ETInt { + continue // no need to refine it } - } else if !er.sctx.GetSessionVars().StmtCtx.UseCache { - // We should remove the mutable constant for correctness, because its value may be changed. + er.sctx.GetSessionVars().StmtCtx.SetSkipPlanCache(errors.Errorf("skip plan-cache: '%v' may be converted to INT", c.String())) expression.RemoveMutableConst(er.sctx, []expression.Expression{c}) } args[i], isExceptional = expression.RefineComparedConstant(er.sctx, *leftFt, c, opcode.EQ) diff --git a/planner/core/plan_cache_test.go b/planner/core/plan_cache_test.go index 0aa5c4e02d7cb..d7d47e55bc62e 100644 --- a/planner/core/plan_cache_test.go +++ b/planner/core/plan_cache_test.go @@ -385,6 +385,38 @@ func TestPlanCacheDiagInfo(t *testing.T) { tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 skip plan-cache: some parameters may be overwritten")) } +func TestIssue40224(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t (a int, key(a))") + tk.MustExec("prepare st from 'select a from t where a in (?, ?)'") + tk.MustExec("set @a=1.0, @b=2.0") + tk.MustExec("execute st using @a, @b") + tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 skip plan-cache: '1.0' may be converted to INT")) + tk.MustExec("execute st using @a, @b") + tkProcess := tk.Session().ShowProcess() + ps := []*util.ProcessInfo{tkProcess} + tk.Session().SetSessionManager(&testkit.MockSessionManager{PS: ps}) + tk.MustQuery(fmt.Sprintf("explain for connection %d", tkProcess.ID)).CheckAt([]int{0}, + [][]interface{}{ + {"IndexReader_6"}, + {"└─IndexRangeScan_5"}, // range scan not full scan + }) + + tk.MustExec("set @a=1, @b=2") + tk.MustExec("execute st using @a, @b") + tk.MustQuery("show warnings").Check(testkit.Rows()) // no warning for INT values + tk.MustExec("execute st using @a, @b") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) // cacheable for INT + tk.MustExec("execute st using @a, @b") + tk.MustQuery(fmt.Sprintf("explain for connection %d", tkProcess.ID)).CheckAt([]int{0}, + [][]interface{}{ + {"IndexReader_6"}, + {"└─IndexRangeScan_5"}, // range scan not full scan + }) +} + func TestIssue40225(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) From d0272703f3e9570ecbd9a8df25f96f9a8837fcec Mon Sep 17 00:00:00 2001 From: Song Gao Date: Thu, 5 Jan 2023 11:52:20 +0800 Subject: [PATCH 2/6] statistics: support historical stats dump partition table (#40310) --- domain/historical_stats.go | 18 ++++++++++++++---- executor/analyze.go | 22 ++++++++++++++++++---- executor/analyze_global_stats.go | 12 ++++++++---- executor/analyze_worker.go | 4 ---- executor/historical_stats_test.go | 27 +++++++++++++++++++++++++++ statistics/handle/dump.go | 2 ++ statistics/handle/handle.go | 24 +++++++++++++++++------- statistics/handle/handle_test.go | 2 +- 8 files changed, 87 insertions(+), 24 deletions(-) diff --git a/domain/historical_stats.go b/domain/historical_stats.go index ca68319c31ba8..5d6d90feedef8 100644 --- a/domain/historical_stats.go +++ b/domain/historical_stats.go @@ -17,6 +17,7 @@ package domain import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/metrics" + "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/statistics/handle" ) @@ -48,18 +49,27 @@ func (w *HistoricalStatsWorker) DumpHistoricalStats(tableID int64, statsHandle * } sctx := w.sctx is := GetDomain(sctx).InfoSchema() + isPartition := false + var tblInfo *model.TableInfo tbl, existed := is.TableByID(tableID) if !existed { - return errors.Errorf("cannot get table by id %d", tableID) + tbl, db, p := is.FindTableByPartitionID(tableID) + if tbl != nil && db != nil && p != nil { + isPartition = true + tblInfo = tbl.Meta() + } else { + return errors.Errorf("cannot get table by id %d", tableID) + } + } else { + tblInfo = tbl.Meta() } - tblInfo := tbl.Meta() dbInfo, existed := is.SchemaByTable(tblInfo) if !existed { return errors.Errorf("cannot get DBInfo by TableID %d", tableID) } - if _, err := statsHandle.RecordHistoricalStatsToStorage(dbInfo.Name.O, tblInfo); err != nil { + if _, err := statsHandle.RecordHistoricalStatsToStorage(dbInfo.Name.O, tblInfo, tableID, isPartition); err != nil { generateHistoricalStatsFailedCounter.Inc() - return errors.Errorf("record table %s.%s's historical stats failed", dbInfo.Name.O, tblInfo.Name.O) + return errors.Errorf("record table %s.%s's historical stats failed, err:%v", dbInfo.Name.O, tblInfo.Name.O, err) } generateHistoricalStatsSuccessCounter.Inc() return nil diff --git a/executor/analyze.go b/executor/analyze.go index af223b24dd4a8..705e6eed6c590 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -291,6 +291,8 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n } } + tableIDs := map[int64]struct{}{} + // save analyze results in single-thread. statsHandle := domain.GetDomain(e.ctx).StatsHandle() panicCnt := 0 @@ -311,6 +313,7 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n continue } handleGlobalStats(needGlobalStats, globalStatsMap, results) + tableIDs[results.TableID.GetStatisticsID()] = struct{}{} if err1 := statsHandle.SaveTableStatsToStorage(results, e.ctx.GetSessionVars().EnableAnalyzeSnapshot, handle.StatsMetaHistorySourceAnalyze); err1 != nil { tableID := results.TableID.TableID @@ -319,10 +322,6 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n finishJobWithLog(e.ctx, results.Job, err) } else { finishJobWithLog(e.ctx, results.Job, nil) - // Dump stats to historical storage. - if err := recordHistoricalStats(e.ctx, results.TableID.TableID); err != nil { - logutil.BgLogger().Error("record historical stats failed", zap.Error(err)) - } } invalidInfoSchemaStatCache(results.TableID.GetStatisticsID()) if atomic.LoadUint32(&e.ctx.GetSessionVars().Killed) == 1 { @@ -330,6 +329,13 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n return errors.Trace(ErrQueryInterrupted) } } + // Dump stats to historical storage. + for tableID := range tableIDs { + if err := recordHistoricalStats(e.ctx, tableID); err != nil { + logutil.BgLogger().Error("record historical stats failed", zap.Error(err)) + } + } + return err } @@ -348,6 +354,7 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency(ctx context.Context, sta worker.run(ctx1, e.ctx.GetSessionVars().EnableAnalyzeSnapshot) }) } + tableIDs := map[int64]struct{}{} panicCnt := 0 var err error for panicCnt < statsConcurrency { @@ -370,6 +377,7 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency(ctx context.Context, sta continue } handleGlobalStats(needGlobalStats, globalStatsMap, results) + tableIDs[results.TableID.GetStatisticsID()] = struct{}{} saveResultsCh <- results } close(saveResultsCh) @@ -382,6 +390,12 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency(ctx context.Context, sta } err = errors.New(strings.Join(errMsg, ",")) } + for tableID := range tableIDs { + // Dump stats to historical storage. + if err := recordHistoricalStats(e.ctx, tableID); err != nil { + logutil.BgLogger().Error("record historical stats failed", zap.Error(err)) + } + } return err } diff --git a/executor/analyze_global_stats.go b/executor/analyze_global_stats.go index 6b11e68a3e614..e8f8d53b8adbf 100644 --- a/executor/analyze_global_stats.go +++ b/executor/analyze_global_stats.go @@ -54,7 +54,9 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, needGlobalStats boo globalStatsTableIDs[globalStatsID.tableID] = struct{}{} } statsHandle := domain.GetDomain(e.ctx).StatsHandle() + tableIDs := map[int64]struct{}{} for tableID := range globalStatsTableIDs { + tableIDs[tableID] = struct{}{} tableAllPartitionStats := make(map[int64]*statistics.Table) for globalStatsID, info := range globalStatsMap { if globalStatsID.tableID != tableID { @@ -101,16 +103,18 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, needGlobalStats boo logutil.Logger(ctx).Error("save global-level stats to storage failed", zap.String("info", job.JobInfo), zap.Int64("histID", hg.ID), zap.Error(err), zap.Int64("tableID", tableID)) } - // Dump stats to historical storage. - if err1 := recordHistoricalStats(e.ctx, globalStatsID.tableID); err1 != nil { - logutil.BgLogger().Error("record historical stats failed", zap.String("info", job.JobInfo), zap.Int64("histID", hg.ID), zap.Error(err1)) - } } return err }() FinishAnalyzeMergeJob(e.ctx, job, mergeStatsErr) } } + for tableID := range tableIDs { + // Dump stats to historical storage. + if err := recordHistoricalStats(e.ctx, tableID); err != nil { + logutil.BgLogger().Error("record historical stats failed", zap.Error(err)) + } + } return nil } diff --git a/executor/analyze_worker.go b/executor/analyze_worker.go index 18edc514c5d4d..688f89f5a120d 100644 --- a/executor/analyze_worker.go +++ b/executor/analyze_worker.go @@ -66,10 +66,6 @@ func (worker *analyzeSaveStatsWorker) run(ctx context.Context, analyzeSnapshot b worker.errCh <- err } else { finishJobWithLog(worker.sctx, results.Job, nil) - // Dump stats to historical storage. - if err := recordHistoricalStats(worker.sctx, results.TableID.TableID); err != nil { - logutil.BgLogger().Error("record historical stats failed", zap.Error(err)) - } } invalidInfoSchemaStatCache(results.TableID.GetStatisticsID()) if err != nil { diff --git a/executor/historical_stats_test.go b/executor/historical_stats_test.go index 809c2c862bf43..6ae23dcebb365 100644 --- a/executor/historical_stats_test.go +++ b/executor/historical_stats_test.go @@ -216,3 +216,30 @@ func TestGCOutdatedHistoryStats(t *testing.T) { tk.MustQuery(fmt.Sprintf("select count(*) from mysql.stats_history where table_id = '%d'", tableInfo.Meta().ID)).Check(testkit.Rows("0")) } + +func TestPartitionTableHistoricalStats(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_historical_stats = 1") + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec(`CREATE TABLE t (a int, b int, index idx(b)) +PARTITION BY RANGE ( a ) ( +PARTITION p0 VALUES LESS THAN (6) +)`) + tk.MustExec("delete from mysql.stats_history") + + tk.MustExec("analyze table test.t") + // dump historical stats + h := dom.StatsHandle() + hsWorker := dom.GetHistoricalStatsWorker() + + // assert global table and partition table be dumped + tblID := hsWorker.GetOneHistoricalStatsTable() + err := hsWorker.DumpHistoricalStats(tblID, h) + require.NoError(t, err) + tblID = hsWorker.GetOneHistoricalStatsTable() + err = hsWorker.DumpHistoricalStats(tblID, h) + require.NoError(t, err) + tk.MustQuery("select count(*) from mysql.stats_history").Check(testkit.Rows("2")) +} diff --git a/statistics/handle/dump.go b/statistics/handle/dump.go index 75f4ee9ea958a..a83c6e57ee3c7 100644 --- a/statistics/handle/dump.go +++ b/statistics/handle/dump.go @@ -46,6 +46,7 @@ type JSONTable struct { Count int64 `json:"count"` ModifyCount int64 `json:"modify_count"` Partitions map[string]*JSONTable `json:"partitions"` + Version uint64 `json:"version"` } type jsonExtendedStats struct { @@ -228,6 +229,7 @@ func GenJSONTableFromStats(dbName string, tableInfo *model.TableInfo, tbl *stati Indices: make(map[string]*jsonColumn, len(tbl.Indices)), Count: tbl.Count, ModifyCount: tbl.ModifyCount, + Version: tbl.Version, } for _, col := range tbl.Columns { sc := &stmtctx.StatementContext{TimeZone: time.UTC} diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index f53f075301acb..b3a9c99298a92 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -2565,17 +2565,27 @@ func (h *Handle) GetPredicateColumns(tableID int64) ([]int64, error) { const maxColumnSize = 6 << 20 // RecordHistoricalStatsToStorage records the given table's stats data to mysql.stats_history -func (h *Handle) RecordHistoricalStatsToStorage(dbName string, tableInfo *model.TableInfo) (uint64, error) { +func (h *Handle) RecordHistoricalStatsToStorage(dbName string, tableInfo *model.TableInfo, physicalID int64, isPartition bool) (uint64, error) { ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) - js, err := h.DumpStatsToJSON(dbName, tableInfo, nil, true) + var js *JSONTable + var err error + if isPartition { + js, err = h.tableStatsToJSON(dbName, tableInfo, physicalID, 0) + } else { + js, err = h.DumpStatsToJSON(dbName, tableInfo, nil, true) + } if err != nil { return 0, errors.Trace(err) } version := uint64(0) - for _, value := range js.Columns { - version = uint64(*value.StatsVer) - if version != 0 { - break + if len(js.Partitions) == 0 { + version = js.Version + } else { + for _, p := range js.Partitions { + version = p.Version + if version != 0 { + break + } } } blocks, err := JSONTableToBlocks(js, maxColumnSize) @@ -2596,7 +2606,7 @@ func (h *Handle) RecordHistoricalStatsToStorage(dbName string, tableInfo *model. const sql = "INSERT INTO mysql.stats_history(table_id, stats_data, seq_no, version, create_time) VALUES (%?, %?, %?, %?, %?)" for i := 0; i < len(blocks); i++ { - if _, err := exec.ExecuteInternal(ctx, sql, tableInfo.ID, blocks[i], i, version, ts); err != nil { + if _, err := exec.ExecuteInternal(ctx, sql, physicalID, blocks[i], i, version, ts); err != nil { return version, errors.Trace(err) } } diff --git a/statistics/handle/handle_test.go b/statistics/handle/handle_test.go index 9bb80498bc90f..ac9936bed11fe 100644 --- a/statistics/handle/handle_test.go +++ b/statistics/handle/handle_test.go @@ -3299,7 +3299,7 @@ func TestRecordHistoricalStatsToStorage(t *testing.T) { tableInfo, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) require.NoError(t, err) - version, err := dom.StatsHandle().RecordHistoricalStatsToStorage("t", tableInfo.Meta()) + version, err := dom.StatsHandle().RecordHistoricalStatsToStorage("t", tableInfo.Meta(), tableInfo.Meta().ID, false) require.NoError(t, err) rows := tk.MustQuery(fmt.Sprintf("select count(*) from mysql.stats_history where version = '%d'", version)).Rows() From 37f045c57533d979cd64754da5d9534f9891d74a Mon Sep 17 00:00:00 2001 From: xiongjiwei Date: Thu, 5 Jan 2023 12:54:19 +0800 Subject: [PATCH 3/6] *: update badger (#40331) --- DEPS.bzl | 4 ++-- go.mod | 2 +- go.sum | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/DEPS.bzl b/DEPS.bzl index 0333a2dc70184..fe7d110931a2c 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -2881,8 +2881,8 @@ def go_deps(): name = "com_github_pingcap_badger", build_file_proto_mode = "disable_global", importpath = "github.com/pingcap/badger", - sum = "h1:QB16qn8wx5X4SRn3/5axrjPMNS3WRt87+5Bfrnmt6IA=", - version = "v1.5.1-0.20221229114011-ddffaa0fff7a", + sum = "h1:AEcvKyVM8CUII3bYzgz8haFXtGiqcrtXW1csu/5UELY=", + version = "v1.5.1-0.20230103063557-828f39b09b6d", ) go_repository( name = "com_github_pingcap_check", diff --git a/go.mod b/go.mod index d87619f7dc67c..5bcda5c055148 100644 --- a/go.mod +++ b/go.mod @@ -66,7 +66,7 @@ require ( github.com/opentracing/basictracer-go v1.0.0 github.com/opentracing/opentracing-go v1.2.0 github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2 - github.com/pingcap/badger v1.5.1-0.20221229114011-ddffaa0fff7a + github.com/pingcap/badger v1.5.1-0.20230103063557-828f39b09b6d github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32 github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3 github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 diff --git a/go.sum b/go.sum index efeb0a0fb5537..41e8406455de2 100644 --- a/go.sum +++ b/go.sum @@ -761,8 +761,8 @@ github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2/go.mod h1:iIss55rK github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= -github.com/pingcap/badger v1.5.1-0.20221229114011-ddffaa0fff7a h1:QB16qn8wx5X4SRn3/5axrjPMNS3WRt87+5Bfrnmt6IA= -github.com/pingcap/badger v1.5.1-0.20221229114011-ddffaa0fff7a/go.mod h1:p8QnkZnmyV8L/M/jzYb8rT7kv3bz9m7bn1Ju94wDifs= +github.com/pingcap/badger v1.5.1-0.20230103063557-828f39b09b6d h1:AEcvKyVM8CUII3bYzgz8haFXtGiqcrtXW1csu/5UELY= +github.com/pingcap/badger v1.5.1-0.20230103063557-828f39b09b6d/go.mod h1:p8QnkZnmyV8L/M/jzYb8rT7kv3bz9m7bn1Ju94wDifs= github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8/go.mod h1:B1+S9LNcuMyLH/4HMTViQOJevkGiik3wW2AN9zb2fNQ= github.com/pingcap/check v0.0.0-20191107115940-caf2b9e6ccf4/go.mod h1:PYMCGwN0JHjoqGr3HrZoD+b8Tgx8bKnArhSq8YVzUMc= github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712 h1:R8gStypOBmpnHEx1qi//SaqxJVI4inOqljg/Aj5/390= From 508b601529690c06e47b570b43796165dbd5c538 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Thu, 5 Jan 2023 13:50:19 +0800 Subject: [PATCH 4/6] expression: enlarge timeout for test (#40332) --- expression/BUILD.bazel | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/expression/BUILD.bazel b/expression/BUILD.bazel index 5a201d906b5a3..17436f517a673 100644 --- a/expression/BUILD.bazel +++ b/expression/BUILD.bazel @@ -122,7 +122,7 @@ go_library( go_test( name = "expression_test", - timeout = "short", + timeout = "moderate", srcs = [ "bench_test.go", "builtin_arithmetic_test.go", From 4a5a44733421455bb9929ca14f91e36db657b1bd Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Thu, 5 Jan 2023 14:44:21 +0800 Subject: [PATCH 5/6] resourcemanger: add cpu scheduler (#39886) close pingcap/tidb#39657 --- resourcemanager/BUILD.bazel | 11 ++- resourcemanager/rm.go | 35 ++++++++ resourcemanager/schedule.go | 69 +++++++++++++++ resourcemanager/scheduler/BUILD.bazel | 16 ++++ resourcemanager/scheduler/cpu_scheduler.go | 44 ++++++++++ resourcemanager/scheduler/scheduler.go | 43 +++++++++ resourcemanager/util/BUILD.bazel | 20 +++++ resourcemanager/util/mock_gpool.go | 97 +++++++++++++++++++++ resourcemanager/util/shard_pool_map.go | 80 +++++++++++++++++ resourcemanager/util/shard_pool_map_test.go | 38 ++++++++ resourcemanager/util/util.go | 53 +++++++++++ tidb-server/BUILD.bazel | 2 +- 12 files changed, 505 insertions(+), 3 deletions(-) create mode 100644 resourcemanager/schedule.go create mode 100644 resourcemanager/scheduler/BUILD.bazel create mode 100644 resourcemanager/scheduler/cpu_scheduler.go create mode 100644 resourcemanager/scheduler/scheduler.go create mode 100644 resourcemanager/util/BUILD.bazel create mode 100644 resourcemanager/util/mock_gpool.go create mode 100644 resourcemanager/util/shard_pool_map.go create mode 100644 resourcemanager/util/shard_pool_map_test.go create mode 100644 resourcemanager/util/util.go diff --git a/resourcemanager/BUILD.bazel b/resourcemanager/BUILD.bazel index 2b97ccbd25bdd..968a73b4a0f95 100644 --- a/resourcemanager/BUILD.bazel +++ b/resourcemanager/BUILD.bazel @@ -1,12 +1,19 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( - name = "resourcemanage", - srcs = ["rm.go"], + name = "resourcemanager", + srcs = [ + "rm.go", + "schedule.go", + ], importpath = "github.com/pingcap/tidb/resourcemanager", visibility = ["//visibility:public"], deps = [ + "//resourcemanager/scheduler", + "//resourcemanager/util", "//util", "//util/cpu", + "@com_github_pingcap_log//:log", + "@org_uber_go_zap//:zap", ], ) diff --git a/resourcemanager/rm.go b/resourcemanager/rm.go index 1195176fbde95..35ac6afff960f 100644 --- a/resourcemanager/rm.go +++ b/resourcemanager/rm.go @@ -15,6 +15,10 @@ package resourcemanager import ( + "time" + + "github.com/pingcap/tidb/resourcemanager/scheduler" + "github.com/pingcap/tidb/resourcemanager/util" tidbutil "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/cpu" ) @@ -24,24 +28,55 @@ var GlobalResourceManager = NewResourceManger() // ResourceManager is a resource manager type ResourceManager struct { + poolMap *util.ShardPoolMap + scheduler []scheduler.Scheduler cpuObserver *cpu.Observer + exitCh chan struct{} wg tidbutil.WaitGroupWrapper } // NewResourceManger is to create a new resource manager func NewResourceManger() *ResourceManager { + sc := make([]scheduler.Scheduler, 0, 1) + sc = append(sc, scheduler.NewCPUScheduler()) return &ResourceManager{ cpuObserver: cpu.NewCPUObserver(), + exitCh: make(chan struct{}), + poolMap: util.NewShardPoolMap(), + scheduler: sc, } } // Start is to start resource manager func (r *ResourceManager) Start() { r.wg.Run(r.cpuObserver.Start) + r.wg.Run(func() { + tick := time.NewTicker(100 * time.Millisecond) + defer tick.Stop() + for { + select { + case <-tick.C: + r.schedule() + case <-r.exitCh: + return + } + } + }) } // Stop is to stop resource manager func (r *ResourceManager) Stop() { r.cpuObserver.Stop() + close(r.exitCh) r.wg.Wait() } + +// Register is to register pool into resource manager +func (r *ResourceManager) Register(pool util.GorotinuePool, name string, component util.Component) error { + p := util.PoolContainer{Pool: pool, Component: component} + return r.registerPool(name, &p) +} + +func (r *ResourceManager) registerPool(name string, pool *util.PoolContainer) error { + return r.poolMap.Add(name, pool) +} diff --git a/resourcemanager/schedule.go b/resourcemanager/schedule.go new file mode 100644 index 0000000000000..41560eed5c2a4 --- /dev/null +++ b/resourcemanager/schedule.go @@ -0,0 +1,69 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package resourcemanager + +import ( + "time" + + "github.com/pingcap/log" + "github.com/pingcap/tidb/resourcemanager/scheduler" + "github.com/pingcap/tidb/resourcemanager/util" + "go.uber.org/zap" +) + +func (r *ResourceManager) schedule() { + r.poolMap.Iter(func(pool *util.PoolContainer) { + cmd := r.schedulePool(pool) + r.exec(pool, cmd) + }) +} + +func (r *ResourceManager) schedulePool(pool *util.PoolContainer) scheduler.Command { + for _, sch := range r.scheduler { + cmd := sch.Tune(pool.Component, pool.Pool) + switch cmd { + case scheduler.Hold: + continue + default: + return cmd + } + } + return scheduler.Hold +} + +func (*ResourceManager) exec(pool *util.PoolContainer, cmd scheduler.Command) { + if cmd == scheduler.Hold { + return + } + if time.Since(pool.Pool.LastTunerTs()) > 200*time.Millisecond { + con := pool.Pool.Cap() + switch cmd { + case scheduler.Downclock: + concurrency := con - 1 + log.Info("downclock goroutine pool", + zap.Int("origin concurrency", con), + zap.Int("concurrency", concurrency), + zap.String("name", pool.Pool.Name())) + pool.Pool.Tune(concurrency) + case scheduler.Overclock: + concurrency := con + 1 + log.Info("overclock goroutine pool", + zap.Int("origin concurrency", con), + zap.Int("concurrency", concurrency), + zap.String("name", pool.Pool.Name())) + pool.Pool.Tune(concurrency) + } + } +} diff --git a/resourcemanager/scheduler/BUILD.bazel b/resourcemanager/scheduler/BUILD.bazel new file mode 100644 index 0000000000000..5dc17e8412d17 --- /dev/null +++ b/resourcemanager/scheduler/BUILD.bazel @@ -0,0 +1,16 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "scheduler", + srcs = [ + "cpu_scheduler.go", + "scheduler.go", + ], + importpath = "github.com/pingcap/tidb/resourcemanager/scheduler", + visibility = ["//visibility:public"], + deps = [ + "//resourcemanager/util", + "//util/cpu", + "@org_uber_go_atomic//:atomic", + ], +) diff --git a/resourcemanager/scheduler/cpu_scheduler.go b/resourcemanager/scheduler/cpu_scheduler.go new file mode 100644 index 0000000000000..7d0bdf1d31a07 --- /dev/null +++ b/resourcemanager/scheduler/cpu_scheduler.go @@ -0,0 +1,44 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scheduler + +import ( + "time" + + "github.com/pingcap/tidb/resourcemanager/util" + "github.com/pingcap/tidb/util/cpu" +) + +// CPUScheduler is a cpu scheduler +type CPUScheduler struct{} + +// NewCPUScheduler is to create a new cpu scheduler +func NewCPUScheduler() *CPUScheduler { + return &CPUScheduler{} +} + +// Tune is to tune the goroutine pool +func (*CPUScheduler) Tune(_ util.Component, pool util.GorotinuePool) Command { + if time.Since(pool.LastTunerTs()) < minCPUSchedulerInterval.Load() { + return Hold + } + if cpu.GetCPUUsage() < 0.5 { + return Downclock + } + if cpu.GetCPUUsage() > 0.7 { + return Overclock + } + return Hold +} diff --git a/resourcemanager/scheduler/scheduler.go b/resourcemanager/scheduler/scheduler.go new file mode 100644 index 0000000000000..6cba0e18923cc --- /dev/null +++ b/resourcemanager/scheduler/scheduler.go @@ -0,0 +1,43 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scheduler + +import ( + "time" + + "github.com/pingcap/tidb/resourcemanager/util" + "go.uber.org/atomic" +) + +var ( + minCPUSchedulerInterval = atomic.NewDuration(time.Minute) +) + +// Command is the command for scheduler +type Command int + +const ( + // Downclock is to reduce the number of concurrency. + Downclock Command = iota + // Hold is to hold the number of concurrency. + Hold + // Overclock is to increase the number of concurrency. + Overclock +) + +// Scheduler is a scheduler interface +type Scheduler interface { + Tune(component util.Component, p util.GorotinuePool) Command +} diff --git a/resourcemanager/util/BUILD.bazel b/resourcemanager/util/BUILD.bazel new file mode 100644 index 0000000000000..7688b26a93d93 --- /dev/null +++ b/resourcemanager/util/BUILD.bazel @@ -0,0 +1,20 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "util", + srcs = [ + "mock_gpool.go", + "shard_pool_map.go", + "util.go", + ], + importpath = "github.com/pingcap/tidb/resourcemanager/util", + visibility = ["//visibility:public"], + deps = ["@com_github_pingcap_errors//:errors"], +) + +go_test( + name = "util_test", + srcs = ["shard_pool_map_test.go"], + embed = [":util"], + deps = ["@com_github_stretchr_testify//require"], +) diff --git a/resourcemanager/util/mock_gpool.go b/resourcemanager/util/mock_gpool.go new file mode 100644 index 0000000000000..b9e66dd9afeab --- /dev/null +++ b/resourcemanager/util/mock_gpool.go @@ -0,0 +1,97 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import "time" + +// MockGPool is only for test +type MockGPool struct { + name string +} + +// NewMockGPool is only for test +func NewMockGPool(name string) *MockGPool { + return &MockGPool{name: name} +} + +// Release is only for test +func (*MockGPool) Release() { + panic("implement me") +} + +// Tune is only for test +func (*MockGPool) Tune(_ int) { + panic("implement me") +} + +// LastTunerTs is only for test +func (*MockGPool) LastTunerTs() time.Time { + panic("implement me") +} + +// MaxInFlight is only for test +func (*MockGPool) MaxInFlight() int64 { + panic("implement me") +} + +// InFlight is only for test +func (*MockGPool) InFlight() int64 { + panic("implement me") +} + +// MinRT is only for test +func (*MockGPool) MinRT() uint64 { + panic("implement me") +} + +// MaxPASS is only for test +func (*MockGPool) MaxPASS() uint64 { + panic("implement me") +} + +// Cap is only for test +func (*MockGPool) Cap() int { + panic("implement me") +} + +// LongRTT is to represent the baseline latency by tracking a measurement of the long term, less volatile RTT. +func (*MockGPool) LongRTT() float64 { + panic("implement me") +} + +// UpdateLongRTT is only for test +func (*MockGPool) UpdateLongRTT(_ func(float64) float64) { + panic("implement me") +} + +// ShortRTT is to represent the current system latency by tracking a measurement of the short time, and more volatile RTT. +func (*MockGPool) ShortRTT() uint64 { + panic("implement me") +} + +// GetQueueSize is only for test +func (*MockGPool) GetQueueSize() int64 { + panic("implement me") +} + +// Running is only for test +func (*MockGPool) Running() int { + panic("implement me") +} + +// Name is only for test +func (m *MockGPool) Name() string { + return m.name +} diff --git a/resourcemanager/util/shard_pool_map.go b/resourcemanager/util/shard_pool_map.go new file mode 100644 index 0000000000000..8819c56ed36fa --- /dev/null +++ b/resourcemanager/util/shard_pool_map.go @@ -0,0 +1,80 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "sync" + + "github.com/pingcap/errors" +) + +const shard = 8 + +func hash(key string) int { + return int(key[0]) % shard +} + +// ShardPoolMap is a map with shard +type ShardPoolMap struct { + pools [shard]poolMap +} + +// NewShardPoolMap creates a shard pool map +func NewShardPoolMap() *ShardPoolMap { + var result ShardPoolMap + for i := 0; i < shard; i++ { + result.pools[i] = newPoolMap() + } + return &result +} + +// Add adds a pool to the map +func (s *ShardPoolMap) Add(key string, pool *PoolContainer) error { + return s.pools[hash(key)].Add(key, pool) +} + +// Iter iterates the map +func (s *ShardPoolMap) Iter(fn func(pool *PoolContainer)) { + for i := 0; i < shard; i++ { + s.pools[i].Iter(fn) + } +} + +type poolMap struct { + mu sync.RWMutex + poolMap map[string]*PoolContainer +} + +func newPoolMap() poolMap { + return poolMap{poolMap: make(map[string]*PoolContainer)} +} + +func (p *poolMap) Add(key string, pool *PoolContainer) error { + p.mu.Lock() + defer p.mu.Unlock() + if _, contain := p.poolMap[key]; contain { + return errors.New("pool is already exist") + } + p.poolMap[key] = pool + return nil +} + +func (p *poolMap) Iter(fn func(pool *PoolContainer)) { + p.mu.RLock() + defer p.mu.RUnlock() + for _, pool := range p.poolMap { + fn(pool) + } +} diff --git a/resourcemanager/util/shard_pool_map_test.go b/resourcemanager/util/shard_pool_map_test.go new file mode 100644 index 0000000000000..34e0a11ca5976 --- /dev/null +++ b/resourcemanager/util/shard_pool_map_test.go @@ -0,0 +1,38 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "strconv" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestShardPoolMap(t *testing.T) { + rc := 10 + pm := NewShardPoolMap() + for i := 0; i < rc; i++ { + id := strconv.FormatInt(int64(i), 10) + require.NoError(t, pm.Add(id, &PoolContainer{Pool: NewMockGPool(id), Component: DDL})) + } + require.Error(t, pm.Add("1", &PoolContainer{Pool: NewMockGPool("1"), Component: DDL})) + var cnt atomic.Int32 + pm.Iter(func(pool *PoolContainer) { + cnt.Add(1) + }) + require.Equal(t, rc, int(cnt.Load())) +} diff --git a/resourcemanager/util/util.go b/resourcemanager/util/util.go new file mode 100644 index 0000000000000..d5b988c344295 --- /dev/null +++ b/resourcemanager/util/util.go @@ -0,0 +1,53 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import "time" + +// GorotinuePool is a pool interface +type GorotinuePool interface { + Release() + Tune(size int) + LastTunerTs() time.Time + MaxInFlight() int64 + InFlight() int64 + MinRT() uint64 + MaxPASS() uint64 + Cap() int + // LongRTT is to represent the baseline latency by tracking a measurement of the long term, less volatile RTT. + LongRTT() float64 + UpdateLongRTT(f func(float64) float64) + // ShortRTT is to represent the current system latency by tracking a measurement of the short time, and more volatile RTT. + ShortRTT() uint64 + GetQueueSize() int64 + Running() int + Name() string +} + +// PoolContainer is a pool container +type PoolContainer struct { + Pool GorotinuePool + Component Component +} + +// Component is ID for difference component +type Component int + +const ( + // UNKNOWN is for unknown component. It is only for test + UNKNOWN Component = iota + // DDL is for ddl component + DDL +) diff --git a/tidb-server/BUILD.bazel b/tidb-server/BUILD.bazel index 361a929351642..b918b7e1ca5f8 100644 --- a/tidb-server/BUILD.bazel +++ b/tidb-server/BUILD.bazel @@ -21,7 +21,7 @@ go_library( "//planner/core", "//plugin", "//privilege/privileges", - "//resourcemanager:resourcemanage", + "//resourcemanager", "//server", "//session", "//session/txninfo", From 5eea731db0712f1c8a90a3a81e7881da4e400f95 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Thu, 5 Jan 2023 15:10:20 +0800 Subject: [PATCH 6/6] util: use go-deadlock to find deadlock (#40288) close pingcap/tidb#40293 --- Makefile | 2 +- domain/BUILD.bazel | 1 + domain/sysvar_cache.go | 10 ++++----- executor/autoidtest/BUILD.bazel | 1 + go.mod | 2 ++ go.sum | 2 ++ server/BUILD.bazel | 1 + server/tidb_library_test.go | 7 +++++- session/BUILD.bazel | 1 + session/session.go | 3 ++- util/syncutil/BUILD.bazel | 12 ++++++++++ util/syncutil/mutex_deadlock.go | 40 +++++++++++++++++++++++++++++++++ util/syncutil/mutex_sync.go | 32 ++++++++++++++++++++++++++ 13 files changed, 106 insertions(+), 8 deletions(-) create mode 100644 util/syncutil/BUILD.bazel create mode 100644 util/syncutil/mutex_deadlock.go create mode 100644 util/syncutil/mutex_sync.go diff --git a/Makefile b/Makefile index ba8c742782900..35223c2269acf 100644 --- a/Makefile +++ b/Makefile @@ -405,7 +405,7 @@ bazel_test: failpoint-enable bazel_ci_prepare bazel_coverage_test: failpoint-enable bazel_ci_prepare bazel $(BAZEL_GLOBAL_CONFIG) coverage $(BAZEL_CMD_CONFIG) \ - --build_event_json_file=bazel_1.json --@io_bazel_rules_go//go/config:cover_format=go_cover \ + --build_event_json_file=bazel_1.json --@io_bazel_rules_go//go/config:cover_format=go_cover --define gotags=deadlock \ -- //... -//cmd/... -//tests/graceshutdown/... \ -//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test -//tests/realtikvtest/... diff --git a/domain/BUILD.bazel b/domain/BUILD.bazel index eafaa6b4cd8aa..f04948ddfe709 100644 --- a/domain/BUILD.bazel +++ b/domain/BUILD.bazel @@ -64,6 +64,7 @@ go_library( "//util/replayer", "//util/servermemorylimit", "//util/sqlexec", + "//util/syncutil", "@com_github_burntsushi_toml//:toml", "@com_github_ngaut_pools//:pools", "@com_github_pingcap_errors//:errors", diff --git a/domain/sysvar_cache.go b/domain/sysvar_cache.go index 370260a67c02a..1611231d42ad5 100644 --- a/domain/sysvar_cache.go +++ b/domain/sysvar_cache.go @@ -17,13 +17,13 @@ package domain import ( "context" "fmt" - "sync" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/sqlexec" + "github.com/pingcap/tidb/util/syncutil" "go.uber.org/zap" "golang.org/x/exp/maps" ) @@ -36,10 +36,10 @@ import ( // sysVarCache represents the cache of system variables broken up into session and global scope. type sysVarCache struct { - sync.RWMutex // protects global and session maps - global map[string]string - session map[string]string - rebuildLock sync.Mutex // protects concurrent rebuild + syncutil.RWMutex // protects global and session maps + global map[string]string + session map[string]string + rebuildLock syncutil.Mutex // protects concurrent rebuild } func (do *Domain) rebuildSysVarCacheIfNeeded() (err error) { diff --git a/executor/autoidtest/BUILD.bazel b/executor/autoidtest/BUILD.bazel index cd04b266fa2e3..a59514bef3bd6 100644 --- a/executor/autoidtest/BUILD.bazel +++ b/executor/autoidtest/BUILD.bazel @@ -8,6 +8,7 @@ go_test( ], flaky = True, race = "on", + shard_count = 5, deps = [ "//autoid_service", "//config", diff --git a/go.mod b/go.mod index 5bcda5c055148..46b0536b128bd 100644 --- a/go.mod +++ b/go.mod @@ -80,6 +80,7 @@ require ( github.com/prometheus/client_model v0.3.0 github.com/prometheus/common v0.39.0 github.com/prometheus/prometheus v0.0.0-20190525122359-d20e84d0fb64 + github.com/sasha-s/go-deadlock v0.0.0-20161201235124-341000892f3d github.com/shirou/gopsutil/v3 v3.22.9 github.com/shurcooL/httpgzip v0.0.0-20190720172056-320755c1c1b0 github.com/soheilhy/cmux v0.1.5 @@ -203,6 +204,7 @@ require ( github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef // indirect github.com/oklog/ulid v1.3.1 // indirect github.com/olekukonko/tablewriter v0.0.5 // indirect + github.com/petermattis/goid v0.0.0-20170504144140-0ded85884ba5 // indirect github.com/pierrec/lz4 v2.6.1+incompatible // indirect github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712 // indirect github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 // indirect diff --git a/go.sum b/go.sum index 41e8406455de2..4a6a635b40ebc 100644 --- a/go.sum +++ b/go.sum @@ -755,6 +755,7 @@ github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FI github.com/pborman/getopt v0.0.0-20180729010549-6fdd0a2c7117/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/peterbourgon/g2s v0.0.0-20170223122336-d4e7ad98afea/go.mod h1:1VcHEd3ro4QMoHfiNl/j7Jkln9+KQuorp0PItHMJYNg= +github.com/petermattis/goid v0.0.0-20170504144140-0ded85884ba5 h1:rUMC+oZ89Om6l9wvUNjzI0ZrKrSnXzV+opsgAohYUNc= github.com/petermattis/goid v0.0.0-20170504144140-0ded85884ba5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o= github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2 h1:JhzVVoYvbOACxoUmOs6V/G4D5nPVUW73rKvXxP4XUJc= github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2/go.mod h1:iIss55rKnNBTvrwdmkUpLnDpZoAHvWaiq5+iMmen4AE= @@ -861,6 +862,7 @@ github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/samuel/go-zookeeper v0.0.0-20161028232340-1d7be4effb13/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= +github.com/sasha-s/go-deadlock v0.0.0-20161201235124-341000892f3d h1:yVBZEAirqhDYAc7xftf/swe8eHcg63jqfwdqN8KSoR8= github.com/sasha-s/go-deadlock v0.0.0-20161201235124-341000892f3d/go.mod h1:StQn567HiB1fF2yJ44N9au7wOhrPS3iZqiDbRupzT10= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/sclevine/agouti v3.0.0+incompatible/go.mod h1:b4WX9W9L1sfQKXeJf1mUTLZKJ48R1S7H23Ji7oFO5Bw= diff --git a/server/BUILD.bazel b/server/BUILD.bazel index afcb983fad76e..0d1303bf53993 100644 --- a/server/BUILD.bazel +++ b/server/BUILD.bazel @@ -198,6 +198,7 @@ go_test( "//util/resourcegrouptag", "//util/rowcodec", "//util/sqlexec", + "//util/syncutil", "//util/topsql", "//util/topsql/collector", "//util/topsql/collector/mock", diff --git a/server/tidb_library_test.go b/server/tidb_library_test.go index 8d895aa692e95..9c9e9b3d8110e 100644 --- a/server/tidb_library_test.go +++ b/server/tidb_library_test.go @@ -21,6 +21,7 @@ import ( "github.com/docker/go-units" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/store/mockstore" + "github.com/pingcap/tidb/util/syncutil" "github.com/stretchr/testify/require" ) @@ -47,5 +48,9 @@ func TestMemoryLeak(t *testing.T) { runtime.GC() runtime.ReadMemStats(&memStat) // before the fix, initAndCloseTiDB for 20 times will cost 900 MB memory, so we test for a quite loose upper bound. - require.Less(t, memStat.HeapInuse-oldHeapInUse, uint64(300*units.MiB)) + if syncutil.EnableDeadlock { + require.Less(t, memStat.HeapInuse-oldHeapInUse, uint64(1100*units.MiB)) + } else { + require.Less(t, memStat.HeapInuse-oldHeapInUse, uint64(300*units.MiB)) + } } diff --git a/session/BUILD.bazel b/session/BUILD.bazel index 63118a3ea701a..5939684d51549 100644 --- a/session/BUILD.bazel +++ b/session/BUILD.bazel @@ -83,6 +83,7 @@ go_library( "//util/sem", "//util/sli", "//util/sqlexec", + "//util/syncutil", "//util/tableutil", "//util/timeutil", "//util/topsql", diff --git a/session/session.go b/session/session.go index ec6ba2bbb29fb..1b341805682a4 100644 --- a/session/session.go +++ b/session/session.go @@ -99,6 +99,7 @@ import ( "github.com/pingcap/tidb/util/sem" "github.com/pingcap/tidb/util/sli" "github.com/pingcap/tidb/util/sqlexec" + "github.com/pingcap/tidb/util/syncutil" "github.com/pingcap/tidb/util/tableutil" "github.com/pingcap/tidb/util/timeutil" "github.com/pingcap/tidb/util/topsql" @@ -274,7 +275,7 @@ type session struct { idxUsageCollector *handle.SessionIndexUsageCollector functionUsageMu struct { - sync.RWMutex + syncutil.RWMutex builtinFunctionUsage telemetry.BuiltinFunctionsUsage } // allowed when tikv disk full happened. diff --git a/util/syncutil/BUILD.bazel b/util/syncutil/BUILD.bazel new file mode 100644 index 0000000000000..919301546f69c --- /dev/null +++ b/util/syncutil/BUILD.bazel @@ -0,0 +1,12 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "syncutil", + srcs = [ + "mutex_deadlock.go", + "mutex_sync.go", + ], + importpath = "github.com/pingcap/tidb/util/syncutil", + visibility = ["//visibility:public"], + deps = ["@com_github_sasha_s_go_deadlock//:go-deadlock"], +) diff --git a/util/syncutil/mutex_deadlock.go b/util/syncutil/mutex_deadlock.go new file mode 100644 index 0000000000000..8879ff5138104 --- /dev/null +++ b/util/syncutil/mutex_deadlock.go @@ -0,0 +1,40 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build deadlock + +package syncutil + +import ( + "time" + + deadlock "github.com/sasha-s/go-deadlock" +) + +// EnableDeadlock is a flag to enable deadlock detection. +const EnableDeadlock = true + +func init() { + deadlock.Opts.DeadlockTimeout = 20 * time.Second +} + +// A Mutex is a mutual exclusion lock. +type Mutex struct { + deadlock.Mutex +} + +// An RWMutex is a reader/writer mutual exclusion lock. +type RWMutex struct { + deadlock.RWMutex +} diff --git a/util/syncutil/mutex_sync.go b/util/syncutil/mutex_sync.go new file mode 100644 index 0000000000000..c6bdc55ccf87b --- /dev/null +++ b/util/syncutil/mutex_sync.go @@ -0,0 +1,32 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !deadlock + +package syncutil + +import "sync" + +// EnableDeadlock is a flag to enable deadlock detection. +const EnableDeadlock = false + +// A Mutex is a mutual exclusion lock. +type Mutex struct { + sync.Mutex +} + +// An RWMutex is a reader/writer mutual exclusion lock. +type RWMutex struct { + sync.RWMutex +}