From c0459da30d7c188f7fe0420e80a977b8e473bbfc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E8=B6=85?= Date: Wed, 26 Jul 2023 12:46:33 +0800 Subject: [PATCH] ttl: ttl use timer framework to trigger jobs (#45469) close pingcap/tidb#45468 --- domain/domain.go | 2 +- timer/runtime/runtime.go | 7 + timer/runtime/runtime_test.go | 2 + timer/runtime/worker.go | 2 +- ttl/client/command.go | 4 +- ttl/client/notification.go | 34 +- ttl/ttlworker/BUILD.bazel | 8 +- ttl/ttlworker/job_manager.go | 390 +++++++++++++++--- ttl/ttlworker/job_manager_integration_test.go | 376 +++++++++++++++-- ttl/ttlworker/job_manager_test.go | 204 +++++---- ttl/ttlworker/session_test.go | 4 + ttl/ttlworker/task_manager.go | 20 +- ttl/ttlworker/task_manager_test.go | 38 ++ ttl/ttlworker/timer.go | 2 +- ttl/ttlworker/timer_sync.go | 37 +- ttl/ttlworker/timer_sync_test.go | 60 +++ 16 files changed, 1005 insertions(+), 185 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index 02d50a062002f..541a90be5dce7 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -3093,7 +3093,7 @@ func (do *Domain) StartTTLJobManager() { logutil.BgLogger().Info("ttlJobManager exited.") }() - ttlJobManager := ttlworker.NewJobManager(do.ddl.GetID(), do.sysSessionPool, do.store, do.etcdClient) + ttlJobManager := ttlworker.NewJobManager(do.ddl.GetID(), do.sysSessionPool, do.store, do.etcdClient, do.ddl.OwnerManager().IsOwner) do.ttlJobManager.Store(ttlJobManager) ttlJobManager.Start() diff --git a/timer/runtime/runtime.go b/timer/runtime/runtime.go index b60df8a0c28dc..7b55752156ec4 100644 --- a/timer/runtime/runtime.go +++ b/timer/runtime/runtime.go @@ -115,6 +115,13 @@ func (rt *TimerGroupRuntime) Start() { go rt.loop() } +// Running returns whether the runtime is running +func (rt *TimerGroupRuntime) Running() bool { + rt.mu.Lock() + defer rt.mu.Unlock() + return rt.ctx != nil && rt.cancel != nil +} + func (rt *TimerGroupRuntime) initCtx() { rt.ctx, rt.cancel = context.WithCancel(context.Background()) } diff --git a/timer/runtime/runtime_test.go b/timer/runtime/runtime_test.go index 7db15545df615..8c9a0c9f2781a 100644 --- a/timer/runtime/runtime_test.go +++ b/timer/runtime/runtime_test.go @@ -67,9 +67,11 @@ func TestRuntimeStartStop(t *testing.T) { Build() runtime.Start() + require.True(t, runtime.Running()) waitDone(timerProcessed, time.Minute) go func() { runtime.Stop() + require.False(t, runtime.Running()) cancel() }() waitDone(ctx.Done(), time.Minute) diff --git a/timer/runtime/worker.go b/timer/runtime/worker.go index a1eb6a79a3f1f..48e3ade347a59 100644 --- a/timer/runtime/worker.go +++ b/timer/runtime/worker.go @@ -26,7 +26,7 @@ import ( ) const ( - workerRecvChanCap = 8 + workerRecvChanCap = 128 workerRespChanCap = 128 workerEventDefaultRetryInterval = 10 * time.Second chanBlockInterval = time.Second diff --git a/ttl/client/command.go b/ttl/client/command.go index 837c22324d4f0..3ec5d0c9be376 100644 --- a/ttl/client/command.go +++ b/ttl/client/command.go @@ -74,8 +74,8 @@ type TriggerNewTTLJobTableResult struct { DBName string `json:"db_name"` TableName string `json:"table_name"` PartitionName string `json:"partition_name,omitempty"` - JobID string `json:"job_id"` - ErrorMessage string `json:"error_message"` + JobID string `json:"job_id,omitempty"` + ErrorMessage string `json:"error_message,omitempty"` } // TriggerNewTTLJobResponse is the response detail for trigger_ttl_job command diff --git a/ttl/client/notification.go b/ttl/client/notification.go index d65405b5bf06b..2778546426dc5 100644 --- a/ttl/client/notification.go +++ b/ttl/client/notification.go @@ -58,13 +58,41 @@ func NewMockNotificationClient() NotificationClient { } // Notify implements the NotificationClient -func (c *mockClient) Notify(_ context.Context, typ string, data string) error { +func (c *mockClient) Notify(ctx context.Context, typ string, data string) error { c.Lock() defer c.Unlock() - for _, ch := range c.notificationWatchers[typ] { - ch <- clientv3.WatchResponse{} + watchers, ok := c.notificationWatchers[typ] + if !ok { + return nil } + + var unsent []chan clientv3.WatchResponse +loop: + for i, ch := range watchers { + select { + case <-ctx.Done(): + return ctx.Err() + case ch <- clientv3.WatchResponse{}: + default: + unsent = make([]chan clientv3.WatchResponse, len(watchers), 0) + copy(unsent, watchers[i:]) + break loop + } + } + + if len(unsent) > 0 { + go func() { + for _, ch := range unsent { + select { + case <-ctx.Done(): + return + case ch <- clientv3.WatchResponse{}: + } + } + }() + } + return nil } diff --git a/ttl/ttlworker/BUILD.bazel b/ttl/ttlworker/BUILD.bazel index 4debd916534a0..f830e7568fa63 100644 --- a/ttl/ttlworker/BUILD.bazel +++ b/ttl/ttlworker/BUILD.bazel @@ -26,6 +26,7 @@ go_library( "//store/driver/error", "//timer/api", "//timer/runtime", + "//timer/tablestore", "//ttl/cache", "//ttl/client", "//ttl/metrics", @@ -37,12 +38,13 @@ go_library( "//util/logutil", "//util/sqlexec", "//util/timeutil", - "@com_github_google_uuid//:uuid", "@com_github_ngaut_pools//:pools", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_tikv_client_go_v2//tikv", + "@com_github_tikv_client_go_v2//tikvrpc", "@io_etcd_go_etcd_client_v3//:client", + "@org_golang_x_exp//maps", "@org_golang_x_exp//slices", "@org_golang_x_time//rate", "@org_uber_go_multierr//:multierr", @@ -67,7 +69,7 @@ go_test( embed = [":ttlworker"], flaky = True, race = "on", - shard_count = 41, + shard_count = 46, deps = [ "//domain", "//infoschema", @@ -99,6 +101,8 @@ go_test( "@com_github_stretchr_testify//mock", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//testutils", + "@com_github_tikv_client_go_v2//tikv", + "@com_github_tikv_client_go_v2//tikvrpc", "@org_golang_x_time//rate", "@org_uber_go_atomic//:atomic", "@org_uber_go_zap//:zap", diff --git a/ttl/ttlworker/job_manager.go b/ttl/ttlworker/job_manager.go index 962072b0c56e6..2eff07f4b5ded 100644 --- a/ttl/ttlworker/job_manager.go +++ b/ttl/ttlworker/job_manager.go @@ -22,12 +22,13 @@ import ( "strings" "time" - "github.com/google/uuid" "github.com/pingcap/errors" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/sessionctx/variable" + timerapi "github.com/pingcap/tidb/timer/api" + ttltablestore "github.com/pingcap/tidb/timer/tablestore" "github.com/pingcap/tidb/ttl/cache" "github.com/pingcap/tidb/ttl/client" "github.com/pingcap/tidb/ttl/metrics" @@ -68,7 +69,7 @@ func insertNewTableIntoStatusSQL(tableID int64, parentTableID int64) (string, [] } func setTableStatusOwnerSQL(uuid string, tableID int64, jobStart time.Time, now time.Time, currentJobTTLExpire time.Time, id string) (string, []interface{}) { - return setTableStatusOwnerTemplate, []interface{}{uuid, id, jobStart, now.Format(timeFormat), currentJobTTLExpire.Format(timeFormat), now.Format(timeFormat), tableID} + return setTableStatusOwnerTemplate, []interface{}{uuid, id, jobStart.Format(timeFormat), now.Format(timeFormat), currentJobTTLExpire.Format(timeFormat), now.Format(timeFormat), tableID} } func updateHeartBeatSQL(tableID int64, now time.Time, id string) (string, []interface{}) { @@ -90,6 +91,7 @@ type JobManager struct { store kv.Storage cmdCli client.CommandClient notificationCli client.NotificationClient + etcd *clientv3.Client // infoSchemaCache and tableStatusCache are a cache stores the information from info schema and the tidb_ttl_table_status // table. They don't need to be protected by mutex, because they are only used in job loop goroutine. @@ -105,10 +107,11 @@ type JobManager struct { taskManager *taskManager lastReportDelayMetricsTime time.Time + leaderFunc func() bool } // NewJobManager creates a new ttl job manager -func NewJobManager(id string, sessPool sessionPool, store kv.Storage, etcdCli *clientv3.Client) (manager *JobManager) { +func NewJobManager(id string, sessPool sessionPool, store kv.Storage, etcdCli *clientv3.Client, leaderFunc func() bool) (manager *JobManager) { manager = &JobManager{} manager.id = id manager.store = store @@ -123,23 +126,35 @@ func NewJobManager(id string, sessPool sessionPool, store kv.Storage, etcdCli *c if etcdCli != nil { manager.cmdCli = client.NewCommandClient(etcdCli) manager.notificationCli = client.NewNotificationClient(etcdCli) + manager.etcd = etcdCli } else { manager.cmdCli = client.NewMockCommandClient() manager.notificationCli = client.NewMockNotificationClient() } manager.taskManager = newTaskManager(manager.ctx, sessPool, manager.infoSchemaCache, id, store) - + manager.leaderFunc = leaderFunc return } +func (m *JobManager) isLeader() bool { + return m.leaderFunc != nil && m.leaderFunc() +} + func (m *JobManager) jobLoop() error { se, err := getSession(m.sessPool) if err != nil { return err } + timerStore := ttltablestore.NewTableTimerStore(1, m.sessPool, "mysql", "tidb_timers", m.etcd) + jobRequestCh := make(chan *SubmitTTLManagerJobRequest) + adapter := NewManagerJobAdapter(m.store, m.sessPool, jobRequestCh) + timerRT := newTTLTimerRuntime(timerStore, adapter) + timerSyncer := NewTTLTimerSyncer(m.sessPool, timerapi.NewDefaultTimerClient(timerStore)) defer func() { + timerRT.Pause() + timerStore.Close() err = multierr.Combine(err, multierr.Combine(m.taskManager.resizeScanWorkers(0), m.taskManager.resizeDelWorkers(0))) se.Close() logutil.Logger(m.ctx).Info("ttlJobManager loop exited.") @@ -153,6 +168,7 @@ func (m *JobManager) jobLoop() error { scheduleJobTicker := time.Tick(jobManagerLoopTickerInterval) jobCheckTicker := time.Tick(jobManagerLoopTickerInterval) updateJobHeartBeatTicker := time.Tick(jobManagerLoopTickerInterval) + timerTicker := time.Tick(time.Second) scheduleTaskTicker := time.Tick(getTaskManagerLoopTickerInterval()) updateTaskHeartBeatTicker := time.Tick(ttlTaskHeartBeatTickerInterval) @@ -171,6 +187,10 @@ func (m *JobManager) jobLoop() error { // misc case <-m.ctx.Done(): return nil + case <-timerTicker: + m.onTimerTick(se, timerRT, timerSyncer, time.Now()) + case jobReq := <-jobRequestCh: + m.handleSubmitJobRequest(se, jobReq) case <-infoSchemaCacheUpdateTicker: err := m.updateInfoSchemaCache(se) if err != nil { @@ -210,8 +230,7 @@ func (m *JobManager) jobLoop() error { } if triggerJobCmd, ok := cmd.GetTriggerTTLJobRequest(); ok { - m.triggerTTLJob(cmd.RequestID, triggerJobCmd, se) - m.rescheduleJobs(se, now) + m.triggerTTLJob(cmd.RequestID, triggerJobCmd, se, timerStore) } // Task Manager Loop @@ -252,12 +271,54 @@ func (m *JobManager) jobLoop() error { } } -func (m *JobManager) triggerTTLJob(requestID string, cmd *client.TriggerNewTTLJobRequest, se session.Session) { - if len(m.runningJobs) > 0 { - // sleep 2 seconds to make sure the TiDB without any job running in it to have a higher priority to take a new job. - time.Sleep(2 * time.Second) +func (m *JobManager) onTimerTick(se session.Session, rt *ttlTimerRuntime, syncer *TTLTimersSyncer, now time.Time) { + if !m.isLeader() { + rt.Pause() + syncer.Reset() + return + } + + rt.Resume() + lastSyncTime, lastSyncVer := syncer.GetLastSyncInfo() + sinceLastSync := now.Sub(lastSyncTime) + if sinceLastSync < 5*time.Second { + // limit timer sync frequency by every 5 seconds + return + } + + is := se.GetDomainInfoSchema().(infoschema.InfoSchema) + if is.SchemaMetaVersion() > lastSyncVer || sinceLastSync > 2*time.Minute { + // only sync timer when information schema version upgraded, or it has not been synced for more than 2 minutes. + syncer.SyncTimers(m.ctx, is) + } +} + +func (m *JobManager) handleSubmitJobRequest(se session.Session, jobReq *SubmitTTLManagerJobRequest) { + if !m.isLeader() { + jobReq.RespCh <- errors.Errorf("current TTL manager is not the leader") + return + } + + if err := m.updateInfoSchemaCache(se); err != nil { + logutil.Logger(m.ctx).Warn("failed to update info schema cache", zap.Error(err)) + } + + tbl, ok := m.infoSchemaCache.Tables[jobReq.PhysicalID] + if !ok { + jobReq.RespCh <- errors.Errorf("physical id: %d not exists in information schema", jobReq.PhysicalID) + return + } + + if tbl.TableInfo.ID != jobReq.TableID { + jobReq.RespCh <- errors.Errorf("table id '%d' != '%d' for physical table with id: %d", jobReq.TableID, tbl.TableInfo.ID, jobReq.PhysicalID) + return } + _, err := m.lockNewJob(m.ctx, se, tbl, time.Now(), jobReq.RequestID, false) + jobReq.RespCh <- err +} + +func (m *JobManager) triggerTTLJob(requestID string, cmd *client.TriggerNewTTLJobRequest, se session.Session, store *timerapi.TimerStore) { ok, err := m.cmdCli.TakeCommand(m.ctx, requestID) if err != nil { logutil.BgLogger().Error("failed to take TTL trigger job command", @@ -280,6 +341,16 @@ func (m *JobManager) triggerTTLJob(requestID string, cmd *client.TriggerNewTTLJo terror.Log(m.cmdCli.ResponseCommand(m.ctx, requestID, err)) } + if !variable.EnableTTLJob.Load() { + responseErr(errors.New("tidb_ttl_job_enable is disabled")) + return + } + + if !timeutil.WithinDayTimePeriod(variable.TTLJobScheduleWindowStartTime.Load(), variable.TTLJobScheduleWindowEndTime.Load(), time.Now()) { + responseErr(errors.New("not in TTL job window")) + return + } + if err = m.infoSchemaCache.Update(se); err != nil { responseErr(err) return @@ -302,46 +373,95 @@ func (m *JobManager) triggerTTLJob(requestID string, cmd *client.TriggerNewTTLJo return } - now := time.Now() + syncer := NewTTLTimerSyncer(m.sessPool, timerapi.NewDefaultTimerClient(store)) tableResults := make([]*client.TriggerNewTTLJobTableResult, 0, len(tables)) - allError := true - var firstError error - for _, ttlTbl := range tables { + getJobFns := make([]func() (string, bool, error), 0, len(tables)) + ctx, cancel := context.WithTimeout(m.ctx, 5*time.Minute) + for _, tbl := range tables { tblResult := &client.TriggerNewTTLJobTableResult{ - TableID: ttlTbl.ID, + TableID: tbl.ID, DBName: cmd.DBName, TableName: cmd.TableName, - PartitionName: ttlTbl.Partition.O, + PartitionName: tbl.Partition.O, } + tableResults = append(tableResults, tblResult) - jobID := uuid.NewString() - if _, err = m.lockNewJob(m.ctx, se, ttlTbl, now, jobID, false); err != nil { - firstError = err + fn, err := syncer.ManualTriggerTTLTimer(m.ctx, tbl) + if err != nil { tblResult.ErrorMessage = err.Error() - tableResults = append(tableResults, tblResult) - continue + getJobFns = append(getJobFns, nil) + } else { + getJobFns = append(getJobFns, fn) } - allError = false - tblResult.JobID = jobID - tableResults = append(tableResults, tblResult) } - if allError { - responseErr(firstError) - return - } + go func() { + defer cancel() + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + loop: + for { + select { + case <-ctx.Done(): + break loop + case <-ticker.C: + } - terror.Log(m.cmdCli.ResponseCommand(m.ctx, requestID, &client.TriggerNewTTLJobResponse{ - TableResult: tableResults, - })) + for i := range getJobFns { + fn := getJobFns[i] + if fn == nil { + continue + } - tableResultsJSON, _ := json.Marshal(tableResults) - logutil.BgLogger().Info("Done to trigger a new TTL job", - zap.String("requestID", requestID), - zap.String("database", cmd.DBName), - zap.String("table", cmd.TableName), - zap.ByteString("tableResults", tableResultsJSON), - ) + jobID, ok, err := fn() + if err != nil { + tableResults[i].ErrorMessage = err.Error() + getJobFns[i] = nil + continue + } + + if ok { + tableResults[i].JobID = jobID + getJobFns[i] = nil + } + } + + for _, fn := range getJobFns { + if fn != nil { + continue loop + } + } + break + } + + allError := true + for _, r := range tableResults { + if r.JobID == "" && r.ErrorMessage == "" { + r.ErrorMessage = "timeout" + } + + if r.ErrorMessage == "" { + allError = false + } + } + + if allError { + responseErr(errors.New(tableResults[0].ErrorMessage)) + return + } + + terror.Log(m.cmdCli.ResponseCommand(m.ctx, requestID, &client.TriggerNewTTLJobResponse{ + TableResult: tableResults, + })) + + tableResultsJSON, _ := json.Marshal(tableResults) + logutil.BgLogger().Info("Done to trigger a new TTL job", + zap.String("requestID", requestID), + zap.String("database", cmd.DBName), + zap.String("table", cmd.TableName), + zap.ByteString("tableResults", tableResultsJSON), + ) + }() } func (m *JobManager) reportMetrics(se session.Session) { @@ -441,26 +561,13 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) { return } - // don't lock job if there's no free scan workers in local - // it's a mechanism to avoid too many scan tasks waiting in the ttl_tasks table. - if len(m.taskManager.idleScanWorkers()) == 0 { - return - } - - jobTables, isCreate := m.readyForLockJobTables(now) + jobTables := m.readyForLockHBTimeoutJobTables(now) // TODO: also consider to resume tables, but it's fine to left them there, as other nodes will take this job // when the heart beat is not sent - for i, table := range jobTables { + for _, table := range jobTables { logutil.Logger(m.ctx).Info("try lock new job", zap.Int64("tableID", table.ID)) - var err error - if isCreate[i] { - _, err = m.lockNewJob(m.ctx, se, table, now, uuid.NewString(), true) - } else { - _, err = m.lockHBTimeoutJob(m.ctx, se, table, now) - } - - if err != nil { - logutil.Logger(m.ctx).Warn("fail to create new job", zap.Error(err)) + if _, err := m.lockHBTimeoutJob(m.ctx, se, table, now); err != nil { + logutil.Logger(m.ctx).Warn("failed to lock heartbeat timeout job", zap.Error(err)) } } } @@ -479,11 +586,9 @@ func (m *JobManager) localJobs() []*ttlJob { return jobs } -// readyForLockJobTables returns all tables which should spawn a TTL job according to cache -func (m *JobManager) readyForLockJobTables(now time.Time) ([]*cache.PhysicalTable, []bool) { +// readyForLockHBTimeoutJobTables returns all tables whose job is timeout and should be taken over +func (m *JobManager) readyForLockHBTimeoutJobTables(now time.Time) []*cache.PhysicalTable { tables := make([]*cache.PhysicalTable, 0, len(m.infoSchemaCache.Tables)) - isCreate := make([]bool, 0, cap(tables)) - tblLoop: for _, table := range m.infoSchemaCache.Tables { // If this node already has a job for this table, just ignore. @@ -496,16 +601,12 @@ tblLoop: } status := m.tableStatusCache.Tables[table.ID] - if m.couldLockJob(status, table, now, true, true) { - tables = append(tables, table) - isCreate = append(isCreate, true) - } else if m.couldLockJob(status, table, now, false, false) { + if m.couldLockJob(status, table, now, false, false) { tables = append(tables, table) - isCreate = append(isCreate, false) } } - return tables, isCreate + return tables } // couldLockJob returns whether a table should be tried to run TTL @@ -954,3 +1055,162 @@ GROUP BY return records, nil } + +// SubmitTTLManagerJobRequest is the request to submit a TTL job to manager +type SubmitTTLManagerJobRequest struct { + // TableID indicates the parent table id + TableID int64 + // PhysicalID indicates the physical table id + PhysicalID int64 + // RequestID indicates the request id of the job + RequestID string + // RespCh indicates the channel for response + RespCh chan<- error +} + +type managerJobAdapter struct { + store kv.Storage + sessPool sessionPool + requestCh chan<- *SubmitTTLManagerJobRequest +} + +// NewManagerJobAdapter creates a managerJobAdapter +func NewManagerJobAdapter(store kv.Storage, sessPool sessionPool, requestCh chan<- *SubmitTTLManagerJobRequest) TTLJobAdapter { + return &managerJobAdapter{store: store, sessPool: sessPool, requestCh: requestCh} +} + +func (a *managerJobAdapter) CanSubmitJob(tableID, physicalID int64) bool { + se, err := getSession(a.sessPool) + if err != nil { + terror.Log(err) + return false + } + defer se.Close() + + is := se.GetDomainInfoSchema().(infoschema.InfoSchema) + tbl, ok := is.TableByID(tableID) + if !ok { + return false + } + + tblInfo := tbl.Meta() + ttlInfo := tblInfo.TTLInfo + if ttlInfo == nil || !ttlInfo.Enable { + return false + } + + if physicalID != tableID { + if par := tbl.GetPartitionedTable(); par == nil || par.GetPartition(physicalID) == nil { + return false + } + } + + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Minute) + defer cancel() + + selectTasksCntSQL := "select LOW_PRIORITY COUNT(1) FROM mysql.tidb_ttl_task WHERE status IN ('waiting', 'running')" + rs, err := se.ExecuteSQL(ctx, selectTasksCntSQL) + if err == nil && len(rs) == 0 { + err = errors.New("selectTasksCntSQL returns no row") + } + + if err != nil { + logutil.BgLogger().Error( + "error to query ttl task count", + zap.Error(err), + zap.Int64("physicalID", physicalID), + zap.Int64("tableID", tableID), + zap.String("SQL", selectTasksCntSQL), + ) + return false + } + + cnt := rs[0].GetInt64(0) + tasksLimit := getMaxRunningTasksLimit(a.store) + if cnt >= int64(tasksLimit) { + logutil.BgLogger().Warn( + "current TTL tasks count exceeds limit, delay create new job temporarily", + zap.Int64("physicalID", physicalID), + zap.Int64("tableID", tableID), + zap.Int64("count", cnt), + zap.Int("limit", tasksLimit), + ) + return false + } + + return true +} + +func (a *managerJobAdapter) SubmitJob(ctx context.Context, tableID, physicalID int64, requestID string, _ time.Time) (*TTLJobTrace, error) { + respCh := make(chan error, 1) + req := &SubmitTTLManagerJobRequest{ + TableID: tableID, + PhysicalID: physicalID, + RequestID: requestID, + RespCh: respCh, + } + select { + case <-ctx.Done(): + return nil, ctx.Err() + case a.requestCh <- req: + select { + case <-ctx.Done(): + return nil, ctx.Err() + case err := <-respCh: + if err != nil { + return nil, err + } + + return &TTLJobTrace{ + RequestID: requestID, + Finished: false, + }, nil + } + } +} + +func (a *managerJobAdapter) GetJob(ctx context.Context, tableID, physicalID int64, requestID string) (*TTLJobTrace, error) { + se, err := getSession(a.sessPool) + if err != nil { + return nil, err + } + defer se.Close() + + rows, err := se.ExecuteSQL( + ctx, + "select summary_text, status from mysql.tidb_ttl_job_history where table_id=%? AND parent_table_id=%? AND job_id=%?", + physicalID, tableID, requestID, + ) + if err != nil { + return nil, err + } + + if len(rows) == 0 { + return nil, nil + } + + jobTrace := TTLJobTrace{ + RequestID: requestID, + } + + row := rows[0] + if !row.IsNull(0) { + if summaryBytes := row.GetBytes(0); len(summaryBytes) > 0 { + var ttlSummary TTLSummary + if err = json.Unmarshal(summaryBytes, &ttlSummary); err != nil { + return nil, err + } + jobTrace.Summary = &ttlSummary + } + } + + if !row.IsNull(1) { + statusText := row.GetString(1) + switch cache.JobStatus(statusText) { + case cache.JobStatusFinished, cache.JobStatusTimeout, cache.JobStatusCancelled: + jobTrace.Finished = true + } + } + + return &jobTrace, nil +} diff --git a/ttl/ttlworker/job_manager_integration_test.go b/ttl/ttlworker/job_manager_integration_test.go index b576d7750591d..6c85bcdfef968 100644 --- a/ttl/ttlworker/job_manager_integration_test.go +++ b/ttl/ttlworker/job_manager_integration_test.go @@ -25,6 +25,7 @@ import ( "time" "github.com/google/uuid" + "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" @@ -33,6 +34,8 @@ import ( dbsession "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/statistics/handle" "github.com/pingcap/tidb/testkit" + timerapi "github.com/pingcap/tidb/timer/api" + timertable "github.com/pingcap/tidb/timer/tablestore" "github.com/pingcap/tidb/ttl/cache" "github.com/pingcap/tidb/ttl/client" "github.com/pingcap/tidb/ttl/metrics" @@ -68,7 +71,7 @@ func TestParallelLockNewJob(t *testing.T) { testTable := &cache.PhysicalTable{ID: 2, TableInfo: &model.TableInfo{ID: 1, TTLInfo: &model.TTLInfo{IntervalExprStr: "1", IntervalTimeUnit: int(ast.TimeUnitDay), JobInterval: "1h"}}} // simply lock a new job - m := ttlworker.NewJobManager("test-id", nil, store, nil) + m := ttlworker.NewJobManager("test-id", nil, store, nil, nil) m.InfoSchemaCache().Tables[testTable.ID] = testTable se := sessionFactory() @@ -91,7 +94,7 @@ func TestParallelLockNewJob(t *testing.T) { jobManagerID := fmt.Sprintf("test-ttl-manager-%d", j) wg.Add(1) go func() { - m := ttlworker.NewJobManager(jobManagerID, nil, store, nil) + m := ttlworker.NewJobManager(jobManagerID, nil, store, nil, nil) m.InfoSchemaCache().Tables[testTable.ID] = testTable se := sessionFactory() @@ -125,7 +128,7 @@ func TestFinishJob(t *testing.T) { tk.MustExec("insert into mysql.tidb_ttl_table_status(table_id) values (2)") // finish with error - m := ttlworker.NewJobManager("test-id", nil, store, nil) + m := ttlworker.NewJobManager("test-id", nil, store, nil, nil) m.InfoSchemaCache().Tables[testTable.ID] = testTable se := sessionFactory() startTime := time.Now() @@ -241,12 +244,16 @@ func TestTriggerTTLJob(t *testing.T) { tblID := tbl.Meta().ID require.NoError(t, err) + timerStore := timertable.NewTableTimerStore(0, do.SysSessionPool(), "mysql", "tidb_timers", nil) + defer timerStore.Close() + timerCli := timerapi.NewDefaultTimerClient(timerStore) + // make sure the table had run a job one time to make the test stable cli := do.TTLJobManager().GetCommandCli() _, _ = client.TriggerNewTTLJob(ctx, cli, "test", "t") r := tk.MustQuery("select last_job_id, current_job_id from mysql.tidb_ttl_table_status where table_id=?", tblID) require.Equal(t, 1, len(r.Rows())) - waitTTLJobFinished(t, tk, tblID) + waitTTLJobFinished(t, tk, tblID, timerCli) now := time.Now() nowDateStr := now.Format("2006-01-02 15:04:05.999999") @@ -268,7 +275,7 @@ func TestTriggerTTLJob(t *testing.T) { require.Equal(t, "", tableResult.ErrorMessage) require.Equal(t, "", tableResult.PartitionName) - waitTTLJobFinished(t, tk, tblID) + waitTTLJobFinished(t, tk, tblID, timerCli) tk.MustQuery("select id from t order by id asc").Check(testkit.Rows("2", "4")) } @@ -277,6 +284,10 @@ func TestTTLDeleteWithTimeZoneChange(t *testing.T) { defer failpoint.Disable("github.com/pingcap/tidb/ttl/ttlworker/task-manager-loop-interval") store, do := testkit.CreateMockStoreAndDomain(t) + timerStore := timertable.NewTableTimerStore(0, do.SysSessionPool(), "mysql", "tidb_timers", nil) + defer timerStore.Close() + timerCli := timerapi.NewDefaultTimerClient(timerStore) + tk := testkit.NewTestKit(t, store) tk.MustExec("use test") tk.MustExec("set @@global.time_zone='Asia/Shanghai'") @@ -305,18 +316,18 @@ func TestTTLDeleteWithTimeZoneChange(t *testing.T) { _, _ = client.TriggerNewTTLJob(ctx, cli, "test", "t1") _, _ = client.TriggerNewTTLJob(ctx, cli, "test", "t2") - waitTTLJobFinished(t, tk, tblID1) + waitTTLJobFinished(t, tk, tblID1, timerCli) tk.MustQuery("select id from t1 order by id asc").Check(testkit.Rows("1", "2")) - waitTTLJobFinished(t, tk, tblID2) + waitTTLJobFinished(t, tk, tblID2, timerCli) tk.MustQuery("select id from t2 order by id asc").Check(testkit.Rows("1")) } -func waitTTLJobFinished(t *testing.T, tk *testkit.TestKit, tableID int64) { +func waitTTLJobFinished(t *testing.T, tk *testkit.TestKit, tableID int64, timerCli timerapi.TimerClient) { start := time.Now() for time.Since(start) < time.Minute { time.Sleep(time.Second) - r := tk.MustQuery("select last_job_id, current_job_id from mysql.tidb_ttl_table_status where table_id=?", tableID) + r := tk.MustQuery("select last_job_id, current_job_id, parent_table_id from mysql.tidb_ttl_table_status where table_id=?", tableID) rows := r.Rows() if len(rows) == 0 { continue @@ -330,6 +341,15 @@ func waitTTLJobFinished(t *testing.T, tk *testkit.TestKit, tableID int64) { continue } + parentID, err := strconv.ParseInt(rows[0][2].(string), 10, 64) + require.NoError(t, err) + + timer, err := timerCli.GetTimerByKey(context.Background(), fmt.Sprintf("/tidb/ttl/physical_table/%d/%d", parentID, tableID)) + require.NoError(t, err) + if timer.EventStatus == timerapi.SchedEventTrigger { + continue + } + return } require.FailNow(t, "timeout") @@ -388,6 +408,85 @@ func TestTTLJobDisable(t *testing.T) { require.False(t, deleted) } +func TestSubmitJob(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + sessionFactory := sessionFactory(t, store) + + waitAndStopTTLManager(t, dom) + + tk.MustExec("use test") + tk.MustExec("create table ttlp1(a int, t timestamp) TTL=`t`+interval 1 HOUR PARTITION BY RANGE (a) (" + + "PARTITION p0 VALUES LESS THAN (10)," + + "PARTITION p1 VALUES LESS THAN (100)" + + ")") + table, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("ttlp1")) + require.NoError(t, err) + tableID := table.Meta().ID + var physicalID int64 + for _, def := range table.Meta().Partition.Definitions { + if def.Name.L == "p0" { + physicalID = def.ID + break + } + } + require.NotZero(t, physicalID) + + var leader atomic.Bool + m := ttlworker.NewJobManager("manager-1", nil, store, nil, func() bool { + return leader.Load() + }) + + se := sessionFactory() + + // not leader + err = m.SubmitJob(se, tableID, physicalID, "req1") + require.EqualError(t, err, "current TTL manager is not the leader") + + leader.Store(true) + // invalid table + err = m.SubmitJob(se, 9999, 9999, "req1") + require.ErrorContains(t, err, "not exists in information schema") + + err = m.SubmitJob(se, tableID, 9999, "req1") + require.ErrorContains(t, err, "not exists in information schema") + + err = m.SubmitJob(se, 9999, physicalID, "req1") + require.ErrorContains(t, err, "for physical table with id") + + // check no success job submitted + tk.MustQuery("select count(1) from mysql.tidb_ttl_table_status").Check(testkit.Rows("0")) + tk.MustQuery("select count(1) from mysql.tidb_ttl_job_history").Check(testkit.Rows("0")) + tk.MustQuery("select count(1) from mysql.tidb_ttl_task").Check(testkit.Rows("0")) + + // submit successfully + now := time.Now() + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnTTL) + require.NoError(t, m.SubmitJob(se, tableID, physicalID, "request1")) + sql, args := cache.SelectFromTTLTableStatusWithID(physicalID) + rows, err := se.ExecuteSQL(ctx, sql, args...) + require.NoError(t, err) + tableStatus, err := cache.RowToTableStatus(se, rows[0]) + require.NoError(t, err) + require.Equal(t, physicalID, tableStatus.TableID) + require.Equal(t, tableID, tableStatus.ParentTableID) + require.Equal(t, "request1", tableStatus.CurrentJobID) + require.Equal(t, "manager-1", tableStatus.CurrentJobOwnerID) + require.Equal(t, cache.JobStatusRunning, tableStatus.CurrentJobStatus) + require.InDelta(t, tableStatus.CurrentJobTTLExpire.Unix(), now.Unix()-3600, 300) + require.Greater(t, tableStatus.CurrentJobOwnerHBTime.Unix(), now.Unix()-10) + require.Greater(t, tableStatus.CurrentJobStartTime.Unix(), now.Unix()-10) + require.Greater(t, tableStatus.CurrentJobStatusUpdateTime.Unix(), now.Unix()-10) + tk.MustQuery("select table_id, scan_id, UNIX_TIMESTAMP(expire_time), status from mysql.tidb_ttl_task where job_id='request1'"). + Check(testkit.Rows(fmt.Sprintf("%d 0 %d waiting", physicalID, tableStatus.CurrentJobTTLExpire.Unix()))) + tk.MustQuery("select parent_table_id, table_id, table_schema, table_name, partition_name, " + + "UNIX_TIMESTAMP(create_time), UNIX_TIMESTAMP(ttl_expire), status " + + "from mysql.tidb_ttl_job_history where job_id='request1'").Check(testkit.Rows(fmt.Sprintf( + "%d %d test ttlp1 p0 %d %d running", + tableID, physicalID, tableStatus.CurrentJobStartTime.Unix(), tableStatus.CurrentJobTTLExpire.Unix(), + ))) +} + func TestRescheduleJobs(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) @@ -402,12 +501,14 @@ func TestRescheduleJobs(t *testing.T) { ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnTTL) se := sessionFactory() - m := ttlworker.NewJobManager("manager-1", nil, store, nil) + m := ttlworker.NewJobManager("manager-1", nil, store, nil, func() bool { + return true + }) m.TaskManager().ResizeWorkersWithSysVar() require.NoError(t, m.InfoSchemaCache().Update(se)) require.NoError(t, m.TableStatusCache().Update(context.Background(), se)) - // schedule jobs - m.RescheduleJobs(se, now) + // submit job + require.NoError(t, m.SubmitJob(se, table.Meta().ID, table.Meta().ID, "request1")) sql, args := cache.SelectFromTTLTableStatusWithID(table.Meta().ID) rows, err := se.ExecuteSQL(ctx, sql, args...) require.NoError(t, err) @@ -421,7 +522,7 @@ func TestRescheduleJobs(t *testing.T) { tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1")) // another manager should get this job, if the heart beat is not updated - anotherManager := ttlworker.NewJobManager("manager-2", nil, store, nil) + anotherManager := ttlworker.NewJobManager("manager-2", nil, store, nil, nil) anotherManager.TaskManager().ResizeWorkersWithSysVar() require.NoError(t, anotherManager.InfoSchemaCache().Update(se)) require.NoError(t, anotherManager.TableStatusCache().Update(context.Background(), se)) @@ -454,16 +555,19 @@ func TestJobTimeout(t *testing.T) { now := time.Now() tk.MustExec("create table test.t (id int, created_at datetime) ttl = `created_at` + interval 1 minute ttl_job_interval = '1m'") table, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + tableID := table.Meta().ID require.NoError(t, err) ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnTTL) se := sessionFactory() - m := ttlworker.NewJobManager("manager-1", nil, store, nil) + m := ttlworker.NewJobManager("manager-1", nil, store, nil, func() bool { + return true + }) m.TaskManager().ResizeWorkersWithSysVar() require.NoError(t, m.InfoSchemaCache().Update(se)) require.NoError(t, m.TableStatusCache().Update(context.Background(), se)) - // schedule jobs - m.RescheduleJobs(se, now) + // submit job + require.NoError(t, m.SubmitJob(se, tableID, tableID, "request1")) // set the worker to be empty, so none of the tasks will be scheduled m.TaskManager().SetScanWorkers4Test([]ttlworker.Worker{}) @@ -478,7 +582,7 @@ func TestJobTimeout(t *testing.T) { // there is already a task tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1")) - m2 := ttlworker.NewJobManager("manager-2", nil, store, nil) + m2 := ttlworker.NewJobManager("manager-2", nil, store, nil, nil) m2.TaskManager().ResizeWorkersWithSysVar() require.NoError(t, m2.InfoSchemaCache().Update(se)) require.NoError(t, m2.TableStatusCache().Update(context.Background(), se)) @@ -517,8 +621,13 @@ func TestTriggerScanTask(t *testing.T) { waitAndStopTTLManager(t, dom) tk.MustExec("create table test.t (id int, created_at datetime) ttl = `created_at` + interval 1 minute ttl_job_interval = '1m'") + tbl, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + tblID := tbl.Meta().ID - m := ttlworker.NewJobManager("manager-1", nil, store, nil) + m := ttlworker.NewJobManager("manager-1", nil, store, nil, func() bool { + return true + }) require.NoError(t, m.InfoSchemaCache().Update(se)) m.TaskManager().ResizeWorkersWithSysVar() m.Start() @@ -534,7 +643,7 @@ func TestTriggerScanTask(t *testing.T) { <-nCli.WatchNotification(context.Background(), "scan") wg.Done() }() - m.RescheduleJobs(se, now) + require.NoError(t, m.SubmitJob(se, tblID, tblID, "request1")) // notification is sent wg.Wait() @@ -659,18 +768,19 @@ func TestJobMetrics(t *testing.T) { waitAndStopTTLManager(t, dom) - now := time.Now() tk.MustExec("create table test.t (id int, created_at datetime) ttl = `created_at` + interval 1 minute ttl_job_interval = '1m'") table, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) require.NoError(t, err) ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnTTL) se := sessionFactory() - m := ttlworker.NewJobManager("manager-1", nil, store, nil) + m := ttlworker.NewJobManager("manager-1", nil, store, nil, func() bool { + return true + }) m.TaskManager().ResizeWorkersWithSysVar() require.NoError(t, m.InfoSchemaCache().Update(se)) - // schedule jobs - m.RescheduleJobs(se, now) + // submit job + require.NoError(t, m.SubmitJob(se, table.Meta().ID, table.Meta().ID, "request1")) // set the worker to be empty, so none of the tasks will be scheduled m.TaskManager().SetScanWorkers4Test([]ttlworker.Worker{}) @@ -814,3 +924,223 @@ func TestDelayMetrics(t *testing.T) { checkRecord(records, "t4", now.Add(-3*time.Hour)) checkRecord(records, "t5", emptyTime) } + +func TestManagerJobAdapterCanSubmitJob(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + adapter := ttlworker.NewManagerJobAdapter(store, dom.SysSessionPool(), nil) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + // no table + require.False(t, adapter.CanSubmitJob(9999, 9999)) + + // not ttl table + tk.MustExec("create table t1(t timestamp)") + tbl, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) + require.NoError(t, err) + require.False(t, adapter.CanSubmitJob(tbl.Meta().ID, tbl.Meta().ID)) + + // ttl table + tk.MustExec("create table ttl1(t timestamp) TTL=`t`+interval 1 DAY") + tbl, err = dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("ttl1")) + require.NoError(t, err) + require.True(t, adapter.CanSubmitJob(tbl.Meta().ID, tbl.Meta().ID)) + + // ttl table but disabled + tk.MustExec("create table ttl2(t timestamp) TTL=`t`+interval 1 DAY TTL_ENABLE='OFF'") + tbl, err = dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("ttl2")) + require.NoError(t, err) + require.False(t, adapter.CanSubmitJob(tbl.Meta().ID, tbl.Meta().ID)) + + // ttl partition table + tk.MustExec("create table ttlp1(a int, t timestamp) TTL=`t`+interval 1 DAY PARTITION BY RANGE (a) (" + + "PARTITION p0 VALUES LESS THAN (10)," + + "PARTITION p1 VALUES LESS THAN (100)" + + ")") + tbl, err = dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("ttlp1")) + require.NoError(t, err) + for _, def := range tbl.Meta().Partition.Definitions { + require.True(t, adapter.CanSubmitJob(tbl.Meta().ID, def.ID)) + } + + // limit max running tasks + tk.MustExec("set @@global.tidb_ttl_running_tasks=8") + defer tk.MustExec("set @@global.tidb_ttl_running_tasks=-1") + for i := 1; i <= 16; i++ { + jobID := strconv.Itoa(i) + sql, args, err := cache.InsertIntoTTLTask(tk.Session(), jobID, int64(1000+i), i, nil, nil, time.Now(), time.Now()) + require.NoError(t, err) + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnTTL) + _, err = tk.Session().ExecuteInternal(ctx, sql, args...) + require.NoError(t, err) + + if i <= 4 { + // tasks 1 - 4 are running and 5 - 7 are waiting + tk.MustExec("update mysql.tidb_ttl_task set status='running' where job_id=?", jobID) + } + + if i > 7 { + // tasks after 8 are finished + tk.MustExec("update mysql.tidb_ttl_task set status='finished' where job_id=?", jobID) + } + } + tbl, err = dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("ttl1")) + require.NoError(t, err) + require.True(t, adapter.CanSubmitJob(tbl.Meta().ID, tbl.Meta().ID)) + tk.MustExec("update mysql.tidb_ttl_task set status='running' where job_id='8'") + require.False(t, adapter.CanSubmitJob(tbl.Meta().ID, tbl.Meta().ID)) +} + +func TestManagerJobAdapterSubmitJob(t *testing.T) { + ch := make(chan *ttlworker.SubmitTTLManagerJobRequest) + adapter := ttlworker.NewManagerJobAdapter(nil, nil, ch) + + var reqPointer atomic.Pointer[ttlworker.SubmitTTLManagerJobRequest] + responseRequest := func(err error) { + ctx, cancel := context.WithTimeout(context.TODO(), time.Minute) + defer cancel() + select { + case <-ctx.Done(): + require.FailNow(t, "timeout") + case req, ok := <-ch: + require.True(t, ok) + reqPointer.Store(req) + select { + case req.RespCh <- err: + default: + require.FailNow(t, "blocked") + } + } + } + + // normal submit + go responseRequest(nil) + job, err := adapter.SubmitJob(context.TODO(), 1, 2, "req1", time.Now()) + require.NoError(t, err) + require.Equal(t, "req1", job.RequestID) + require.False(t, job.Finished) + require.Nil(t, job.Summary) + req := reqPointer.Load() + require.NotNil(t, req) + require.Equal(t, int64(1), req.TableID) + require.Equal(t, int64(2), req.PhysicalID) + require.Equal(t, "req1", req.RequestID) + + // submit but reply error + go responseRequest(errors.New("mockErr")) + job, err = adapter.SubmitJob(context.TODO(), 1, 2, "req1", time.Now()) + require.EqualError(t, err, "mockErr") + require.Nil(t, job) + + // context timeout when send request + ctx, cancel := context.WithCancel(context.TODO()) + cancel() + job, err = adapter.SubmitJob(ctx, 1, 2, "req1", time.Now()) + require.Same(t, err, ctx.Err()) + require.Nil(t, job) + + // context timeout when waiting response + ch = make(chan *ttlworker.SubmitTTLManagerJobRequest, 1) + adapter = ttlworker.NewManagerJobAdapter(nil, nil, ch) + ctx, cancel = context.WithTimeout(context.TODO(), 100*time.Millisecond) + defer cancel() + job, err = adapter.SubmitJob(ctx, 1, 2, "req1", time.Now()) + require.EqualError(t, err, ctx.Err().Error()) + require.Nil(t, job) +} + +func TestManagerJobAdapterGetJob(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + adapter := ttlworker.NewManagerJobAdapter(store, dom.SysSessionPool(), nil) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + summary := ttlworker.TTLSummary{ + TotalRows: 1000, + SuccessRows: 998, + ErrorRows: 2, + TotalScanTask: 10, + ScheduledScanTask: 9, + FinishedScanTask: 8, + ScanTaskErr: "err1", + } + + summaryText, err := json.Marshal(summary) + require.NoError(t, err) + + insertJob := func(tableID, physicalID int64, jobID string, status cache.JobStatus) { + tk.MustExec(fmt.Sprintf(`INSERT INTO mysql.tidb_ttl_job_history ( + job_id, + table_id, + parent_table_id, + table_schema, + table_name, + partition_name, + create_time, + finish_time, + ttl_expire, + summary_text, + expired_rows, + deleted_rows, + error_delete_rows, + status + ) + VALUES + ( + '%s', %d, %d, 'test', '%s', '', now() - interval 1 MINUTE, now(), now() - interval 1 DAY, + '%s', %d, %d, %d, '%s' + )`, + jobID, physicalID, tableID, "t1", summaryText, summary.TotalRows, summary.SuccessRows, summary.ErrorRows, status, + )) + } + + // job not exists + job, err := adapter.GetJob(context.TODO(), 1, 2, "req1") + require.NoError(t, err) + require.Nil(t, job) + + // job table id not match + insertJob(2, 2, "req1", cache.JobStatusFinished) + require.NoError(t, err) + require.Nil(t, job) + tk.MustExec("delete from mysql.tidb_ttl_job_history") + + // job physical id not match + insertJob(1, 3, "req1", cache.JobStatusFinished) + require.NoError(t, err) + require.Nil(t, job) + tk.MustExec("delete from mysql.tidb_ttl_job_history") + + // job request id not match + insertJob(1, 2, "req2", cache.JobStatusFinished) + require.NoError(t, err) + require.Nil(t, job) + tk.MustExec("delete from mysql.tidb_ttl_job_history") + + // job exists with status + statusList := []cache.JobStatus{ + cache.JobStatusWaiting, + cache.JobStatusRunning, + cache.JobStatusCancelling, + cache.JobStatusCancelled, + cache.JobStatusTimeout, + cache.JobStatusFinished, + } + for _, status := range statusList { + insertJob(1, 2, "req1", status) + job, err = adapter.GetJob(context.TODO(), 1, 2, "req1") + require.NoError(t, err, status) + require.NotNil(t, job, status) + require.Equal(t, "req1", job.RequestID, status) + switch status { + case cache.JobStatusTimeout, cache.JobStatusFinished, cache.JobStatusCancelled: + require.True(t, job.Finished, status) + default: + require.False(t, job.Finished, status) + } + require.Equal(t, summary, *job.Summary, status) + tk.MustExec("delete from mysql.tidb_ttl_job_history") + } +} diff --git a/ttl/ttlworker/job_manager_test.go b/ttl/ttlworker/job_manager_test.go index 3f08ff4e03dfb..42b3d528e731a 100644 --- a/ttl/ttlworker/job_manager_test.go +++ b/ttl/ttlworker/job_manager_test.go @@ -16,13 +16,14 @@ package ttlworker import ( "context" + "sync/atomic" "testing" "time" "github.com/pingcap/errors" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" - "github.com/pingcap/tidb/sessionctx/variable" + timerapi "github.com/pingcap/tidb/timer/api" "github.com/pingcap/tidb/ttl/cache" "github.com/pingcap/tidb/ttl/session" "github.com/pingcap/tidb/types" @@ -168,6 +169,17 @@ func (m *JobManager) RescheduleJobs(se session.Session, now time.Time) { m.rescheduleJobs(se, now) } +func (m *JobManager) SubmitJob(se session.Session, tableID, physicalID int64, requestID string) error { + ch := make(chan error, 1) + m.handleSubmitJobRequest(se, &SubmitTTLManagerJobRequest{ + TableID: tableID, + PhysicalID: physicalID, + RequestID: requestID, + RespCh: ch, + }) + return <-ch +} + // TaskManager is an exported getter of task manager for test func (m *JobManager) TaskManager() *taskManager { return m.taskManager @@ -195,9 +207,9 @@ func newMockTTLJob(tbl *cache.PhysicalTable, status cache.JobStatus) *ttlJob { return &ttlJob{tbl: tbl, status: status} } -func TestReadyForNewJobTables(t *testing.T) { +func TestReadyForLockHBTimeoutJobTables(t *testing.T) { tbl := newMockTTLTbl(t, "t1") - m := NewJobManager("test-id", nil, nil, nil) + m := NewJobManager("test-id", nil, nil, nil, nil) m.sessPool = newMockSessionPool(t, tbl) se := newMockSession(t, tbl) @@ -209,24 +221,23 @@ func TestReadyForNewJobTables(t *testing.T) { infoSchemaTables []*cache.PhysicalTable tableStatus []*cache.TableStatus shouldSchedule bool - isCreate bool }{ - // for a newly inserted table, it'll always be scheduled - {"newly created", []*cache.PhysicalTable{tbl}, []*cache.TableStatus{{TableID: tbl.ID, ParentTableID: tbl.ID}}, true, true}, + // for a newly inserted table, it'll always not be scheduled because no job running + {"newly created", []*cache.PhysicalTable{tbl}, []*cache.TableStatus{{TableID: tbl.ID, ParentTableID: tbl.ID}}, false}, // table only in the table status cache will not be scheduled - {"proper subset", []*cache.PhysicalTable{}, []*cache.TableStatus{{TableID: tbl.ID, ParentTableID: tbl.ID}}, false, false}, + {"proper subset", []*cache.PhysicalTable{}, []*cache.TableStatus{{TableID: tbl.ID, ParentTableID: tbl.ID}}, false}, // table whose current job owner id is not empty, and heart beat time is long enough will not be scheduled - {"current job not empty", []*cache.PhysicalTable{tbl}, []*cache.TableStatus{{TableID: tbl.ID, ParentTableID: tbl.ID, CurrentJobID: "job1", CurrentJobOwnerID: "test-another-id", CurrentJobOwnerHBTime: time.Now()}}, false, false}, + {"current job not empty", []*cache.PhysicalTable{tbl}, []*cache.TableStatus{{TableID: tbl.ID, ParentTableID: tbl.ID, CurrentJobID: "job1", CurrentJobOwnerID: "test-another-id", CurrentJobOwnerHBTime: time.Now()}}, false}, // table whose current job owner id is not empty, but heart beat time is expired will be scheduled - {"hb time expired", []*cache.PhysicalTable{tbl}, []*cache.TableStatus{{TableID: tbl.ID, ParentTableID: tbl.ID, CurrentJobID: "job1", CurrentJobOwnerID: "test-another-id", CurrentJobOwnerHBTime: time.Now().Add(-time.Hour)}}, true, false}, - // if the last start time is too near, it will also not be scheduled - {"last start time too near", []*cache.PhysicalTable{tbl}, []*cache.TableStatus{{TableID: tbl.ID, ParentTableID: tbl.ID, LastJobStartTime: time.Now()}}, false, false}, - // if the last start time is expired, it will be scheduled - {"last start time expired", []*cache.PhysicalTable{tbl}, []*cache.TableStatus{{TableID: tbl.ID, ParentTableID: tbl.ID, LastJobStartTime: time.Now().Add(-time.Hour * 2)}}, true, true}, - // if the interval is 24h, and the last start time is near, it will not be scheduled - {"last start time too near for 24h", []*cache.PhysicalTable{tblWithDailyInterval}, []*cache.TableStatus{{TableID: tblWithDailyInterval.ID, ParentTableID: tblWithDailyInterval.ID, LastJobStartTime: time.Now().Add(-time.Hour * 2)}}, false, false}, - // if the interval is 24h, and the last start time is far enough, it will be scheduled - {"last start time far enough for 24h", []*cache.PhysicalTable{tblWithDailyInterval}, []*cache.TableStatus{{TableID: tblWithDailyInterval.ID, ParentTableID: tblWithDailyInterval.ID, LastJobStartTime: time.Now().Add(-time.Hour * 25)}}, true, true}, + {"hb time expired", []*cache.PhysicalTable{tbl}, []*cache.TableStatus{{TableID: tbl.ID, ParentTableID: tbl.ID, CurrentJobID: "job1", CurrentJobOwnerID: "test-another-id", CurrentJobOwnerHBTime: time.Now().Add(-time.Hour)}}, true}, + // if the last start time is too near, it will not be scheduled because no job running + {"last start time too near", []*cache.PhysicalTable{tbl}, []*cache.TableStatus{{TableID: tbl.ID, ParentTableID: tbl.ID, LastJobStartTime: time.Now()}}, false}, + // if the last start time is expired, it will not be scheduled because no job running + {"last start time expired", []*cache.PhysicalTable{tbl}, []*cache.TableStatus{{TableID: tbl.ID, ParentTableID: tbl.ID, LastJobStartTime: time.Now().Add(-time.Hour * 2)}}, false}, + // if the interval is 24h, and the last start time is near, it will not be scheduled because no job running + {"last start time too near for 24h", []*cache.PhysicalTable{tblWithDailyInterval}, []*cache.TableStatus{{TableID: tblWithDailyInterval.ID, ParentTableID: tblWithDailyInterval.ID, LastJobStartTime: time.Now().Add(-time.Hour * 2)}}, false}, + // if the interval is 24h, and the last start time is far enough, it will not be scheduled because no job running + {"last start time far enough for 24h", []*cache.PhysicalTable{tblWithDailyInterval}, []*cache.TableStatus{{TableID: tblWithDailyInterval.ID, ParentTableID: tblWithDailyInterval.ID, LastJobStartTime: time.Now().Add(-time.Hour * 25)}}, false}, } for _, c := range cases { @@ -240,21 +251,114 @@ func TestReadyForNewJobTables(t *testing.T) { m.tableStatusCache.Tables[st.TableID] = st } - tables, isCreate := m.readyForLockJobTables(se.Now()) + tables := m.readyForLockHBTimeoutJobTables(se.Now()) if c.shouldSchedule { assert.Len(t, tables, 1) assert.Equal(t, int64(0), tables[0].ID) assert.Equal(t, int64(0), tables[0].TableInfo.ID) - assert.Len(t, isCreate, 1) - assert.Equal(t, c.isCreate, isCreate[0]) } else { assert.Len(t, tables, 0) - assert.Len(t, isCreate, 0) } }) } } +func TestOnTimerTick(t *testing.T) { + var leader atomic.Bool + m := NewJobManager("test-id", newMockSessionPool(t), nil, nil, func() bool { + return leader.Load() + }) + + tbl := newMockTTLTbl(t, "t1") + se := newMockSession(t) + se.sessionInfoSchema = newMockInfoSchemaWithVer(100, tbl.TableInfo) + + timerStore := timerapi.NewMemoryTimerStore() + defer timerStore.Close() + + a := &mockJobAdapter{} + a.On("CanSubmitJob").Return(false).Maybe() + + rt := newTTLTimerRuntime(timerStore, a) + require.Nil(t, rt.rt) + defer rt.Pause() + + now := time.UnixMilli(3600 * 24) + syncer := NewTTLTimerSyncer(m.sessPool, timerapi.NewDefaultTimerClient(timerStore)) + syncer.nowFunc = func() time.Time { + return now + } + + // pause after init + m.onTimerTick(se, rt, syncer, now) + require.Nil(t, rt.rt) + require.Equal(t, 0, len(syncer.key2Timers)) + syncTime, syncVer := syncer.GetLastSyncInfo() + require.Zero(t, syncVer) + require.True(t, syncTime.IsZero()) + + // resume first time + leader.Store(true) + m.onTimerTick(se, rt, syncer, now) + innerRT := rt.rt + require.NotNil(t, innerRT) + require.True(t, innerRT.Running()) + require.Equal(t, 1, len(syncer.key2Timers)) + syncTime, syncVer = syncer.GetLastSyncInfo() + require.Equal(t, int64(100), syncVer) + require.Equal(t, now, syncTime) + + // resume after a very short duration + now = now.Add(time.Second) + se.sessionInfoSchema = newMockInfoSchemaWithVer(101, tbl.TableInfo) + m.onTimerTick(se, rt, syncer, now) + require.Same(t, innerRT, rt.rt) + require.True(t, innerRT.Running()) + require.Equal(t, 1, len(syncer.key2Timers)) + syncTime, syncVer = syncer.GetLastSyncInfo() + require.Equal(t, int64(100), syncVer) + require.Equal(t, now.Add(-time.Second), syncTime) + + // resume after a middle duration + now = now.Add(6 * time.Second) + m.onTimerTick(se, rt, syncer, now) + require.Same(t, innerRT, rt.rt) + require.True(t, innerRT.Running()) + require.Equal(t, 1, len(syncer.key2Timers)) + syncTime, syncVer = syncer.GetLastSyncInfo() + require.Equal(t, int64(101), syncVer) + require.Equal(t, now, syncTime) + + // resume after a middle duration but infoschema not change + now = now.Add(6 * time.Second) + m.onTimerTick(se, rt, syncer, now) + require.Same(t, innerRT, rt.rt) + require.True(t, innerRT.Running()) + require.Equal(t, 1, len(syncer.key2Timers)) + syncTime, syncVer = syncer.GetLastSyncInfo() + require.Equal(t, int64(101), syncVer) + require.Equal(t, now.Add(-6*time.Second), syncTime) + + // resume after a long duration + now = now.Add(3 * time.Minute) + m.onTimerTick(se, rt, syncer, now) + require.Same(t, innerRT, rt.rt) + require.True(t, innerRT.Running()) + require.Equal(t, 1, len(syncer.key2Timers)) + syncTime, syncVer = syncer.GetLastSyncInfo() + require.Equal(t, int64(101), syncVer) + require.Equal(t, now, syncTime) + + // pause + leader.Store(false) + m.onTimerTick(se, rt, syncer, now) + require.Nil(t, rt.rt) + require.False(t, innerRT.Running()) + syncTime, syncVer = syncer.GetLastSyncInfo() + require.Zero(t, syncVer) + require.True(t, syncTime.IsZero()) +} + func TestLockTable(t *testing.T) { now, err := time.Parse(timeFormat, "2022-12-05 17:13:05") assert.NoError(t, err) @@ -482,7 +586,7 @@ func TestLockTable(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { - m := NewJobManager("test-id", newMockSessionPool(t), nil, nil) + m := NewJobManager("test-id", newMockSessionPool(t), nil, nil, nil) m.infoSchemaCache.Tables[c.table.ID] = c.table sqlCounter := 0 se := newMockSession(t) @@ -536,7 +640,7 @@ func TestLocalJobs(t *testing.T) { tbl1.ID = 1 tbl2 := newMockTTLTbl(t, "t2") tbl2.ID = 2 - m := NewJobManager("test-id", nil, nil, nil) + m := NewJobManager("test-id", nil, nil, nil, nil) m.sessPool = newMockSessionPool(t, tbl1, tbl2) m.runningJobs = []*ttlJob{{tbl: tbl1, id: "1"}, {tbl: tbl2, id: "2"}} @@ -551,57 +655,3 @@ func TestLocalJobs(t *testing.T) { assert.Len(t, m.localJobs(), 1) assert.Equal(t, m.localJobs()[0].id, "1") } - -func TestRescheduleJobsOutOfWindow(t *testing.T) { - // TODO: use failpoint to mock return job, and schedule - - tbl := newMockTTLTbl(t, "t1") - se := newMockSession(t, tbl) - - scanWorker1 := NewMockScanWorker(t) - scanWorker1.Start() - scanWorker1.setOneRowResult(tbl, 2022) - scanWorker2 := NewMockScanWorker(t) - scanWorker2.Start() - scanWorker2.setOneRowResult(tbl, 2022) - - m := NewJobManager("test-id", nil, nil, nil) - m.sessPool = newMockSessionPool(t, tbl) - m.taskManager.SetScanWorkers4Test([]worker{ - scanWorker1, - scanWorker2, - }) - - // jobs will not be scheduled - m.tableStatusCache.Tables = map[int64]*cache.TableStatus{ - tbl.ID: { - CurrentJobOwnerID: m.id, - }, - } - m.runningJobs = []*ttlJob{newMockTTLJob(tbl, cache.JobStatusWaiting)} - savedttlJobScheduleWindowStartTime := variable.TTLJobScheduleWindowStartTime.Load() - savedttlJobScheduleWindowEndTime := variable.TTLJobScheduleWindowEndTime.Load() - ttlJobScheduleWindowStartTime, _ := time.ParseInLocation(variable.FullDayTimeFormat, "12:00 +0000", time.UTC) - variable.TTLJobScheduleWindowStartTime.Store(ttlJobScheduleWindowStartTime) - ttlJobScheduleWindowEndTime, _ := time.ParseInLocation(variable.FullDayTimeFormat, "12:05 +0000", time.UTC) - variable.TTLJobScheduleWindowEndTime.Store(ttlJobScheduleWindowEndTime) - defer func() { - variable.TTLJobScheduleWindowStartTime.Store(savedttlJobScheduleWindowStartTime) - variable.TTLJobScheduleWindowEndTime.Store(savedttlJobScheduleWindowEndTime) - }() - - now, _ := time.ParseInLocation(variable.FullDayTimeFormat, "12:06 +0000", time.UTC) - m.rescheduleJobs(se, now) - scanWorker1.checkWorkerStatus(workerStatusRunning, true, nil) - scanWorker1.checkPollResult(false, "") - scanWorker2.checkWorkerStatus(workerStatusRunning, true, nil) - scanWorker2.checkPollResult(false, "") - - // jobs will be scheduled within the time window - now, _ = time.ParseInLocation(variable.FullDayTimeFormat, "12:02 +0000", time.UTC) - m.rescheduleJobs(se, now) - //scanWorker1.checkWorkerStatus(workerStatusRunning, false, m.runningJobs[0].tasks[0]) - scanWorker1.checkPollResult(false, "") - scanWorker2.checkWorkerStatus(workerStatusRunning, true, nil) - scanWorker2.checkPollResult(false, "") -} diff --git a/ttl/ttlworker/session_test.go b/ttl/ttlworker/session_test.go index 335f6a5701a99..62b0d4764c12a 100644 --- a/ttl/ttlworker/session_test.go +++ b/ttl/ttlworker/session_test.go @@ -66,6 +66,10 @@ func newMockInfoSchema(tbl ...*model.TableInfo) infoschema.InfoSchema { return infoschema.MockInfoSchema(tbl) } +func newMockInfoSchemaWithVer(ver int64, tbl ...*model.TableInfo) infoschema.InfoSchema { + return infoschema.MockInfoSchemaWithSchemaVer(tbl, ver) +} + type mockRows struct { t *testing.T fieldTypes []*types.FieldType diff --git a/ttl/ttlworker/task_manager.go b/ttl/ttlworker/task_manager.go index 20f70d7fc49ac..a5de011003512 100644 --- a/ttl/ttlworker/task_manager.go +++ b/ttl/ttlworker/task_manager.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tidb/ttl/session" "github.com/pingcap/tidb/util/logutil" "github.com/tikv/client-go/v2/tikv" + "github.com/tikv/client-go/v2/tikvrpc" "go.uber.org/multierr" "go.uber.org/zap" ) @@ -552,28 +553,31 @@ func (m *taskManager) meetTTLRunningTask(count int, taskStatus cache.TaskStatus) // always return true for already running task because it is already included in count return true } + return getMaxRunningTasksLimit(m.store) > count +} +func getMaxRunningTasksLimit(store kv.Storage) int { ttlRunningTask := variable.TTLRunningTasks.Load() - // `-1` is the auto value, means we should calculate the limit according to the count of TiKV if ttlRunningTask != -1 { - return int(ttlRunningTask) > count + return int(ttlRunningTask) } - store, ok := m.store.(tikv.Storage) + tikvStore, ok := store.(tikv.Storage) if !ok { - return variable.MaxConfigurableConcurrency > count + return variable.MaxConfigurableConcurrency } - regionCache := store.GetRegionCache() + regionCache := tikvStore.GetRegionCache() if regionCache == nil { - return true + return variable.MaxConfigurableConcurrency } - limit := len(regionCache.GetAllStores()) + + limit := len(regionCache.GetStoresByType(tikvrpc.TiKV)) if limit > variable.MaxConfigurableConcurrency { limit = variable.MaxConfigurableConcurrency } - return limit > count + return limit } type runningScanTask struct { diff --git a/ttl/ttlworker/task_manager_test.go b/ttl/ttlworker/task_manager_test.go index 624ca4c752c2c..57554332bf806 100644 --- a/ttl/ttlworker/task_manager_test.go +++ b/ttl/ttlworker/task_manager_test.go @@ -19,9 +19,14 @@ import ( "testing" "time" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/ttl/cache" "github.com/pingcap/tidb/ttl/session" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/tikv" + "github.com/tikv/client-go/v2/tikvrpc" ) // NewTaskManager is an exported version of newTaskManager for test @@ -144,3 +149,36 @@ func TestResizeWorkers(t *testing.T) { scanWorker2.checkWorkerStatus(workerStatusStopped, false, nil) assert.NotNil(t, m.runningTasks[0].result) } + +type mockKVStore struct { + kv.Storage +} + +type mockTiKVStore struct { + mockKVStore + tikv.Storage + regionCache *tikv.RegionCache +} + +func (s *mockTiKVStore) GetRegionCache() *tikv.RegionCache { + return s.regionCache +} + +func TestGetMaxRunningTasksLimit(t *testing.T) { + variable.TTLRunningTasks.Store(1) + require.Equal(t, 1, getMaxRunningTasksLimit(&mockTiKVStore{})) + + variable.TTLRunningTasks.Store(2) + require.Equal(t, 2, getMaxRunningTasksLimit(&mockTiKVStore{})) + + variable.TTLRunningTasks.Store(-1) + require.Equal(t, variable.MaxConfigurableConcurrency, getMaxRunningTasksLimit(nil)) + require.Equal(t, variable.MaxConfigurableConcurrency, getMaxRunningTasksLimit(&mockKVStore{})) + require.Equal(t, variable.MaxConfigurableConcurrency, getMaxRunningTasksLimit(&mockTiKVStore{})) + + s := &mockTiKVStore{regionCache: tikv.NewRegionCache(nil)} + s.GetRegionCache().SetRegionCacheStore(1, "", "", tikvrpc.TiKV, 1, nil) + s.GetRegionCache().SetRegionCacheStore(2, "", "", tikvrpc.TiKV, 1, nil) + s.GetRegionCache().SetRegionCacheStore(3, "", "", tikvrpc.TiFlash, 1, nil) + require.Equal(t, 2, getMaxRunningTasksLimit(s)) +} diff --git a/ttl/ttlworker/timer.go b/ttl/ttlworker/timer.go index f4ed5e99adb2d..8ea176515aa14 100644 --- a/ttl/ttlworker/timer.go +++ b/ttl/ttlworker/timer.go @@ -180,7 +180,7 @@ func (t *ttlTimerHook) OnSchedEvent(ctx context.Context, event timerapi.TimerShe func (t *ttlTimerHook) waitJobFinished(logger *zap.Logger, data *TTLTimerData, timerID string, eventID string, eventStart time.Time) { defer func() { t.wg.Done() - logger.Info("stop to wait job") + logger.Info("stop to wait TTL job") }() ticker := time.NewTicker(t.checkTTLJobInterval) diff --git a/ttl/ttlworker/timer_sync.go b/ttl/ttlworker/timer_sync.go index 030d0b9b89754..274a47754e675 100644 --- a/ttl/ttlworker/timer_sync.go +++ b/ttl/ttlworker/timer_sync.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tidb/ttl/session" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" + "golang.org/x/exp/maps" "golang.org/x/exp/slices" ) @@ -51,6 +52,9 @@ type TTLTimersSyncer struct { key2Timers map[string]*timerapi.TimerRecord lastPullTimers time.Time delayDelete time.Duration + lastSyncTime time.Time + lastSyncVer int64 + nowFunc func() time.Time } // NewTTLTimerSyncer creates a new TTLTimersSyncer @@ -59,6 +63,7 @@ func NewTTLTimerSyncer(pool sessionPool, cli timerapi.TimerClient) *TTLTimersSyn pool: pool, cli: cli, key2Timers: make(map[string]*timerapi.TimerRecord), + nowFunc: time.Now, delayDelete: timerDelayDeleteInterval, } } @@ -120,12 +125,40 @@ func (g *TTLTimersSyncer) ManualTriggerTTLTimer(ctx context.Context, tbl *cache. return "", false, errors.New("manual request failed to trigger, request cancelled") } - return timer.ManualEventID, true, nil + jobID := timer.ManualEventID + rows, err := se.ExecuteSQL(ctx, "select 1 from mysql.tidb_ttl_job_history where job_id=%?", jobID) + if err != nil { + return "", false, err + } + + if len(rows) == 0 { + return "", false, nil + } + + return jobID, true, nil }, nil } +// Reset resets the syncer's state +func (g *TTLTimersSyncer) Reset() { + var zeroTime time.Time + g.lastPullTimers = zeroTime + g.lastSyncTime = zeroTime + g.lastSyncVer = 0 + if len(g.key2Timers) > 0 { + maps.Clear(g.key2Timers) + } +} + +// GetLastSyncInfo returns last sync time and information schema version +func (g *TTLTimersSyncer) GetLastSyncInfo() (time.Time, int64) { + return g.lastSyncTime, g.lastSyncVer +} + // SyncTimers syncs timers with TTL tables func (g *TTLTimersSyncer) SyncTimers(ctx context.Context, is infoschema.InfoSchema) { + g.lastSyncTime = g.nowFunc() + g.lastSyncVer = is.SchemaMetaVersion() if time.Since(g.lastPullTimers) > fullRefreshTimersCacheInterval { newKey2Timers := make(map[string]*timerapi.TimerRecord, len(g.key2Timers)) timers, err := g.cli.GetTimers(ctx, timerapi.WithKeyPrefix(timerKeyPrefix)) @@ -138,7 +171,7 @@ func (g *TTLTimersSyncer) SyncTimers(ctx context.Context, is infoschema.InfoSche newKey2Timers[timer.Key] = timer } g.key2Timers = newKey2Timers - g.lastPullTimers = time.Now() + g.lastPullTimers = g.nowFunc() } se, err := getSession(g.pool) diff --git a/ttl/ttlworker/timer_sync_test.go b/ttl/ttlworker/timer_sync_test.go index 8dc4a1f8db2cd..a475b20be61ec 100644 --- a/ttl/ttlworker/timer_sync_test.go +++ b/ttl/ttlworker/timer_sync_test.go @@ -92,6 +92,37 @@ func TestTTLManualTriggerOneTimer(t *testing.T) { } } + createJobHistory := func(jobID string) { + tk.MustExec(fmt.Sprintf(`INSERT INTO mysql.tidb_ttl_job_history ( + job_id, + table_id, + parent_table_id, + table_schema, + table_name, + partition_name, + create_time, + finish_time, + ttl_expire, + expired_rows, + deleted_rows, + error_delete_rows, + status + ) + VALUES + ( + '%s', %d, %d, 'test', '%s', '', + from_unixtime(%d), + from_unixtime(%d), + from_unixtime(%d), + 100, 100, 0, 'running' + )`, + jobID, physical.TableInfo.ID, physical.ID, physical.TableInfo.Name.O, + time.Now().Unix(), + time.Now().Unix()+int64(time.Minute.Seconds()), + time.Now().Unix()-int64(time.Hour.Seconds()), + )) + } + // start trigger -> not finished -> finished check, manual := startTrigger(context.TODO(), "") testCheckFunc(check, "", "") @@ -102,6 +133,8 @@ func TestTTLManualTriggerOneTimer(t *testing.T) { require.NoError(t, timerStore.Update(context.TODO(), timer.ID, &timerapi.TimerUpdate{ ManualRequest: timerapi.NewOptionalVal(manual), })) + testCheckFunc(check, "", "") + createJobHistory("event123") testCheckFunc(check, "event123", "") // start trigger -> trigger done but no event id @@ -191,8 +224,16 @@ func TestTTLTimerSync(t *testing.T) { cli := timerapi.NewDefaultTimerClient(timerStore) sync := ttlworker.NewTTLTimerSyncer(do.SysSessionPool(), cli) + lastSyncTime, lastSyncVer := sync.GetLastSyncInfo() + require.True(t, lastSyncTime.IsZero()) + require.Zero(t, lastSyncVer) + // first sync + now := time.Now() sync.SyncTimers(context.TODO(), do.InfoSchema()) + lastSyncTime, lastSyncVer = sync.GetLastSyncInfo() + require.Equal(t, do.InfoSchema().SchemaMetaVersion(), lastSyncVer) + require.GreaterOrEqual(t, lastSyncTime.Unix(), now.Unix()) checkTimerCnt(t, cli, 6) timer1 := checkTimerWithTableMeta(t, do, cli, "test", "t1", "", zeroTime) timer2 := checkTimerWithTableMeta(t, do, cli, "test", "t2", "", wm1) @@ -209,7 +250,11 @@ func TestTTLTimerSync(t *testing.T) { "partition p0 values less than (10)," + "partition p1 values less than (100)" + ")") + now = time.Now() sync.SyncTimers(context.TODO(), do.InfoSchema()) + lastSyncTime, lastSyncVer = sync.GetLastSyncInfo() + require.Equal(t, do.InfoSchema().SchemaMetaVersion(), lastSyncVer) + require.GreaterOrEqual(t, lastSyncTime.Unix(), now.Unix()) checkTimerCnt(t, cli, 10) timer4 := checkTimerWithTableMeta(t, do, cli, "test", "t4", "", zeroTime) checkTimerWithTableMeta(t, do, cli, "test", "tp1", "p3", zeroTime) @@ -284,6 +329,21 @@ func TestTTLTimerSync(t *testing.T) { sync.SyncTimers(context.TODO(), do.InfoSchema()) checkTimerCnt(t, cli, 7) checkTimersNotChange(t, cli, timer2, timer3, timer4, timer5, timerP10, timerP11, timerP12) + + // reset timers + sync.Reset() + lastSyncTime, lastSyncVer = sync.GetLastSyncInfo() + require.True(t, lastSyncTime.IsZero()) + require.Zero(t, lastSyncVer) + + // sync after reset + now = time.Now() + sync.SyncTimers(context.TODO(), do.InfoSchema()) + lastSyncTime, lastSyncVer = sync.GetLastSyncInfo() + require.Equal(t, do.InfoSchema().SchemaMetaVersion(), lastSyncVer) + require.GreaterOrEqual(t, lastSyncTime.Unix(), now.Unix()) + checkTimerCnt(t, cli, 7) + checkTimersNotChange(t, cli, timer2, timer3, timer4, timer5, timerP10, timerP11, timerP12) } func insertTTLTableStatusWatermark(t *testing.T, do *domain.Domain, tk *testkit.TestKit, db, table, partition string, watermark time.Time, jobRunning bool) {