From afe3d77257b4dcc285cc0d9b7c264a365fbc9b5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E8=B6=85?= Date: Wed, 30 Oct 2024 12:15:57 +0800 Subject: [PATCH] ttl: fix some memory leak in TTL (#56935) (#56977) close pingcap/tidb#56934 --- pkg/ttl/ttlworker/del.go | 1 + pkg/ttl/ttlworker/del_test.go | 1 + pkg/ttl/ttlworker/job_manager.go | 1 + .../ttlworker/job_manager_integration_test.go | 43 +++++++++++++++++-- pkg/ttl/ttlworker/job_manager_test.go | 1 + pkg/ttl/ttlworker/scan_test.go | 4 ++ pkg/ttl/ttlworker/session_test.go | 16 ++++++- pkg/ttl/ttlworker/timer_sync_test.go | 8 +++- 8 files changed, 69 insertions(+), 6 deletions(-) diff --git a/pkg/ttl/ttlworker/del.go b/pkg/ttl/ttlworker/del.go index c0f67b854beb3..95f76121fcdca 100644 --- a/pkg/ttl/ttlworker/del.go +++ b/pkg/ttl/ttlworker/del.go @@ -266,6 +266,7 @@ func (w *ttlDeleteWorker) loop() error { if err != nil { return err } + defer se.Close() ctx := metrics.CtxWithPhaseTracer(w.baseWorker.ctx, tracer) diff --git a/pkg/ttl/ttlworker/del_test.go b/pkg/ttl/ttlworker/del_test.go index ee912ff9aca95..8d0a4c7038e5a 100644 --- a/pkg/ttl/ttlworker/del_test.go +++ b/pkg/ttl/ttlworker/del_test.go @@ -320,6 +320,7 @@ func TestTTLDeleteTaskWorker(t *testing.T) { s := newMockSession(t) pool := newMockSessionPool(t) pool.se = s + defer pool.AssertNoSessionInUse() sqlMap := make(map[string]struct{}) s.executeSQL = func(ctx context.Context, sql string, args ...any) ([]chunk.Row, error) { diff --git a/pkg/ttl/ttlworker/job_manager.go b/pkg/ttl/ttlworker/job_manager.go index 59f2bab360300..b07edc45af7f8 100644 --- a/pkg/ttl/ttlworker/job_manager.go +++ b/pkg/ttl/ttlworker/job_manager.go @@ -1284,6 +1284,7 @@ func (a *managerJobAdapter) Now() (time.Time, error) { if err != nil { return time.Time{}, err } + defer se.Close() tz, err := se.GlobalTimeZone(context.TODO()) if err != nil { diff --git a/pkg/ttl/ttlworker/job_manager_integration_test.go b/pkg/ttl/ttlworker/job_manager_integration_test.go index 06d21fe2e4de2..7061123126627 100644 --- a/pkg/ttl/ttlworker/job_manager_integration_test.go +++ b/pkg/ttl/ttlworker/job_manager_integration_test.go @@ -1063,9 +1063,42 @@ func TestDelayMetrics(t *testing.T) { checkRecord(records, "t5", emptyTime) } +type sessionPool interface { + Get() (pools.Resource, error) + Put(pools.Resource) +} + +type poolTestWrapper struct { + sessionPool + inuse atomic.Int64 +} + +func wrapPoolForTest(pool sessionPool) *poolTestWrapper { + return &poolTestWrapper{sessionPool: pool} +} + +func (w *poolTestWrapper) Get() (pools.Resource, error) { + r, err := w.sessionPool.Get() + if err == nil { + w.inuse.Add(1) + } + return r, err +} + +func (w *poolTestWrapper) Put(r pools.Resource) { + w.inuse.Add(-1) + w.sessionPool.Put(r) +} + +func (w *poolTestWrapper) AssertNoSessionInUse(t *testing.T) { + require.Zero(t, w.inuse.Load()) +} + func TestManagerJobAdapterCanSubmitJob(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - adapter := ttlworker.NewManagerJobAdapter(store, dom.SysSessionPool(), nil) + pool := wrapPoolForTest(dom.SysSessionPool()) + defer pool.AssertNoSessionInUse(t) + adapter := ttlworker.NewManagerJobAdapter(store, pool, nil) // stop TTLJobManager to avoid unnecessary job schedule and make test stable dom.TTLJobManager().Stop() @@ -1194,7 +1227,9 @@ func TestManagerJobAdapterSubmitJob(t *testing.T) { func TestManagerJobAdapterGetJob(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - adapter := ttlworker.NewManagerJobAdapter(store, dom.SysSessionPool(), nil) + pool := wrapPoolForTest(dom.SysSessionPool()) + defer pool.AssertNoSessionInUse(t) + adapter := ttlworker.NewManagerJobAdapter(store, pool, nil) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -1289,7 +1324,9 @@ func TestManagerJobAdapterGetJob(t *testing.T) { func TestManagerJobAdapterNow(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - adapter := ttlworker.NewManagerJobAdapter(store, dom.SysSessionPool(), nil) + pool := wrapPoolForTest(dom.SysSessionPool()) + defer pool.AssertNoSessionInUse(t) + adapter := ttlworker.NewManagerJobAdapter(store, pool, nil) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") diff --git a/pkg/ttl/ttlworker/job_manager_test.go b/pkg/ttl/ttlworker/job_manager_test.go index 61c74683602d9..2b4b1ae93c873 100644 --- a/pkg/ttl/ttlworker/job_manager_test.go +++ b/pkg/ttl/ttlworker/job_manager_test.go @@ -291,6 +291,7 @@ func TestOnTimerTick(t *testing.T) { now := time.UnixMilli(3600 * 24) syncer := NewTTLTimerSyncer(m.sessPool, timerapi.NewDefaultTimerClient(timerStore)) + defer m.sessPool.(*mockSessionPool).AssertNoSessionInUse() syncer.nowFunc = func() time.Time { return now } diff --git a/pkg/ttl/ttlworker/scan_test.go b/pkg/ttl/ttlworker/scan_test.go index 137e72fe9a662..456efcd7155bc 100644 --- a/pkg/ttl/ttlworker/scan_test.go +++ b/pkg/ttl/ttlworker/scan_test.go @@ -131,6 +131,7 @@ func TestScanWorkerSchedule(t *testing.T) { tbl := newMockTTLTbl(t, "t1") w := NewMockScanWorker(t) + defer w.sessPoll.AssertNoSessionInUse() w.setOneRowResult(tbl, 7) defer w.stopWithWait() @@ -180,6 +181,7 @@ func TestScanWorkerScheduleWithFailedTask(t *testing.T) { tbl := newMockTTLTbl(t, "t1") w := NewMockScanWorker(t) + defer w.sessPoll.AssertNoSessionInUse() w.clearInfoSchema() defer w.stopWithWait() @@ -392,6 +394,7 @@ func (t *mockScanTask) execSQL(_ context.Context, sql string, _ ...any) ([]chunk func TestScanTaskDoScan(t *testing.T) { task := newMockScanTask(t, 3) + defer task.sessPool.AssertNoSessionInUse() task.ctx = cache.SetMockExpireTime(task.ctx, time.Now()) task.sqlRetry[1] = scanTaskExecuteSQLMaxRetry task.runDoScanForTest(3, "") @@ -413,6 +416,7 @@ func TestScanTaskDoScan(t *testing.T) { func TestScanTaskCheck(t *testing.T) { tbl := newMockTTLTbl(t, "t1") pool := newMockSessionPool(t, tbl) + defer pool.AssertNoSessionInUse() pool.se.rows = newMockRows(t, types.NewFieldType(mysql.TypeInt24)).Append(12).Rows() ctx := cache.SetMockExpireTime(context.Background(), time.Unix(100, 0)) diff --git a/pkg/ttl/ttlworker/session_test.go b/pkg/ttl/ttlworker/session_test.go index 56c018005c6c0..6d17709d7b8db 100644 --- a/pkg/ttl/ttlworker/session_test.go +++ b/pkg/ttl/ttlworker/session_test.go @@ -125,18 +125,28 @@ type mockSessionPool struct { t *testing.T se *mockSession lastSession *mockSession + inuse atomic.Int64 } func (p *mockSessionPool) Get() (pools.Resource, error) { se := *(p.se) p.lastSession = &se + p.lastSession.pool = p + p.inuse.Add(1) return p.lastSession, nil } -func (p *mockSessionPool) Put(pools.Resource) {} +func (p *mockSessionPool) Put(pools.Resource) { + p.inuse.Add(-1) +} + +func (p *mockSessionPool) AssertNoSessionInUse() { + require.Equal(p.t, int64(0), p.inuse.Load()) +} func newMockSessionPool(t *testing.T, tbl ...*cache.PhysicalTable) *mockSessionPool { return &mockSessionPool{ + t: t, se: newMockSession(t, tbl...), } } @@ -152,6 +162,7 @@ type mockSession struct { resetTimeZoneCalls int closed bool commitErr error + pool *mockSessionPool } func newMockSession(t *testing.T, tbl ...*cache.PhysicalTable) *mockSession { @@ -223,6 +234,9 @@ func (s *mockSession) GlobalTimeZone(_ context.Context) (*time.Location, error) func (s *mockSession) Close() { s.closed = true + if s.pool != nil { + s.pool.Put(s) + } } func (s *mockSession) Now() time.Time { diff --git a/pkg/ttl/ttlworker/timer_sync_test.go b/pkg/ttl/ttlworker/timer_sync_test.go index 18f6cfd4c487b..0d559b1f0ef51 100644 --- a/pkg/ttl/ttlworker/timer_sync_test.go +++ b/pkg/ttl/ttlworker/timer_sync_test.go @@ -45,7 +45,9 @@ func TestTTLManualTriggerOneTimer(t *testing.T) { defer timerStore.Close() var zeroWatermark time.Time cli := timerapi.NewDefaultTimerClient(timerStore) - sync := ttlworker.NewTTLTimerSyncer(do.SysSessionPool(), cli) + pool := wrapPoolForTest(do.SysSessionPool()) + defer pool.AssertNoSessionInUse(t) + sync := ttlworker.NewTTLTimerSyncer(pool, cli) tk.MustExec("set @@global.tidb_ttl_job_enable=0") tk.MustExec("create table tp1(a int, t timestamp) TTL=`t`+interval 1 HOUR ttl_job_interval='3h' partition by range(a) (" + @@ -237,7 +239,9 @@ func TestTTLTimerSync(t *testing.T) { insertTTLTableStatusWatermark(t, do, tk, "test", "tp1", "p1", wm2, true) cli := timerapi.NewDefaultTimerClient(timerStore) - sync := ttlworker.NewTTLTimerSyncer(do.SysSessionPool(), cli) + pool := wrapPoolForTest(do.SysSessionPool()) + defer pool.AssertNoSessionInUse(t) + sync := ttlworker.NewTTLTimerSyncer(pool, cli) lastSyncTime, lastSyncVer := sync.GetLastSyncInfo() require.True(t, lastSyncTime.IsZero())