Skip to content

Commit

Permalink
ttl: limit total count of running tasks across the cluster (#41586)
Browse files Browse the repository at this point in the history
close #41585
  • Loading branch information
YangKeao authored Feb 28, 2023
1 parent f0bd0da commit 469ec9d
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 20 deletions.
10 changes: 10 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -2384,6 +2384,16 @@ var defaultSysVars = []*SysVar{
s.EnablePlanCacheForSubquery = TiDBOptOn(val)
return nil
}},
{Scope: ScopeGlobal, Name: TiDBTTLRunningTasks, Value: strconv.Itoa(DefTiDBTTLRunningTasks), Type: TypeInt, MinValue: 1, MaxValue: MaxConfigurableConcurrency, AllowAutoValue: true, SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error {
val, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return err
}
TTLRunningTasks.Store(int32(val))
return nil
}, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) {
return strconv.Itoa(int(TTLRunningTasks.Load())), nil
}},
}

// FeedbackProbability points to the FeedbackProbability in statistics package.
Expand Down
5 changes: 5 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,9 @@ const (
TiDBStmtSummaryFileMaxSize = "tidb_stmt_summary_file_max_size"
// TiDBStmtSummaryFileMaxBackups indicates the maximum number of files written by stmtsummary.
TiDBStmtSummaryFileMaxBackups = "tidb_stmt_summary_file_max_backups"
// TiDBTTLRunningTasks limits the count of running ttl tasks. Default to 0, means 3 times the count of TiKV (or no
// limitation, if the storage is not TiKV).
TiDBTTLRunningTasks = "tidb_ttl_running_tasks"
)

