diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index b33459bb98e2e..8596eb89d6974 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1177,7 +1177,7 @@ func restoreStream( return errors.Trace(err) } defer func() { - if err = restoreGc(); err != nil { + if err := restoreGc(); err != nil { log.Error("failed to set gc enabled", zap.Error(err)) } }() diff --git a/ttl/cache/infoschema_test.go b/ttl/cache/infoschema_test.go index 4cec3db563fa7..ef428e4399c25 100644 --- a/ttl/cache/infoschema_test.go +++ b/ttl/cache/infoschema_test.go @@ -34,7 +34,7 @@ func TestInfoSchemaCache(t *testing.T) { conn := server.CreateMockConn(t, sv) sctx := conn.Context().Session tk := testkit.NewTestKitWithSession(t, store, sctx) - se := session.NewSession(sctx, sctx, func() {}) + se := session.NewSession(sctx, sctx, func(_ session.Session) {}) isc := cache.NewInfoSchemaCache(time.Hour) diff --git a/ttl/cache/ttlstatus_test.go b/ttl/cache/ttlstatus_test.go index 134faf3a201a3..a73d3675d0e6d 100644 --- a/ttl/cache/ttlstatus_test.go +++ b/ttl/cache/ttlstatus_test.go @@ -36,7 +36,7 @@ func TestTTLStatusCache(t *testing.T) { conn := server.CreateMockConn(t, sv) sctx := conn.Context().Session tk := testkit.NewTestKitWithSession(t, store, sctx) - ttlSession := session.NewSession(sctx, tk.Session(), func() {}) + ttlSession := session.NewSession(sctx, tk.Session(), func(_ session.Session) {}) isc := cache.NewTableStatusCache(time.Hour) diff --git a/ttl/session/session.go b/ttl/session/session.go index d07419651a103..4fa8bc198c3ac 100644 --- a/ttl/session/session.go +++ b/ttl/session/session.go @@ -50,11 +50,11 @@ type Session interface { type session struct { sessionctx.Context sqlExec sqlexec.SQLExecutor - closeFn func() + closeFn func(Session) } // NewSession creates a new Session -func NewSession(sctx sessionctx.Context, sqlExec sqlexec.SQLExecutor, closeFn func()) Session { +func NewSession(sctx sessionctx.Context, sqlExec sqlexec.SQLExecutor, closeFn func(Session)) Session { return &session{ Context: sctx, sqlExec: sqlExec, @@ -99,7 +99,7 @@ func (s *session) RunInTxn(ctx context.Context, fn func() error) (err error) { defer tracer.EnterPhase(tracer.Phase()) tracer.EnterPhase(metrics.PhaseBeginTxn) - if _, err = s.ExecuteSQL(ctx, "BEGIN"); err != nil { + if _, err = s.ExecuteSQL(ctx, "BEGIN OPTIMISTIC"); err != nil { return err } tracer.EnterPhase(metrics.PhaseOther) @@ -150,7 +150,7 @@ func (s *session) ResetWithGlobalTimeZone(ctx context.Context) error { // Close closes the session func (s *session) Close() { if s.closeFn != nil { - s.closeFn() + s.closeFn(s) s.Context = nil s.sqlExec = nil s.closeFn = nil diff --git a/ttl/ttlworker/BUILD.bazel b/ttl/ttlworker/BUILD.bazel index 8b7c39270807e..42a654e3b3343 100644 --- a/ttl/ttlworker/BUILD.bazel +++ b/ttl/ttlworker/BUILD.bazel @@ -40,6 +40,7 @@ go_test( name = "ttlworker_test", srcs = [ "del_test.go", + "job_manager_integration_test.go", "job_manager_test.go", "job_test.go", "scan_test.go", @@ -52,15 +53,21 @@ go_test( "//parser/ast", "//parser/model", "//parser/mysql", + "//session", "//sessionctx", "//sessionctx/variable", + "//testkit", "//ttl/cache", + "//ttl/session", "//types", "//util/chunk", + "//util/logutil", "@com_github_ngaut_pools//:pools", "@com_github_pingcap_errors//:errors", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@org_golang_x_time//rate", + "@org_uber_go_atomic//:atomic", + "@org_uber_go_zap//:zap", ], ) diff --git a/ttl/ttlworker/job.go b/ttl/ttlworker/job.go index 5551984cceb08..35c5881fb4dd7 100644 --- a/ttl/ttlworker/job.go +++ b/ttl/ttlworker/job.go @@ -96,8 +96,8 @@ func (job *ttlJob) updateState(ctx context.Context, se session.Session) error { } // peekScanTask returns the next scan task, but doesn't promote the iterator -func (job *ttlJob) peekScanTask() (*ttlScanTask, error) { - return job.tasks[job.taskIter], nil +func (job *ttlJob) peekScanTask() *ttlScanTask { + return job.tasks[job.taskIter] } // nextScanTask promotes the iterator diff --git a/ttl/ttlworker/job_manager.go b/ttl/ttlworker/job_manager.go index cd3c1208f1f37..05f139e6bb599 100644 --- a/ttl/ttlworker/job_manager.go +++ b/ttl/ttlworker/job_manager.go @@ -32,7 +32,7 @@ import ( ) const insertNewTableIntoStatusTemplate = "INSERT INTO mysql.tidb_ttl_table_status (table_id,parent_table_id) VALUES (%d, %d)" -const setTableStatusOwnerTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_id = UUID(), current_job_owner_id = '%s',current_job_start_time = '%s',current_job_status = 'waiting',current_job_status_update_time = '%s',current_job_ttl_expire = '%s',current_job_owner_hb_time = '%s' WHERE (current_job_owner_id IS NULL OR current_job_owner_hb_time < '%s') AND table_id = %d" +const setTableStatusOwnerTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_id = UUID(), current_job_owner_id = '%s',current_job_start_time = '%s',current_job_status = 'waiting',current_job_status_update_time = '%s',current_job_ttl_expire = '%s',current_job_owner_hb_time = '%s' WHERE table_id = %d" const updateHeartBeatTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_owner_hb_time = '%s' WHERE table_id = %d AND current_job_owner_id = '%s'" const timeFormat = "2006-01-02 15:04:05" @@ -41,8 +41,8 @@ func insertNewTableIntoStatusSQL(tableID int64, parentTableID int64) string { return fmt.Sprintf(insertNewTableIntoStatusTemplate, tableID, parentTableID) } -func setTableStatusOwnerSQL(tableID int64, now time.Time, currentJobTTLExpire time.Time, maxHBTime time.Time, id string) string { - return fmt.Sprintf(setTableStatusOwnerTemplate, id, now.Format(timeFormat), now.Format(timeFormat), currentJobTTLExpire.Format(timeFormat), now.Format(timeFormat), maxHBTime.Format(timeFormat), tableID) +func setTableStatusOwnerSQL(tableID int64, now time.Time, currentJobTTLExpire time.Time, id string) string { + return fmt.Sprintf(setTableStatusOwnerTemplate, id, now.Format(timeFormat), now.Format(timeFormat), currentJobTTLExpire.Format(timeFormat), now.Format(timeFormat), tableID) } func updateHeartBeatSQL(tableID int64, now time.Time, id string) string { @@ -271,12 +271,17 @@ func (m *JobManager) resizeWorkers(workers []worker, count int, factory func() w func (m *JobManager) updateTaskState() bool { results := m.pollScanWorkerResults() for _, result := range results { + logger := logutil.Logger(m.ctx).With(zap.Int64("tableID", result.task.tbl.ID)) + if result.err != nil { + logger = logger.With(zap.Error(result.err)) + } + job := findJobWithTableID(m.runningJobs, result.task.tbl.ID) if job == nil { - logutil.Logger(m.ctx).Warn("task state changed but job not found", zap.Int64("tableID", result.task.tbl.ID)) + logger.Warn("task state changed but job not found", zap.Int64("tableID", result.task.tbl.ID)) continue } - logutil.Logger(m.ctx).Debug("scan task finished", zap.String("jobID", job.id)) + logger.Info("scan task finished", zap.String("jobID", job.id)) job.finishedScanTaskCounter += 1 job.scanTaskErr = multierr.Append(job.scanTaskErr, result.err) @@ -303,7 +308,11 @@ func (m *JobManager) checkNotOwnJob() { for _, job := range m.runningJobs { tableStatus := m.tableStatusCache.Tables[job.tbl.ID] if tableStatus == nil || tableStatus.CurrentJobOwnerID != m.id { - logutil.Logger(m.ctx).Info("job has been taken over by another node", zap.String("jobID", job.id), zap.String("statistics", job.statistics.String())) + logger := logutil.Logger(m.ctx).With(zap.String("jobID", job.id), zap.String("statistics", job.statistics.String())) + if tableStatus != nil { + logger.With(zap.String("newJobOwnerID", tableStatus.CurrentJobOwnerID)) + } + logger.Info("job has been taken over by another node") m.removeJob(job) job.cancel() } @@ -357,9 +366,10 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) { case len(newJobTables) > 0: table := newJobTables[0] newJobTables = newJobTables[1:] - logutil.Logger(m.ctx).Debug("try lock new job", zap.Int64("tableID", table.ID)) + logutil.Logger(m.ctx).Info("try lock new job", zap.Int64("tableID", table.ID)) job, err = m.lockNewJob(m.ctx, se, table, now) if job != nil { + logutil.Logger(m.ctx).Info("append new running job", zap.String("jobID", job.id), zap.Int64("tableID", job.tbl.ID)) m.appendJob(job) } } @@ -371,10 +381,10 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) { } for !job.AllSpawned() { - task, err := job.peekScanTask() - if err != nil { - logutil.Logger(m.ctx).Warn("fail to generate scan task", zap.Error(err)) - break + task := job.peekScanTask() + logger := logutil.Logger(m.ctx).With(zap.String("jobID", job.id), zap.String("table", task.tbl.TableInfo.Name.L)) + if task.tbl.PartitionDef != nil { + logger = logger.With(zap.String("partition", task.tbl.PartitionDef.Name.L)) } for len(idleScanWorkers) > 0 { @@ -383,7 +393,7 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) { err := idleWorker.Schedule(task) if err != nil { - logutil.Logger(m.ctx).Info("fail to schedule task", zap.Error(err)) + logger.Info("fail to schedule task", zap.Error(err)) continue } @@ -392,16 +402,11 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) { if err != nil { // not a big problem, current logic doesn't depend on the job status to promote // the routine, so we could just print a log here - logutil.Logger(m.ctx).Error("change ttl job status", zap.Error(err), zap.String("id", job.id)) + logger.Error("change ttl job status", zap.Error(err), zap.String("id", job.id)) } cancel() - logArgs := []zap.Field{zap.String("table", task.tbl.TableInfo.Name.L)} - if task.tbl.PartitionDef != nil { - logArgs = append(logArgs, zap.String("partition", task.tbl.PartitionDef.Name.L)) - } - logutil.Logger(m.ctx).Debug("schedule ttl task", - logArgs...) + logger.Info("scheduled ttl task") job.nextScanTask() break @@ -425,14 +430,17 @@ func (m *JobManager) idleScanWorkers() []scanWorker { } func (m *JobManager) localJobs() []*ttlJob { + jobs := make([]*ttlJob, 0, len(m.runningJobs)) for _, job := range m.runningJobs { status := m.tableStatusCache.Tables[job.tbl.ID] if status == nil || status.CurrentJobOwnerID != m.id { - m.removeJob(job) + // these jobs will be removed in `checkNotOwnJob` continue } + + jobs = append(jobs, job) } - return m.runningJobs + return jobs } // readyForNewJobTables returns all tables which should spawn a TTL job according to cache @@ -491,11 +499,10 @@ func (m *JobManager) couldTrySchedule(table *cache.TableStatus, now time.Time) b // localJob and return it. // It could be nil, nil, if the table query doesn't return error but the job has been locked by other instances. func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *cache.PhysicalTable, now time.Time) (*ttlJob, error) { - maxHBTime := now.Add(-2 * jobManagerLoopTickerInterval) var expireTime time.Time err := se.RunInTxn(ctx, func() error { - rows, err := se.ExecuteSQL(ctx, cache.SelectFromTTLTableStatusWithID(table.TableInfo.ID)) + rows, err := se.ExecuteSQL(ctx, cache.SelectFromTTLTableStatusWithID(table.ID)) if err != nil { return err } @@ -505,7 +512,7 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table * if err != nil { return err } - rows, err = se.ExecuteSQL(ctx, cache.SelectFromTTLTableStatusWithID(table.TableInfo.ID)) + rows, err = se.ExecuteSQL(ctx, cache.SelectFromTTLTableStatusWithID(table.ID)) if err != nil { return err } @@ -526,7 +533,7 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table * return err } - _, err = se.ExecuteSQL(ctx, setTableStatusOwnerSQL(table.ID, now, expireTime, maxHBTime, m.id)) + _, err = se.ExecuteSQL(ctx, setTableStatusOwnerSQL(table.ID, now, expireTime, m.id)) return err }) diff --git a/ttl/ttlworker/job_manager_integration_test.go b/ttl/ttlworker/job_manager_integration_test.go new file mode 100644 index 0000000000000..8c299afcd48de --- /dev/null +++ b/ttl/ttlworker/job_manager_integration_test.go @@ -0,0 +1,98 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ttlworker_test + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/pingcap/tidb/parser/ast" + "github.com/pingcap/tidb/parser/model" + dbsession "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/ttl/cache" + "github.com/pingcap/tidb/ttl/session" + "github.com/pingcap/tidb/ttl/ttlworker" + "github.com/pingcap/tidb/util/logutil" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" + "go.uber.org/zap" +) + +func TestParallelLockNewJob(t *testing.T) { + store := testkit.CreateMockStore(t) + + sessionFactory := func() session.Session { + dbSession, err := dbsession.CreateSession4Test(store) + require.NoError(t, err) + se := session.NewSession(dbSession, dbSession, nil) + + _, err = se.ExecuteSQL(context.Background(), "ROLLBACK") + require.NoError(t, err) + _, err = se.ExecuteSQL(context.Background(), "set tidb_retry_limit=0") + require.NoError(t, err) + + return se + } + + storedTTLJobRunInterval := variable.TTLJobRunInterval.Load() + variable.TTLJobRunInterval.Store(0) + defer func() { + variable.TTLJobRunInterval.Store(storedTTLJobRunInterval) + }() + + testTable := &cache.PhysicalTable{ID: 2, TableInfo: &model.TableInfo{ID: 1, TTLInfo: &model.TTLInfo{IntervalExprStr: "1", IntervalTimeUnit: int(ast.TimeUnitDay)}}} + // simply lock a new job + m := ttlworker.NewJobManager("test-id", nil, store) + se := sessionFactory() + job, err := m.LockNewJob(context.Background(), se, testTable, time.Now()) + require.NoError(t, err) + job.Finish(se, time.Now()) + + // lock one table in parallel, only one of them should lock successfully + testTimes := 100 + concurrency := 5 + for i := 0; i < testTimes; i++ { + successCounter := atomic.NewUint64(0) + successJob := &ttlworker.TTLJob{} + + wg := sync.WaitGroup{} + for j := 0; j < concurrency; j++ { + jobManagerID := fmt.Sprintf("test-ttl-manager-%d", j) + wg.Add(1) + go func() { + m := ttlworker.NewJobManager(jobManagerID, nil, store) + + se := sessionFactory() + job, err := m.LockNewJob(context.Background(), se, testTable, time.Now()) + if err == nil { + successCounter.Add(1) + successJob = job + } else { + logutil.BgLogger().Error("lock new job with error", zap.Error(err)) + } + wg.Done() + }() + } + wg.Wait() + + require.Equal(t, uint64(1), successCounter.Load()) + successJob.Finish(se, time.Now()) + } +} diff --git a/ttl/ttlworker/job_manager_test.go b/ttl/ttlworker/job_manager_test.go index 87bc19ca08261..6718c384543fe 100644 --- a/ttl/ttlworker/job_manager_test.go +++ b/ttl/ttlworker/job_manager_test.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/ttl/cache" + "github.com/pingcap/tidb/ttl/session" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/stretchr/testify/assert" @@ -139,6 +140,22 @@ func (m *JobManager) SetScanWorkers4Test(workers []worker) { m.scanWorkers = workers } +// TTLJob exports the ttlJob for test +type TTLJob = ttlJob + +// LockNewJob is an exported version of lockNewJob for test +func (m *JobManager) LockNewJob(ctx context.Context, se session.Session, table *cache.PhysicalTable, now time.Time) (*TTLJob, error) { + return m.lockNewJob(ctx, se, table, now) +} + +func (j *ttlJob) Finish(se session.Session, now time.Time) { + j.finish(se, now) +} + +func (j *ttlJob) ID() string { + return j.id +} + func newMockTTLJob(tbl *cache.PhysicalTable, status cache.JobStatus) *ttlJob { statistics := &ttlStatistics{} return &ttlJob{tbl: tbl, ctx: context.Background(), statistics: statistics, status: status, tasks: []*ttlScanTask{{ctx: context.Background(), tbl: tbl, statistics: statistics}}} @@ -195,7 +212,6 @@ func TestReadyForNewJobTables(t *testing.T) { func TestLockNewTable(t *testing.T) { now, err := time.Parse(timeFormat, "2022-12-05 17:13:05") assert.NoError(t, err) - maxHBTime := now.Add(-2 * jobManagerLoopTickerInterval) expireTime := now testPhysicalTable := &cache.PhysicalTable{ID: 1, TableInfo: &model.TableInfo{ID: 1, TTLInfo: &model.TTLInfo{ColumnName: model.NewCIStr("test"), IntervalExprStr: "5 Year"}}} @@ -219,7 +235,7 @@ func TestLockNewTable(t *testing.T) { newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil, }, { - setTableStatusOwnerSQL(1, now, expireTime, maxHBTime, "test-id"), + setTableStatusOwnerSQL(1, now, expireTime, "test-id"), nil, nil, }, { @@ -241,7 +257,7 @@ func TestLockNewTable(t *testing.T) { newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil, }, { - setTableStatusOwnerSQL(1, now, expireTime, maxHBTime, "test-id"), + setTableStatusOwnerSQL(1, now, expireTime, "test-id"), nil, nil, }, { @@ -255,7 +271,7 @@ func TestLockNewTable(t *testing.T) { newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil, }, { - setTableStatusOwnerSQL(1, now, expireTime, maxHBTime, "test-id"), + setTableStatusOwnerSQL(1, now, expireTime, "test-id"), nil, errors.New("test error message"), }, }, false, true}, diff --git a/ttl/ttlworker/job_test.go b/ttl/ttlworker/job_test.go index 5af5d0316eed2..460dfa0611d7c 100644 --- a/ttl/ttlworker/job_test.go +++ b/ttl/ttlworker/job_test.go @@ -27,8 +27,7 @@ func TestIterScanTask(t *testing.T) { tbl: tbl, tasks: []*ttlScanTask{{}}, } - scanTask, err := job.peekScanTask() - assert.NoError(t, err) + scanTask := job.peekScanTask() assert.NotNil(t, scanTask) assert.Len(t, job.tasks, 1) diff --git a/ttl/ttlworker/session.go b/ttl/ttlworker/session.go index b20f436a61859..5bea3a4b8cba6 100644 --- a/ttl/ttlworker/session.go +++ b/ttl/ttlworker/session.go @@ -16,6 +16,7 @@ package ttlworker import ( "context" + "fmt" "time" "github.com/ngaut/pools" @@ -26,7 +27,9 @@ import ( "github.com/pingcap/tidb/ttl/metrics" "github.com/pingcap/tidb/ttl/session" "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/sqlexec" + "go.uber.org/zap" ) type sessionPool interface { @@ -57,10 +60,23 @@ func getSession(pool sessionPool) (session.Session, error) { return nil, errors.Errorf("%T cannot be casted to sqlexec.SQLExecutor", sctx) } - se := session.NewSession(sctx, exec, func() { + originalRetryLimit := sctx.GetSessionVars().RetryLimit + se := session.NewSession(sctx, exec, func(se session.Session) { + _, err = se.ExecuteSQL(context.Background(), fmt.Sprintf("set tidb_retry_limit=%d", originalRetryLimit)) + if err != nil { + logutil.BgLogger().Error("fail to reset tidb_retry_limit", zap.Int64("originalRetryLimit", originalRetryLimit), zap.Error(err)) + } + pool.Put(resource) }) + // store and set the retry limit to 0 + _, err = se.ExecuteSQL(context.Background(), "set tidb_retry_limit=0") + if err != nil { + se.Close() + return nil, err + } + // Force rollback the session to guarantee the session is not in any explicit transaction if _, err = se.ExecuteSQL(context.Background(), "ROLLBACK"); err != nil { se.Close()