Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ttl: cancel the hearbeat timeout job after disable the TTL (#57452) #57491

Open
wants to merge 1 commit into
base: release-7.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,41 @@ j:
}

func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) {
<<<<<<< HEAD:ttl/ttlworker/job_manager.go
if !variable.EnableTTLJob.Load() || !timeutil.WithinDayTimePeriod(variable.TTLJobScheduleWindowStartTime.Load(), variable.TTLJobScheduleWindowEndTime.Load(), now) {
=======
tz, err := se.GlobalTimeZone(m.ctx)
if err != nil {
terror.Log(err)
} else {
now = now.In(tz)
}

// Try to lock HB timeout jobs, to avoid the case that when the `tidb_ttl_job_enable = 'OFF'`, the HB timeout job will
// never be cancelled.
jobTables := m.readyForLockHBTimeoutJobTables(now)
// TODO: also consider to resume tables, but it's fine to left them there, as other nodes will take this job
// when the heart beat is not sent
for _, table := range jobTables {
logutil.Logger(m.ctx).Info("try lock new job", zap.Int64("tableID", table.ID))
if _, err := m.lockHBTimeoutJob(m.ctx, se, table, now); err != nil {
logutil.Logger(m.ctx).Warn("failed to lock heartbeat timeout job", zap.Error(err))
}
}

cancelJobs := false
cancelReason := ""
switch {
case !variable.EnableTTLJob.Load():
cancelJobs = true
cancelReason = "tidb_ttl_job_enable turned off"
case !timeutil.WithinDayTimePeriod(variable.TTLJobScheduleWindowStartTime.Load(), variable.TTLJobScheduleWindowEndTime.Load(), now):
cancelJobs = true
cancelReason = "out of TTL job schedule window"
}

if cancelJobs {
>>>>>>> afe8a09e928 (ttl: cancel the hearbeat timeout job after disable the TTL (#57452)):pkg/ttl/ttlworker/job_manager.go
if len(m.runningJobs) > 0 {
for _, job := range m.runningJobs {
logutil.Logger(m.ctx).Info("cancel job because tidb_ttl_job_enable turned off", zap.String("jobID", job.id))
Expand Down Expand Up @@ -487,6 +521,7 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) {
}
m.removeJob(job)
}
<<<<<<< HEAD:ttl/ttlworker/job_manager.go

// don't lock job if there's no free scan workers in local
// it's a mechanism to avoid too many scan tasks waiting in the ttl_tasks table.
Expand All @@ -508,6 +543,8 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) {
logutil.Logger(m.ctx).Warn("fail to create new job", zap.Error(err))
}
}
=======
>>>>>>> afe8a09e928 (ttl: cancel the hearbeat timeout job after disable the TTL (#57452)):pkg/ttl/ttlworker/job_manager.go
}

func (m *JobManager) localJobs() []*ttlJob {
Expand Down
40 changes: 40 additions & 0 deletions ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1081,3 +1081,43 @@ func TestGetSession(t *testing.T) {
tk.MustQuery("select @@tidb_retry_limit, @@tidb_enable_1pc, @@tidb_enable_async_commit, @@tidb_isolation_read_engines").
Check(testkit.Rows("1 0 0 tiflash,tidb"))
}

func TestDisableTTLAfterLoseHeartbeat(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
waitAndStopTTLManager(t, dom)
tk := testkit.NewTestKit(t, store)

sessionFactory := sessionFactory(t, store)
se := sessionFactory()

tk.MustExec("use test")
tk.MustExec("CREATE TABLE t (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR")
testTable, err := dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t"))
require.NoError(t, err)

ctx := context.Background()
m1 := ttlworker.NewJobManager("test-ttl-job-manager-1", nil, store, nil, nil)
require.NoError(t, m1.InfoSchemaCache().Update(se))
require.NoError(t, m1.TableStatusCache().Update(ctx, se))

now := se.Now()
_, err = m1.LockJob(context.Background(), se, m1.InfoSchemaCache().Tables[testTable.Meta().ID], now, uuid.NewString(), false)
require.NoError(t, err)
tk.MustQuery("select current_job_status from mysql.tidb_ttl_table_status").Check(testkit.Rows("running"))

// lose heartbeat. Simulate the situation that m1 doesn't update the hearbeat for 8 hours.
now = now.Add(time.Hour * 8)

// stop the tidb_ttl_job_enable
tk.MustExec("set global tidb_ttl_job_enable = 'OFF'")
defer tk.MustExec("set global tidb_ttl_job_enable = 'ON'")

// reschedule and try to get the job
m2 := ttlworker.NewJobManager("test-ttl-job-manager-2", nil, store, nil, nil)
require.NoError(t, m2.InfoSchemaCache().Update(se))
require.NoError(t, m2.TableStatusCache().Update(ctx, se))
m2.RescheduleJobs(se, now)

// the job should have been cancelled
tk.MustQuery("select current_job_status from mysql.tidb_ttl_table_status").Check(testkit.Rows("<nil>"))
}