From d2eca727f5c8bf8c3390740ce2c2a64d99e6e1e0 Mon Sep 17 00:00:00 2001 From: YangKeao Date: Tue, 13 Dec 2022 12:46:53 -0500 Subject: [PATCH] ttl: reschedule scan tasks after update task state (#39891) close pingcap/tidb#39890 --- ttl/ttlworker/job_manager.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/ttl/ttlworker/job_manager.go b/ttl/ttlworker/job_manager.go index 10e0e8751173e..cd3c1208f1f37 100644 --- a/ttl/ttlworker/job_manager.go +++ b/ttl/ttlworker/job_manager.go @@ -143,9 +143,13 @@ func (m *JobManager) jobLoop() error { } cancel() case <-updateScanTaskStateTicker: - m.updateTaskState() + if m.updateTaskState() { + m.rescheduleJobs(se, now) + } case <-m.notifyStateCh: - m.updateTaskState() + if m.updateTaskState() { + m.rescheduleJobs(se, now) + } case <-jobCheckTicker: m.checkFinishedJob(se, now) m.checkNotOwnJob() @@ -263,7 +267,8 @@ func (m *JobManager) resizeWorkers(workers []worker, count int, factory func() w return workers, nil, nil } -func (m *JobManager) updateTaskState() { +// updateTaskState polls the result from scan worker and returns whether there are result polled +func (m *JobManager) updateTaskState() bool { results := m.pollScanWorkerResults() for _, result := range results { job := findJobWithTableID(m.runningJobs, result.task.tbl.ID) @@ -276,6 +281,8 @@ func (m *JobManager) updateTaskState() { job.finishedScanTaskCounter += 1 job.scanTaskErr = multierr.Append(job.scanTaskErr, result.err) } + + return len(results) > 0 } func (m *JobManager) pollScanWorkerResults() []*ttlScanTaskExecResult {