From 469ec9dc83cde58d0c2ebefb6f33666ce5367d17 Mon Sep 17 00:00:00 2001 From: YangKeao Date: Tue, 28 Feb 2023 05:05:08 -0500 Subject: [PATCH] ttl: limit total count of running tasks across the cluster (#41586) close pingcap/tidb#41585 --- sessionctx/variable/sysvar.go | 10 +++ sessionctx/variable/tidb_vars.go | 5 ++ ttl/ttlworker/BUILD.bazel | 4 + ttl/ttlworker/job_manager.go | 2 +- ttl/ttlworker/task_manager.go | 74 +++++++++++++-- .../task_manager_integration_test.go | 89 +++++++++++++++++-- ttl/ttlworker/task_manager_test.go | 11 ++- 7 files changed, 175 insertions(+), 20 deletions(-) diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 07384b2f69994..3dbf8a3e9c25d 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -2384,6 +2384,16 @@ var defaultSysVars = []*SysVar{ s.EnablePlanCacheForSubquery = TiDBOptOn(val) return nil }}, + {Scope: ScopeGlobal, Name: TiDBTTLRunningTasks, Value: strconv.Itoa(DefTiDBTTLRunningTasks), Type: TypeInt, MinValue: 1, MaxValue: MaxConfigurableConcurrency, AllowAutoValue: true, SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error { + val, err := strconv.ParseInt(s, 10, 64) + if err != nil { + return err + } + TTLRunningTasks.Store(int32(val)) + return nil + }, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) { + return strconv.Itoa(int(TTLRunningTasks.Load())), nil + }}, } // FeedbackProbability points to the FeedbackProbability in statistics package. diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 868ef5a9ed031..2663b56da1bf8 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -941,6 +941,9 @@ const ( TiDBStmtSummaryFileMaxSize = "tidb_stmt_summary_file_max_size" // TiDBStmtSummaryFileMaxBackups indicates the maximum number of files written by stmtsummary. TiDBStmtSummaryFileMaxBackups = "tidb_stmt_summary_file_max_backups" + // TiDBTTLRunningTasks limits the count of running ttl tasks. Default to 0, means 3 times the count of TiKV (or no + // limitation, if the storage is not TiKV). + TiDBTTLRunningTasks = "tidb_ttl_running_tasks" ) // TiDB intentional limits @@ -1194,6 +1197,7 @@ const ( DefTiDBTTLDeleteBatchMaxSize = 10240 DefTiDBTTLDeleteBatchMinSize = 1 DefTiDBTTLDeleteRateLimit = 0 + DefTiDBTTLRunningTasks = -1 DefPasswordReuseHistory = 0 DefPasswordReuseTime = 0 DefTiDBStoreBatchSize = 4 @@ -1285,6 +1289,7 @@ var ( HistoricalStatsDuration = atomic.NewDuration(DefTiDBHistoricalStatsDuration) EnableHistoricalStatsForCapture = atomic.NewBool(DefTiDBEnableHistoricalStatsForCapture) EnableResourceControl = atomic.NewBool(DefTiDBEnableResourceControl) + TTLRunningTasks = atomic.NewInt32(DefTiDBTTLRunningTasks) ) var ( diff --git a/ttl/ttlworker/BUILD.bazel b/ttl/ttlworker/BUILD.bazel index 943f206ec16fe..aa805ad1a1920 100644 --- a/ttl/ttlworker/BUILD.bazel +++ b/ttl/ttlworker/BUILD.bazel @@ -20,6 +20,7 @@ go_library( "//parser/terror", "//sessionctx", "//sessionctx/variable", + "//store/driver/error", "//ttl/cache", "//ttl/client", "//ttl/metrics", @@ -35,6 +36,7 @@ go_library( "@com_github_ngaut_pools//:pools", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", + "@com_github_tikv_client_go_v2//tikv", "@io_etcd_go_etcd_client_v3//:client", "@org_golang_x_time//rate", "@org_uber_go_multierr//:multierr", @@ -69,6 +71,7 @@ go_test( "//sessionctx", "//sessionctx/variable", "//statistics/handle", + "//store/mockstore", "//testkit", "//ttl/cache", "//ttl/client", @@ -83,6 +86,7 @@ go_test( "@com_github_prometheus_client_model//go", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", + "@com_github_tikv_client_go_v2//testutils", "@org_golang_x_time//rate", "@org_uber_go_atomic//:atomic", "@org_uber_go_zap//:zap", diff --git a/ttl/ttlworker/job_manager.go b/ttl/ttlworker/job_manager.go index f9470ead93622..5c0edce652d9f 100644 --- a/ttl/ttlworker/job_manager.go +++ b/ttl/ttlworker/job_manager.go @@ -125,7 +125,7 @@ func NewJobManager(id string, sessPool sessionPool, store kv.Storage, etcdCli *c manager.notificationCli = client.NewMockNotificationClient() } - manager.taskManager = newTaskManager(manager.ctx, sessPool, manager.infoSchemaCache, id) + manager.taskManager = newTaskManager(manager.ctx, sessPool, manager.infoSchemaCache, id, store) return } diff --git a/ttl/ttlworker/task_manager.go b/ttl/ttlworker/task_manager.go index fe5e2333a90b9..780c16d09425a 100644 --- a/ttl/ttlworker/task_manager.go +++ b/ttl/ttlworker/task_manager.go @@ -20,11 +20,14 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx/variable" + storeerr "github.com/pingcap/tidb/store/driver/error" "github.com/pingcap/tidb/ttl/cache" "github.com/pingcap/tidb/ttl/metrics" "github.com/pingcap/tidb/ttl/session" "github.com/pingcap/tidb/util/logutil" + "github.com/tikv/client-go/v2/tikv" "go.uber.org/multierr" "go.uber.org/zap" ) @@ -67,6 +70,11 @@ func updateTTLTaskHeartBeatSQL(jobID string, scanID int64, now time.Time, state return updateTTLTaskHeartBeatTempalte, []interface{}{string(stateStr), now.Format(timeFormat), jobID, scanID}, nil } +const countRunningTasks = "SELECT count(1) FROM mysql.tidb_ttl_task WHERE status = 'running'" + +var errAlreadyScheduled = errors.New("task is already scheduled") +var errTooManyRunningTasks = errors.New("there are too many running tasks") + // taskManager schedules and manages the ttl tasks on this instance type taskManager struct { ctx context.Context @@ -74,6 +82,8 @@ type taskManager struct { id string + store kv.Storage + scanWorkers []worker delWorkers []worker @@ -84,13 +94,15 @@ type taskManager struct { notifyStateCh chan interface{} } -func newTaskManager(ctx context.Context, sessPool sessionPool, infoSchemaCache *cache.InfoSchemaCache, id string) *taskManager { +func newTaskManager(ctx context.Context, sessPool sessionPool, infoSchemaCache *cache.InfoSchemaCache, id string, store kv.Storage) *taskManager { return &taskManager{ ctx: logutil.WithKeyValue(ctx, "ttl-worker", "task-manager"), sessPool: sessPool, id: id, + store: store, + scanWorkers: []worker{}, delWorkers: []worker{}, @@ -269,14 +281,22 @@ func (m *taskManager) rescheduleTasks(se session.Session, now time.Time) { return } +loop: for _, t := range tasks { logger := logutil.Logger(m.ctx).With(zap.String("jobID", t.JobID), zap.Int64("scanID", t.ScanID)) task, err := m.lockScanTask(se, t, now) if err != nil { - // If other nodes lock the task, it will return an error. It's expected - // so the log level is only `info` - logutil.Logger(m.ctx).Info("fail to lock scan task", zap.Error(err)) + switch errors.Cause(err) { + case errAlreadyScheduled: + continue + case errTooManyRunningTasks: + break loop + case storeerr.ErrLockWaitTimeout: + // don't step into the next step to avoid exceeding the limit + break loop + } + logutil.Logger(m.ctx).Warn("fail to lock scan task", zap.Error(err)) continue } @@ -328,12 +348,26 @@ func (m *taskManager) lockScanTask(se session.Session, task *cache.TTLTask, now err := se.RunInTxn(ctx, func() error { var err error + // The `for update` here ensures two things: + // 1. The instance which will update this row has committed, and we can get the newest information. + // 2. There is only one successful transaction concurrently. + // If it's a `for update nowait`, we cannot avoid the situation that multiple transactions fetched the tasks + // at the same time, and the task limitation doesn't work. task, err = m.syncTaskFromTable(se, task.JobID, task.ScanID, true) if err != nil { return err } if task.OwnerID != "" && !task.OwnerHBTime.Add(getTaskManagerHeartBeatExpireInterval()).Before(now) { - return errors.New("task is already scheduled") + return errors.WithStack(errAlreadyScheduled) + } + + // check the count of running task is smaller or equal than the limit + rows, err := se.ExecuteSQL(ctx, countRunningTasks) + if err != nil { + return errors.Wrapf(err, "execute sql: %s", countRunningTasks) + } + if !m.meetTTLRunningTask(int(rows[0].GetInt64(0))) { + return errors.WithStack(errTooManyRunningTasks) } sql, args := setTTLTaskOwnerSQL(task.JobID, task.ScanID, m.id, now) @@ -370,12 +404,12 @@ func (m *taskManager) lockScanTask(se session.Session, task *cache.TTLTask, now }, nil } -func (m *taskManager) syncTaskFromTable(se session.Session, jobID string, scanID int64, detectLock bool) (*cache.TTLTask, error) { +func (m *taskManager) syncTaskFromTable(se session.Session, jobID string, scanID int64, waitLock bool) (*cache.TTLTask, error) { ctx := m.ctx sql, args := cache.SelectFromTTLTaskWithID(jobID, scanID) - if detectLock { - sql += " FOR UPDATE NOWAIT" + if waitLock { + sql += " FOR UPDATE" } rows, err := se.ExecuteSQL(ctx, sql, args...) if err != nil { @@ -510,6 +544,30 @@ func (m *taskManager) reportMetrics() { metrics.DeletingTaskCnt.Set(float64(deletingTaskCnt)) } +func (m *taskManager) meetTTLRunningTask(count int) bool { + ttlRunningTask := variable.TTLRunningTasks.Load() + // `-1` is the auto value, means we should calculate the limit according to the count of TiKV + if ttlRunningTask != -1 { + return int(ttlRunningTask) > count + } + + store, ok := m.store.(tikv.Storage) + if !ok { + return variable.MaxConfigurableConcurrency > count + } + + regionCache := store.GetRegionCache() + if regionCache == nil { + return true + } + limit := len(regionCache.GetAllStores()) + if limit > variable.MaxConfigurableConcurrency { + limit = variable.MaxConfigurableConcurrency + } + + return limit > count +} + type runningScanTask struct { *ttlScanTask cancel func() diff --git a/ttl/ttlworker/task_manager_integration_test.go b/ttl/ttlworker/task_manager_integration_test.go index 19bf25fd3b1d6..30d2742657591 100644 --- a/ttl/ttlworker/task_manager_integration_test.go +++ b/ttl/ttlworker/task_manager_integration_test.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/ttl/cache" "github.com/pingcap/tidb/ttl/metrics" @@ -31,6 +32,7 @@ import ( "github.com/pingcap/tidb/util/logutil" dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/testutils" "go.uber.org/atomic" "go.uber.org/zap" ) @@ -38,6 +40,7 @@ import ( func TestParallelLockNewTask(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_ttl_running_tasks = 1000") ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnTTL) tk.MustExec("create table test.t (id int, created_at datetime) TTL= created_at + interval 1 hour") testTable, err := tk.Session().GetDomainInfoSchema().(infoschema.InfoSchema).TableByName(model.NewCIStr("test"), model.NewCIStr("t")) @@ -50,7 +53,7 @@ func TestParallelLockNewTask(t *testing.T) { isc := cache.NewInfoSchemaCache(time.Minute) require.NoError(t, isc.Update(se)) - m := ttlworker.NewTaskManager(context.Background(), nil, isc, "test-id") + m := ttlworker.NewTaskManager(context.Background(), nil, isc, "test-id", store) // insert and lock a new task sql, args, err := cache.InsertIntoTTLTask(tk.Session(), "test-job", testTable.Meta().ID, 1, nil, nil, now, now) @@ -87,7 +90,7 @@ func TestParallelLockNewTask(t *testing.T) { isc := cache.NewInfoSchemaCache(time.Minute) require.NoError(t, isc.Update(se)) - m := ttlworker.NewTaskManager(context.Background(), nil, isc, scanManagerID) + m := ttlworker.NewTaskManager(context.Background(), nil, isc, scanManagerID, store) _, err := m.LockScanTask(se, &cache.TTLTask{ ScanID: 1, @@ -113,6 +116,7 @@ func TestParallelSchedule(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) waitAndStopTTLManager(t, dom) tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_ttl_running_tasks = 1000") sessionFactory := sessionFactory(t, store) tk.MustExec("create table test.t(id int, created_at datetime) ttl=created_at + interval 1 day") @@ -135,7 +139,7 @@ func TestParallelSchedule(t *testing.T) { workers = append(workers, scanWorker) } - m := ttlworker.NewTaskManager(context.Background(), nil, isc, fmt.Sprintf("task-manager-%d", i)) + m := ttlworker.NewTaskManager(context.Background(), nil, isc, fmt.Sprintf("task-manager-%d", i), store) m.SetScanWorkers4Test(workers) scheduleWg.Add(1) go func() { @@ -157,6 +161,7 @@ func TestTaskScheduleExpireHeartBeat(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) waitAndStopTTLManager(t, dom) tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_ttl_running_tasks = 1000") sessionFactory := sessionFactory(t, store) // create table and scan task @@ -174,7 +179,7 @@ func TestTaskScheduleExpireHeartBeat(t *testing.T) { // schedule in a task manager scanWorker := ttlworker.NewMockScanWorker(t) scanWorker.Start() - m := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-1") + m := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-1", store) m.SetScanWorkers4Test([]ttlworker.Worker{scanWorker}) m.RescheduleTasks(sessionFactory(), now) tk.MustQuery("select status,owner_id from mysql.tidb_ttl_task").Check(testkit.Rows("running task-manager-1")) @@ -182,7 +187,7 @@ func TestTaskScheduleExpireHeartBeat(t *testing.T) { // another task manager should fetch this task after heartbeat expire scanWorker2 := ttlworker.NewMockScanWorker(t) scanWorker2.Start() - m2 := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-2") + m2 := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-2", store) m2.SetScanWorkers4Test([]ttlworker.Worker{scanWorker2}) m2.RescheduleTasks(sessionFactory(), now.Add(time.Hour)) tk.MustQuery("select status,owner_id from mysql.tidb_ttl_task").Check(testkit.Rows("running task-manager-2")) @@ -193,7 +198,7 @@ func TestTaskScheduleExpireHeartBeat(t *testing.T) { m2.CheckFinishedTask(sessionFactory(), now) scanWorker3 := ttlworker.NewMockScanWorker(t) scanWorker3.Start() - m3 := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-3") + m3 := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-3", store) m3.SetScanWorkers4Test([]ttlworker.Worker{scanWorker3}) m3.RescheduleTasks(sessionFactory(), now.Add(time.Hour)) tk.MustQuery("select status,owner_id from mysql.tidb_ttl_task").Check(testkit.Rows("finished task-manager-2")) @@ -203,6 +208,7 @@ func TestTaskMetrics(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) waitAndStopTTLManager(t, dom) tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_ttl_running_tasks = 1000") sessionFactory := sessionFactory(t, store) // create table and scan task @@ -220,7 +226,7 @@ func TestTaskMetrics(t *testing.T) { // schedule in a task manager scanWorker := ttlworker.NewMockScanWorker(t) scanWorker.Start() - m := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-1") + m := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-1", store) m.SetScanWorkers4Test([]ttlworker.Worker{scanWorker}) m.RescheduleTasks(sessionFactory(), now) tk.MustQuery("select status,owner_id from mysql.tidb_ttl_task").Check(testkit.Rows("running task-manager-1")) @@ -235,6 +241,7 @@ func TestRescheduleWithError(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) waitAndStopTTLManager(t, dom) tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_ttl_running_tasks = 1000") sessionFactory := sessionFactory(t, store) // insert a wrong scan task with random table id @@ -248,7 +255,7 @@ func TestRescheduleWithError(t *testing.T) { // schedule in a task manager scanWorker := ttlworker.NewMockScanWorker(t) scanWorker.Start() - m := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-1") + m := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-1", store) m.SetScanWorkers4Test([]ttlworker.Worker{scanWorker}) notify := make(chan struct{}) go func() { @@ -265,3 +272,69 @@ func TestRescheduleWithError(t *testing.T) { } tk.MustQuery("select status from mysql.tidb_ttl_task").Check(testkit.Rows("waiting")) } + +func TestTTLRunningTasksLimitation(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + waitAndStopTTLManager(t, dom) + tk := testkit.NewTestKit(t, store) + sessionFactory := sessionFactory(t, store) + + tk.MustExec("set global tidb_ttl_running_tasks = 32") + tk.MustExec("create table test.t(id int, created_at datetime) ttl=created_at + interval 1 day") + table, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + // 64 tasks and 128 scan workers (in 16 task manager) should only schedule 32 tasks + for i := 0; i < 128; i++ { + sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW(), NOW())", table.Meta().ID, i) + tk.MustExec(sql) + } + isc := cache.NewInfoSchemaCache(time.Second) + require.NoError(t, isc.Update(sessionFactory())) + now := time.Now() + scheduleWg := sync.WaitGroup{} + for i := 0; i < 16; i++ { + workers := []ttlworker.Worker{} + for j := 0; j < 8; j++ { + scanWorker := ttlworker.NewMockScanWorker(t) + scanWorker.Start() + workers = append(workers, scanWorker) + } + + ctx := logutil.WithKeyValue(context.Background(), "ttl-worker-test", fmt.Sprintf("task-manager-%d", i)) + m := ttlworker.NewTaskManager(ctx, nil, isc, fmt.Sprintf("task-manager-%d", i), store) + m.SetScanWorkers4Test(workers) + scheduleWg.Add(1) + go func() { + se := sessionFactory() + m.RescheduleTasks(se, now) + scheduleWg.Done() + }() + } + scheduleWg.Wait() + // all tasks should have been scheduled + tk.MustQuery("select count(1) from mysql.tidb_ttl_task where status = 'running'").Check(testkit.Rows("32")) +} + +func TestMeetTTLRunningTasks(t *testing.T) { + // initialize a cluster with 3 TiKV + store, dom := testkit.CreateMockStoreAndDomain(t, mockstore.WithStoreType(mockstore.MockTiKV), + mockstore.WithClusterInspector(func(c testutils.Cluster) { + mockstore.BootstrapWithMultiStores(c, 3) + })) + waitAndStopTTLManager(t, dom) + tk := testkit.NewTestKit(t, store) + + // -1, the default value, means the count of TiKV + require.True(t, dom.TTLJobManager().TaskManager().MeetTTLRunningTasks(2)) + require.False(t, dom.TTLJobManager().TaskManager().MeetTTLRunningTasks(3)) + + // positive number means the limitation + tk.MustExec("set global tidb_ttl_running_tasks = 32") + require.False(t, dom.TTLJobManager().TaskManager().MeetTTLRunningTasks(32)) + require.True(t, dom.TTLJobManager().TaskManager().MeetTTLRunningTasks(31)) + + // set it back to auto value + tk.MustExec("set global tidb_ttl_running_tasks = -1") + require.True(t, dom.TTLJobManager().TaskManager().MeetTTLRunningTasks(2)) + require.False(t, dom.TTLJobManager().TaskManager().MeetTTLRunningTasks(3)) +} diff --git a/ttl/ttlworker/task_manager_test.go b/ttl/ttlworker/task_manager_test.go index cffb8e071b62d..6212aee17d0ce 100644 --- a/ttl/ttlworker/task_manager_test.go +++ b/ttl/ttlworker/task_manager_test.go @@ -64,6 +64,11 @@ func (m *taskManager) GetRunningTasks() []*runningScanTask { return m.runningTasks } +// MeetTTLRunningTasks is an exported version of meetTTLRunningTask +func (m *taskManager) MeetTTLRunningTasks(count int) bool { + return m.meetTTLRunningTask(count) +} + // ReportTaskFinished is an exported version of reportTaskFinished func (t *runningScanTask) SetResult(err error) { t.result = &ttlScanTaskExecResult{ @@ -80,7 +85,7 @@ func TestResizeWorkers(t *testing.T) { scanWorker1.Start() scanWorker2 := NewMockScanWorker(t) - m := newTaskManager(context.Background(), nil, nil, "test-id") + m := newTaskManager(context.Background(), nil, nil, "test-id", nil) m.sessPool = newMockSessionPool(t, tbl) m.SetScanWorkers4Test([]worker{ scanWorker1, @@ -99,7 +104,7 @@ func TestResizeWorkers(t *testing.T) { scanWorker2 = NewMockScanWorker(t) scanWorker2.Start() - m = newTaskManager(context.Background(), nil, nil, "test-id") + m = newTaskManager(context.Background(), nil, nil, "test-id", nil) m.sessPool = newMockSessionPool(t, tbl) m.SetScanWorkers4Test([]worker{ scanWorker1, @@ -115,7 +120,7 @@ func TestResizeWorkers(t *testing.T) { scanWorker2 = NewMockScanWorker(t) scanWorker2.Start() - m = newTaskManager(context.Background(), nil, nil, "test-id") + m = newTaskManager(context.Background(), nil, nil, "test-id", nil) m.sessPool = newMockSessionPool(t, tbl) m.SetScanWorkers4Test([]worker{ scanWorker1,