diff --git a/pkg/ttl/ttlworker/del.go b/pkg/ttl/ttlworker/del.go index 9e159fc3e31af..8c2334c912c09 100644 --- a/pkg/ttl/ttlworker/del.go +++ b/pkg/ttl/ttlworker/del.go @@ -284,6 +284,7 @@ func (w *ttlDeleteWorker) loop() error { if err != nil { return err } + defer se.Close() ctx := metrics.CtxWithPhaseTracer(w.baseWorker.ctx, tracer) diff --git a/pkg/ttl/ttlworker/del_test.go b/pkg/ttl/ttlworker/del_test.go index ba41e468ef90a..87b335b72fa5e 100644 --- a/pkg/ttl/ttlworker/del_test.go +++ b/pkg/ttl/ttlworker/del_test.go @@ -387,6 +387,7 @@ func TestTTLDeleteTaskWorker(t *testing.T) { s := newMockSession(t) pool := newMockSessionPool(t) pool.se = s + defer pool.AssertNoSessionInUse() sqlMap := make(map[string]int) t3Retried := make(chan struct{}) diff --git a/pkg/ttl/ttlworker/job_manager.go b/pkg/ttl/ttlworker/job_manager.go index eb9a34e4ad568..028b377950d57 100644 --- a/pkg/ttl/ttlworker/job_manager.go +++ b/pkg/ttl/ttlworker/job_manager.go @@ -1259,3 +1259,21 @@ func (a *managerJobAdapter) GetJob(ctx context.Context, tableID, physicalID int6 return &jobTrace, nil } +<<<<<<< HEAD +======= + +func (a *managerJobAdapter) Now() (time.Time, error) { + se, err := getSession(a.sessPool) + if err != nil { + return time.Time{}, err + } + defer se.Close() + + tz, err := se.GlobalTimeZone(context.TODO()) + if err != nil { + return time.Time{}, err + } + + return se.Now().In(tz), nil +} +>>>>>>> 50d73f80c42 (ttl: fix some memory leak in TTL (#56935)) diff --git a/pkg/ttl/ttlworker/job_manager_integration_test.go b/pkg/ttl/ttlworker/job_manager_integration_test.go index a8b9a81ba10ff..3a37c5ae4a5ef 100644 --- a/pkg/ttl/ttlworker/job_manager_integration_test.go +++ b/pkg/ttl/ttlworker/job_manager_integration_test.go @@ -41,6 +41,7 @@ import ( "github.com/pingcap/tidb/pkg/ttl/metrics" "github.com/pingcap/tidb/pkg/ttl/session" "github.com/pingcap/tidb/pkg/ttl/ttlworker" + "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/logutil" dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/require" @@ -1023,9 +1024,37 @@ func TestDelayMetrics(t *testing.T) { checkRecord(records, "t5", emptyTime) } +type poolTestWrapper struct { + util.SessionPool + inuse atomic.Int64 +} + +func wrapPoolForTest(pool util.SessionPool) *poolTestWrapper { + return &poolTestWrapper{SessionPool: pool} +} + +func (w *poolTestWrapper) Get() (pools.Resource, error) { + r, err := w.SessionPool.Get() + if err == nil { + w.inuse.Add(1) + } + return r, err +} + +func (w *poolTestWrapper) Put(r pools.Resource) { + w.inuse.Add(-1) + w.SessionPool.Put(r) +} + +func (w *poolTestWrapper) AssertNoSessionInUse(t *testing.T) { + require.Zero(t, w.inuse.Load()) +} + func TestManagerJobAdapterCanSubmitJob(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - adapter := ttlworker.NewManagerJobAdapter(store, dom.SysSessionPool(), nil) + pool := wrapPoolForTest(dom.SysSessionPool()) + defer pool.AssertNoSessionInUse(t) + adapter := ttlworker.NewManagerJobAdapter(store, pool, nil) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -1150,7 +1179,9 @@ func TestManagerJobAdapterSubmitJob(t *testing.T) { func TestManagerJobAdapterGetJob(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - adapter := ttlworker.NewManagerJobAdapter(store, dom.SysSessionPool(), nil) + pool := wrapPoolForTest(dom.SysSessionPool()) + defer pool.AssertNoSessionInUse(t) + adapter := ttlworker.NewManagerJobAdapter(store, pool, nil) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -1242,3 +1273,189 @@ func TestManagerJobAdapterGetJob(t *testing.T) { tk.MustExec("delete from mysql.tidb_ttl_job_history") } } +<<<<<<< HEAD +======= + +func TestManagerJobAdapterNow(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + pool := wrapPoolForTest(dom.SysSessionPool()) + defer pool.AssertNoSessionInUse(t) + adapter := ttlworker.NewManagerJobAdapter(store, pool, nil) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("set @@global.time_zone ='Europe/Berlin'") + tk.MustExec("set @@time_zone='Asia/Shanghai'") + + now, err := adapter.Now() + require.NoError(t, err) + localNow := time.Now() + + require.Equal(t, "Europe/Berlin", now.Location().String()) + require.InDelta(t, now.Unix(), localNow.Unix(), 10) +} + +func TestFinishAndUpdateOwnerAtSameTime(t *testing.T) { + // Finishing a `TTLJob` will remove all the `TTLTask` of the job, and at the same time + // the `task_manager` may update the owner of the task, which may cause a write conflict. + // This test is to simulate this scenario. + store, dom := testkit.CreateMockStoreAndDomain(t) + waitAndStopTTLManager(t, dom) + tk := testkit.NewTestKit(t, store) + + sessionFactory := sessionFactory(t, store) + se := sessionFactory() + + tk.MustExec("use test") + tk.MustExec("CREATE TABLE t (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR") + testTable, err := dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) + require.NoError(t, err) + + now := se.Now().Add(time.Hour * 48) + m := ttlworker.NewJobManager("test-ttl-job-manager", nil, store, nil, nil) + require.NoError(t, m.InfoSchemaCache().Update(se)) + + job, err := m.LockJob(context.Background(), se, m.InfoSchemaCache().Tables[testTable.Meta().ID], now, uuid.NewString(), false) + require.NoError(t, err) + tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1")) + + continueFinish := make(chan struct{}) + doneFinish := make(chan struct{}) + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ttl/ttlworker/ttl-before-remove-task-in-finish", func() { + <-continueFinish + }) + + go func() { + defer close(doneFinish) + finishSe := sessionFactory() + require.NoError(t, job.Finish(finishSe, finishSe.Now(), &ttlworker.TTLSummary{})) + }() + + _, err = m.TaskManager().LockScanTask(se, &cache.TTLTask{ + ScanID: 0, + JobID: job.ID(), + TableID: testTable.Meta().ID, + }, now) + require.NoError(t, err) + close(continueFinish) + <-doneFinish + + tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0")) +} + +func TestFinishError(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + waitAndStopTTLManager(t, dom) + tk := testkit.NewTestKit(t, store) + + sessionFactory := sessionFactory(t, store) + se := sessionFactory() + + tk.MustExec("use test") + tk.MustExec("CREATE TABLE t (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR") + testTable, err := dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) + require.NoError(t, err) + + errCount := 5 + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ttl/ttlworker/ttl-finish", func(err *error) { + errCount -= 1 + if errCount > 0 { + *err = errors.New("mock error") + } + }) + + now := se.Now() + + m := ttlworker.NewJobManager("test-ttl-job-manager", nil, store, nil, nil) + require.NoError(t, m.InfoSchemaCache().Update(se)) + + initializeTest := func() { + errCount = 5 + now = now.Add(time.Hour * 48) + job, err := m.LockJob(context.Background(), se, m.InfoSchemaCache().Tables[testTable.Meta().ID], now, uuid.NewString(), false) + require.NoError(t, err) + tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1")) + task, err := m.TaskManager().LockScanTask(se, &cache.TTLTask{ + ScanID: 0, + JobID: job.ID(), + TableID: testTable.Meta().ID, + }, now) + require.NoError(t, err) + task.SetResult(nil) + err = m.TaskManager().ReportTaskFinished(se, now, task) + require.NoError(t, err) + tk.MustQuery("select status from mysql.tidb_ttl_task").Check(testkit.Rows("finished")) + } + + // Test the `CheckFinishedJob` can tolerate the `job.finish` error + initializeTest() + for i := 0; i < 4; i++ { + m.CheckFinishedJob(se) + tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1")) + } + m.CheckFinishedJob(se) + tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0")) + + // Test the `rescheduleJobs` can tolerate the `job.finish` error + // cancel job branch + initializeTest() + variable.EnableTTLJob.Store(false) + t.Cleanup(func() { + variable.EnableTTLJob.Store(true) + }) + for i := 0; i < 4; i++ { + m.RescheduleJobs(se, now) + tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1")) + } + m.RescheduleJobs(se, now) + tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0")) + variable.EnableTTLJob.Store(true) + // remove table branch + initializeTest() + tk.MustExec("drop table t") + require.NoError(t, m.InfoSchemaCache().Update(se)) + for i := 0; i < 4; i++ { + m.RescheduleJobs(se, now) + tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1")) + } + m.RescheduleJobs(se, now) + tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0")) + tk.MustExec("CREATE TABLE t (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR") + require.NoError(t, m.InfoSchemaCache().Update(se)) + testTable, err = dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) + require.NoError(t, err) + + // Teset the `updateHeartBeat` can tolerate the `job.finish` error + initializeTest() + for i := 0; i < 4; i++ { + // timeout is 6h + now = now.Add(time.Hour * 8) + m.UpdateHeartBeat(context.Background(), se, now) + tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1")) + } + m.UpdateHeartBeat(context.Background(), se, now) + tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0")) +} + +func boostJobScheduleForTest(t *testing.T) func() { + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/check-job-triggered-interval", fmt.Sprintf("return(%d)", 100*time.Millisecond))) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/sync-timer", fmt.Sprintf("return(%d)", 100*time.Millisecond))) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/check-job-interval", fmt.Sprintf("return(%d)", 100*time.Millisecond))) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/resize-workers-interval", fmt.Sprintf("return(%d)", 100*time.Millisecond))) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/update-status-table-cache-interval", fmt.Sprintf("return(%d)", 100*time.Millisecond))) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/update-info-schema-cache-interval", fmt.Sprintf("return(%d)", 100*time.Millisecond))) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/task-manager-loop-interval", fmt.Sprintf("return(%d)", 100*time.Millisecond))) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/check-task-interval", fmt.Sprintf("return(%d)", 100*time.Millisecond))) + + return func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/check-job-triggered-interval")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/sync-timer")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/check-job-interval")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/resize-workers-interval")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/update-status-table-cache-interval")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/update-info-schema-cache-interval")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/task-manager-loop-interval")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/check-task-interval")) + } +} +>>>>>>> 50d73f80c42 (ttl: fix some memory leak in TTL (#56935)) diff --git a/pkg/ttl/ttlworker/job_manager_test.go b/pkg/ttl/ttlworker/job_manager_test.go index 1fe7b48e476a3..6841a676ed526 100644 --- a/pkg/ttl/ttlworker/job_manager_test.go +++ b/pkg/ttl/ttlworker/job_manager_test.go @@ -285,6 +285,7 @@ func TestOnTimerTick(t *testing.T) { now := time.UnixMilli(3600 * 24) syncer := NewTTLTimerSyncer(m.sessPool, timerapi.NewDefaultTimerClient(timerStore)) + defer m.sessPool.(*mockSessionPool).AssertNoSessionInUse() syncer.nowFunc = func() time.Time { return now } diff --git a/pkg/ttl/ttlworker/scan_test.go b/pkg/ttl/ttlworker/scan_test.go index 62c7238e8cad0..c49f253a92416 100644 --- a/pkg/ttl/ttlworker/scan_test.go +++ b/pkg/ttl/ttlworker/scan_test.go @@ -132,6 +132,7 @@ func TestScanWorkerSchedule(t *testing.T) { tbl := newMockTTLTbl(t, "t1") w := NewMockScanWorker(t) + defer w.sessPoll.AssertNoSessionInUse() w.setOneRowResult(tbl, 7) defer w.stopWithWait() @@ -181,6 +182,7 @@ func TestScanWorkerScheduleWithFailedTask(t *testing.T) { tbl := newMockTTLTbl(t, "t1") w := NewMockScanWorker(t) + defer w.sessPoll.AssertNoSessionInUse() w.clearInfoSchema() defer w.stopWithWait() @@ -393,6 +395,11 @@ func (t *mockScanTask) execSQL(_ context.Context, sql string, _ ...interface{}) func TestScanTaskDoScan(t *testing.T) { task := newMockScanTask(t, 3) +<<<<<<< HEAD +======= + defer task.sessPool.AssertNoSessionInUse() + task.ctx = cache.SetMockExpireTime(task.ctx, time.Now()) +>>>>>>> 50d73f80c42 (ttl: fix some memory leak in TTL (#56935)) task.sqlRetry[1] = scanTaskExecuteSQLMaxRetry task.runDoScanForTest(3, "") @@ -413,7 +420,11 @@ func TestScanTaskDoScan(t *testing.T) { func TestScanTaskCheck(t *testing.T) { tbl := newMockTTLTbl(t, "t1") pool := newMockSessionPool(t, tbl) +<<<<<<< HEAD pool.se.evalExpire = time.UnixMilli(100) +======= + defer pool.AssertNoSessionInUse() +>>>>>>> 50d73f80c42 (ttl: fix some memory leak in TTL (#56935)) pool.se.rows = newMockRows(t, types.NewFieldType(mysql.TypeInt24)).Append(12).Rows() task := &ttlScanTask{ @@ -460,6 +471,7 @@ func TestScanTaskCancelStmt(t *testing.T) { testCancel := func(ctx context.Context, doCancel func()) { mockPool := newMockSessionPool(t) + defer mockPool.AssertNoSessionInUse() startExec := make(chan struct{}) mockPool.se.sessionInfoSchema = newMockInfoSchema(task.tbl.TableInfo) mockPool.se.executeSQL = func(_ context.Context, _ string, _ ...any) ([]chunk.Row, error) { diff --git a/pkg/ttl/ttlworker/session_test.go b/pkg/ttl/ttlworker/session_test.go index 081cc04b48dd2..8bbb4339f20bf 100644 --- a/pkg/ttl/ttlworker/session_test.go +++ b/pkg/ttl/ttlworker/session_test.go @@ -116,18 +116,28 @@ type mockSessionPool struct { t *testing.T se *mockSession lastSession *mockSession + inuse atomic.Int64 } func (p *mockSessionPool) Get() (pools.Resource, error) { se := *(p.se) p.lastSession = &se + p.lastSession.pool = p + p.inuse.Add(1) return p.lastSession, nil } -func (p *mockSessionPool) Put(pools.Resource) {} +func (p *mockSessionPool) Put(pools.Resource) { + p.inuse.Add(-1) +} + +func (p *mockSessionPool) AssertNoSessionInUse() { + require.Equal(p.t, int64(0), p.inuse.Load()) +} func newMockSessionPool(t *testing.T, tbl ...*cache.PhysicalTable) *mockSessionPool { return &mockSessionPool{ + t: t, se: newMockSession(t, tbl...), } } @@ -145,6 +155,7 @@ type mockSession struct { closed bool commitErr error killed chan struct{} + pool *mockSessionPool } func newMockSession(t *testing.T, tbl ...*cache.PhysicalTable) *mockSession { @@ -214,6 +225,9 @@ func (s *mockSession) KillStmt() { func (s *mockSession) Close() { s.closed = true + if s.pool != nil { + s.pool.Put(s) + } } func (s *mockSession) Now() time.Time { diff --git a/pkg/ttl/ttlworker/timer_sync_test.go b/pkg/ttl/ttlworker/timer_sync_test.go index 18f6cfd4c487b..0d559b1f0ef51 100644 --- a/pkg/ttl/ttlworker/timer_sync_test.go +++ b/pkg/ttl/ttlworker/timer_sync_test.go @@ -45,7 +45,9 @@ func TestTTLManualTriggerOneTimer(t *testing.T) { defer timerStore.Close() var zeroWatermark time.Time cli := timerapi.NewDefaultTimerClient(timerStore) - sync := ttlworker.NewTTLTimerSyncer(do.SysSessionPool(), cli) + pool := wrapPoolForTest(do.SysSessionPool()) + defer pool.AssertNoSessionInUse(t) + sync := ttlworker.NewTTLTimerSyncer(pool, cli) tk.MustExec("set @@global.tidb_ttl_job_enable=0") tk.MustExec("create table tp1(a int, t timestamp) TTL=`t`+interval 1 HOUR ttl_job_interval='3h' partition by range(a) (" + @@ -237,7 +239,9 @@ func TestTTLTimerSync(t *testing.T) { insertTTLTableStatusWatermark(t, do, tk, "test", "tp1", "p1", wm2, true) cli := timerapi.NewDefaultTimerClient(timerStore) - sync := ttlworker.NewTTLTimerSyncer(do.SysSessionPool(), cli) + pool := wrapPoolForTest(do.SysSessionPool()) + defer pool.AssertNoSessionInUse(t) + sync := ttlworker.NewTTLTimerSyncer(pool, cli) lastSyncTime, lastSyncVer := sync.GetLastSyncInfo() require.True(t, lastSyncTime.IsZero())