Skip to content

Commit

Permalink
This is an automated cherry-pick of #56935
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
lcwangchao authored and ti-chi-bot committed Oct 29, 2024
1 parent 4af46a5 commit 1ceacac
Show file tree
Hide file tree
Showing 8 changed files with 2,817 additions and 1 deletion.
1,461 changes: 1,461 additions & 0 deletions pkg/ttl/ttlworker/job_manager_integration_test.go

Large diffs are not rendered by default.

667 changes: 667 additions & 0 deletions pkg/ttl/ttlworker/job_manager_test.go

Large diffs are not rendered by default.

483 changes: 483 additions & 0 deletions pkg/ttl/ttlworker/timer_sync_test.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions ttl/ttlworker/del.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ func (w *ttlDeleteWorker) loop() error {
if err != nil {
return err
}
defer se.Close()

ctx := metrics.CtxWithPhaseTracer(w.baseWorker.ctx, tracer)

Expand Down
1 change: 1 addition & 0 deletions ttl/ttlworker/del_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ func TestTTLDeleteTaskWorker(t *testing.T) {
s := newMockSession(t)
pool := newMockSessionPool(t)
pool.se = s
defer pool.AssertNoSessionInUse()

sqlMap := make(map[string]int)
t3Retried := make(chan struct{})
Expand Down
177 changes: 177 additions & 0 deletions ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -961,3 +961,180 @@ GROUP BY

return records, nil
}
<<<<<<< HEAD:ttl/ttlworker/job_manager.go
=======

// SubmitTTLManagerJobRequest is the request to submit a TTL job to manager
type SubmitTTLManagerJobRequest struct {
// TableID indicates the parent table id
TableID int64
// PhysicalID indicates the physical table id
PhysicalID int64
// RequestID indicates the request id of the job
RequestID string
// RespCh indicates the channel for response
RespCh chan<- error
}

type managerJobAdapter struct {
store kv.Storage
sessPool util.SessionPool
requestCh chan<- *SubmitTTLManagerJobRequest
}

// NewManagerJobAdapter creates a managerJobAdapter
func NewManagerJobAdapter(store kv.Storage, sessPool util.SessionPool, requestCh chan<- *SubmitTTLManagerJobRequest) TTLJobAdapter {
return &managerJobAdapter{store: store, sessPool: sessPool, requestCh: requestCh}
}

func (a *managerJobAdapter) CanSubmitJob(tableID, physicalID int64) bool {
se, err := getSession(a.sessPool)
if err != nil {
terror.Log(err)
return false
}
defer se.Close()

is := se.GetDomainInfoSchema().(infoschema.InfoSchema)
tbl, ok := is.TableByID(context.Background(), tableID)
if !ok {
return false
}

tblInfo := tbl.Meta()
ttlInfo := tblInfo.TTLInfo
if ttlInfo == nil || !ttlInfo.Enable {
return false
}

if physicalID != tableID {
if par := tbl.GetPartitionedTable(); par == nil || par.GetPartition(physicalID) == nil {
return false
}
}

ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Minute)
defer cancel()

selectTasksCntSQL := "select LOW_PRIORITY COUNT(1) FROM mysql.tidb_ttl_task WHERE status IN ('waiting', 'running')"
rs, err := se.ExecuteSQL(ctx, selectTasksCntSQL)
if err == nil && len(rs) == 0 {
err = errors.New("selectTasksCntSQL returns no row")
}

if err != nil {
logutil.BgLogger().Error(
"error to query ttl task count",
zap.Error(err),
zap.Int64("physicalID", physicalID),
zap.Int64("tableID", tableID),
zap.String("SQL", selectTasksCntSQL),
)
return false
}

cnt := rs[0].GetInt64(0)
tasksLimit := getMaxRunningTasksLimit(a.store)
if cnt >= int64(tasksLimit) {
logutil.BgLogger().Warn(
"current TTL tasks count exceeds limit, delay create new job temporarily",
zap.Int64("physicalID", physicalID),
zap.Int64("tableID", tableID),
zap.Int64("count", cnt),
zap.Int("limit", tasksLimit),
)
return false
}

return true
}

func (a *managerJobAdapter) SubmitJob(ctx context.Context, tableID, physicalID int64, requestID string, _ time.Time) (*TTLJobTrace, error) {
respCh := make(chan error, 1)
req := &SubmitTTLManagerJobRequest{
TableID: tableID,
PhysicalID: physicalID,
RequestID: requestID,
RespCh: respCh,
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case a.requestCh <- req:
select {
case <-ctx.Done():
return nil, ctx.Err()
case err := <-respCh:
if err != nil {
return nil, err
}

return &TTLJobTrace{
RequestID: requestID,
Finished: false,
}, nil
}
}
}

