Skip to content

Commit

Permalink
statistics: handle deleted tables correctly in the PQ (#57649)
Browse files Browse the repository at this point in the history
close #57648
  • Loading branch information
Rustin170506 authored Nov 25, 2024
1 parent 9ff83eb commit 702c4f2
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (f *AnalysisJobFactory) CreateNonPartitionedTableAnalysisJob(
tblInfo *model.TableInfo,
tblStats *statistics.Table,
) AnalysisJob {
if !tblStats.IsEligibleForAnalysis() {
if tblStats == nil || !tblStats.IsEligibleForAnalysis() {
return nil
}

Expand Down Expand Up @@ -92,7 +92,7 @@ func (f *AnalysisJobFactory) CreateStaticPartitionAnalysisJob(
partitionID int64,
partitionStats *statistics.Table,
) AnalysisJob {
if !partitionStats.IsEligibleForAnalysis() {
if partitionStats == nil || !partitionStats.IsEligibleForAnalysis() {
return nil
}

Expand Down Expand Up @@ -128,7 +128,7 @@ func (f *AnalysisJobFactory) CreateDynamicPartitionedTableAnalysisJob(
globalTblStats *statistics.Table,
partitionStats map[PartitionIDAndName]*statistics.Table,
) AnalysisJob {
if !globalTblStats.IsEligibleForAnalysis() {
if globalTblStats == nil || !globalTblStats.IsEligibleForAnalysis() {
return nil
}

Expand Down
17 changes: 16 additions & 1 deletion pkg/statistics/handle/autoanalyze/priorityqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,14 +644,16 @@ func (pq *AnalysisPriorityQueue) RefreshLastAnalysisDuration() {
zap.Int64("tableID", job.GetTableID()),
zap.String("job", job.String()),
)
// TODO: Remove this after handling the DDL event.
// Delete the job from the queue since its table is missing. This is a safeguard -
// DDL events should have already cleaned up jobs for dropped tables.
err := pq.syncFields.inner.delete(job)
if err != nil {
statslogutil.StatsLogger().Error("Failed to delete job from priority queue",
zap.Error(err),
zap.String("job", job.String()),
)
}
continue
}
indicators.LastAnalysisDuration = jobFactory.GetTableLastAnalyzeDuration(tableStats)
job.SetIndicators(indicators)
Expand Down Expand Up @@ -752,11 +754,24 @@ func (pq *AnalysisPriorityQueue) Pop() (AnalysisJob, error) {
job.RegisterSuccessHook(func(j AnalysisJob) {
pq.syncFields.mu.Lock()
defer pq.syncFields.mu.Unlock()
// During owner switch, the priority queue is closed and its fields are reset to nil.
// We allow running jobs to complete normally rather than stopping them, so this nil
// check is expected when the job finishes after the switch.
if pq.syncFields.runningJobs == nil {
return
}
delete(pq.syncFields.runningJobs, j.GetTableID())
})
job.RegisterFailureHook(func(j AnalysisJob, needRetry bool) {
pq.syncFields.mu.Lock()
defer pq.syncFields.mu.Unlock()
// During owner switch, the priority queue is closed and its fields are reset to nil.
// We allow running jobs to complete normally rather than stopping them, so this nil check
// is expected when jobs finish after the switch. Failed jobs will be handled by the next
// initialization, so we can safely ignore them here.
if pq.syncFields.runningJobs == nil || pq.syncFields.mustRetryJobs == nil {
return
}
// Mark the job as failed and remove it from the running jobs.
delete(pq.syncFields.runningJobs, j.GetTableID())
if needRetry {
Expand Down
45 changes: 45 additions & 0 deletions pkg/statistics/handle/autoanalyze/priorityqueue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"
"time"

"github.com/pingcap/tidb/pkg/meta/model"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/statistics"
Expand Down Expand Up @@ -608,3 +609,47 @@ func TestPQCanBeClosedAndReInitialized(t *testing.T) {
// Check if the priority queue is initialized.
require.True(t, pq.IsInitialized())
}

func TestPQHandlesTableDeletionGracefully(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
handle := dom.StatsHandle()

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t1 (a int)")
tk.MustExec("insert into t1 values (1)")
statistics.AutoAnalyzeMinCnt = 0
defer func() {
statistics.AutoAnalyzeMinCnt = 1000
}()

ctx := context.Background()
require.NoError(t, handle.DumpStatsDeltaToKV(true))
require.NoError(t, handle.Update(ctx, dom.InfoSchema()))
pq := priorityqueue.NewAnalysisPriorityQueue(handle)
defer pq.Close()
require.NoError(t, pq.Initialize())

// Check the priority queue is not empty.
l, err := pq.Len()
require.NoError(t, err)
require.NotEqual(t, 0, l)

tbl, err := dom.InfoSchema().TableByName(ctx, pmodel.NewCIStr("test"), pmodel.NewCIStr("t1"))
require.NoError(t, err)

// Drop the table and mock the table stats is removed from the cache.
tk.MustExec("drop table t1")
deleteEvent := findEvent(handle.DDLEventCh(), model.ActionDropTable)
require.NotNil(t, deleteEvent)
require.NoError(t, handle.HandleDDLEvent(deleteEvent))
require.NoError(t, handle.Update(ctx, dom.InfoSchema()))

// Make sure handle.Get() returns false.
_, ok := handle.Get(tbl.Meta().ID)
require.False(t, ok)

require.NotPanics(t, func() {
pq.RefreshLastAnalysisDuration()
})
}
3 changes: 2 additions & 1 deletion pkg/statistics/handle/autoanalyze/refresher/refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ func (*Refresher) OnBecomeOwner() {

// OnRetireOwner is used to handle the event when the current TiDB instance retires from being the stats owner.
func (r *Refresher) OnRetireOwner() {
// Stop the worker and close the queue.
// Theoretically we should stop the worker here, but stopping analysis jobs can be time-consuming.
// To avoid blocking etcd leader re-election, we only close the priority queue.
r.jobs.Close()
}

0 comments on commit 702c4f2

Please sign in to comment.