Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into reproduce-lock-no…
Browse files Browse the repository at this point in the history
…t-found
  • Loading branch information
tangenta committed Dec 14, 2022
2 parents ad2f9db + de865c4 commit 985c1a6
Show file tree
Hide file tree
Showing 11 changed files with 184 additions and 41 deletions.
2 changes: 1 addition & 1 deletion br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1177,7 +1177,7 @@ func restoreStream(
return errors.Trace(err)
}
defer func() {
if err = restoreGc(); err != nil {
if err := restoreGc(); err != nil {
log.Error("failed to set gc enabled", zap.Error(err))
}
}()
Expand Down
2 changes: 1 addition & 1 deletion ttl/cache/infoschema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestInfoSchemaCache(t *testing.T) {
conn := server.CreateMockConn(t, sv)
sctx := conn.Context().Session
tk := testkit.NewTestKitWithSession(t, store, sctx)
se := session.NewSession(sctx, sctx, func() {})
se := session.NewSession(sctx, sctx, func(_ session.Session) {})

isc := cache.NewInfoSchemaCache(time.Hour)

Expand Down
2 changes: 1 addition & 1 deletion ttl/cache/ttlstatus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestTTLStatusCache(t *testing.T) {
conn := server.CreateMockConn(t, sv)
sctx := conn.Context().Session
tk := testkit.NewTestKitWithSession(t, store, sctx)
ttlSession := session.NewSession(sctx, tk.Session(), func() {})
ttlSession := session.NewSession(sctx, tk.Session(), func(_ session.Session) {})

isc := cache.NewTableStatusCache(time.Hour)

Expand Down
8 changes: 4 additions & 4 deletions ttl/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ type Session interface {
type session struct {
sessionctx.Context
sqlExec sqlexec.SQLExecutor
closeFn func()
closeFn func(Session)
}

// NewSession creates a new Session
func NewSession(sctx sessionctx.Context, sqlExec sqlexec.SQLExecutor, closeFn func()) Session {
func NewSession(sctx sessionctx.Context, sqlExec sqlexec.SQLExecutor, closeFn func(Session)) Session {
return &session{
Context: sctx,
sqlExec: sqlExec,
Expand Down Expand Up @@ -99,7 +99,7 @@ func (s *session) RunInTxn(ctx context.Context, fn func() error) (err error) {
defer tracer.EnterPhase(tracer.Phase())

tracer.EnterPhase(metrics.PhaseBeginTxn)
if _, err = s.ExecuteSQL(ctx, "BEGIN"); err != nil {
if _, err = s.ExecuteSQL(ctx, "BEGIN OPTIMISTIC"); err != nil {
return err
}
tracer.EnterPhase(metrics.PhaseOther)
Expand Down Expand Up @@ -150,7 +150,7 @@ func (s *session) ResetWithGlobalTimeZone(ctx context.Context) error {
// Close closes the session
func (s *session) Close() {
if s.closeFn != nil {
s.closeFn()
s.closeFn(s)
s.Context = nil
s.sqlExec = nil
s.closeFn = nil
Expand Down
7 changes: 7 additions & 0 deletions ttl/ttlworker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ go_test(
name = "ttlworker_test",
srcs = [
"del_test.go",
"job_manager_integration_test.go",
"job_manager_test.go",
"job_test.go",
"scan_test.go",
Expand All @@ -52,15 +53,21 @@ go_test(
"//parser/ast",
"//parser/model",
"//parser/mysql",
"//session",
"//sessionctx",
"//sessionctx/variable",
"//testkit",
"//ttl/cache",
"//ttl/session",
"//types",
"//util/chunk",
"//util/logutil",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_x_time//rate",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
)
4 changes: 2 additions & 2 deletions ttl/ttlworker/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ func (job *ttlJob) updateState(ctx context.Context, se session.Session) error {
}

// peekScanTask returns the next scan task, but doesn't promote the iterator
func (job *ttlJob) peekScanTask() (*ttlScanTask, error) {
return job.tasks[job.taskIter], nil
func (job *ttlJob) peekScanTask() *ttlScanTask {
return job.tasks[job.taskIter]
}

// nextScanTask promotes the iterator
Expand Down
57 changes: 32 additions & 25 deletions ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
)

const insertNewTableIntoStatusTemplate = "INSERT INTO mysql.tidb_ttl_table_status (table_id,parent_table_id) VALUES (%d, %d)"
const setTableStatusOwnerTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_id = UUID(), current_job_owner_id = '%s',current_job_start_time = '%s',current_job_status = 'waiting',current_job_status_update_time = '%s',current_job_ttl_expire = '%s',current_job_owner_hb_time = '%s' WHERE (current_job_owner_id IS NULL OR current_job_owner_hb_time < '%s') AND table_id = %d"
const setTableStatusOwnerTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_id = UUID(), current_job_owner_id = '%s',current_job_start_time = '%s',current_job_status = 'waiting',current_job_status_update_time = '%s',current_job_ttl_expire = '%s',current_job_owner_hb_time = '%s' WHERE table_id = %d"
const updateHeartBeatTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_owner_hb_time = '%s' WHERE table_id = %d AND current_job_owner_id = '%s'"

const timeFormat = "2006-01-02 15:04:05"
Expand All @@ -41,8 +41,8 @@ func insertNewTableIntoStatusSQL(tableID int64, parentTableID int64) string {
return fmt.Sprintf(insertNewTableIntoStatusTemplate, tableID, parentTableID)
}

func setTableStatusOwnerSQL(tableID int64, now time.Time, currentJobTTLExpire time.Time, maxHBTime time.Time, id string) string {
return fmt.Sprintf(setTableStatusOwnerTemplate, id, now.Format(timeFormat), now.Format(timeFormat), currentJobTTLExpire.Format(timeFormat), now.Format(timeFormat), maxHBTime.Format(timeFormat), tableID)
func setTableStatusOwnerSQL(tableID int64, now time.Time, currentJobTTLExpire time.Time, id string) string {
return fmt.Sprintf(setTableStatusOwnerTemplate, id, now.Format(timeFormat), now.Format(timeFormat), currentJobTTLExpire.Format(timeFormat), now.Format(timeFormat), tableID)
}

func updateHeartBeatSQL(tableID int64, now time.Time, id string) string {
Expand Down Expand Up @@ -271,12 +271,17 @@ func (m *JobManager) resizeWorkers(workers []worker, count int, factory func() w
func (m *JobManager) updateTaskState() bool {
results := m.pollScanWorkerResults()
for _, result := range results {
logger := logutil.Logger(m.ctx).With(zap.Int64("tableID", result.task.tbl.ID))
if result.err != nil {
logger = logger.With(zap.Error(result.err))
}

job := findJobWithTableID(m.runningJobs, result.task.tbl.ID)
if job == nil {
logutil.Logger(m.ctx).Warn("task state changed but job not found", zap.Int64("tableID", result.task.tbl.ID))
logger.Warn("task state changed but job not found", zap.Int64("tableID", result.task.tbl.ID))
continue
}
logutil.Logger(m.ctx).Debug("scan task finished", zap.String("jobID", job.id))
logger.Info("scan task finished", zap.String("jobID", job.id))

job.finishedScanTaskCounter += 1
job.scanTaskErr = multierr.Append(job.scanTaskErr, result.err)
Expand All @@ -303,7 +308,11 @@ func (m *JobManager) checkNotOwnJob() {
for _, job := range m.runningJobs {
tableStatus := m.tableStatusCache.Tables[job.tbl.ID]
if tableStatus == nil || tableStatus.CurrentJobOwnerID != m.id {
logutil.Logger(m.ctx).Info("job has been taken over by another node", zap.String("jobID", job.id), zap.String("statistics", job.statistics.String()))
logger := logutil.Logger(m.ctx).With(zap.String("jobID", job.id), zap.String("statistics", job.statistics.String()))
if tableStatus != nil {
logger.With(zap.String("newJobOwnerID", tableStatus.CurrentJobOwnerID))
}
logger.Info("job has been taken over by another node")
m.removeJob(job)
job.cancel()
}
Expand Down Expand Up @@ -357,9 +366,10 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) {
case len(newJobTables) > 0:
table := newJobTables[0]
newJobTables = newJobTables[1:]
logutil.Logger(m.ctx).Debug("try lock new job", zap.Int64("tableID", table.ID))
logutil.Logger(m.ctx).Info("try lock new job", zap.Int64("tableID", table.ID))
job, err = m.lockNewJob(m.ctx, se, table, now)
if job != nil {
logutil.Logger(m.ctx).Info("append new running job", zap.String("jobID", job.id), zap.Int64("tableID", job.tbl.ID))
m.appendJob(job)
}
}
Expand All @@ -371,10 +381,10 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) {
}

for !job.AllSpawned() {
task, err := job.peekScanTask()
if err != nil {
logutil.Logger(m.ctx).Warn("fail to generate scan task", zap.Error(err))
break
task := job.peekScanTask()
logger := logutil.Logger(m.ctx).With(zap.String("jobID", job.id), zap.String("table", task.tbl.TableInfo.Name.L))
if task.tbl.PartitionDef != nil {
logger = logger.With(zap.String("partition", task.tbl.PartitionDef.Name.L))
}

for len(idleScanWorkers) > 0 {
Expand All @@ -383,7 +393,7 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) {

err := idleWorker.Schedule(task)
if err != nil {
logutil.Logger(m.ctx).Info("fail to schedule task", zap.Error(err))
logger.Info("fail to schedule task", zap.Error(err))
continue
}

Expand All @@ -392,16 +402,11 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) {
if err != nil {
// not a big problem, current logic doesn't depend on the job status to promote
// the routine, so we could just print a log here
logutil.Logger(m.ctx).Error("change ttl job status", zap.Error(err), zap.String("id", job.id))
logger.Error("change ttl job status", zap.Error(err), zap.String("id", job.id))
}
cancel()

logArgs := []zap.Field{zap.String("table", task.tbl.TableInfo.Name.L)}
if task.tbl.PartitionDef != nil {
logArgs = append(logArgs, zap.String("partition", task.tbl.PartitionDef.Name.L))
}
logutil.Logger(m.ctx).Debug("schedule ttl task",
logArgs...)
logger.Info("scheduled ttl task")

job.nextScanTask()
break
Expand All @@ -425,14 +430,17 @@ func (m *JobManager) idleScanWorkers() []scanWorker {
}

func (m *JobManager) localJobs() []*ttlJob {
jobs := make([]*ttlJob, 0, len(m.runningJobs))
for _, job := range m.runningJobs {
status := m.tableStatusCache.Tables[job.tbl.ID]
if status == nil || status.CurrentJobOwnerID != m.id {
m.removeJob(job)
// these jobs will be removed in `checkNotOwnJob`
continue
}

jobs = append(jobs, job)
}
return m.runningJobs
return jobs
}

// readyForNewJobTables returns all tables which should spawn a TTL job according to cache
Expand Down Expand Up @@ -491,11 +499,10 @@ func (m *JobManager) couldTrySchedule(table *cache.TableStatus, now time.Time) b
// localJob and return it.
// It could be nil, nil, if the table query doesn't return error but the job has been locked by other instances.
func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *cache.PhysicalTable, now time.Time) (*ttlJob, error) {
maxHBTime := now.Add(-2 * jobManagerLoopTickerInterval)
var expireTime time.Time

err := se.RunInTxn(ctx, func() error {
rows, err := se.ExecuteSQL(ctx, cache.SelectFromTTLTableStatusWithID(table.TableInfo.ID))
rows, err := se.ExecuteSQL(ctx, cache.SelectFromTTLTableStatusWithID(table.ID))
if err != nil {
return err
}
Expand All @@ -505,7 +512,7 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *
if err != nil {
return err
}
rows, err = se.ExecuteSQL(ctx, cache.SelectFromTTLTableStatusWithID(table.TableInfo.ID))
rows, err = se.ExecuteSQL(ctx, cache.SelectFromTTLTableStatusWithID(table.ID))
if err != nil {
return err
}
Expand All @@ -526,7 +533,7 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *
return err
}

_, err = se.ExecuteSQL(ctx, setTableStatusOwnerSQL(table.ID, now, expireTime, maxHBTime, m.id))
_, err = se.ExecuteSQL(ctx, setTableStatusOwnerSQL(table.ID, now, expireTime, m.id))

return err
})
Expand Down
98 changes: 98 additions & 0 deletions ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ttlworker_test

import (
"context"
"fmt"
"sync"
"testing"
"time"

"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
dbsession "github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/ttl/cache"
"github.com/pingcap/tidb/ttl/session"
"github.com/pingcap/tidb/ttl/ttlworker"
"github.com/pingcap/tidb/util/logutil"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"go.uber.org/zap"
)

func TestParallelLockNewJob(t *testing.T) {
store := testkit.CreateMockStore(t)

sessionFactory := func() session.Session {
dbSession, err := dbsession.CreateSession4Test(store)
require.NoError(t, err)
se := session.NewSession(dbSession, dbSession, nil)

_, err = se.ExecuteSQL(context.Background(), "ROLLBACK")
require.NoError(t, err)
_, err = se.ExecuteSQL(context.Background(), "set tidb_retry_limit=0")
require.NoError(t, err)

return se
}

storedTTLJobRunInterval := variable.TTLJobRunInterval.Load()
variable.TTLJobRunInterval.Store(0)
defer func() {
variable.TTLJobRunInterval.Store(storedTTLJobRunInterval)
}()

testTable := &cache.PhysicalTable{ID: 2, TableInfo: &model.TableInfo{ID: 1, TTLInfo: &model.TTLInfo{IntervalExprStr: "1", IntervalTimeUnit: int(ast.TimeUnitDay)}}}
// simply lock a new job
m := ttlworker.NewJobManager("test-id", nil, store)
se := sessionFactory()
job, err := m.LockNewJob(context.Background(), se, testTable, time.Now())
require.NoError(t, err)
job.Finish(se, time.Now())

// lock one table in parallel, only one of them should lock successfully
testTimes := 100
concurrency := 5
for i := 0; i < testTimes; i++ {
successCounter := atomic.NewUint64(0)
successJob := &ttlworker.TTLJob{}

wg := sync.WaitGroup{}
for j := 0; j < concurrency; j++ {
jobManagerID := fmt.Sprintf("test-ttl-manager-%d", j)
wg.Add(1)
go func() {
m := ttlworker.NewJobManager(jobManagerID, nil, store)

se := sessionFactory()
job, err := m.LockNewJob(context.Background(), se, testTable, time.Now())
if err == nil {
successCounter.Add(1)
successJob = job
} else {
logutil.BgLogger().Error("lock new job with error", zap.Error(err))
}
wg.Done()
}()
}
wg.Wait()

require.Equal(t, uint64(1), successCounter.Load())
successJob.Finish(se, time.Now())
}
}
Loading

0 comments on commit 985c1a6

Please sign in to comment.