diff --git a/pkg/ttl/ttlworker/config.go b/pkg/ttl/ttlworker/config.go index acaf9d52682c3..d3c7cbc5c5474 100644 --- a/pkg/ttl/ttlworker/config.go +++ b/pkg/ttl/ttlworker/config.go @@ -35,7 +35,7 @@ const ttlJobTimeout = 6 * time.Hour const taskManagerLoopTickerInterval = time.Minute const ttlTaskHeartBeatTickerInterval = time.Minute -const ttlGCInterval = time.Hour +const ttlGCInterval = 10 * time.Minute func getCheckJobInterval() time.Duration { failpoint.Inject("check-job-interval", func(val failpoint.Value) time.Duration { diff --git a/pkg/ttl/ttlworker/job_manager.go b/pkg/ttl/ttlworker/job_manager.go index f13bec8c5f6b1..5eac888e77523 100644 --- a/pkg/ttl/ttlworker/job_manager.go +++ b/pkg/ttl/ttlworker/job_manager.go @@ -64,8 +64,7 @@ const taskGCTemplate = `DELETE task FROM WHERE job.table_id IS NULL` const ttlJobHistoryGCTemplate = `DELETE FROM mysql.tidb_ttl_job_history WHERE create_time < CURDATE() - INTERVAL 90 DAY` -const ttlTableStatusGCWithoutIDTemplate = `DELETE FROM mysql.tidb_ttl_table_status WHERE current_job_status IS NULL` -const ttlTableStatusGCWithIDTemplate = ttlTableStatusGCWithoutIDTemplate + ` AND table_id NOT IN (%s)` +const ttlTableStatusGCWithoutIDTemplate = `DELETE FROM mysql.tidb_ttl_table_status WHERE (current_job_status IS NULL OR current_job_owner_hb_time < %?)` const timeFormat = time.DateTime @@ -81,15 +80,18 @@ func updateHeartBeatSQL(tableID int64, now time.Time, id string) (string, []any) return updateHeartBeatTemplate, []any{now.Format(timeFormat), tableID, id} } -func gcTTLTableStatusGCSQL(existIDs []int64) string { +func gcTTLTableStatusGCSQL(existIDs []int64, now time.Time) (string, []any) { existIDStrs := make([]string, 0, len(existIDs)) for _, id := range existIDs { existIDStrs = append(existIDStrs, strconv.Itoa(int(id))) } + + hbExpireTime := now.Add(-jobManagerLoopTickerInterval * 2) + args := []any{hbExpireTime.Format(timeFormat)} if len(existIDStrs) > 0 { - return fmt.Sprintf(ttlTableStatusGCWithIDTemplate, strings.Join(existIDStrs, ",")) + return ttlTableStatusGCWithoutIDTemplate + fmt.Sprintf(` AND table_id NOT IN (%s)`, strings.Join(existIDStrs, ",")), args } - return ttlTableStatusGCWithoutIDTemplate + return ttlTableStatusGCWithoutIDTemplate, args } // JobManager schedules and manages the ttl jobs on this instance @@ -219,7 +221,7 @@ func (m *JobManager) jobLoop() error { } case <-gcTicker: gcCtx, cancel := context.WithTimeout(m.ctx, ttlInternalSQLTimeout) - m.DoGC(gcCtx, se) + m.DoGC(gcCtx, se, now) cancel() // Job Schedule loop: case <-updateJobHeartBeatTicker: @@ -1029,7 +1031,7 @@ func summarizeTaskResult(tasks []*cache.TTLTask) (*TTLSummary, error) { } // DoGC deletes some old TTL job histories and redundant scan tasks -func (m *JobManager) DoGC(ctx context.Context, se session.Session) { +func (m *JobManager) DoGC(ctx context.Context, se session.Session, now time.Time) { // Remove the table not exist in info schema cache. // Delete the table status before deleting the tasks. Therefore the related tasks if err := m.updateInfoSchemaCache(se); err == nil { @@ -1038,7 +1040,8 @@ func (m *JobManager) DoGC(ctx context.Context, se session.Session) { for id := range m.infoSchemaCache.Tables { existIDs = append(existIDs, id) } - if _, err := se.ExecuteSQL(ctx, gcTTLTableStatusGCSQL(existIDs)); err != nil { + sql, args := gcTTLTableStatusGCSQL(existIDs, now) + if _, err := se.ExecuteSQL(ctx, sql, args...); err != nil { logutil.Logger(ctx).Warn("fail to gc ttl table status", zap.Error(err)) } } else { diff --git a/pkg/ttl/ttlworker/job_manager_integration_test.go b/pkg/ttl/ttlworker/job_manager_integration_test.go index 69624a1c2890c..0bd260077122b 100644 --- a/pkg/ttl/ttlworker/job_manager_integration_test.go +++ b/pkg/ttl/ttlworker/job_manager_integration_test.go @@ -612,7 +612,7 @@ func TestRescheduleJobsAfterTableDropped(t *testing.T) { tk.MustExec(rb.resume) table, err = dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) require.NoError(t, err) - m.DoGC(context.TODO(), se) + m.DoGC(context.TODO(), se, now) } } @@ -775,7 +775,7 @@ func TestGCScanTasks(t *testing.T) { return true }) se := session.NewSession(tk.Session(), tk.Session(), func(_ session.Session) {}) - m.DoGC(context.TODO(), se) + m.DoGC(context.TODO(), se, se.Now()) tk.MustQuery("select job_id, scan_id from mysql.tidb_ttl_task order by job_id, scan_id asc").Check(testkit.Rows("1 1", "1 2")) } @@ -794,7 +794,7 @@ func TestGCTableStatus(t *testing.T) { return true }) se := session.NewSession(tk.Session(), tk.Session(), func(_ session.Session) {}) - m.DoGC(context.TODO(), se) + m.DoGC(context.TODO(), se, se.Now()) tk.MustQuery("select * from mysql.tidb_ttl_table_status").Check(nil) // insert a running table status without corresponding table @@ -808,7 +808,7 @@ func TestGCTableStatus(t *testing.T) { current_job_ttl_expire = NOW(), current_job_owner_hb_time = NOW() WHERE table_id = ?`, 1, 2024) - m.DoGC(context.TODO(), se) + m.DoGC(context.TODO(), se, se.Now()) // it'll not be removed tk.MustQuery("select current_job_id from mysql.tidb_ttl_table_status").Check(testkit.Rows("1")) } @@ -855,7 +855,7 @@ func TestGCTTLHistory(t *testing.T) { return true }) se := session.NewSession(tk.Session(), tk.Session(), func(_ session.Session) {}) - m.DoGC(context.TODO(), se) + m.DoGC(context.TODO(), se, se.Now()) tk.MustQuery("select job_id from mysql.tidb_ttl_job_history order by job_id asc").Check(testkit.Rows("1", "2", "3", "4", "5")) } @@ -1475,27 +1475,93 @@ func TestDisableTTLAfterLoseHeartbeat(t *testing.T) { ctx := context.Background() m1 := ttlworker.NewJobManager("test-ttl-job-manager-1", nil, store, nil, nil) - require.NoError(t, m1.InfoSchemaCache().Update(se)) - require.NoError(t, m1.TableStatusCache().Update(ctx, se)) + m2 := ttlworker.NewJobManager("test-ttl-job-manager-2", nil, store, nil, nil) now := se.Now() - _, err = m1.LockJob(context.Background(), se, m1.InfoSchemaCache().Tables[testTable.Meta().ID], now, uuid.NewString(), false) - require.NoError(t, err) - tk.MustQuery("select current_job_status from mysql.tidb_ttl_table_status").Check(testkit.Rows("running")) - // lose heartbeat. Simulate the situation that m1 doesn't update the hearbeat for 8 hours. - now = now.Add(time.Hour * 8) + acquireJob := func(now time.Time) { + require.NoError(t, m1.InfoSchemaCache().Update(se)) + require.NoError(t, m1.TableStatusCache().Update(ctx, se)) + require.NoError(t, m2.InfoSchemaCache().Update(se)) + require.NoError(t, m2.TableStatusCache().Update(ctx, se)) + _, err = m1.LockJob(context.Background(), se, m1.InfoSchemaCache().Tables[testTable.Meta().ID], now, uuid.NewString(), false) + require.NoError(t, err) + tk.MustQuery("select current_job_status from mysql.tidb_ttl_table_status").Check(testkit.Rows("running")) + } + t.Run("disable TTL globally after losing heartbeat", func(t *testing.T) { + // now the TTL job should be scheduled again + now = now.Add(time.Hour * 8) + acquireJob(now) - // stop the tidb_ttl_job_enable - tk.MustExec("set global tidb_ttl_job_enable = 'OFF'") - defer tk.MustExec("set global tidb_ttl_job_enable = 'ON'") + // lose heartbeat. Simulate the situation that m1 doesn't update the hearbeat for 8 hours. + now = now.Add(time.Hour * 8) - // reschedule and try to get the job - m2 := ttlworker.NewJobManager("test-ttl-job-manager-2", nil, store, nil, nil) - require.NoError(t, m2.InfoSchemaCache().Update(se)) - require.NoError(t, m2.TableStatusCache().Update(ctx, se)) - m2.RescheduleJobs(se, now) + // stop the tidb_ttl_job_enable + tk.MustExec("set global tidb_ttl_job_enable = 'OFF'") + defer tk.MustExec("set global tidb_ttl_job_enable = 'ON'") + + // reschedule and try to get the job + require.NoError(t, m2.InfoSchemaCache().Update(se)) + require.NoError(t, m2.TableStatusCache().Update(ctx, se)) + m2.RescheduleJobs(se, now) + + // the job should have been cancelled + tk.MustQuery("select current_job_status from mysql.tidb_ttl_table_status").Check(testkit.Rows("")) + }) + + t.Run("disable TTL for a table after losing heartbeat", func(t *testing.T) { + // now the TTL job should be scheduled again + now = now.Add(time.Hour * 8) + acquireJob(now) - // the job should have been cancelled - tk.MustQuery("select current_job_status from mysql.tidb_ttl_table_status").Check(testkit.Rows("")) + // lose heartbeat. Simulate the situation that m1 doesn't update the hearbeat for 8 hours. + now = now.Add(time.Hour * 8) + + tk.MustExec("ALTER TABLE t TTL_ENABLE = 'OFF'") + defer tk.MustExec("ALTER TABLE t TTL_ENABLE = 'ON'") + + // reschedule and try to get the job + require.NoError(t, m2.InfoSchemaCache().Update(se)) + require.NoError(t, m2.TableStatusCache().Update(ctx, se)) + m2.RescheduleJobs(se, now) + + // the job cannot be cancelled, because it doesn't exist in the infoschema cache. + tk.MustQuery("select current_job_status from mysql.tidb_ttl_table_status").Check(testkit.Rows("running")) + + // run GC + m2.DoGC(ctx, se, now) + + // the job should have been cancelled + tk.MustQuery("select current_job_status, current_job_owner_hb_time from mysql.tidb_ttl_table_status").Check(testkit.Rows()) + }) + + t.Run("drop a TTL table after losing heartbeat", func(t *testing.T) { + // now the TTL job should be scheduled again + now = now.Add(time.Hour * 8) + acquireJob(now) + + // lose heartbeat. Simulate the situation that m1 doesn't update the hearbeat for 8 hours. + now = now.Add(time.Hour * 8) + + tk.MustExec("DROP TABLE t") + defer func() { + tk.MustExec("CREATE TABLE t (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR") + testTable, err = dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) + require.NoError(t, err) + }() + + // reschedule and try to get the job + require.NoError(t, m2.InfoSchemaCache().Update(se)) + require.NoError(t, m2.TableStatusCache().Update(ctx, se)) + m2.RescheduleJobs(se, now) + + // the job cannot be cancelled, because it doesn't exist in the infoschema cache. + tk.MustQuery("select current_job_status from mysql.tidb_ttl_table_status").Check(testkit.Rows("running")) + + // run GC + m2.DoGC(ctx, se, now) + + // the job should have been cancelled + tk.MustQuery("select current_job_status, current_job_owner_hb_time from mysql.tidb_ttl_table_status").Check(testkit.Rows()) + }) }