Skip to content

Commit

Permalink
ttl: fix a bug that a TTL task will not be taken by other tidb after …
Browse files Browse the repository at this point in the history
…failover sometimes (#45023) (#45039)

close #45022
  • Loading branch information
ti-chi-bot authored Jun 29, 2023
1 parent 731bd28 commit 44d13a3
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 10 deletions.
9 changes: 7 additions & 2 deletions ttl/ttlworker/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func (m *taskManager) lockScanTask(se session.Session, task *cache.TTLTask, now
if err != nil {
return errors.Wrapf(err, "execute sql: %s", countRunningTasks)
}
if !m.meetTTLRunningTask(int(rows[0].GetInt64(0))) {
if !m.meetTTLRunningTask(int(rows[0].GetInt64(0)), task.Status) {
return errors.WithStack(errTooManyRunningTasks)
}

Expand Down Expand Up @@ -544,7 +544,12 @@ func (m *taskManager) reportMetrics() {
metrics.DeletingTaskCnt.Set(float64(deletingTaskCnt))
}

func (m *taskManager) meetTTLRunningTask(count int) bool {
func (m *taskManager) meetTTLRunningTask(count int, taskStatus cache.TaskStatus) bool {
if taskStatus == cache.TaskStatusRunning {
// always return true for already running task because it is already included in count
return true
}

ttlRunningTask := variable.TTLRunningTasks.Load()
// `-1` is the auto value, means we should calculate the limit according to the count of TiKV
if ttlRunningTask != -1 {
Expand Down
15 changes: 9 additions & 6 deletions ttl/ttlworker/task_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,16 +325,19 @@ func TestMeetTTLRunningTasks(t *testing.T) {
tk := testkit.NewTestKit(t, store)

// -1, the default value, means the count of TiKV
require.True(t, dom.TTLJobManager().TaskManager().MeetTTLRunningTasks(2))
require.False(t, dom.TTLJobManager().TaskManager().MeetTTLRunningTasks(3))
require.True(t, dom.TTLJobManager().TaskManager().MeetTTLRunningTasks(2, cache.TaskStatusWaiting))
require.False(t, dom.TTLJobManager().TaskManager().MeetTTLRunningTasks(3, cache.TaskStatusWaiting))
require.True(t, dom.TTLJobManager().TaskManager().MeetTTLRunningTasks(2, cache.TaskStatusRunning))

// positive number means the limitation
tk.MustExec("set global tidb_ttl_running_tasks = 32")
require.False(t, dom.TTLJobManager().TaskManager().MeetTTLRunningTasks(32))
require.True(t, dom.TTLJobManager().TaskManager().MeetTTLRunningTasks(31))
require.False(t, dom.TTLJobManager().TaskManager().MeetTTLRunningTasks(32, cache.TaskStatusWaiting))
require.True(t, dom.TTLJobManager().TaskManager().MeetTTLRunningTasks(31, cache.TaskStatusWaiting))
require.True(t, dom.TTLJobManager().TaskManager().MeetTTLRunningTasks(32, cache.TaskStatusRunning))

// set it back to auto value
tk.MustExec("set global tidb_ttl_running_tasks = -1")
require.True(t, dom.TTLJobManager().TaskManager().MeetTTLRunningTasks(2))
require.False(t, dom.TTLJobManager().TaskManager().MeetTTLRunningTasks(3))
require.True(t, dom.TTLJobManager().TaskManager().MeetTTLRunningTasks(2, cache.TaskStatusWaiting))
require.False(t, dom.TTLJobManager().TaskManager().MeetTTLRunningTasks(3, cache.TaskStatusWaiting))
require.True(t, dom.TTLJobManager().TaskManager().MeetTTLRunningTasks(3, cache.TaskStatusRunning))
}
4 changes: 2 additions & 2 deletions ttl/ttlworker/task_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ func (m *taskManager) GetRunningTasks() []*runningScanTask {
}

// MeetTTLRunningTasks is an exported version of meetTTLRunningTask
func (m *taskManager) MeetTTLRunningTasks(count int) bool {
return m.meetTTLRunningTask(count)
func (m *taskManager) MeetTTLRunningTasks(count int, taskStatus cache.TaskStatus) bool {
return m.meetTTLRunningTask(count, taskStatus)
}

// ReportTaskFinished is an exported version of reportTaskFinished
Expand Down

0 comments on commit 44d13a3

Please sign in to comment.