Skip to content

Commit

Permalink
ttl: cancel the hearbeat timeout job after disable the TTL (#57452)
Browse files Browse the repository at this point in the history
close #57404
  • Loading branch information
YangKeao authored Nov 18, 2024
1 parent b37a837 commit afe8a09
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 10 deletions.
22 changes: 12 additions & 10 deletions pkg/ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,18 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) {
now = now.In(tz)
}

// Try to lock HB timeout jobs, to avoid the case that when the `tidb_ttl_job_enable = 'OFF'`, the HB timeout job will
// never be cancelled.
jobTables := m.readyForLockHBTimeoutJobTables(now)
// TODO: also consider to resume tables, but it's fine to left them there, as other nodes will take this job
// when the heart beat is not sent
for _, table := range jobTables {
logutil.Logger(m.ctx).Info("try lock new job", zap.Int64("tableID", table.ID))
if _, err := m.lockHBTimeoutJob(m.ctx, se, table, now); err != nil {
logutil.Logger(m.ctx).Warn("failed to lock heartbeat timeout job", zap.Error(err))
}
}

cancelJobs := false
cancelReason := ""
switch {
Expand Down Expand Up @@ -635,16 +647,6 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) {
}
m.removeJob(job)
}

jobTables := m.readyForLockHBTimeoutJobTables(now)
// TODO: also consider to resume tables, but it's fine to left them there, as other nodes will take this job
// when the heart beat is not sent
for _, table := range jobTables {
logutil.Logger(m.ctx).Info("try lock new job", zap.Int64("tableID", table.ID))
if _, err := m.lockHBTimeoutJob(m.ctx, se, table, now); err != nil {
logutil.Logger(m.ctx).Warn("failed to lock heartbeat timeout job", zap.Error(err))
}
}
}

func (m *JobManager) localJobs() []*ttlJob {
Expand Down
40 changes: 40 additions & 0 deletions pkg/ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1459,3 +1459,43 @@ func boostJobScheduleForTest(t *testing.T) func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/check-task-interval"))
}
}

func TestDisableTTLAfterLoseHeartbeat(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
waitAndStopTTLManager(t, dom)
tk := testkit.NewTestKit(t, store)

sessionFactory := sessionFactory(t, store)
se := sessionFactory()

tk.MustExec("use test")
tk.MustExec("CREATE TABLE t (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR")
testTable, err := dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t"))
require.NoError(t, err)

ctx := context.Background()
m1 := ttlworker.NewJobManager("test-ttl-job-manager-1", nil, store, nil, nil)
require.NoError(t, m1.InfoSchemaCache().Update(se))
require.NoError(t, m1.TableStatusCache().Update(ctx, se))

now := se.Now()
_, err = m1.LockJob(context.Background(), se, m1.InfoSchemaCache().Tables[testTable.Meta().ID], now, uuid.NewString(), false)
require.NoError(t, err)
tk.MustQuery("select current_job_status from mysql.tidb_ttl_table_status").Check(testkit.Rows("running"))

// lose heartbeat. Simulate the situation that m1 doesn't update the hearbeat for 8 hours.
now = now.Add(time.Hour * 8)

// stop the tidb_ttl_job_enable
tk.MustExec("set global tidb_ttl_job_enable = 'OFF'")
defer tk.MustExec("set global tidb_ttl_job_enable = 'ON'")

// reschedule and try to get the job
m2 := ttlworker.NewJobManager("test-ttl-job-manager-2", nil, store, nil, nil)
require.NoError(t, m2.InfoSchemaCache().Update(se))
require.NoError(t, m2.TableStatusCache().Update(ctx, se))
m2.RescheduleJobs(se, now)

// the job should have been cancelled
tk.MustQuery("select current_job_status from mysql.tidb_ttl_table_status").Check(testkit.Rows("<nil>"))
}

0 comments on commit afe8a09

Please sign in to comment.