diff --git a/ttl/ttlworker/job_manager.go b/ttl/ttlworker/job_manager.go index 92d927cf0d631..bb720e99d7056 100644 --- a/ttl/ttlworker/job_manager.go +++ b/ttl/ttlworker/job_manager.go @@ -447,7 +447,41 @@ j: } func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) { +<<<<<<< HEAD:ttl/ttlworker/job_manager.go if !variable.EnableTTLJob.Load() || !timeutil.WithinDayTimePeriod(variable.TTLJobScheduleWindowStartTime.Load(), variable.TTLJobScheduleWindowEndTime.Load(), now) { +======= + tz, err := se.GlobalTimeZone(m.ctx) + if err != nil { + terror.Log(err) + } else { + now = now.In(tz) + } + + // Try to lock HB timeout jobs, to avoid the case that when the `tidb_ttl_job_enable = 'OFF'`, the HB timeout job will + // never be cancelled. + jobTables := m.readyForLockHBTimeoutJobTables(now) + // TODO: also consider to resume tables, but it's fine to left them there, as other nodes will take this job + // when the heart beat is not sent + for _, table := range jobTables { + logutil.Logger(m.ctx).Info("try lock new job", zap.Int64("tableID", table.ID)) + if _, err := m.lockHBTimeoutJob(m.ctx, se, table, now); err != nil { + logutil.Logger(m.ctx).Warn("failed to lock heartbeat timeout job", zap.Error(err)) + } + } + + cancelJobs := false + cancelReason := "" + switch { + case !variable.EnableTTLJob.Load(): + cancelJobs = true + cancelReason = "tidb_ttl_job_enable turned off" + case !timeutil.WithinDayTimePeriod(variable.TTLJobScheduleWindowStartTime.Load(), variable.TTLJobScheduleWindowEndTime.Load(), now): + cancelJobs = true + cancelReason = "out of TTL job schedule window" + } + + if cancelJobs { +>>>>>>> afe8a09e928 (ttl: cancel the hearbeat timeout job after disable the TTL (#57452)):pkg/ttl/ttlworker/job_manager.go if len(m.runningJobs) > 0 { for _, job := range m.runningJobs { logutil.Logger(m.ctx).Info("cancel job because tidb_ttl_job_enable turned off", zap.String("jobID", job.id)) @@ -487,6 +521,7 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) { } m.removeJob(job) } +<<<<<<< HEAD:ttl/ttlworker/job_manager.go // don't lock job if there's no free scan workers in local // it's a mechanism to avoid too many scan tasks waiting in the ttl_tasks table. @@ -508,6 +543,8 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) { logutil.Logger(m.ctx).Warn("fail to create new job", zap.Error(err)) } } +======= +>>>>>>> afe8a09e928 (ttl: cancel the hearbeat timeout job after disable the TTL (#57452)):pkg/ttl/ttlworker/job_manager.go } func (m *JobManager) localJobs() []*ttlJob { diff --git a/ttl/ttlworker/job_manager_integration_test.go b/ttl/ttlworker/job_manager_integration_test.go index bec35373a0853..c17baf7b05595 100644 --- a/ttl/ttlworker/job_manager_integration_test.go +++ b/ttl/ttlworker/job_manager_integration_test.go @@ -1081,3 +1081,43 @@ func TestGetSession(t *testing.T) { tk.MustQuery("select @@tidb_retry_limit, @@tidb_enable_1pc, @@tidb_enable_async_commit, @@tidb_isolation_read_engines"). Check(testkit.Rows("1 0 0 tiflash,tidb")) } + +func TestDisableTTLAfterLoseHeartbeat(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + waitAndStopTTLManager(t, dom) + tk := testkit.NewTestKit(t, store) + + sessionFactory := sessionFactory(t, store) + se := sessionFactory() + + tk.MustExec("use test") + tk.MustExec("CREATE TABLE t (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR") + testTable, err := dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) + require.NoError(t, err) + + ctx := context.Background() + m1 := ttlworker.NewJobManager("test-ttl-job-manager-1", nil, store, nil, nil) + require.NoError(t, m1.InfoSchemaCache().Update(se)) + require.NoError(t, m1.TableStatusCache().Update(ctx, se)) + + now := se.Now() + _, err = m1.LockJob(context.Background(), se, m1.InfoSchemaCache().Tables[testTable.Meta().ID], now, uuid.NewString(), false) + require.NoError(t, err) + tk.MustQuery("select current_job_status from mysql.tidb_ttl_table_status").Check(testkit.Rows("running")) + + // lose heartbeat. Simulate the situation that m1 doesn't update the hearbeat for 8 hours. + now = now.Add(time.Hour * 8) + + // stop the tidb_ttl_job_enable + tk.MustExec("set global tidb_ttl_job_enable = 'OFF'") + defer tk.MustExec("set global tidb_ttl_job_enable = 'ON'") + + // reschedule and try to get the job + m2 := ttlworker.NewJobManager("test-ttl-job-manager-2", nil, store, nil, nil) + require.NoError(t, m2.InfoSchemaCache().Update(se)) + require.NoError(t, m2.TableStatusCache().Update(ctx, se)) + m2.RescheduleJobs(se, now) + + // the job should have been cancelled + tk.MustQuery("select current_job_status from mysql.tidb_ttl_table_status").Check(testkit.Rows("")) +}