From 1c3d94c424d8103d00737ffc012e4addb09f5a04 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 30 Oct 2024 11:40:46 +0800 Subject: [PATCH] ttl: fix some memory leak in TTL (#56935) (#56976) close pingcap/tidb#56934 --- pkg/ttl/ttlworker/del.go | 1 + pkg/ttl/ttlworker/del_test.go | 1 + pkg/ttl/ttlworker/job_manager.go | 1 + .../ttlworker/job_manager_integration_test.go | 39 +++++++++++++++++-- pkg/ttl/ttlworker/job_manager_test.go | 1 + pkg/ttl/ttlworker/scan_test.go | 4 ++ pkg/ttl/ttlworker/session_test.go | 16 +++++++- pkg/ttl/ttlworker/timer_sync_test.go | 8 +++- 8 files changed, 65 insertions(+), 6 deletions(-) diff --git a/pkg/ttl/ttlworker/del.go b/pkg/ttl/ttlworker/del.go index 9518350996165..6fc4c93af5068 100644 --- a/pkg/ttl/ttlworker/del.go +++ b/pkg/ttl/ttlworker/del.go @@ -285,6 +285,7 @@ func (w *ttlDeleteWorker) loop() error { if err != nil { return err } + defer se.Close() ctx := metrics.CtxWithPhaseTracer(w.baseWorker.ctx, tracer) diff --git a/pkg/ttl/ttlworker/del_test.go b/pkg/ttl/ttlworker/del_test.go index ba41e468ef90a..87b335b72fa5e 100644 --- a/pkg/ttl/ttlworker/del_test.go +++ b/pkg/ttl/ttlworker/del_test.go @@ -387,6 +387,7 @@ func TestTTLDeleteTaskWorker(t *testing.T) { s := newMockSession(t) pool := newMockSessionPool(t) pool.se = s + defer pool.AssertNoSessionInUse() sqlMap := make(map[string]int) t3Retried := make(chan struct{}) diff --git a/pkg/ttl/ttlworker/job_manager.go b/pkg/ttl/ttlworker/job_manager.go index 81b4f3a52e625..21eac7618f1ba 100644 --- a/pkg/ttl/ttlworker/job_manager.go +++ b/pkg/ttl/ttlworker/job_manager.go @@ -1292,6 +1292,7 @@ func (a *managerJobAdapter) Now() (time.Time, error) { if err != nil { return time.Time{}, err } + defer se.Close() tz, err := se.GlobalTimeZone(context.TODO()) if err != nil { diff --git a/pkg/ttl/ttlworker/job_manager_integration_test.go b/pkg/ttl/ttlworker/job_manager_integration_test.go index 685f5afc4be45..5c39fd8683f04 100644 --- a/pkg/ttl/ttlworker/job_manager_integration_test.go +++ b/pkg/ttl/ttlworker/job_manager_integration_test.go @@ -43,6 +43,7 @@ import ( "github.com/pingcap/tidb/pkg/ttl/metrics" "github.com/pingcap/tidb/pkg/ttl/session" "github.com/pingcap/tidb/pkg/ttl/ttlworker" + "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/logutil" dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/require" @@ -1064,9 +1065,37 @@ func TestDelayMetrics(t *testing.T) { checkRecord(records, "t5", emptyTime) } +type poolTestWrapper struct { + util.SessionPool + inuse atomic.Int64 +} + +func wrapPoolForTest(pool util.SessionPool) *poolTestWrapper { + return &poolTestWrapper{SessionPool: pool} +} + +func (w *poolTestWrapper) Get() (pools.Resource, error) { + r, err := w.SessionPool.Get() + if err == nil { + w.inuse.Add(1) + } + return r, err +} + +func (w *poolTestWrapper) Put(r pools.Resource) { + w.inuse.Add(-1) + w.SessionPool.Put(r) +} + +func (w *poolTestWrapper) AssertNoSessionInUse(t *testing.T) { + require.Zero(t, w.inuse.Load()) +} + func TestManagerJobAdapterCanSubmitJob(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - adapter := ttlworker.NewManagerJobAdapter(store, dom.SysSessionPool(), nil) + pool := wrapPoolForTest(dom.SysSessionPool()) + defer pool.AssertNoSessionInUse(t) + adapter := ttlworker.NewManagerJobAdapter(store, pool, nil) // stop TTLJobManager to avoid unnecessary job schedule and make test stable dom.TTLJobManager().Stop() @@ -1195,7 +1224,9 @@ func TestManagerJobAdapterSubmitJob(t *testing.T) { func TestManagerJobAdapterGetJob(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - adapter := ttlworker.NewManagerJobAdapter(store, dom.SysSessionPool(), nil) + pool := wrapPoolForTest(dom.SysSessionPool()) + defer pool.AssertNoSessionInUse(t) + adapter := ttlworker.NewManagerJobAdapter(store, pool, nil) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -1290,7 +1321,9 @@ func TestManagerJobAdapterGetJob(t *testing.T) { func TestManagerJobAdapterNow(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - adapter := ttlworker.NewManagerJobAdapter(store, dom.SysSessionPool(), nil) + pool := wrapPoolForTest(dom.SysSessionPool()) + defer pool.AssertNoSessionInUse(t) + adapter := ttlworker.NewManagerJobAdapter(store, pool, nil) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") diff --git a/pkg/ttl/ttlworker/job_manager_test.go b/pkg/ttl/ttlworker/job_manager_test.go index f509d26e3dbb1..ca6b84dfaaf62 100644 --- a/pkg/ttl/ttlworker/job_manager_test.go +++ b/pkg/ttl/ttlworker/job_manager_test.go @@ -289,6 +289,7 @@ func TestOnTimerTick(t *testing.T) { now := time.UnixMilli(3600 * 24) syncer := NewTTLTimerSyncer(m.sessPool, timerapi.NewDefaultTimerClient(timerStore)) + defer m.sessPool.(*mockSessionPool).AssertNoSessionInUse() syncer.nowFunc = func() time.Time { return now } diff --git a/pkg/ttl/ttlworker/scan_test.go b/pkg/ttl/ttlworker/scan_test.go index 137e72fe9a662..456efcd7155bc 100644 --- a/pkg/ttl/ttlworker/scan_test.go +++ b/pkg/ttl/ttlworker/scan_test.go @@ -131,6 +131,7 @@ func TestScanWorkerSchedule(t *testing.T) { tbl := newMockTTLTbl(t, "t1") w := NewMockScanWorker(t) + defer w.sessPoll.AssertNoSessionInUse() w.setOneRowResult(tbl, 7) defer w.stopWithWait() @@ -180,6 +181,7 @@ func TestScanWorkerScheduleWithFailedTask(t *testing.T) { tbl := newMockTTLTbl(t, "t1") w := NewMockScanWorker(t) + defer w.sessPoll.AssertNoSessionInUse() w.clearInfoSchema() defer w.stopWithWait() @@ -392,6 +394,7 @@ func (t *mockScanTask) execSQL(_ context.Context, sql string, _ ...any) ([]chunk func TestScanTaskDoScan(t *testing.T) { task := newMockScanTask(t, 3) + defer task.sessPool.AssertNoSessionInUse() task.ctx = cache.SetMockExpireTime(task.ctx, time.Now()) task.sqlRetry[1] = scanTaskExecuteSQLMaxRetry task.runDoScanForTest(3, "") @@ -413,6 +416,7 @@ func TestScanTaskDoScan(t *testing.T) { func TestScanTaskCheck(t *testing.T) { tbl := newMockTTLTbl(t, "t1") pool := newMockSessionPool(t, tbl) + defer pool.AssertNoSessionInUse() pool.se.rows = newMockRows(t, types.NewFieldType(mysql.TypeInt24)).Append(12).Rows() ctx := cache.SetMockExpireTime(context.Background(), time.Unix(100, 0)) diff --git a/pkg/ttl/ttlworker/session_test.go b/pkg/ttl/ttlworker/session_test.go index b9022802b1814..346fba43680fb 100644 --- a/pkg/ttl/ttlworker/session_test.go +++ b/pkg/ttl/ttlworker/session_test.go @@ -126,20 +126,30 @@ type mockSessionPool struct { t *testing.T se *mockSession lastSession *mockSession + inuse atomic.Int64 } func (p *mockSessionPool) Get() (pools.Resource, error) { se := *(p.se) p.lastSession = &se + p.lastSession.pool = p + p.inuse.Add(1) return p.lastSession, nil } -func (p *mockSessionPool) Put(pools.Resource) {} +func (p *mockSessionPool) Put(pools.Resource) { + p.inuse.Add(-1) +} + +func (p *mockSessionPool) AssertNoSessionInUse() { + require.Equal(p.t, int64(0), p.inuse.Load()) +} func (p *mockSessionPool) Close() {} func newMockSessionPool(t *testing.T, tbl ...*cache.PhysicalTable) *mockSessionPool { return &mockSessionPool{ + t: t, se: newMockSession(t, tbl...), } } @@ -155,6 +165,7 @@ type mockSession struct { resetTimeZoneCalls int closed bool commitErr error + pool *mockSessionPool } func newMockSession(t *testing.T, tbl ...*cache.PhysicalTable) *mockSession { @@ -226,6 +237,9 @@ func (s *mockSession) GlobalTimeZone(_ context.Context) (*time.Location, error) func (s *mockSession) Close() { s.closed = true + if s.pool != nil { + s.pool.Put(s) + } } func (s *mockSession) Now() time.Time { diff --git a/pkg/ttl/ttlworker/timer_sync_test.go b/pkg/ttl/ttlworker/timer_sync_test.go index 4d6d2f1f8c7b6..68062ddf39750 100644 --- a/pkg/ttl/ttlworker/timer_sync_test.go +++ b/pkg/ttl/ttlworker/timer_sync_test.go @@ -45,7 +45,9 @@ func TestTTLManualTriggerOneTimer(t *testing.T) { defer timerStore.Close() var zeroWatermark time.Time cli := timerapi.NewDefaultTimerClient(timerStore) - sync := ttlworker.NewTTLTimerSyncer(do.SysSessionPool(), cli) + pool := wrapPoolForTest(do.SysSessionPool()) + defer pool.AssertNoSessionInUse(t) + sync := ttlworker.NewTTLTimerSyncer(pool, cli) tk.MustExec("set @@global.tidb_ttl_job_enable=0") tk.MustExec("create table tp1(a int, t timestamp) TTL=`t`+interval 1 HOUR ttl_job_interval='3h' partition by range(a) (" + @@ -237,7 +239,9 @@ func TestTTLTimerSync(t *testing.T) { insertTTLTableStatusWatermark(t, do, tk, "test", "tp1", "p1", wm2, true) cli := timerapi.NewDefaultTimerClient(timerStore) - sync := ttlworker.NewTTLTimerSyncer(do.SysSessionPool(), cli) + pool := wrapPoolForTest(do.SysSessionPool()) + defer pool.AssertNoSessionInUse(t) + sync := ttlworker.NewTTLTimerSyncer(pool, cli) lastSyncTime, lastSyncVer := sync.GetLastSyncInfo() require.True(t, lastSyncTime.IsZero())