func (a *managerJobAdapter) GetJob(ctx context.Context, tableID, physicalID int64, requestID string) (*TTLJobTrace, error) {
se, err := getSession(a.sessPool)
if err != nil {
return nil, err
}
defer se.Close()

rows, err := se.ExecuteSQL(
ctx,
"select summary_text, status from mysql.tidb_ttl_job_history where table_id=%? AND parent_table_id=%? AND job_id=%?",
physicalID, tableID, requestID,
)
if err != nil {
return nil, err
}

if len(rows) == 0 {
return nil, nil
}

jobTrace := TTLJobTrace{
RequestID: requestID,
}

row := rows[0]
if !row.IsNull(0) {
if summaryBytes := row.GetBytes(0); len(summaryBytes) > 0 {
var ttlSummary TTLSummary
if err = json.Unmarshal(summaryBytes, &ttlSummary); err != nil {
return nil, err
}
jobTrace.Summary = &ttlSummary
}
}

if !row.IsNull(1) {
statusText := row.GetString(1)
switch cache.JobStatus(statusText) {
case cache.JobStatusFinished, cache.JobStatusTimeout, cache.JobStatusCancelled:
jobTrace.Finished = true
}
}

return &jobTrace, nil
}

func (a *managerJobAdapter) Now() (time.Time, error) {
se, err := getSession(a.sessPool)
if err != nil {
return time.Time{}, err
}
defer se.Close()

tz, err := se.GlobalTimeZone(context.TODO())
if err != nil {
return time.Time{}, err
}

return se.Now().In(tz), nil
}
>>>>>>> 50d73f80c42 (ttl: fix some memory leak in TTL (#56935)):pkg/ttl/ttlworker/job_manager.go
12 changes: 12 additions & 0 deletions ttl/ttlworker/scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ func TestScanWorkerSchedule(t *testing.T) {

tbl := newMockTTLTbl(t, "t1")
w := NewMockScanWorker(t)
defer w.sessPoll.AssertNoSessionInUse()
w.setOneRowResult(tbl, 7)
defer w.stopWithWait()

Expand Down Expand Up @@ -181,6 +182,7 @@ func TestScanWorkerScheduleWithFailedTask(t *testing.T) {

tbl := newMockTTLTbl(t, "t1")
w := NewMockScanWorker(t)
defer w.sessPoll.AssertNoSessionInUse()
w.clearInfoSchema()
defer w.stopWithWait()

Expand Down Expand Up @@ -393,6 +395,11 @@ func (t *mockScanTask) execSQL(_ context.Context, sql string, _ ...interface{})

func TestScanTaskDoScan(t *testing.T) {
task := newMockScanTask(t, 3)
<<<<<<< HEAD:ttl/ttlworker/scan_test.go
=======
defer task.sessPool.AssertNoSessionInUse()
task.ctx = cache.SetMockExpireTime(task.ctx, time.Now())
>>>>>>> 50d73f80c42 (ttl: fix some memory leak in TTL (#56935)):pkg/ttl/ttlworker/scan_test.go
task.sqlRetry[1] = scanTaskExecuteSQLMaxRetry
task.runDoScanForTest(3, "")

Expand All @@ -413,7 +420,11 @@ func TestScanTaskDoScan(t *testing.T) {
func TestScanTaskCheck(t *testing.T) {
tbl := newMockTTLTbl(t, "t1")
pool := newMockSessionPool(t, tbl)
<<<<<<< HEAD:ttl/ttlworker/scan_test.go
pool.se.evalExpire = time.UnixMilli(100)
=======
defer pool.AssertNoSessionInUse()
>>>>>>> 50d73f80c42 (ttl: fix some memory leak in TTL (#56935)):pkg/ttl/ttlworker/scan_test.go
pool.se.rows = newMockRows(t, types.NewFieldType(mysql.TypeInt24)).Append(12).Rows()

task := &ttlScanTask{
Expand Down Expand Up @@ -460,6 +471,7 @@ func TestScanTaskCancelStmt(t *testing.T) {

testCancel := func(ctx context.Context, doCancel func()) {
mockPool := newMockSessionPool(t)
defer mockPool.AssertNoSessionInUse()
startExec := make(chan struct{})
mockPool.se.sessionInfoSchema = newMockInfoSchema(task.tbl.TableInfo)
mockPool.se.executeSQL = func(_ context.Context, _ string, _ ...any) ([]chunk.Row, error) {
Expand Down
16 changes: 15 additions & 1 deletion ttl/ttlworker/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,18 +112,28 @@ type mockSessionPool struct {
t *testing.T
se *mockSession
lastSession *mockSession
inuse atomic.Int64
}

func (p *mockSessionPool) Get() (pools.Resource, error) {
se := *(p.se)
p.lastSession = &se
p.lastSession.pool = p
p.inuse.Add(1)
return p.lastSession, nil
}

func (p *mockSessionPool) Put(pools.Resource) {}
func (p *mockSessionPool) Put(pools.Resource) {
p.inuse.Add(-1)
}

func (p *mockSessionPool) AssertNoSessionInUse() {
require.Equal(p.t, int64(0), p.inuse.Load())
}

func newMockSessionPool(t *testing.T, tbl ...*cache.PhysicalTable) *mockSessionPool {
return &mockSessionPool{
t: t,
se: newMockSession(t, tbl...),
}
}
Expand All @@ -141,6 +151,7 @@ type mockSession struct {
closed bool
commitErr error
killed chan struct{}
pool *mockSessionPool
}

func newMockSession(t *testing.T, tbl ...*cache.PhysicalTable) *mockSession {
Expand Down Expand Up @@ -210,6 +221,9 @@ func (s *mockSession) KillStmt() {

func (s *mockSession) Close() {
s.closed = true
if s.pool != nil {
s.pool.Put(s)
}
}

func (s *mockSession) Now() time.Time {
Expand Down

0 comments on commit 1ceacac

Please sign in to comment.