From 3199444fe16f031a9605f2921dd26c0b4bbf9d05 Mon Sep 17 00:00:00 2001 From: yisaer Date: Mon, 16 Jan 2023 19:02:45 +0800 Subject: [PATCH 01/12] use enhanced wg --- domain/domain.go | 39 ++++++++++++++++---------------- statistics/handle/handle_hist.go | 2 +- util/wait_group_wrapper.go | 28 ++++++++++------------- util/wait_group_wrapper_test.go | 12 ++++------ 4 files changed, 38 insertions(+), 43 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index 977694d773998..75ce985d95ed3 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -116,16 +116,17 @@ type Domain struct { expensiveQueryHandle *expensivequery.Handle memoryUsageAlarmHandle *memoryusagealarm.Handle serverMemoryLimitHandle *servermemorylimit.Handle - wg util.WaitGroupWrapper - statsUpdating atomicutil.Int32 - cancel context.CancelFunc - indexUsageSyncLease time.Duration - dumpFileGcChecker *dumpFileGcChecker - planReplayerHandle *planReplayerHandle - expiredTimeStamp4PC types.Time - logBackupAdvancer *daemon.OwnerDaemon - historicalStatsWorker *HistoricalStatsWorker - ttlJobManager *ttlworker.JobManager + // TODO: use Run for each process in future pr + wg *util.WaitGroupEnhancedWrapper + statsUpdating atomicutil.Int32 + cancel context.CancelFunc + indexUsageSyncLease time.Duration + dumpFileGcChecker *dumpFileGcChecker + planReplayerHandle *planReplayerHandle + expiredTimeStamp4PC types.Time + logBackupAdvancer *daemon.OwnerDaemon + historicalStatsWorker *HistoricalStatsWorker + ttlJobManager *ttlworker.JobManager serverID uint64 serverIDSession *concurrency.Session @@ -918,7 +919,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio jobsIdsMap: make(map[int64]string), }, } - + do.wg = util.NewWaitGroupEnhancedWrapper("domain", do.exit) do.SchemaValidator = NewSchemaValidator(ddlLease, do) do.expensiveQueryHandle = expensivequery.NewExpensiveQueryHandle(do.exit) do.memoryUsageAlarmHandle = memoryusagealarm.NewMemoryUsageAlarmHandle(do.exit) @@ -1065,7 +1066,7 @@ func (do *Domain) Init( // Local store needs to get the change information for every DDL state in each session. go do.loadSchemaInLoop(ctx, ddlLease) } - do.wg.Run(do.mdlCheckLoop) + do.wg.Run(do.mdlCheckLoop, "mdlCheckLoop") do.wg.Add(3) go do.topNSlowQueryLoop() go do.infoSyncerKeeper() @@ -1107,7 +1108,7 @@ func (do *Domain) initLogBackup(ctx context.Context, pdClient pd.Client) error { if err != nil { return err } - do.wg.Run(loop) + do.wg.Run(loop, "logBackupAdvancer") return nil } @@ -1921,7 +1922,7 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err do.ddl.RegisterStatsHandle(statsHandle) // Negative stats lease indicates that it is in test, it does not need update. if do.statsLease >= 0 { - do.wg.Run(do.loadStatsWorker) + do.wg.Run(do.loadStatsWorker, "loadStatsWorker") } owner := do.newOwnerManager(handle.StatsPrompt, handle.StatsOwnerKey) if do.indexUsageSyncLease > 0 { @@ -1932,9 +1933,9 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err return nil } do.SetStatsUpdating(true) - do.wg.Run(func() { do.updateStatsWorker(ctx, owner) }) - do.wg.Run(func() { do.autoAnalyzeWorker(owner) }) - do.wg.Run(func() { do.gcAnalyzeHistory(owner) }) + do.wg.Run(func() { do.updateStatsWorker(ctx, owner) }, "updateStatsWorker") + do.wg.Run(func() { do.autoAnalyzeWorker(owner) }, "autoAnalyzeWorker") + do.wg.Run(func() { do.gcAnalyzeHistory(owner) }, "gcAnalyzeHistory") return nil } @@ -1944,7 +1945,7 @@ func (do *Domain) StartLoadStatsSubWorkers(ctxList []sessionctx.Context) { for i, ctx := range ctxList { statsHandle.StatsLoad.SubCtxs[i] = ctx do.wg.Add(1) - go statsHandle.SubLoadWorker(ctx, do.exit, &do.wg) + go statsHandle.SubLoadWorker(ctx, do.exit, do.wg) } } @@ -2529,7 +2530,7 @@ func (do *Domain) StartTTLJobManager() { if err != nil { logutil.BgLogger().Warn("fail to wait until the ttl job manager stop", zap.Error(err)) } - }) + }, "ttlJobManager") } // TTLJobManager returns the ttl job manager on this domain diff --git a/statistics/handle/handle_hist.go b/statistics/handle/handle_hist.go index ddfd88dd32ffb..ad04d946e3f22 100644 --- a/statistics/handle/handle_hist.go +++ b/statistics/handle/handle_hist.go @@ -182,7 +182,7 @@ type StatsReaderContext struct { } // SubLoadWorker loads hist data for each column -func (h *Handle) SubLoadWorker(ctx sessionctx.Context, exit chan struct{}, exitWg *util.WaitGroupWrapper) { +func (h *Handle) SubLoadWorker(ctx sessionctx.Context, exit chan struct{}, exitWg *util.WaitGroupEnhancedWrapper) { readerCtx := &StatsReaderContext{} defer func() { exitWg.Done() diff --git a/util/wait_group_wrapper.go b/util/wait_group_wrapper.go index 3f3b139990b81..2a6fb3b244a86 100644 --- a/util/wait_group_wrapper.go +++ b/util/wait_group_wrapper.go @@ -19,7 +19,6 @@ import ( "time" "github.com/pingcap/tidb/util/logutil" - "go.uber.org/atomic" "go.uber.org/zap" ) @@ -27,42 +26,39 @@ import ( // if the `exited` signal is true by print them on log. type WaitGroupEnhancedWrapper struct { sync.WaitGroup - exited *atomic.Bool source string registerProcess sync.Map } // NewWaitGroupEnhancedWrapper returns WaitGroupEnhancedWrapper, the empty source indicates the unit test, then // the `checkUnExitedProcess` won't be executed. -func NewWaitGroupEnhancedWrapper(source string, exited *atomic.Bool) *WaitGroupEnhancedWrapper { +func NewWaitGroupEnhancedWrapper(source string, exit chan struct{}) *WaitGroupEnhancedWrapper { wgew := &WaitGroupEnhancedWrapper{ - exited: exited, source: source, registerProcess: sync.Map{}, } if len(source) > 0 { - go wgew.checkUnExitedProcess() + go wgew.checkUnExitedProcess(exit) } return wgew } -func (w *WaitGroupEnhancedWrapper) checkUnExitedProcess() { +func (w *WaitGroupEnhancedWrapper) checkUnExitedProcess(exit chan struct{}) { + <-exit logutil.BgLogger().Info("waitGroupWrapper ready to check unexited process", zap.String("source", w.source)) - ticker := time.NewTimer(10 * time.Second) - defer ticker.Stop() - for { - <-ticker.C - continueCheck := w.check() - if !continueCheck { - return + if w.check() { + ticker := time.NewTimer(10 * time.Second) + defer ticker.Stop() + for { + continueCheck := w.check() + if !continueCheck { + return + } } } } func (w *WaitGroupEnhancedWrapper) check() bool { - if !w.exited.Load() { - return true - } unexitedProcess := make([]string, 0) w.registerProcess.Range(func(key, value any) bool { unexitedProcess = append(unexitedProcess, key.(string)) diff --git a/util/wait_group_wrapper_test.go b/util/wait_group_wrapper_test.go index 1ad9ada053ebb..bd22cc9e31e5d 100644 --- a/util/wait_group_wrapper_test.go +++ b/util/wait_group_wrapper_test.go @@ -36,8 +36,7 @@ func TestWaitGroupWrapperRun(t *testing.T) { require.Equal(t, expect, val.Load()) val.Store(0) - exited := atomic.NewBool(false) - wg2 := NewWaitGroupEnhancedWrapper("", exited) + wg2 := NewWaitGroupEnhancedWrapper("", nil) for i := int32(0); i < expect; i++ { wg2.Run(func() { val.Inc() @@ -62,8 +61,7 @@ func TestWaitGroupWrapperRunWithRecover(t *testing.T) { require.Equal(t, expect, val.Load()) val.Store(0) - exited := atomic.NewBool(false) - wg2 := NewWaitGroupEnhancedWrapper("", exited) + wg2 := NewWaitGroupEnhancedWrapper("",, nil) for i := int32(0); i < expect; i++ { wg2.RunWithRecover(func() { panic("test1") @@ -76,8 +74,8 @@ func TestWaitGroupWrapperRunWithRecover(t *testing.T) { } func TestWaitGroupWrapperCheck(t *testing.T) { - exited := atomic.NewBool(false) - wg := NewWaitGroupEnhancedWrapper("", exited) + exit := make(chan struct{}) + wg := NewWaitGroupEnhancedWrapper("", exit) quit := make(chan struct{}) wg.Run(func() { <-quit @@ -87,7 +85,7 @@ func TestWaitGroupWrapperCheck(t *testing.T) { require.True(t, wg.check()) // need continue check as existed unexited process - exited.Store(true) + close(exit) require.True(t, wg.check()) // no need to continue check as all process exited From adb7ad99bc391692a5b4d967ccffad378c955d09 Mon Sep 17 00:00:00 2001 From: yisaer Date: Mon, 16 Jan 2023 20:12:54 +0800 Subject: [PATCH 02/12] use enhanced wg --- util/wait_group_wrapper_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/util/wait_group_wrapper_test.go b/util/wait_group_wrapper_test.go index bd22cc9e31e5d..f7b6c476e4c98 100644 --- a/util/wait_group_wrapper_test.go +++ b/util/wait_group_wrapper_test.go @@ -61,7 +61,7 @@ func TestWaitGroupWrapperRunWithRecover(t *testing.T) { require.Equal(t, expect, val.Load()) val.Store(0) - wg2 := NewWaitGroupEnhancedWrapper("",, nil) + wg2 := NewWaitGroupEnhancedWrapper("", nil) for i := int32(0); i < expect; i++ { wg2.RunWithRecover(func() { panic("test1") From 77152f9ecbe28877d2989b8cd46d03296c4c87e3 Mon Sep 17 00:00:00 2001 From: yisaer Date: Mon, 16 Jan 2023 22:42:54 +0800 Subject: [PATCH 03/12] use enhanced wg --- domain/domain.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index 75ce985d95ed3..ed278507c6f40 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -117,7 +117,8 @@ type Domain struct { memoryUsageAlarmHandle *memoryusagealarm.Handle serverMemoryLimitHandle *servermemorylimit.Handle // TODO: use Run for each process in future pr - wg *util.WaitGroupEnhancedWrapper + ewg *util.WaitGroupEnhancedWrapper + wg util.WaitGroupWrapper statsUpdating atomicutil.Int32 cancel context.CancelFunc indexUsageSyncLease time.Duration @@ -890,6 +891,7 @@ func (do *Domain) Close() { do.cancel() } do.wg.Wait() + do.ewg.Wait() do.sysSessionPool.Close() variable.UnregisterStatistics(do.bindHandle.Load()) if do.onClose != nil { @@ -919,7 +921,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio jobsIdsMap: make(map[int64]string), }, } - do.wg = util.NewWaitGroupEnhancedWrapper("domain", do.exit) + do.ewg = util.NewWaitGroupEnhancedWrapper("domain", do.exit) do.SchemaValidator = NewSchemaValidator(ddlLease, do) do.expensiveQueryHandle = expensivequery.NewExpensiveQueryHandle(do.exit) do.memoryUsageAlarmHandle = memoryusagealarm.NewMemoryUsageAlarmHandle(do.exit) @@ -1066,7 +1068,7 @@ func (do *Domain) Init( // Local store needs to get the change information for every DDL state in each session. go do.loadSchemaInLoop(ctx, ddlLease) } - do.wg.Run(do.mdlCheckLoop, "mdlCheckLoop") + do.wg.Run(do.mdlCheckLoop) do.wg.Add(3) go do.topNSlowQueryLoop() go do.infoSyncerKeeper() @@ -1108,7 +1110,7 @@ func (do *Domain) initLogBackup(ctx context.Context, pdClient pd.Client) error { if err != nil { return err } - do.wg.Run(loop, "logBackupAdvancer") + do.wg.Run(loop) return nil } @@ -1922,7 +1924,7 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err do.ddl.RegisterStatsHandle(statsHandle) // Negative stats lease indicates that it is in test, it does not need update. if do.statsLease >= 0 { - do.wg.Run(do.loadStatsWorker, "loadStatsWorker") + do.ewg.Run(do.loadStatsWorker, "loadStatsWorker") } owner := do.newOwnerManager(handle.StatsPrompt, handle.StatsOwnerKey) if do.indexUsageSyncLease > 0 { @@ -1933,9 +1935,9 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err return nil } do.SetStatsUpdating(true) - do.wg.Run(func() { do.updateStatsWorker(ctx, owner) }, "updateStatsWorker") - do.wg.Run(func() { do.autoAnalyzeWorker(owner) }, "autoAnalyzeWorker") - do.wg.Run(func() { do.gcAnalyzeHistory(owner) }, "gcAnalyzeHistory") + do.wg.Run(func() { do.updateStatsWorker(ctx, owner) }) + do.wg.Run(func() { do.autoAnalyzeWorker(owner) }) + do.wg.Run(func() { do.gcAnalyzeHistory(owner) }) return nil } From e50b14dbfbbf69496e0556c495c41c1e25d2e5aa Mon Sep 17 00:00:00 2001 From: yisaer Date: Mon, 16 Jan 2023 22:46:59 +0800 Subject: [PATCH 04/12] use enhanced wg --- domain/domain.go | 4 ++-- statistics/handle/handle_hist.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index ed278507c6f40..39b02b4d49be7 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1947,7 +1947,7 @@ func (do *Domain) StartLoadStatsSubWorkers(ctxList []sessionctx.Context) { for i, ctx := range ctxList { statsHandle.StatsLoad.SubCtxs[i] = ctx do.wg.Add(1) - go statsHandle.SubLoadWorker(ctx, do.exit, do.wg) + go statsHandle.SubLoadWorker(ctx, do.exit, &do.wg) } } @@ -2532,7 +2532,7 @@ func (do *Domain) StartTTLJobManager() { if err != nil { logutil.BgLogger().Warn("fail to wait until the ttl job manager stop", zap.Error(err)) } - }, "ttlJobManager") + }) } // TTLJobManager returns the ttl job manager on this domain diff --git a/statistics/handle/handle_hist.go b/statistics/handle/handle_hist.go index ad04d946e3f22..ddfd88dd32ffb 100644 --- a/statistics/handle/handle_hist.go +++ b/statistics/handle/handle_hist.go @@ -182,7 +182,7 @@ type StatsReaderContext struct { } // SubLoadWorker loads hist data for each column -func (h *Handle) SubLoadWorker(ctx sessionctx.Context, exit chan struct{}, exitWg *util.WaitGroupEnhancedWrapper) { +func (h *Handle) SubLoadWorker(ctx sessionctx.Context, exit chan struct{}, exitWg *util.WaitGroupWrapper) { readerCtx := &StatsReaderContext{} defer func() { exitWg.Done() From 87cd0f2c5d5559fe980cda8f6ece5c45ebcdd8db Mon Sep 17 00:00:00 2001 From: yisaer Date: Mon, 16 Jan 2023 23:34:08 +0800 Subject: [PATCH 05/12] use enhanced wg --- util/wait_group_wrapper.go | 1 + 1 file changed, 1 insertion(+) diff --git a/util/wait_group_wrapper.go b/util/wait_group_wrapper.go index 2a6fb3b244a86..b91b35119a698 100644 --- a/util/wait_group_wrapper.go +++ b/util/wait_group_wrapper.go @@ -50,6 +50,7 @@ func (w *WaitGroupEnhancedWrapper) checkUnExitedProcess(exit chan struct{}) { ticker := time.NewTimer(10 * time.Second) defer ticker.Stop() for { + <-ticker.C continueCheck := w.check() if !continueCheck { return From 4fd03d2e298972735ca909cea3bd5701e24577f4 Mon Sep 17 00:00:00 2001 From: yisaer Date: Tue, 17 Jan 2023 00:18:01 +0800 Subject: [PATCH 06/12] use enhanced wg --- domain/domain.go | 11 +++++++++-- testkit/mockstore.go | 1 + util/wait_group_wrapper.go | 12 +++++++++--- util/wait_group_wrapper_test.go | 9 +++------ 4 files changed, 22 insertions(+), 11 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index 39b02b4d49be7..6ddfcc3ee56ee 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -921,7 +921,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio jobsIdsMap: make(map[int64]string), }, } - do.ewg = util.NewWaitGroupEnhancedWrapper("domain", do.exit) + do.ewg = util.NewWaitGroupEnhancedWrapper("domain", do.exit, enableEnhancedWaitGroupCheck.Load()) do.SchemaValidator = NewSchemaValidator(ddlLease, do) do.expensiveQueryHandle = expensivequery.NewExpensiveQueryHandle(do.exit) do.memoryUsageAlarmHandle = memoryusagealarm.NewMemoryUsageAlarmHandle(do.exit) @@ -1707,10 +1707,17 @@ func (do *Domain) SetupDumpFileGCChecker(ctx sessionctx.Context) { } var planReplayerHandleLease atomic.Uint64 +var enableEnhancedWaitGroupCheck atomic.Bool func init() { planReplayerHandleLease.Store(uint64(10 * time.Second)) enableDumpHistoricalStats.Store(true) + enableEnhancedWaitGroupCheck.Store(true) +} + +// DisableEnhancedWaitGroupCheck4Test diable check in test +func DisableEnhancedWaitGroupCheck4Test() { + enableEnhancedWaitGroupCheck.Store(false) } // DisablePlanReplayerBackgroundJob4Test disable plan replayer handle for test @@ -1924,7 +1931,7 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err do.ddl.RegisterStatsHandle(statsHandle) // Negative stats lease indicates that it is in test, it does not need update. if do.statsLease >= 0 { - do.ewg.Run(do.loadStatsWorker, "loadStatsWorker") + do.wg.Run(do.loadStatsWorker) } owner := do.newOwnerManager(handle.StatsPrompt, handle.StatsOwnerKey) if do.indexUsageSyncLease > 0 { diff --git a/testkit/mockstore.go b/testkit/mockstore.go index 12afe0e0f2f68..af25825811648 100644 --- a/testkit/mockstore.go +++ b/testkit/mockstore.go @@ -81,6 +81,7 @@ func bootstrap(t testing.TB, store kv.Storage, lease time.Duration) *domain.Doma session.DisableStats4Test() domain.DisablePlanReplayerBackgroundJob4Test() domain.DisableDumpHistoricalStats4Test() + domain.DisableEnhancedWaitGroupCheck4Test() dom, err := session.BootstrapSession(store) require.NoError(t, err) diff --git a/util/wait_group_wrapper.go b/util/wait_group_wrapper.go index b91b35119a698..6b2dec941a28c 100644 --- a/util/wait_group_wrapper.go +++ b/util/wait_group_wrapper.go @@ -32,20 +32,26 @@ type WaitGroupEnhancedWrapper struct { // NewWaitGroupEnhancedWrapper returns WaitGroupEnhancedWrapper, the empty source indicates the unit test, then // the `checkUnExitedProcess` won't be executed. -func NewWaitGroupEnhancedWrapper(source string, exit chan struct{}) *WaitGroupEnhancedWrapper { +func NewWaitGroupEnhancedWrapper(source string, exit chan struct{}, exitedCheck bool) *WaitGroupEnhancedWrapper { wgew := &WaitGroupEnhancedWrapper{ source: source, registerProcess: sync.Map{}, } - if len(source) > 0 { + if exitedCheck { + wgew.Add(1) go wgew.checkUnExitedProcess(exit) } return wgew } func (w *WaitGroupEnhancedWrapper) checkUnExitedProcess(exit chan struct{}) { + defer func() { + logutil.BgLogger().Info("waitGroupWrapper exit-checking exited", zap.String("source", w.source)) + w.Done() + }() + logutil.BgLogger().Info("waitGroupWrapper enable exit-checking", zap.String("source", w.source)) <-exit - logutil.BgLogger().Info("waitGroupWrapper ready to check unexited process", zap.String("source", w.source)) + logutil.BgLogger().Info("waitGroupWrapper start exit-checking", zap.String("source", w.source)) if w.check() { ticker := time.NewTimer(10 * time.Second) defer ticker.Stop() diff --git a/util/wait_group_wrapper_test.go b/util/wait_group_wrapper_test.go index f7b6c476e4c98..2d06abdd689df 100644 --- a/util/wait_group_wrapper_test.go +++ b/util/wait_group_wrapper_test.go @@ -36,7 +36,7 @@ func TestWaitGroupWrapperRun(t *testing.T) { require.Equal(t, expect, val.Load()) val.Store(0) - wg2 := NewWaitGroupEnhancedWrapper("", nil) + wg2 := NewWaitGroupEnhancedWrapper("", nil, false) for i := int32(0); i < expect; i++ { wg2.Run(func() { val.Inc() @@ -61,7 +61,7 @@ func TestWaitGroupWrapperRunWithRecover(t *testing.T) { require.Equal(t, expect, val.Load()) val.Store(0) - wg2 := NewWaitGroupEnhancedWrapper("", nil) + wg2 := NewWaitGroupEnhancedWrapper("", nil, false) for i := int32(0); i < expect; i++ { wg2.RunWithRecover(func() { panic("test1") @@ -75,15 +75,12 @@ func TestWaitGroupWrapperRunWithRecover(t *testing.T) { func TestWaitGroupWrapperCheck(t *testing.T) { exit := make(chan struct{}) - wg := NewWaitGroupEnhancedWrapper("", exit) + wg := NewWaitGroupEnhancedWrapper("", exit, false) quit := make(chan struct{}) wg.Run(func() { <-quit }, "test") - // directly skip check if exited is false - require.True(t, wg.check()) - // need continue check as existed unexited process close(exit) require.True(t, wg.check()) From ffbe8fd9341a49899a72dbf45bd42dd9775b3ec1 Mon Sep 17 00:00:00 2001 From: yisaer Date: Tue, 17 Jan 2023 11:51:45 +0800 Subject: [PATCH 07/12] use enhanced wg --- domain/domain.go | 22 ++++++++++------------ statistics/handle/handle_hist.go | 2 +- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index 6ddfcc3ee56ee..02858a40b5deb 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -117,8 +117,7 @@ type Domain struct { memoryUsageAlarmHandle *memoryusagealarm.Handle serverMemoryLimitHandle *servermemorylimit.Handle // TODO: use Run for each process in future pr - ewg *util.WaitGroupEnhancedWrapper - wg util.WaitGroupWrapper + wg *util.WaitGroupEnhancedWrapper statsUpdating atomicutil.Int32 cancel context.CancelFunc indexUsageSyncLease time.Duration @@ -891,7 +890,6 @@ func (do *Domain) Close() { do.cancel() } do.wg.Wait() - do.ewg.Wait() do.sysSessionPool.Close() variable.UnregisterStatistics(do.bindHandle.Load()) if do.onClose != nil { @@ -921,7 +919,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio jobsIdsMap: make(map[int64]string), }, } - do.ewg = util.NewWaitGroupEnhancedWrapper("domain", do.exit, enableEnhancedWaitGroupCheck.Load()) + do.wg = util.NewWaitGroupEnhancedWrapper("domain", do.exit, enableEnhancedWaitGroupCheck.Load()) do.SchemaValidator = NewSchemaValidator(ddlLease, do) do.expensiveQueryHandle = expensivequery.NewExpensiveQueryHandle(do.exit) do.memoryUsageAlarmHandle = memoryusagealarm.NewMemoryUsageAlarmHandle(do.exit) @@ -1068,7 +1066,7 @@ func (do *Domain) Init( // Local store needs to get the change information for every DDL state in each session. go do.loadSchemaInLoop(ctx, ddlLease) } - do.wg.Run(do.mdlCheckLoop) + do.wg.Run(do.mdlCheckLoop, "mdlCheckLoop") do.wg.Add(3) go do.topNSlowQueryLoop() go do.infoSyncerKeeper() @@ -1110,7 +1108,7 @@ func (do *Domain) initLogBackup(ctx context.Context, pdClient pd.Client) error { if err != nil { return err } - do.wg.Run(loop) + do.wg.Run(loop, "logBackupAdvancer") return nil } @@ -1931,7 +1929,7 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err do.ddl.RegisterStatsHandle(statsHandle) // Negative stats lease indicates that it is in test, it does not need update. if do.statsLease >= 0 { - do.wg.Run(do.loadStatsWorker) + do.wg.Run(do.loadStatsWorker, "loadStatsWorker") } owner := do.newOwnerManager(handle.StatsPrompt, handle.StatsOwnerKey) if do.indexUsageSyncLease > 0 { @@ -1942,9 +1940,9 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err return nil } do.SetStatsUpdating(true) - do.wg.Run(func() { do.updateStatsWorker(ctx, owner) }) - do.wg.Run(func() { do.autoAnalyzeWorker(owner) }) - do.wg.Run(func() { do.gcAnalyzeHistory(owner) }) + do.wg.Run(func() { do.updateStatsWorker(ctx, owner) }, "updateStatsWorker") + do.wg.Run(func() { do.autoAnalyzeWorker(owner) }, "autoAnalyzeWorker") + do.wg.Run(func() { do.gcAnalyzeHistory(owner) }, "gcAnalyzeHistory") return nil } @@ -1954,7 +1952,7 @@ func (do *Domain) StartLoadStatsSubWorkers(ctxList []sessionctx.Context) { for i, ctx := range ctxList { statsHandle.StatsLoad.SubCtxs[i] = ctx do.wg.Add(1) - go statsHandle.SubLoadWorker(ctx, do.exit, &do.wg) + go statsHandle.SubLoadWorker(ctx, do.exit, do.wg) } } @@ -2539,7 +2537,7 @@ func (do *Domain) StartTTLJobManager() { if err != nil { logutil.BgLogger().Warn("fail to wait until the ttl job manager stop", zap.Error(err)) } - }) + }, "ttlJobManager") } // TTLJobManager returns the ttl job manager on this domain diff --git a/statistics/handle/handle_hist.go b/statistics/handle/handle_hist.go index ddfd88dd32ffb..ad04d946e3f22 100644 --- a/statistics/handle/handle_hist.go +++ b/statistics/handle/handle_hist.go @@ -182,7 +182,7 @@ type StatsReaderContext struct { } // SubLoadWorker loads hist data for each column -func (h *Handle) SubLoadWorker(ctx sessionctx.Context, exit chan struct{}, exitWg *util.WaitGroupWrapper) { +func (h *Handle) SubLoadWorker(ctx sessionctx.Context, exit chan struct{}, exitWg *util.WaitGroupEnhancedWrapper) { readerCtx := &StatsReaderContext{} defer func() { exitWg.Done() From 09343c0b8edf764e3cbaf367606b3ffa7bbd3679 Mon Sep 17 00:00:00 2001 From: yisaer Date: Tue, 17 Jan 2023 12:25:18 +0800 Subject: [PATCH 08/12] support wg run --- util/wait_group_wrapper.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/util/wait_group_wrapper.go b/util/wait_group_wrapper.go index 6b2dec941a28c..21f808934f8d7 100644 --- a/util/wait_group_wrapper.go +++ b/util/wait_group_wrapper.go @@ -53,7 +53,7 @@ func (w *WaitGroupEnhancedWrapper) checkUnExitedProcess(exit chan struct{}) { <-exit logutil.BgLogger().Info("waitGroupWrapper start exit-checking", zap.String("source", w.source)) if w.check() { - ticker := time.NewTimer(10 * time.Second) + ticker := time.NewTimer(2 * time.Second) defer ticker.Stop() for { <-ticker.C From b560c919016a63f57358ce3ce9a3c5e4a8d7c5c7 Mon Sep 17 00:00:00 2001 From: yisaer Date: Tue, 17 Jan 2023 14:38:03 +0800 Subject: [PATCH 09/12] fix --- config/config.go | 2 ++ domain/domain.go | 9 +-------- testkit/mockstore.go | 1 - 3 files changed, 3 insertions(+), 9 deletions(-) diff --git a/config/config.go b/config/config.go index 68108267540b1..332aaff44bbc4 100644 --- a/config/config.go +++ b/config/config.go @@ -292,6 +292,8 @@ type Config struct { TiDBMaxReuseChunk uint32 `toml:"tidb-max-reuse-chunk" json:"tidb-max-reuse-chunk"` // TiDBMaxReuseColumn indicates max cached column num TiDBMaxReuseColumn uint32 `toml:"tidb-max-reuse-column" json:"tidb-max-reuse-column"` + // TiDBEnableExitChecking indicates whether checking background process in domain during exiting + TiDBEnableExitChecking bool `toml:"tidb-enable-exit-checking" json:"tidb-enable-exit-checking"` } // UpdateTempStoragePath is to update the `TempStoragePath` if port/statusPort was changed diff --git a/domain/domain.go b/domain/domain.go index 02858a40b5deb..403c1375d43d9 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -919,7 +919,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio jobsIdsMap: make(map[int64]string), }, } - do.wg = util.NewWaitGroupEnhancedWrapper("domain", do.exit, enableEnhancedWaitGroupCheck.Load()) + do.wg = util.NewWaitGroupEnhancedWrapper("domain", do.exit, config.GetGlobalConfig().TiDBEnableExitChecking) do.SchemaValidator = NewSchemaValidator(ddlLease, do) do.expensiveQueryHandle = expensivequery.NewExpensiveQueryHandle(do.exit) do.memoryUsageAlarmHandle = memoryusagealarm.NewMemoryUsageAlarmHandle(do.exit) @@ -1705,17 +1705,10 @@ func (do *Domain) SetupDumpFileGCChecker(ctx sessionctx.Context) { } var planReplayerHandleLease atomic.Uint64 -var enableEnhancedWaitGroupCheck atomic.Bool func init() { planReplayerHandleLease.Store(uint64(10 * time.Second)) enableDumpHistoricalStats.Store(true) - enableEnhancedWaitGroupCheck.Store(true) -} - -// DisableEnhancedWaitGroupCheck4Test diable check in test -func DisableEnhancedWaitGroupCheck4Test() { - enableEnhancedWaitGroupCheck.Store(false) } // DisablePlanReplayerBackgroundJob4Test disable plan replayer handle for test diff --git a/testkit/mockstore.go b/testkit/mockstore.go index af25825811648..12afe0e0f2f68 100644 --- a/testkit/mockstore.go +++ b/testkit/mockstore.go @@ -81,7 +81,6 @@ func bootstrap(t testing.TB, store kv.Storage, lease time.Duration) *domain.Doma session.DisableStats4Test() domain.DisablePlanReplayerBackgroundJob4Test() domain.DisableDumpHistoricalStats4Test() - domain.DisableEnhancedWaitGroupCheck4Test() dom, err := session.BootstrapSession(store) require.NoError(t, err) From 9e1280a3265bbe33e25f2b89e80f0db8b1f208dd Mon Sep 17 00:00:00 2001 From: yisaer Date: Tue, 17 Jan 2023 14:45:36 +0800 Subject: [PATCH 10/12] Revert "fix" This reverts commit b560c919016a63f57358ce3ce9a3c5e4a8d7c5c7. --- config/config.go | 2 -- domain/domain.go | 9 ++++++++- testkit/mockstore.go | 1 + 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/config/config.go b/config/config.go index 332aaff44bbc4..68108267540b1 100644 --- a/config/config.go +++ b/config/config.go @@ -292,8 +292,6 @@ type Config struct { TiDBMaxReuseChunk uint32 `toml:"tidb-max-reuse-chunk" json:"tidb-max-reuse-chunk"` // TiDBMaxReuseColumn indicates max cached column num TiDBMaxReuseColumn uint32 `toml:"tidb-max-reuse-column" json:"tidb-max-reuse-column"` - // TiDBEnableExitChecking indicates whether checking background process in domain during exiting - TiDBEnableExitChecking bool `toml:"tidb-enable-exit-checking" json:"tidb-enable-exit-checking"` } // UpdateTempStoragePath is to update the `TempStoragePath` if port/statusPort was changed diff --git a/domain/domain.go b/domain/domain.go index 403c1375d43d9..02858a40b5deb 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -919,7 +919,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio jobsIdsMap: make(map[int64]string), }, } - do.wg = util.NewWaitGroupEnhancedWrapper("domain", do.exit, config.GetGlobalConfig().TiDBEnableExitChecking) + do.wg = util.NewWaitGroupEnhancedWrapper("domain", do.exit, enableEnhancedWaitGroupCheck.Load()) do.SchemaValidator = NewSchemaValidator(ddlLease, do) do.expensiveQueryHandle = expensivequery.NewExpensiveQueryHandle(do.exit) do.memoryUsageAlarmHandle = memoryusagealarm.NewMemoryUsageAlarmHandle(do.exit) @@ -1705,10 +1705,17 @@ func (do *Domain) SetupDumpFileGCChecker(ctx sessionctx.Context) { } var planReplayerHandleLease atomic.Uint64 +var enableEnhancedWaitGroupCheck atomic.Bool func init() { planReplayerHandleLease.Store(uint64(10 * time.Second)) enableDumpHistoricalStats.Store(true) + enableEnhancedWaitGroupCheck.Store(true) +} + +// DisableEnhancedWaitGroupCheck4Test diable check in test +func DisableEnhancedWaitGroupCheck4Test() { + enableEnhancedWaitGroupCheck.Store(false) } // DisablePlanReplayerBackgroundJob4Test disable plan replayer handle for test diff --git a/testkit/mockstore.go b/testkit/mockstore.go index 9756d5bb65804..9d4b749a87f19 100644 --- a/testkit/mockstore.go +++ b/testkit/mockstore.go @@ -82,6 +82,7 @@ func bootstrap(t testing.TB, store kv.Storage, lease time.Duration) *domain.Doma session.DisableStats4Test() domain.DisablePlanReplayerBackgroundJob4Test() domain.DisableDumpHistoricalStats4Test() + domain.DisableEnhancedWaitGroupCheck4Test() dom, err := session.BootstrapSession(store) require.NoError(t, err) From bb7514098e27d31a699790442fea8d5f96e49bc2 Mon Sep 17 00:00:00 2001 From: yisaer Date: Tue, 17 Jan 2023 14:47:15 +0800 Subject: [PATCH 11/12] fix --- tests/realtikvtest/testkit.go | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/realtikvtest/testkit.go b/tests/realtikvtest/testkit.go index 4b8a749e65c9d..2d1d7888d618a 100644 --- a/tests/realtikvtest/testkit.go +++ b/tests/realtikvtest/testkit.go @@ -93,6 +93,7 @@ func CreateMockStoreAndDomainAndSetup(t *testing.T, opts ...mockstore.MockTiKVSt var err error session.SetSchemaLease(500 * time.Millisecond) + domain.DisableEnhancedWaitGroupCheck4Test() if *WithRealTiKV { var d driver.TiKVDriver From 948e55597155556e486426baeac920e01293767d Mon Sep 17 00:00:00 2001 From: yisaer Date: Tue, 17 Jan 2023 19:33:27 +0800 Subject: [PATCH 12/12] fix --- config/config.go | 3 +++ domain/domain.go | 9 +-------- testkit/mockstore.go | 1 - tests/realtikvtest/testkit.go | 1 - 4 files changed, 4 insertions(+), 10 deletions(-) diff --git a/config/config.go b/config/config.go index 68108267540b1..9030c4ac388f8 100644 --- a/config/config.go +++ b/config/config.go @@ -292,6 +292,8 @@ type Config struct { TiDBMaxReuseChunk uint32 `toml:"tidb-max-reuse-chunk" json:"tidb-max-reuse-chunk"` // TiDBMaxReuseColumn indicates max cached column num TiDBMaxReuseColumn uint32 `toml:"tidb-max-reuse-column" json:"tidb-max-reuse-column"` + // TiDBEnableExitCheck indicates whether exit-checking in domain for background process + TiDBEnableExitCheck bool `toml:"tidb-enable-exit-check" json:"tidb-enable-exit-check"` } // UpdateTempStoragePath is to update the `TempStoragePath` if port/statusPort was changed @@ -1000,6 +1002,7 @@ var defaultConf = Config{ DisaggregatedTiFlash: false, TiDBMaxReuseChunk: 64, TiDBMaxReuseColumn: 256, + TiDBEnableExitCheck: false, } var ( diff --git a/domain/domain.go b/domain/domain.go index 02858a40b5deb..2b6337d23cdec 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -919,7 +919,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio jobsIdsMap: make(map[int64]string), }, } - do.wg = util.NewWaitGroupEnhancedWrapper("domain", do.exit, enableEnhancedWaitGroupCheck.Load()) + do.wg = util.NewWaitGroupEnhancedWrapper("domain", do.exit, config.GetGlobalConfig().TiDBEnableExitCheck) do.SchemaValidator = NewSchemaValidator(ddlLease, do) do.expensiveQueryHandle = expensivequery.NewExpensiveQueryHandle(do.exit) do.memoryUsageAlarmHandle = memoryusagealarm.NewMemoryUsageAlarmHandle(do.exit) @@ -1705,17 +1705,10 @@ func (do *Domain) SetupDumpFileGCChecker(ctx sessionctx.Context) { } var planReplayerHandleLease atomic.Uint64 -var enableEnhancedWaitGroupCheck atomic.Bool func init() { planReplayerHandleLease.Store(uint64(10 * time.Second)) enableDumpHistoricalStats.Store(true) - enableEnhancedWaitGroupCheck.Store(true) -} - -// DisableEnhancedWaitGroupCheck4Test diable check in test -func DisableEnhancedWaitGroupCheck4Test() { - enableEnhancedWaitGroupCheck.Store(false) } // DisablePlanReplayerBackgroundJob4Test disable plan replayer handle for test diff --git a/testkit/mockstore.go b/testkit/mockstore.go index 9d4b749a87f19..9756d5bb65804 100644 --- a/testkit/mockstore.go +++ b/testkit/mockstore.go @@ -82,7 +82,6 @@ func bootstrap(t testing.TB, store kv.Storage, lease time.Duration) *domain.Doma session.DisableStats4Test() domain.DisablePlanReplayerBackgroundJob4Test() domain.DisableDumpHistoricalStats4Test() - domain.DisableEnhancedWaitGroupCheck4Test() dom, err := session.BootstrapSession(store) require.NoError(t, err) diff --git a/tests/realtikvtest/testkit.go b/tests/realtikvtest/testkit.go index 2d1d7888d618a..4b8a749e65c9d 100644 --- a/tests/realtikvtest/testkit.go +++ b/tests/realtikvtest/testkit.go @@ -93,7 +93,6 @@ func CreateMockStoreAndDomainAndSetup(t *testing.T, opts ...mockstore.MockTiKVSt var err error session.SetSchemaLease(500 * time.Millisecond) - domain.DisableEnhancedWaitGroupCheck4Test() if *WithRealTiKV { var d driver.TiKVDriver