Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ttl: fix some memory leak in TTL (#56935) #56951

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ttl/ttlworker/del.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ func (w *ttlDeleteWorker) loop() error {
if err != nil {
return err
}
defer se.Close()

ctx := metrics.CtxWithPhaseTracer(w.baseWorker.ctx, tracer)

Expand Down
1 change: 1 addition & 0 deletions pkg/ttl/ttlworker/del_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ func TestTTLDeleteTaskWorker(t *testing.T) {
s := newMockSession(t)
pool := newMockSessionPool(t)
pool.se = s
defer pool.AssertNoSessionInUse()

sqlMap := make(map[string]int)
t3Retried := make(chan struct{})
Expand Down
18 changes: 18 additions & 0 deletions pkg/ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1259,3 +1259,21 @@ func (a *managerJobAdapter) GetJob(ctx context.Context, tableID, physicalID int6

return &jobTrace, nil
}
<<<<<<< HEAD
=======

func (a *managerJobAdapter) Now() (time.Time, error) {
se, err := getSession(a.sessPool)
if err != nil {
return time.Time{}, err
}
defer se.Close()

tz, err := se.GlobalTimeZone(context.TODO())
if err != nil {
return time.Time{}, err
}

return se.Now().In(tz), nil
}
>>>>>>> 50d73f80c42 (ttl: fix some memory leak in TTL (#56935))
221 changes: 219 additions & 2 deletions pkg/ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/pingcap/tidb/pkg/ttl/metrics"
"github.com/pingcap/tidb/pkg/ttl/session"
"github.com/pingcap/tidb/pkg/ttl/ttlworker"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/logutil"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -1023,9 +1024,37 @@ func TestDelayMetrics(t *testing.T) {
checkRecord(records, "t5", emptyTime)
}

type poolTestWrapper struct {
util.SessionPool
inuse atomic.Int64
}

func wrapPoolForTest(pool util.SessionPool) *poolTestWrapper {
return &poolTestWrapper{SessionPool: pool}
}

func (w *poolTestWrapper) Get() (pools.Resource, error) {
r, err := w.SessionPool.Get()
if err == nil {
w.inuse.Add(1)
}
return r, err
}

func (w *poolTestWrapper) Put(r pools.Resource) {
w.inuse.Add(-1)
w.SessionPool.Put(r)
}

func (w *poolTestWrapper) AssertNoSessionInUse(t *testing.T) {
require.Zero(t, w.inuse.Load())
}

func TestManagerJobAdapterCanSubmitJob(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
adapter := ttlworker.NewManagerJobAdapter(store, dom.SysSessionPool(), nil)
pool := wrapPoolForTest(dom.SysSessionPool())
defer pool.AssertNoSessionInUse(t)
adapter := ttlworker.NewManagerJobAdapter(store, pool, nil)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -1150,7 +1179,9 @@ func TestManagerJobAdapterSubmitJob(t *testing.T) {

func TestManagerJobAdapterGetJob(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
adapter := ttlworker.NewManagerJobAdapter(store, dom.SysSessionPool(), nil)
pool := wrapPoolForTest(dom.SysSessionPool())
defer pool.AssertNoSessionInUse(t)
adapter := ttlworker.NewManagerJobAdapter(store, pool, nil)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -1242,3 +1273,189 @@ func TestManagerJobAdapterGetJob(t *testing.T) {
tk.MustExec("delete from mysql.tidb_ttl_job_history")
}
}
<<<<<<< HEAD
=======

func TestManagerJobAdapterNow(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
pool := wrapPoolForTest(dom.SysSessionPool())
defer pool.AssertNoSessionInUse(t)
adapter := ttlworker.NewManagerJobAdapter(store, pool, nil)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set @@global.time_zone ='Europe/Berlin'")
tk.MustExec("set @@time_zone='Asia/Shanghai'")

now, err := adapter.Now()
require.NoError(t, err)
localNow := time.Now()

require.Equal(t, "Europe/Berlin", now.Location().String())
require.InDelta(t, now.Unix(), localNow.Unix(), 10)
}

func TestFinishAndUpdateOwnerAtSameTime(t *testing.T) {
// Finishing a `TTLJob` will remove all the `TTLTask` of the job, and at the same time
// the `task_manager` may update the owner of the task, which may cause a write conflict.
// This test is to simulate this scenario.
store, dom := testkit.CreateMockStoreAndDomain(t)
waitAndStopTTLManager(t, dom)
tk := testkit.NewTestKit(t, store)

sessionFactory := sessionFactory(t, store)
se := sessionFactory()

tk.MustExec("use test")
tk.MustExec("CREATE TABLE t (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR")
testTable, err := dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t"))
require.NoError(t, err)

now := se.Now().Add(time.Hour * 48)
m := ttlworker.NewJobManager("test-ttl-job-manager", nil, store, nil, nil)
require.NoError(t, m.InfoSchemaCache().Update(se))

job, err := m.LockJob(context.Background(), se, m.InfoSchemaCache().Tables[testTable.Meta().ID], now, uuid.NewString(), false)
require.NoError(t, err)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1"))

continueFinish := make(chan struct{})
doneFinish := make(chan struct{})
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ttl/ttlworker/ttl-before-remove-task-in-finish", func() {
<-continueFinish
})

go func() {
defer close(doneFinish)
finishSe := sessionFactory()
require.NoError(t, job.Finish(finishSe, finishSe.Now(), &ttlworker.TTLSummary{}))
}()

_, err = m.TaskManager().LockScanTask(se, &cache.TTLTask{
ScanID: 0,
JobID: job.ID(),
TableID: testTable.Meta().ID,
}, now)
require.NoError(t, err)
close(continueFinish)
<-doneFinish

tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0"))
}

func TestFinishError(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
waitAndStopTTLManager(t, dom)
tk := testkit.NewTestKit(t, store)

sessionFactory := sessionFactory(t, store)
se := sessionFactory()

tk.MustExec("use test")
tk.MustExec("CREATE TABLE t (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR")
testTable, err := dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t"))
require.NoError(t, err)

errCount := 5
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ttl/ttlworker/ttl-finish", func(err *error) {
errCount -= 1
if errCount > 0 {
*err = errors.New("mock error")
}
})

now := se.Now()

m := ttlworker.NewJobManager("test-ttl-job-manager", nil, store, nil, nil)
require.NoError(t, m.InfoSchemaCache().Update(se))

initializeTest := func() {
errCount = 5
now = now.Add(time.Hour * 48)
job, err := m.LockJob(context.Background(), se, m.InfoSchemaCache().Tables[testTable.Meta().ID], now, uuid.NewString(), false)
require.NoError(t, err)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1"))
task, err := m.TaskManager().LockScanTask(se, &cache.TTLTask{
ScanID: 0,
JobID: job.ID(),
TableID: testTable.Meta().ID,
}, now)
require.NoError(t, err)
task.SetResult(nil)
err = m.TaskManager().ReportTaskFinished(se, now, task)
require.NoError(t, err)
tk.MustQuery("select status from mysql.tidb_ttl_task").Check(testkit.Rows("finished"))
}

// Test the `CheckFinishedJob` can tolerate the `job.finish` error
initializeTest()
for i := 0; i < 4; i++ {
m.CheckFinishedJob(se)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1"))
}
m.CheckFinishedJob(se)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0"))

// Test the `rescheduleJobs` can tolerate the `job.finish` error
// cancel job branch
initializeTest()
variable.EnableTTLJob.Store(false)
t.Cleanup(func() {
variable.EnableTTLJob.Store(true)
})
for i := 0; i < 4; i++ {
m.RescheduleJobs(se, now)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1"))
}
m.RescheduleJobs(se, now)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0"))
variable.EnableTTLJob.Store(true)
// remove table branch
initializeTest()
tk.MustExec("drop table t")
require.NoError(t, m.InfoSchemaCache().Update(se))
for i := 0; i < 4; i++ {
m.RescheduleJobs(se, now)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1"))
}
m.RescheduleJobs(se, now)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0"))
tk.MustExec("CREATE TABLE t (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR")
require.NoError(t, m.InfoSchemaCache().Update(se))
testTable, err = dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t"))
require.NoError(t, err)

// Teset the `updateHeartBeat` can tolerate the `job.finish` error
initializeTest()
for i := 0; i < 4; i++ {
// timeout is 6h
now = now.Add(time.Hour * 8)
m.UpdateHeartBeat(context.Background(), se, now)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1"))
}
m.UpdateHeartBeat(context.Background(), se, now)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0"))
}

func boostJobScheduleForTest(t *testing.T) func() {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/check-job-triggered-interval", fmt.Sprintf("return(%d)", 100*time.Millisecond)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/sync-timer", fmt.Sprintf("return(%d)", 100*time.Millisecond)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/check-job-interval", fmt.Sprintf("return(%d)", 100*time.Millisecond)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/resize-workers-interval", fmt.Sprintf("return(%d)", 100*time.Millisecond)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/update-status-table-cache-interval", fmt.Sprintf("return(%d)", 100*time.Millisecond)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/update-info-schema-cache-interval", fmt.Sprintf("return(%d)", 100*time.Millisecond)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/task-manager-loop-interval", fmt.Sprintf("return(%d)", 100*time.Millisecond)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/check-task-interval", fmt.Sprintf("return(%d)", 100*time.Millisecond)))

return func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/check-job-triggered-interval"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/sync-timer"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/check-job-interval"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/resize-workers-interval"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/update-status-table-cache-interval"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/update-info-schema-cache-interval"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/task-manager-loop-interval"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/check-task-interval"))
}
}
>>>>>>> 50d73f80c42 (ttl: fix some memory leak in TTL (#56935))
1 change: 1 addition & 0 deletions pkg/ttl/ttlworker/job_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ func TestOnTimerTick(t *testing.T) {

now := time.UnixMilli(3600 * 24)
syncer := NewTTLTimerSyncer(m.sessPool, timerapi.NewDefaultTimerClient(timerStore))
defer m.sessPool.(*mockSessionPool).AssertNoSessionInUse()
syncer.nowFunc = func() time.Time {
return now
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/ttl/ttlworker/scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ func TestScanWorkerSchedule(t *testing.T) {

tbl := newMockTTLTbl(t, "t1")
w := NewMockScanWorker(t)
defer w.sessPoll.AssertNoSessionInUse()
w.setOneRowResult(tbl, 7)
defer w.stopWithWait()

Expand Down Expand Up @@ -181,6 +182,7 @@ func TestScanWorkerScheduleWithFailedTask(t *testing.T) {

tbl := newMockTTLTbl(t, "t1")
w := NewMockScanWorker(t)
defer w.sessPoll.AssertNoSessionInUse()
w.clearInfoSchema()
defer w.stopWithWait()

Expand Down Expand Up @@ -393,6 +395,11 @@ func (t *mockScanTask) execSQL(_ context.Context, sql string, _ ...interface{})

func TestScanTaskDoScan(t *testing.T) {
task := newMockScanTask(t, 3)
<<<<<<< HEAD
=======
defer task.sessPool.AssertNoSessionInUse()
task.ctx = cache.SetMockExpireTime(task.ctx, time.Now())
>>>>>>> 50d73f80c42 (ttl: fix some memory leak in TTL (#56935))
task.sqlRetry[1] = scanTaskExecuteSQLMaxRetry
task.runDoScanForTest(3, "")

Expand All @@ -413,7 +420,11 @@ func TestScanTaskDoScan(t *testing.T) {
func TestScanTaskCheck(t *testing.T) {
tbl := newMockTTLTbl(t, "t1")
pool := newMockSessionPool(t, tbl)
<<<<<<< HEAD
pool.se.evalExpire = time.UnixMilli(100)
=======
defer pool.AssertNoSessionInUse()
>>>>>>> 50d73f80c42 (ttl: fix some memory leak in TTL (#56935))
pool.se.rows = newMockRows(t, types.NewFieldType(mysql.TypeInt24)).Append(12).Rows()

task := &ttlScanTask{
Expand Down Expand Up @@ -460,6 +471,7 @@ func TestScanTaskCancelStmt(t *testing.T) {

testCancel := func(ctx context.Context, doCancel func()) {
mockPool := newMockSessionPool(t)
defer mockPool.AssertNoSessionInUse()
startExec := make(chan struct{})
mockPool.se.sessionInfoSchema = newMockInfoSchema(task.tbl.TableInfo)
mockPool.se.executeSQL = func(_ context.Context, _ string, _ ...any) ([]chunk.Row, error) {
Expand Down
16 changes: 15 additions & 1 deletion pkg/ttl/ttlworker/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,18 +116,28 @@ type mockSessionPool struct {
t *testing.T
se *mockSession
lastSession *mockSession
inuse atomic.Int64
}

func (p *mockSessionPool) Get() (pools.Resource, error) {
se := *(p.se)
p.lastSession = &se
p.lastSession.pool = p
p.inuse.Add(1)
return p.lastSession, nil
}

func (p *mockSessionPool) Put(pools.Resource) {}
func (p *mockSessionPool) Put(pools.Resource) {
p.inuse.Add(-1)
}

func (p *mockSessionPool) AssertNoSessionInUse() {
require.Equal(p.t, int64(0), p.inuse.Load())
}

func newMockSessionPool(t *testing.T, tbl ...*cache.PhysicalTable) *mockSessionPool {
return &mockSessionPool{
t: t,
se: newMockSession(t, tbl...),
}
}
Expand All @@ -145,6 +155,7 @@ type mockSession struct {
closed bool
commitErr error
killed chan struct{}
pool *mockSessionPool
}

func newMockSession(t *testing.T, tbl ...*cache.PhysicalTable) *mockSession {
Expand Down Expand Up @@ -214,6 +225,9 @@ func (s *mockSession) KillStmt() {

func (s *mockSession) Close() {
s.closed = true
if s.pool != nil {
s.pool.Put(s)
}
}

func (s *mockSession) Now() time.Time {
Expand Down
Loading