diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/analysis_job_factory.go b/pkg/statistics/handle/autoanalyze/priorityqueue/analysis_job_factory.go index fd52909eff1fe..3a20597c02b91 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/analysis_job_factory.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/analysis_job_factory.go @@ -57,7 +57,7 @@ func (f *AnalysisJobFactory) CreateNonPartitionedTableAnalysisJob( tblInfo *model.TableInfo, tblStats *statistics.Table, ) AnalysisJob { - if !tblStats.IsEligibleForAnalysis() { + if tblStats == nil || !tblStats.IsEligibleForAnalysis() { return nil } @@ -92,7 +92,7 @@ func (f *AnalysisJobFactory) CreateStaticPartitionAnalysisJob( partitionID int64, partitionStats *statistics.Table, ) AnalysisJob { - if !partitionStats.IsEligibleForAnalysis() { + if partitionStats == nil || !partitionStats.IsEligibleForAnalysis() { return nil } @@ -128,7 +128,7 @@ func (f *AnalysisJobFactory) CreateDynamicPartitionedTableAnalysisJob( globalTblStats *statistics.Table, partitionStats map[PartitionIDAndName]*statistics.Table, ) AnalysisJob { - if !globalTblStats.IsEligibleForAnalysis() { + if globalTblStats == nil || !globalTblStats.IsEligibleForAnalysis() { return nil } diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go b/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go index a64ef48933669..39b75ed7af376 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go @@ -644,7 +644,8 @@ func (pq *AnalysisPriorityQueue) RefreshLastAnalysisDuration() { zap.Int64("tableID", job.GetTableID()), zap.String("job", job.String()), ) - // TODO: Remove this after handling the DDL event. + // Delete the job from the queue since its table is missing. This is a safeguard - + // DDL events should have already cleaned up jobs for dropped tables. err := pq.syncFields.inner.delete(job) if err != nil { statslogutil.StatsLogger().Error("Failed to delete job from priority queue", @@ -652,6 +653,7 @@ func (pq *AnalysisPriorityQueue) RefreshLastAnalysisDuration() { zap.String("job", job.String()), ) } + continue } indicators.LastAnalysisDuration = jobFactory.GetTableLastAnalyzeDuration(tableStats) job.SetIndicators(indicators) @@ -752,11 +754,24 @@ func (pq *AnalysisPriorityQueue) Pop() (AnalysisJob, error) { job.RegisterSuccessHook(func(j AnalysisJob) { pq.syncFields.mu.Lock() defer pq.syncFields.mu.Unlock() + // During owner switch, the priority queue is closed and its fields are reset to nil. + // We allow running jobs to complete normally rather than stopping them, so this nil + // check is expected when the job finishes after the switch. + if pq.syncFields.runningJobs == nil { + return + } delete(pq.syncFields.runningJobs, j.GetTableID()) }) job.RegisterFailureHook(func(j AnalysisJob, needRetry bool) { pq.syncFields.mu.Lock() defer pq.syncFields.mu.Unlock() + // During owner switch, the priority queue is closed and its fields are reset to nil. + // We allow running jobs to complete normally rather than stopping them, so this nil check + // is expected when jobs finish after the switch. Failed jobs will be handled by the next + // initialization, so we can safely ignore them here. + if pq.syncFields.runningJobs == nil || pq.syncFields.mustRetryJobs == nil { + return + } // Mark the job as failed and remove it from the running jobs. delete(pq.syncFields.runningJobs, j.GetTableID()) if needRetry { diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/queue_test.go b/pkg/statistics/handle/autoanalyze/priorityqueue/queue_test.go index 29f8f0b6b7f34..0e32590ff6aa3 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/queue_test.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/queue_test.go @@ -19,6 +19,7 @@ import ( "testing" "time" + "github.com/pingcap/tidb/pkg/meta/model" pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/statistics" @@ -608,3 +609,47 @@ func TestPQCanBeClosedAndReInitialized(t *testing.T) { // Check if the priority queue is initialized. require.True(t, pq.IsInitialized()) } + +func TestPQHandlesTableDeletionGracefully(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + handle := dom.StatsHandle() + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t1 (a int)") + tk.MustExec("insert into t1 values (1)") + statistics.AutoAnalyzeMinCnt = 0 + defer func() { + statistics.AutoAnalyzeMinCnt = 1000 + }() + + ctx := context.Background() + require.NoError(t, handle.DumpStatsDeltaToKV(true)) + require.NoError(t, handle.Update(ctx, dom.InfoSchema())) + pq := priorityqueue.NewAnalysisPriorityQueue(handle) + defer pq.Close() + require.NoError(t, pq.Initialize()) + + // Check the priority queue is not empty. + l, err := pq.Len() + require.NoError(t, err) + require.NotEqual(t, 0, l) + + tbl, err := dom.InfoSchema().TableByName(ctx, pmodel.NewCIStr("test"), pmodel.NewCIStr("t1")) + require.NoError(t, err) + + // Drop the table and mock the table stats is removed from the cache. + tk.MustExec("drop table t1") + deleteEvent := findEvent(handle.DDLEventCh(), model.ActionDropTable) + require.NotNil(t, deleteEvent) + require.NoError(t, handle.HandleDDLEvent(deleteEvent)) + require.NoError(t, handle.Update(ctx, dom.InfoSchema())) + + // Make sure handle.Get() returns false. + _, ok := handle.Get(tbl.Meta().ID) + require.False(t, ok) + + require.NotPanics(t, func() { + pq.RefreshLastAnalysisDuration() + }) +} diff --git a/pkg/statistics/handle/autoanalyze/refresher/refresher.go b/pkg/statistics/handle/autoanalyze/refresher/refresher.go index 4bbb0d975a665..06cf3ca0b36a9 100644 --- a/pkg/statistics/handle/autoanalyze/refresher/refresher.go +++ b/pkg/statistics/handle/autoanalyze/refresher/refresher.go @@ -261,6 +261,7 @@ func (*Refresher) OnBecomeOwner() { // OnRetireOwner is used to handle the event when the current TiDB instance retires from being the stats owner. func (r *Refresher) OnRetireOwner() { - // Stop the worker and close the queue. + // Theoretically we should stop the worker here, but stopping analysis jobs can be time-consuming. + // To avoid blocking etcd leader re-election, we only close the priority queue. r.jobs.Close() }