Skip to content

Commit

Permalink
This is an automated cherry-pick of #56935
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
lcwangchao authored and ti-chi-bot committed Oct 29, 2024
1 parent f54a797 commit 52368be
Show file tree
Hide file tree
Showing 8 changed files with 273 additions and 5 deletions.
1 change: 1 addition & 0 deletions pkg/ttl/ttlworker/del.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ func (w *ttlDeleteWorker) loop() error {
if err != nil {
return err
}
defer se.Close()

ctx := metrics.CtxWithPhaseTracer(w.baseWorker.ctx, tracer)

Expand Down
1 change: 1 addition & 0 deletions pkg/ttl/ttlworker/del_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ func TestTTLDeleteTaskWorker(t *testing.T) {
s := newMockSession(t)
pool := newMockSessionPool(t)
pool.se = s
defer pool.AssertNoSessionInUse()

sqlMap := make(map[string]int)
t3Retried := make(chan struct{})
Expand Down
18 changes: 18 additions & 0 deletions pkg/ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1259,3 +1259,21 @@ func (a *managerJobAdapter) GetJob(ctx context.Context, tableID, physicalID int6

return &jobTrace, nil
}
<<<<<<< HEAD
=======

func (a *managerJobAdapter) Now() (time.Time, error) {
se, err := getSession(a.sessPool)
if err != nil {
return time.Time{}, err
}
defer se.Close()

tz, err := se.GlobalTimeZone(context.TODO())
if err != nil {
return time.Time{}, err
}

return se.Now().In(tz), nil
}
>>>>>>> 50d73f80c42 (ttl: fix some memory leak in TTL (#56935))
221 changes: 219 additions & 2 deletions pkg/ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/pingcap/tidb/pkg/ttl/metrics"
"github.com/pingcap/tidb/pkg/ttl/session"
"github.com/pingcap/tidb/pkg/ttl/ttlworker"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/logutil"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -1023,9 +1024,37 @@ func TestDelayMetrics(t *testing.T) {
checkRecord(records, "t5", emptyTime)
}

type poolTestWrapper struct {
util.SessionPool
inuse atomic.Int64
}

func wrapPoolForTest(pool util.SessionPool) *poolTestWrapper {
return &poolTestWrapper{SessionPool: pool}
}

func (w *poolTestWrapper) Get() (pools.Resource, error) {
r, err := w.SessionPool.Get()
if err == nil {
w.inuse.Add(1)
}
return r, err
}

func (w *poolTestWrapper) Put(r pools.Resource) {
w.inuse.Add(-1)
w.SessionPool.Put(r)
}

func (w *poolTestWrapper) AssertNoSessionInUse(t *testing.T) {
require.Zero(t, w.inuse.Load())
}

func TestManagerJobAdapterCanSubmitJob(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
adapter := ttlworker.NewManagerJobAdapter(store, dom.SysSessionPool(), nil)
pool := wrapPoolForTest(dom.SysSessionPool())
defer pool.AssertNoSessionInUse(t)
adapter := ttlworker.NewManagerJobAdapter(store, pool, nil)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -1150,7 +1179,9 @@ func TestManagerJobAdapterSubmitJob(t *testing.T) {

func TestManagerJobAdapterGetJob(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
adapter := ttlworker.NewManagerJobAdapter(store, dom.SysSessionPool(), nil)
pool := wrapPoolForTest(dom.SysSessionPool())
defer pool.AssertNoSessionInUse(t)
adapter := ttlworker.NewManagerJobAdapter(store, pool, nil)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -1242,3 +1273,189 @@ func TestManagerJobAdapterGetJob(t *testing.T) {
tk.MustExec("delete from mysql.tidb_ttl_job_history")
}
}
<<<<<<< HEAD
=======

func TestManagerJobAdapterNow(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
pool := wrapPoolForTest(dom.SysSessionPool())
defer pool.AssertNoSessionInUse(t)
adapter := ttlworker.NewManagerJobAdapter(store, pool, nil)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set @@global.time_zone ='Europe/Berlin'")
tk.MustExec("set @@time_zone='Asia/Shanghai'")

now, err := adapter.Now()
require.NoError(t, err)
localNow := time.Now()

require.Equal(t, "Europe/Berlin", now.Location().String())
require.InDelta(t, now.Unix(), localNow.Unix(), 10)
}

func TestFinishAndUpdateOwnerAtSameTime(t *testing.T) {
// Finishing a `TTLJob` will remove all the `TTLTask` of the job, and at the same time
// the `task_manager` may update the owner of the task, which may cause a write conflict.
// This test is to simulate this scenario.
store, dom := testkit.CreateMockStoreAndDomain(t)
waitAndStopTTLManager(t, dom)
tk := testkit.NewTestKit(t, store)

sessionFactory := sessionFactory(t, store)
se := sessionFactory()

tk.MustExec("use test")
tk.MustExec("CREATE TABLE t (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR")
testTable, err := dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t"))
require.NoError(t, err)

now := se.Now().Add(time.Hour * 48)
m := ttlworker.NewJobManager("test-ttl-job-manager", nil, store, nil, nil)
require.NoError(t, m.InfoSchemaCache().Update(se))

job, err := m.LockJob(context.Background(), se, m.InfoSchemaCache().Tables[testTable.Meta().ID], now, uuid.NewString(), false)
require.NoError(t, err)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1"))

continueFinish := make(chan struct{})
doneFinish := make(chan struct{})
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ttl/ttlworker/ttl-before-remove-task-in-finish", func() {
<-continueFinish
})

go func() {
defer close(doneFinish)
finishSe := sessionFactory()
require.NoError(t, job.Finish(finishSe, finishSe.Now(), &ttlworker.TTLSummary{}))
}()

_, err = m.TaskManager().LockScanTask(se, &cache.TTLTask{
ScanID: 0,
JobID: job.ID(),
TableID: testTable.Meta().ID,
}, now)
require.NoError(t, err)
close(continueFinish)
<-doneFinish

tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0"))
}

func TestFinishError(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
waitAndStopTTLManager(t, dom)
tk := testkit.NewTestKit(t, store)

sessionFactory := sessionFactory(t, store)
se := sessionFactory()

tk.MustExec("use test")
tk.MustExec("CREATE TABLE t (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR")
testTable, err := dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t"))
require.NoError(t, err)

errCount := 5
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ttl/ttlworker/ttl-finish", func(err *error) {
errCount -= 1
if errCount > 0 {
*err = errors.New("mock error")
}
})

now := se.Now()

m := ttlworker.NewJobManager("test-ttl-job-manager", nil, store, nil, nil)
require.NoError(t, m.InfoSchemaCache().Update(se))

initializeTest := func() {
errCount = 5
now = now.Add(time.Hour * 48)
job, err := m.LockJob(context.Background(), se, m.InfoSchemaCache().Tables[testTable.Meta().ID], now, uuid.NewString(), false)
require.NoError(t, err)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1"))
task, err := m.TaskManager().LockScanTask(se, &cache.TTLTask{
ScanID: 0,
JobID: job.ID(),
TableID: testTable.Meta().ID,
}, now)
require.NoError(t, err)
task.SetResult(nil)
err = m.TaskManager().ReportTaskFinished(se, now, task)
require.NoError(t, err)
tk.MustQuery("select status from mysql.tidb_ttl_task").Check(testkit.Rows("finished"))
}

// Test the `CheckFinishedJob` can tolerate the `job.finish` error
initializeTest()
for i := 0; i < 4; i++ {
m.CheckFinishedJob(se)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1"))
}
m.CheckFinishedJob(se)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0"))

// Test the `rescheduleJobs` can tolerate the `job.finish` error
// cancel job branch
initializeTest()
variable.EnableTTLJob.Store(false)
t.Cleanup(func() {
variable.EnableTTLJob.Store(true)
})
for i := 0; i < 4; i++ {
m.RescheduleJobs(se, now)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1"))
}
m.RescheduleJobs(se, now)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0"))
variable.EnableTTLJob.Store(true)
// remove table branch
initializeTest()
tk.MustExec("drop table t")
require.NoError(t, m.InfoSchemaCache().Update(se))
for i := 0; i < 4; i++ {
m.RescheduleJobs(se, now)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1"))
}
m.RescheduleJobs(se, now)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0"))
tk.MustExec("CREATE TABLE t (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR")
require.NoError(t, m.InfoSchemaCache().Update(se))
testTable, err = dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t"))
require.NoError(t, err)

// Teset the `updateHeartBeat` can tolerate the `job.finish` error
initializeTest()
for i := 0; i < 4; i++ {
// timeout is 6h
now = now.Add(time.Hour * 8)
m.UpdateHeartBeat(context.Background(), se, now)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1"))
}
m.UpdateHeartBeat(context.Background(), se, now)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0"))
}

func boostJobScheduleForTest(t *testing.T) func() {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/check-job-triggered-interval", fmt.Sprintf("return(%d)", 100*time.Millisecond)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/sync-timer", fmt.Sprintf("return(%d)", 100*time.Millisecond)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/check-job-interval", fmt.Sprintf("return(%d)", 100*time.Millisecond)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/resize-workers-interval", fmt.Sprintf("return(%d)", 100*time.Millisecond)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/update-status-table-cache-interval", fmt.Sprintf("return(%d)", 100*time.Millisecond)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/update-info-schema-cache-interval", fmt.Sprintf("return(%d)", 100*time.Millisecond)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/task-manager-loop-interval", fmt.Sprintf("return(%d)", 100*time.Millisecond)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/check-task-interval", fmt.Sprintf("return(%d)", 100*time.Millisecond)))

