From a22fc590cc9efb13c025386712f39338c9821187 Mon Sep 17 00:00:00 2001 From: Rustin Date: Thu, 31 Oct 2024 16:48:34 +0800 Subject: [PATCH] statistics: add the refresher as a stats owner listener (#56998) ref pingcap/tidb#55906 --- pkg/domain/domain.go | 2 +- pkg/owner/BUILD.bazel | 2 +- pkg/owner/manager.go | 25 +++++++++++++++ pkg/owner/manager_test.go | 16 ++++++++++ .../handle/autoanalyze/autoanalyze.go | 10 ++++++ .../handle/autoanalyze/priorityqueue/queue.go | 32 ++++++++++--------- .../handle/autoanalyze/refresher/refresher.go | 12 +++++++ pkg/statistics/handle/types/BUILD.bazel | 1 + pkg/statistics/handle/types/interfaces.go | 3 ++ 9 files changed, 86 insertions(+), 17 deletions(-) diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index e83117d148aee..87a9c8f4f8da1 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -2356,7 +2356,7 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err variable.EnableStatsOwner = do.enableStatsOwner variable.DisableStatsOwner = do.disableStatsOwner do.statsOwner = do.newOwnerManager(handle.StatsPrompt, handle.StatsOwnerKey) - do.statsOwner.SetListener(do.ddlNotifier) + do.statsOwner.SetListener(owner.NewListenersWrapper(statsHandle, do.ddlNotifier)) do.wg.Run(func() { do.indexUsageWorker() }, "indexUsageWorker") diff --git a/pkg/owner/BUILD.bazel b/pkg/owner/BUILD.bazel index 62dbe260e3127..20d8e5ac8debe 100644 --- a/pkg/owner/BUILD.bazel +++ b/pkg/owner/BUILD.bazel @@ -37,7 +37,7 @@ go_test( ], embed = [":owner"], flaky = True, - shard_count = 9, + shard_count = 10, deps = [ "//pkg/ddl", "//pkg/infoschema", diff --git a/pkg/owner/manager.go b/pkg/owner/manager.go index e4d467b16b1db..577cff9d53e55 100644 --- a/pkg/owner/manager.go +++ b/pkg/owner/manager.go @@ -554,3 +554,28 @@ func AcquireDistributedLock( } }, nil } + +// ListenersWrapper is a list of listeners. +// A way to broadcast events to multiple listeners. +type ListenersWrapper struct { + listeners []Listener +} + +// OnBecomeOwner broadcasts the OnBecomeOwner event to all listeners. +func (ol *ListenersWrapper) OnBecomeOwner() { + for _, l := range ol.listeners { + l.OnBecomeOwner() + } +} + +// OnRetireOwner broadcasts the OnRetireOwner event to all listeners. +func (ol *ListenersWrapper) OnRetireOwner() { + for _, l := range ol.listeners { + l.OnRetireOwner() + } +} + +// NewListenersWrapper creates a new OwnerListeners. +func NewListenersWrapper(listeners ...Listener) *ListenersWrapper { + return &ListenersWrapper{listeners: listeners} +} diff --git a/pkg/owner/manager_test.go b/pkg/owner/manager_test.go index 907ebe79de896..2955c550f82a7 100644 --- a/pkg/owner/manager_test.go +++ b/pkg/owner/manager_test.go @@ -560,3 +560,19 @@ func TestAcquireDistributedLock(t *testing.T) { release2() }) } + +func TestListenersWrapper(t *testing.T) { + lis1 := &listener{} + lis2 := &listener{} + wrapper := owner.NewListenersWrapper(lis1, lis2) + + // Test OnBecomeOwner + wrapper.OnBecomeOwner() + require.True(t, lis1.val.Load()) + require.True(t, lis2.val.Load()) + + // Test OnRetireOwner + wrapper.OnRetireOwner() + require.False(t, lis1.val.Load()) + require.False(t, lis2.val.Load()) +} diff --git a/pkg/statistics/handle/autoanalyze/autoanalyze.go b/pkg/statistics/handle/autoanalyze/autoanalyze.go index 9774bf7806f36..e6f4daf2f99a1 100644 --- a/pkg/statistics/handle/autoanalyze/autoanalyze.go +++ b/pkg/statistics/handle/autoanalyze/autoanalyze.go @@ -138,6 +138,16 @@ func (sa *statsAnalyze) CleanupCorruptedAnalyzeJobsOnDeadInstances() error { }, statsutil.FlagWrapTxn) } +// OnBecomeOwner is used to handle the event when the current TiDB instance becomes the stats owner. +func (sa *statsAnalyze) OnBecomeOwner() { + sa.refresher.OnBecomeOwner() +} + +// OnRetireOwner is used to handle the event when the current TiDB instance retires from being the stats owner. +func (sa *statsAnalyze) OnRetireOwner() { + sa.refresher.OnRetireOwner() +} + // SelectAnalyzeJobsOnCurrentInstanceSQL is the SQL to select the analyze jobs whose // state is `pending` or `running` and the update time is more than 10 minutes ago // and the instance is current instance. diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go b/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go index 9d295374a7912..4f23f7c58de48 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go @@ -72,18 +72,20 @@ type pqHeap interface { // //nolint:fieldalignment type AnalysisPriorityQueue struct { + ctx context.Context statsHandle statstypes.StatsHandle calculator *PriorityCalculator - ctx context.Context - cancel context.CancelFunc - wg util.WaitGroupWrapper + wg util.WaitGroupWrapper // syncFields is a substructure to hold fields protected by mu. syncFields struct { // mu is used to protect the following fields. - mu sync.RWMutex - inner pqHeap + mu sync.RWMutex + // Because the Initialize and Close functions can be called concurrently, + // so we need to protect the cancel function to avoid data race. + cancel context.CancelFunc + inner pqHeap // runningJobs is a map to store the running jobs. Used to avoid duplicate jobs. runningJobs map[int64]struct{} // lastDMLUpdateFetchTimestamp is the timestamp of the last DML update fetch. @@ -97,20 +99,11 @@ type AnalysisPriorityQueue struct { // NewAnalysisPriorityQueue creates a new AnalysisPriorityQueue2. func NewAnalysisPriorityQueue(handle statstypes.StatsHandle) *AnalysisPriorityQueue { - ctx, cancel := context.WithCancel(context.Background()) - queue := &AnalysisPriorityQueue{ statsHandle: handle, calculator: NewPriorityCalculator(), - ctx: ctx, - cancel: cancel, } - queue.syncFields.mu.Lock() - queue.syncFields.runningJobs = make(map[int64]struct{}) - queue.syncFields.failedJobs = make(map[int64]struct{}) - queue.syncFields.mu.Unlock() - return queue } @@ -144,6 +137,12 @@ func (pq *AnalysisPriorityQueue) Initialize() error { pq.Close() return errors.Trace(err) } + + ctx, cancel := context.WithCancel(context.Background()) + pq.ctx = ctx + pq.syncFields.cancel = cancel + pq.syncFields.runningJobs = make(map[int64]struct{}) + pq.syncFields.failedJobs = make(map[int64]struct{}) pq.syncFields.initialized = true pq.syncFields.mu.Unlock() @@ -813,6 +812,9 @@ func (pq *AnalysisPriorityQueue) Close() { return } - pq.cancel() + // It is possible that the priority queue is not initialized. + if pq.syncFields.cancel != nil { + pq.syncFields.cancel() + } pq.wg.Wait() } diff --git a/pkg/statistics/handle/autoanalyze/refresher/refresher.go b/pkg/statistics/handle/autoanalyze/refresher/refresher.go index f79c3ae16b763..ab90523f6c6b7 100644 --- a/pkg/statistics/handle/autoanalyze/refresher/refresher.go +++ b/pkg/statistics/handle/autoanalyze/refresher/refresher.go @@ -251,3 +251,15 @@ func (r *Refresher) Close() { r.jobs.Close() } } + +// OnBecomeOwner is used to handle the event when the current TiDB instance becomes the stats owner. +func (*Refresher) OnBecomeOwner() { + // No action is taken when becoming the stats owner. + // Initialization of the Refresher can fail, so operations are deferred until the first auto-analyze check. +} + +// OnRetireOwner is used to handle the event when the current TiDB instance retires from being the stats owner. +func (r *Refresher) OnRetireOwner() { + // Stop the worker and close the queue. + r.jobs.Close() +} diff --git a/pkg/statistics/handle/types/BUILD.bazel b/pkg/statistics/handle/types/BUILD.bazel index 373eb6271edcd..7f9388ad78bc4 100644 --- a/pkg/statistics/handle/types/BUILD.bazel +++ b/pkg/statistics/handle/types/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "//pkg/ddl/notifier", "//pkg/infoschema", "//pkg/meta/model", + "//pkg/owner", "//pkg/parser/ast", "//pkg/sessionctx", "//pkg/sessionctx/stmtctx", diff --git a/pkg/statistics/handle/types/interfaces.go b/pkg/statistics/handle/types/interfaces.go index f75529dbd92f4..c6112bb08bc14 100644 --- a/pkg/statistics/handle/types/interfaces.go +++ b/pkg/statistics/handle/types/interfaces.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/tidb/pkg/ddl/notifier" "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/owner" "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" @@ -119,6 +120,8 @@ type StatsHistory interface { // StatsAnalyze is used to handle auto-analyze and manage analyze jobs. type StatsAnalyze interface { + owner.Listener + // InsertAnalyzeJob inserts analyze job into mysql.analyze_jobs and gets job ID for further updating job. InsertAnalyzeJob(job *statistics.AnalyzeJob, instance string, procID uint64) error