Skip to content

Commit

Permalink
ttl: fix ttl job start time will be override after failover (#44768)
Browse files Browse the repository at this point in the history
close #44767
  • Loading branch information
lcwangchao authored Jun 21, 2023
1 parent 25eb7a1 commit 689bedf
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 7 deletions.
8 changes: 5 additions & 3 deletions ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ func insertNewTableIntoStatusSQL(tableID int64, parentTableID int64) (string, []
return insertNewTableIntoStatusTemplate, []interface{}{tableID, parentTableID}
}

func setTableStatusOwnerSQL(uuid string, tableID int64, now time.Time, currentJobTTLExpire time.Time, id string) (string, []interface{}) {
return setTableStatusOwnerTemplate, []interface{}{uuid, id, now.Format(timeFormat), now.Format(timeFormat), currentJobTTLExpire.Format(timeFormat), now.Format(timeFormat), tableID}
func setTableStatusOwnerSQL(uuid string, tableID int64, jobStart time.Time, now time.Time, currentJobTTLExpire time.Time, id string) (string, []interface{}) {
return setTableStatusOwnerTemplate, []interface{}{uuid, id, jobStart, now.Format(timeFormat), currentJobTTLExpire.Format(timeFormat), now.Format(timeFormat), tableID}
}

func updateHeartBeatSQL(tableID int64, now time.Time, id string) (string, []interface{}) {
Expand Down Expand Up @@ -589,19 +589,21 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *
}

jobID = uuid.New().String()
jobStart := now
jobExist := false
if len(tableStatus.CurrentJobID) > 0 {
// don't create new job if there is already one running
// so the running tasks don't need to be cancelled
jobID = tableStatus.CurrentJobID
expireTime = tableStatus.CurrentJobTTLExpire
jobStart = tableStatus.CurrentJobStartTime
jobExist = true
}
failpoint.Inject("set-job-uuid", func(val failpoint.Value) {
jobID = val.(string)
})

sql, args = setTableStatusOwnerSQL(jobID, table.ID, now, expireTime, m.id)
sql, args = setTableStatusOwnerSQL(jobID, table.ID, jobStart, now, expireTime, m.id)
_, err = se.ExecuteSQL(ctx, sql, args...)
if err != nil {
return errors.Wrapf(err, "execute sql: %s", sql)
Expand Down
24 changes: 23 additions & 1 deletion ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,8 +475,30 @@ func TestJobTimeout(t *testing.T) {
// there is already a task
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1"))

m2 := ttlworker.NewJobManager("manager-2", nil, store, nil)
m2.TaskManager().ResizeWorkersWithSysVar()
require.NoError(t, m2.InfoSchemaCache().Update(se))
// schedule jobs
now = now.Add(10 * time.Minute)
m2.RescheduleJobs(se, now)
jobs := m2.RunningJobs()
require.Equal(t, 1, len(jobs))
require.Equal(t, jobs[0].ID(), tableStatus.CurrentJobID)

// check job has taken by another manager
sql, args = cache.SelectFromTTLTableStatusWithID(table.Meta().ID)
rows, err = se.ExecuteSQL(ctx, sql, args...)
require.NoError(t, err)
newTableStatus, err := cache.RowToTableStatus(se, rows[0])
require.NoError(t, err)
require.Equal(t, "manager-2", newTableStatus.CurrentJobOwnerID)
require.Equal(t, tableStatus.CurrentJobID, newTableStatus.CurrentJobID)
require.Equal(t, tableStatus.CurrentJobStartTime, newTableStatus.CurrentJobStartTime)
require.Equal(t, tableStatus.CurrentJobTTLExpire, newTableStatus.CurrentJobTTLExpire)
require.Equal(t, now.Unix(), newTableStatus.CurrentJobOwnerHBTime.Unix())

// the timeout will be checked while updating heartbeat
require.NoError(t, m.UpdateHeartBeat(ctx, se, now.Add(7*time.Hour)))
require.NoError(t, m2.UpdateHeartBeat(ctx, se, now.Add(7*time.Hour)))
tk.MustQuery("select last_job_summary->>'$.scan_task_err' from mysql.tidb_ttl_table_status").Check(testkit.Rows("job is timeout"))
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0"))
}
Expand Down
6 changes: 3 additions & 3 deletions ttl/ttlworker/job_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func TestLockNewTable(t *testing.T) {
newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil,
},
{
getExecuteInfo(setTableStatusOwnerSQL("test-job-id", 1, now, expireTime, "test-id")),
getExecuteInfo(setTableStatusOwnerSQL("test-job-id", 1, now, now, expireTime, "test-id")),
nil, nil,
},
{
Expand Down Expand Up @@ -326,7 +326,7 @@ func TestLockNewTable(t *testing.T) {
newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil,
},
{
getExecuteInfo(setTableStatusOwnerSQL("test-job-id", 1, now, expireTime, "test-id")),
getExecuteInfo(setTableStatusOwnerSQL("test-job-id", 1, now, now, expireTime, "test-id")),
nil, nil,
},
{
Expand All @@ -348,7 +348,7 @@ func TestLockNewTable(t *testing.T) {
newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil,
},
{
getExecuteInfo(setTableStatusOwnerSQL("test-job-id", 1, now, expireTime, "test-id")),
getExecuteInfo(setTableStatusOwnerSQL("test-job-id", 1, now, now, expireTime, "test-id")),
nil, errors.New("test error message"),
},
{
Expand Down

0 comments on commit 689bedf

Please sign in to comment.