// TiDB intentional limits
Expand Down Expand Up @@ -1194,6 +1197,7 @@ const (
DefTiDBTTLDeleteBatchMaxSize = 10240
DefTiDBTTLDeleteBatchMinSize = 1
DefTiDBTTLDeleteRateLimit = 0
DefTiDBTTLRunningTasks = -1
DefPasswordReuseHistory = 0
DefPasswordReuseTime = 0
DefTiDBStoreBatchSize = 4
Expand Down Expand Up @@ -1285,6 +1289,7 @@ var (
HistoricalStatsDuration = atomic.NewDuration(DefTiDBHistoricalStatsDuration)
EnableHistoricalStatsForCapture = atomic.NewBool(DefTiDBEnableHistoricalStatsForCapture)
EnableResourceControl = atomic.NewBool(DefTiDBEnableResourceControl)
TTLRunningTasks = atomic.NewInt32(DefTiDBTTLRunningTasks)
)

var (
Expand Down
4 changes: 4 additions & 0 deletions ttl/ttlworker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"//parser/terror",
"//sessionctx",
"//sessionctx/variable",
"//store/driver/error",
"//ttl/cache",
"//ttl/client",
"//ttl/metrics",
Expand All @@ -35,6 +36,7 @@ go_library(
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_tikv_client_go_v2//tikv",
"@io_etcd_go_etcd_client_v3//:client",
"@org_golang_x_time//rate",
"@org_uber_go_multierr//:multierr",
Expand Down Expand Up @@ -69,6 +71,7 @@ go_test(
"//sessionctx",
"//sessionctx/variable",
"//statistics/handle",
"//store/mockstore",
"//testkit",
"//ttl/cache",
"//ttl/client",
Expand All @@ -83,6 +86,7 @@ go_test(
"@com_github_prometheus_client_model//go",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//testutils",
"@org_golang_x_time//rate",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
Expand Down
2 changes: 1 addition & 1 deletion ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func NewJobManager(id string, sessPool sessionPool, store kv.Storage, etcdCli *c
manager.notificationCli = client.NewMockNotificationClient()
}

manager.taskManager = newTaskManager(manager.ctx, sessPool, manager.infoSchemaCache, id)
manager.taskManager = newTaskManager(manager.ctx, sessPool, manager.infoSchemaCache, id, store)

return
}
Expand Down
74 changes: 66 additions & 8 deletions ttl/ttlworker/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/variable"
storeerr "github.com/pingcap/tidb/store/driver/error"
"github.com/pingcap/tidb/ttl/cache"
"github.com/pingcap/tidb/ttl/metrics"
"github.com/pingcap/tidb/ttl/session"
"github.com/pingcap/tidb/util/logutil"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/multierr"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -67,13 +70,20 @@ func updateTTLTaskHeartBeatSQL(jobID string, scanID int64, now time.Time, state
return updateTTLTaskHeartBeatTempalte, []interface{}{string(stateStr), now.Format(timeFormat), jobID, scanID}, nil
}

const countRunningTasks = "SELECT count(1) FROM mysql.tidb_ttl_task WHERE status = 'running'"

var errAlreadyScheduled = errors.New("task is already scheduled")
var errTooManyRunningTasks = errors.New("there are too many running tasks")

// taskManager schedules and manages the ttl tasks on this instance
type taskManager struct {
ctx context.Context
sessPool sessionPool

id string

store kv.Storage

scanWorkers []worker
delWorkers []worker

Expand All @@ -84,13 +94,15 @@ type taskManager struct {
notifyStateCh chan interface{}
}

func newTaskManager(ctx context.Context, sessPool sessionPool, infoSchemaCache *cache.InfoSchemaCache, id string) *taskManager {
func newTaskManager(ctx context.Context, sessPool sessionPool, infoSchemaCache *cache.InfoSchemaCache, id string, store kv.Storage) *taskManager {
return &taskManager{
ctx: logutil.WithKeyValue(ctx, "ttl-worker", "task-manager"),
sessPool: sessPool,

id: id,

store: store,

scanWorkers: []worker{},
delWorkers: []worker{},

Expand Down Expand Up @@ -269,14 +281,22 @@ func (m *taskManager) rescheduleTasks(se session.Session, now time.Time) {
return
}

loop:
for _, t := range tasks {
logger := logutil.Logger(m.ctx).With(zap.String("jobID", t.JobID), zap.Int64("scanID", t.ScanID))

task, err := m.lockScanTask(se, t, now)
if err != nil {
// If other nodes lock the task, it will return an error. It's expected
// so the log level is only `info`
logutil.Logger(m.ctx).Info("fail to lock scan task", zap.Error(err))
switch errors.Cause(err) {
case errAlreadyScheduled:
continue
case errTooManyRunningTasks:
break loop
case storeerr.ErrLockWaitTimeout:
// don't step into the next step to avoid exceeding the limit
break loop
}
logutil.Logger(m.ctx).Warn("fail to lock scan task", zap.Error(err))
continue
}

Expand Down Expand Up @@ -328,12 +348,26 @@ func (m *taskManager) lockScanTask(se session.Session, task *cache.TTLTask, now
err := se.RunInTxn(ctx, func() error {
var err error

// The `for update` here ensures two things:
// 1. The instance which will update this row has committed, and we can get the newest information.
// 2. There is only one successful transaction concurrently.
// If it's a `for update nowait`, we cannot avoid the situation that multiple transactions fetched the tasks
// at the same time, and the task limitation doesn't work.
task, err = m.syncTaskFromTable(se, task.JobID, task.ScanID, true)
if err != nil {
return err
}
if task.OwnerID != "" && !task.OwnerHBTime.Add(getTaskManagerHeartBeatExpireInterval()).Before(now) {
return errors.New("task is already scheduled")
return errors.WithStack(errAlreadyScheduled)
}

// check the count of running task is smaller or equal than the limit
rows, err := se.ExecuteSQL(ctx, countRunningTasks)
if err != nil {
return errors.Wrapf(err, "execute sql: %s", countRunningTasks)
}
if !m.meetTTLRunningTask(int(rows[0].GetInt64(0))) {
return errors.WithStack(errTooManyRunningTasks)
}

sql, args := setTTLTaskOwnerSQL(task.JobID, task.ScanID, m.id, now)
Expand Down Expand Up @@ -370,12 +404,12 @@ func (m *taskManager) lockScanTask(se session.Session, task *cache.TTLTask, now
}, nil
}

func (m *taskManager) syncTaskFromTable(se session.Session, jobID string, scanID int64, detectLock bool) (*cache.TTLTask, error) {
func (m *taskManager) syncTaskFromTable(se session.Session, jobID string, scanID int64, waitLock bool) (*cache.TTLTask, error) {
ctx := m.ctx

sql, args := cache.SelectFromTTLTaskWithID(jobID, scanID)
if detectLock {
sql += " FOR UPDATE NOWAIT"
if waitLock {
sql += " FOR UPDATE"
}
rows, err := se.ExecuteSQL(ctx, sql, args...)
if err != nil {
Expand Down Expand Up @@ -510,6 +544,30 @@ func (m *taskManager) reportMetrics() {
metrics.DeletingTaskCnt.Set(float64(deletingTaskCnt))
}

func (m *taskManager) meetTTLRunningTask(count int) bool {
ttlRunningTask := variable.TTLRunningTasks.Load()
// `-1` is the auto value, means we should calculate the limit according to the count of TiKV
if ttlRunningTask != -1 {
return int(ttlRunningTask) > count
}

store, ok := m.store.(tikv.Storage)
if !ok {
return variable.MaxConfigurableConcurrency > count
}

regionCache := store.GetRegionCache()
if regionCache == nil {
return true
}
limit := len(regionCache.GetAllStores())
if limit > variable.MaxConfigurableConcurrency {
limit = variable.MaxConfigurableConcurrency
}

return limit > count
}

type runningScanTask struct {
*ttlScanTask
cancel func()
Expand Down
Loading

0 comments on commit 469ec9d

Please sign in to comment.