return func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/check-job-triggered-interval"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/sync-timer"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/check-job-interval"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/resize-workers-interval"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/update-status-table-cache-interval"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/update-info-schema-cache-interval"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/task-manager-loop-interval"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/check-task-interval"))
}
}
>>>>>>> 50d73f80c42 (ttl: fix some memory leak in TTL (#56935))
1 change: 1 addition & 0 deletions pkg/ttl/ttlworker/job_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ func TestOnTimerTick(t *testing.T) {

now := time.UnixMilli(3600 * 24)
syncer := NewTTLTimerSyncer(m.sessPool, timerapi.NewDefaultTimerClient(timerStore))
defer m.sessPool.(*mockSessionPool).AssertNoSessionInUse()
syncer.nowFunc = func() time.Time {
return now
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/ttl/ttlworker/scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ func TestScanWorkerSchedule(t *testing.T) {

tbl := newMockTTLTbl(t, "t1")
w := NewMockScanWorker(t)
defer w.sessPoll.AssertNoSessionInUse()
w.setOneRowResult(tbl, 7)
defer w.stopWithWait()

Expand Down Expand Up @@ -181,6 +182,7 @@ func TestScanWorkerScheduleWithFailedTask(t *testing.T) {

tbl := newMockTTLTbl(t, "t1")
w := NewMockScanWorker(t)
defer w.sessPoll.AssertNoSessionInUse()
w.clearInfoSchema()
defer w.stopWithWait()

Expand Down Expand Up @@ -393,6 +395,11 @@ func (t *mockScanTask) execSQL(_ context.Context, sql string, _ ...interface{})

func TestScanTaskDoScan(t *testing.T) {
task := newMockScanTask(t, 3)
<<<<<<< HEAD
=======
defer task.sessPool.AssertNoSessionInUse()
task.ctx = cache.SetMockExpireTime(task.ctx, time.Now())
>>>>>>> 50d73f80c42 (ttl: fix some memory leak in TTL (#56935))
task.sqlRetry[1] = scanTaskExecuteSQLMaxRetry
task.runDoScanForTest(3, "")

Expand All @@ -413,7 +420,11 @@ func TestScanTaskDoScan(t *testing.T) {
func TestScanTaskCheck(t *testing.T) {
tbl := newMockTTLTbl(t, "t1")
pool := newMockSessionPool(t, tbl)
<<<<<<< HEAD
pool.se.evalExpire = time.UnixMilli(100)
=======
defer pool.AssertNoSessionInUse()
>>>>>>> 50d73f80c42 (ttl: fix some memory leak in TTL (#56935))
pool.se.rows = newMockRows(t, types.NewFieldType(mysql.TypeInt24)).Append(12).Rows()

task := &ttlScanTask{
Expand Down Expand Up @@ -460,6 +471,7 @@ func TestScanTaskCancelStmt(t *testing.T) {

testCancel := func(ctx context.Context, doCancel func()) {
mockPool := newMockSessionPool(t)
defer mockPool.AssertNoSessionInUse()
startExec := make(chan struct{})
mockPool.se.sessionInfoSchema = newMockInfoSchema(task.tbl.TableInfo)
mockPool.se.executeSQL = func(_ context.Context, _ string, _ ...any) ([]chunk.Row, error) {
Expand Down
16 changes: 15 additions & 1 deletion pkg/ttl/ttlworker/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,18 +116,28 @@ type mockSessionPool struct {
t *testing.T
se *mockSession
lastSession *mockSession
inuse atomic.Int64
}

func (p *mockSessionPool) Get() (pools.Resource, error) {
se := *(p.se)
p.lastSession = &se
p.lastSession.pool = p
p.inuse.Add(1)
return p.lastSession, nil
}

func (p *mockSessionPool) Put(pools.Resource) {}
func (p *mockSessionPool) Put(pools.Resource) {
p.inuse.Add(-1)
}

func (p *mockSessionPool) AssertNoSessionInUse() {
require.Equal(p.t, int64(0), p.inuse.Load())
}

func newMockSessionPool(t *testing.T, tbl ...*cache.PhysicalTable) *mockSessionPool {
return &mockSessionPool{
t: t,
se: newMockSession(t, tbl...),
}
}
Expand All @@ -145,6 +155,7 @@ type mockSession struct {
closed bool
commitErr error
killed chan struct{}
pool *mockSessionPool
}

func newMockSession(t *testing.T, tbl ...*cache.PhysicalTable) *mockSession {
Expand Down Expand Up @@ -214,6 +225,9 @@ func (s *mockSession) KillStmt() {

func (s *mockSession) Close() {
s.closed = true
if s.pool != nil {
s.pool.Put(s)
}
}

func (s *mockSession) Now() time.Time {
Expand Down
Loading

0 comments on commit 52368be

Please sign in to comment.