Skip to content

Commit

Permalink
ttl: fix the issue that DROP TABLE / ALTER TABLE will keep job ru…
Browse files Browse the repository at this point in the history
…nning (#57707)

close #57556, close #57702
  • Loading branch information
YangKeao authored Nov 27, 2024
1 parent d0de86b commit ba791ab
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 31 deletions.
2 changes: 1 addition & 1 deletion pkg/ttl/ttlworker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const ttlJobTimeout = 6 * time.Hour

const taskManagerLoopTickerInterval = time.Minute
const ttlTaskHeartBeatTickerInterval = time.Minute
const ttlGCInterval = time.Hour
const ttlGCInterval = 10 * time.Minute

func getCheckJobInterval() time.Duration {
failpoint.Inject("check-job-interval", func(val failpoint.Value) time.Duration {
Expand Down
19 changes: 11 additions & 8 deletions pkg/ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ const taskGCTemplate = `DELETE task FROM
WHERE job.table_id IS NULL`

const ttlJobHistoryGCTemplate = `DELETE FROM mysql.tidb_ttl_job_history WHERE create_time < CURDATE() - INTERVAL 90 DAY`
const ttlTableStatusGCWithoutIDTemplate = `DELETE FROM mysql.tidb_ttl_table_status WHERE current_job_status IS NULL`
const ttlTableStatusGCWithIDTemplate = ttlTableStatusGCWithoutIDTemplate + ` AND table_id NOT IN (%s)`
const ttlTableStatusGCWithoutIDTemplate = `DELETE FROM mysql.tidb_ttl_table_status WHERE (current_job_status IS NULL OR current_job_owner_hb_time < %?)`

const timeFormat = time.DateTime

Expand All @@ -81,15 +80,18 @@ func updateHeartBeatSQL(tableID int64, now time.Time, id string) (string, []any)
return updateHeartBeatTemplate, []any{now.Format(timeFormat), tableID, id}
}

func gcTTLTableStatusGCSQL(existIDs []int64) string {
func gcTTLTableStatusGCSQL(existIDs []int64, now time.Time) (string, []any) {
existIDStrs := make([]string, 0, len(existIDs))
for _, id := range existIDs {
existIDStrs = append(existIDStrs, strconv.Itoa(int(id)))
}

hbExpireTime := now.Add(-jobManagerLoopTickerInterval * 2)
args := []any{hbExpireTime.Format(timeFormat)}
if len(existIDStrs) > 0 {
return fmt.Sprintf(ttlTableStatusGCWithIDTemplate, strings.Join(existIDStrs, ","))
return ttlTableStatusGCWithoutIDTemplate + fmt.Sprintf(` AND table_id NOT IN (%s)`, strings.Join(existIDStrs, ",")), args
}
return ttlTableStatusGCWithoutIDTemplate
return ttlTableStatusGCWithoutIDTemplate, args
}

// JobManager schedules and manages the ttl jobs on this instance
Expand Down Expand Up @@ -219,7 +221,7 @@ func (m *JobManager) jobLoop() error {
}
case <-gcTicker:
gcCtx, cancel := context.WithTimeout(m.ctx, ttlInternalSQLTimeout)
m.DoGC(gcCtx, se)
m.DoGC(gcCtx, se, now)
cancel()
// Job Schedule loop:
case <-updateJobHeartBeatTicker:
Expand Down Expand Up @@ -1029,7 +1031,7 @@ func summarizeTaskResult(tasks []*cache.TTLTask) (*TTLSummary, error) {
}

// DoGC deletes some old TTL job histories and redundant scan tasks
func (m *JobManager) DoGC(ctx context.Context, se session.Session) {
func (m *JobManager) DoGC(ctx context.Context, se session.Session, now time.Time) {
// Remove the table not exist in info schema cache.
// Delete the table status before deleting the tasks. Therefore the related tasks
if err := m.updateInfoSchemaCache(se); err == nil {
Expand All @@ -1038,7 +1040,8 @@ func (m *JobManager) DoGC(ctx context.Context, se session.Session) {
for id := range m.infoSchemaCache.Tables {
existIDs = append(existIDs, id)
}
if _, err := se.ExecuteSQL(ctx, gcTTLTableStatusGCSQL(existIDs)); err != nil {
sql, args := gcTTLTableStatusGCSQL(existIDs, now)
if _, err := se.ExecuteSQL(ctx, sql, args...); err != nil {
logutil.Logger(ctx).Warn("fail to gc ttl table status", zap.Error(err))
}
} else {
Expand Down
110 changes: 88 additions & 22 deletions pkg/ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ func TestRescheduleJobsAfterTableDropped(t *testing.T) {
tk.MustExec(rb.resume)
table, err = dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t"))
require.NoError(t, err)
m.DoGC(context.TODO(), se)
m.DoGC(context.TODO(), se, now)
}
}

Expand Down Expand Up @@ -775,7 +775,7 @@ func TestGCScanTasks(t *testing.T) {
return true
})
se := session.NewSession(tk.Session(), tk.Session(), func(_ session.Session) {})
m.DoGC(context.TODO(), se)
m.DoGC(context.TODO(), se, se.Now())
tk.MustQuery("select job_id, scan_id from mysql.tidb_ttl_task order by job_id, scan_id asc").Check(testkit.Rows("1 1", "1 2"))
}

Expand All @@ -794,7 +794,7 @@ func TestGCTableStatus(t *testing.T) {
return true
})
se := session.NewSession(tk.Session(), tk.Session(), func(_ session.Session) {})
m.DoGC(context.TODO(), se)
m.DoGC(context.TODO(), se, se.Now())
tk.MustQuery("select * from mysql.tidb_ttl_table_status").Check(nil)

// insert a running table status without corresponding table
Expand All @@ -808,7 +808,7 @@ func TestGCTableStatus(t *testing.T) {
current_job_ttl_expire = NOW(),
current_job_owner_hb_time = NOW()
WHERE table_id = ?`, 1, 2024)
m.DoGC(context.TODO(), se)
m.DoGC(context.TODO(), se, se.Now())
// it'll not be removed
tk.MustQuery("select current_job_id from mysql.tidb_ttl_table_status").Check(testkit.Rows("1"))
}
Expand Down Expand Up @@ -855,7 +855,7 @@ func TestGCTTLHistory(t *testing.T) {
return true
})
se := session.NewSession(tk.Session(), tk.Session(), func(_ session.Session) {})
m.DoGC(context.TODO(), se)
m.DoGC(context.TODO(), se, se.Now())
tk.MustQuery("select job_id from mysql.tidb_ttl_job_history order by job_id asc").Check(testkit.Rows("1", "2", "3", "4", "5"))
}

Expand Down Expand Up @@ -1475,27 +1475,93 @@ func TestDisableTTLAfterLoseHeartbeat(t *testing.T) {

ctx := context.Background()
m1 := ttlworker.NewJobManager("test-ttl-job-manager-1", nil, store, nil, nil)
require.NoError(t, m1.InfoSchemaCache().Update(se))
require.NoError(t, m1.TableStatusCache().Update(ctx, se))
m2 := ttlworker.NewJobManager("test-ttl-job-manager-2", nil, store, nil, nil)

now := se.Now()
_, err = m1.LockJob(context.Background(), se, m1.InfoSchemaCache().Tables[testTable.Meta().ID], now, uuid.NewString(), false)
require.NoError(t, err)
tk.MustQuery("select current_job_status from mysql.tidb_ttl_table_status").Check(testkit.Rows("running"))

// lose heartbeat. Simulate the situation that m1 doesn't update the hearbeat for 8 hours.
now = now.Add(time.Hour * 8)
acquireJob := func(now time.Time) {
require.NoError(t, m1.InfoSchemaCache().Update(se))
require.NoError(t, m1.TableStatusCache().Update(ctx, se))
require.NoError(t, m2.InfoSchemaCache().Update(se))
require.NoError(t, m2.TableStatusCache().Update(ctx, se))
_, err = m1.LockJob(context.Background(), se, m1.InfoSchemaCache().Tables[testTable.Meta().ID], now, uuid.NewString(), false)
require.NoError(t, err)
tk.MustQuery("select current_job_status from mysql.tidb_ttl_table_status").Check(testkit.Rows("running"))
}
t.Run("disable TTL globally after losing heartbeat", func(t *testing.T) {
// now the TTL job should be scheduled again
now = now.Add(time.Hour * 8)
acquireJob(now)

// stop the tidb_ttl_job_enable
tk.MustExec("set global tidb_ttl_job_enable = 'OFF'")
defer tk.MustExec("set global tidb_ttl_job_enable = 'ON'")
// lose heartbeat. Simulate the situation that m1 doesn't update the hearbeat for 8 hours.
now = now.Add(time.Hour * 8)

// reschedule and try to get the job
m2 := ttlworker.NewJobManager("test-ttl-job-manager-2", nil, store, nil, nil)
require.NoError(t, m2.InfoSchemaCache().Update(se))
require.NoError(t, m2.TableStatusCache().Update(ctx, se))
m2.RescheduleJobs(se, now)
// stop the tidb_ttl_job_enable
tk.MustExec("set global tidb_ttl_job_enable = 'OFF'")
defer tk.MustExec("set global tidb_ttl_job_enable = 'ON'")

// reschedule and try to get the job
require.NoError(t, m2.InfoSchemaCache().Update(se))
require.NoError(t, m2.TableStatusCache().Update(ctx, se))
m2.RescheduleJobs(se, now)

// the job should have been cancelled
tk.MustQuery("select current_job_status from mysql.tidb_ttl_table_status").Check(testkit.Rows("<nil>"))
})

t.Run("disable TTL for a table after losing heartbeat", func(t *testing.T) {
// now the TTL job should be scheduled again
now = now.Add(time.Hour * 8)
acquireJob(now)

// the job should have been cancelled
tk.MustQuery("select current_job_status from mysql.tidb_ttl_table_status").Check(testkit.Rows("<nil>"))
// lose heartbeat. Simulate the situation that m1 doesn't update the hearbeat for 8 hours.
now = now.Add(time.Hour * 8)

tk.MustExec("ALTER TABLE t TTL_ENABLE = 'OFF'")
defer tk.MustExec("ALTER TABLE t TTL_ENABLE = 'ON'")

// reschedule and try to get the job
require.NoError(t, m2.InfoSchemaCache().Update(se))
require.NoError(t, m2.TableStatusCache().Update(ctx, se))
m2.RescheduleJobs(se, now)

// the job cannot be cancelled, because it doesn't exist in the infoschema cache.
tk.MustQuery("select current_job_status from mysql.tidb_ttl_table_status").Check(testkit.Rows("running"))

// run GC
m2.DoGC(ctx, se, now)

// the job should have been cancelled
tk.MustQuery("select current_job_status, current_job_owner_hb_time from mysql.tidb_ttl_table_status").Check(testkit.Rows())
})

t.Run("drop a TTL table after losing heartbeat", func(t *testing.T) {
// now the TTL job should be scheduled again
now = now.Add(time.Hour * 8)
acquireJob(now)

// lose heartbeat. Simulate the situation that m1 doesn't update the hearbeat for 8 hours.
now = now.Add(time.Hour * 8)

tk.MustExec("DROP TABLE t")
defer func() {
tk.MustExec("CREATE TABLE t (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR")
testTable, err = dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t"))
require.NoError(t, err)
}()

// reschedule and try to get the job
require.NoError(t, m2.InfoSchemaCache().Update(se))
require.NoError(t, m2.TableStatusCache().Update(ctx, se))
m2.RescheduleJobs(se, now)

// the job cannot be cancelled, because it doesn't exist in the infoschema cache.
tk.MustQuery("select current_job_status from mysql.tidb_ttl_table_status").Check(testkit.Rows("running"))

// run GC
m2.DoGC(ctx, se, now)

// the job should have been cancelled
tk.MustQuery("select current_job_status, current_job_owner_hb_time from mysql.tidb_ttl_table_status").Check(testkit.Rows())
})
}

0 comments on commit ba791ab

Please sign in to comment.