Skip to content

Commit

Permalink
ttl: reschedule task to other instances when shrinking worker (#57703)
Browse files Browse the repository at this point in the history
close #57990
  • Loading branch information
lcwangchao authored Dec 5, 2024
1 parent 098213a commit bb9096c
Show file tree
Hide file tree
Showing 13 changed files with 591 additions and 142 deletions.
18 changes: 15 additions & 3 deletions pkg/ttl/cache/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ import (
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/pingcap/tidb/pkg/util/collate"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
)

func getTableKeyColumns(tbl *model.TableInfo) ([]*model.ColumnInfo, []*types.FieldType, error) {
Expand Down Expand Up @@ -448,9 +450,10 @@ func (t *PhysicalTable) splitRawKeyRanges(ctx context.Context, store tikv.Storag
return nil, err
}

regionsPerRange := len(regionIDs) / splitCnt
oversizeCnt := len(regionIDs) % splitCnt
ranges := make([]kv.KeyRange, 0, min(len(regionIDs), splitCnt))
regionsCnt := len(regionIDs)
regionsPerRange := regionsCnt / splitCnt
oversizeCnt := regionsCnt % splitCnt
ranges := make([]kv.KeyRange, 0, min(regionsCnt, splitCnt))
for len(regionIDs) > 0 {
startRegion, err := regionCache.LocateRegionByID(tikv.NewBackofferWithVars(ctx, 20000, nil),
regionIDs[0])
Expand Down Expand Up @@ -483,6 +486,15 @@ func (t *PhysicalTable) splitRawKeyRanges(ctx context.Context, store tikv.Storag
oversizeCnt--
regionIDs = regionIDs[endRegionIdx+1:]
}
logutil.BgLogger().Info("TTL table raw key ranges split",
zap.Int("regionsCnt", regionsCnt),
zap.Int("shouldSplitCnt", splitCnt),
zap.Int("actualSplitCnt", len(ranges)),
zap.Int64("tableID", t.ID),
zap.String("db", t.Schema.O),
zap.String("table", t.Name.O),
zap.String("partition", t.Partition.O),
)
return ranges, nil
}

Expand Down
7 changes: 5 additions & 2 deletions pkg/ttl/cache/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ const (
// TaskStatusWaiting means the task hasn't started
TaskStatusWaiting TaskStatus = "waiting"
// TaskStatusRunning means this task is running
TaskStatusRunning = "running"
TaskStatusRunning TaskStatus = "running"
// TaskStatusFinished means this task has finished
TaskStatusFinished = "finished"
TaskStatusFinished TaskStatus = "finished"
)

// TTLTask is a row recorded in mysql.tidb_ttl_task
Expand All @@ -116,6 +116,9 @@ type TTLTaskState struct {
ErrorRows uint64 `json:"error_rows"`

ScanTaskErr string `json:"scan_task_err"`

// When PreviousOwner != "", it means this task is resigned from another owner
PreviousOwner string `json:"prev_owner,omitempty"`
}

// RowToTTLTask converts a row into TTL task
Expand Down
2 changes: 1 addition & 1 deletion pkg/ttl/ttlworker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ go_library(
"//pkg/util/timeutil",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_log//:log",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@io_etcd_go_etcd_client_v3//:client",
Expand All @@ -67,7 +68,6 @@ go_test(
"task_manager_test.go",
"timer_sync_test.go",
"timer_test.go",
"worker_test.go",
],
embed = [":ttlworker"],
flaky = True,
Expand Down
52 changes: 46 additions & 6 deletions pkg/ttl/ttlworker/del.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/ttl/cache"
"github.com/pingcap/tidb/pkg/ttl/metrics"
Expand Down Expand Up @@ -94,12 +95,12 @@ func (t *ttlDeleteTask) doDelete(ctx context.Context, rawSe session.Session) (re
leftRows := t.rows
defer func() {
if len(leftRows) > 0 {
t.statistics.IncErrorRows(len(leftRows))
retryRows = append(retryRows, leftRows...)
}
}()

se := newTableSession(rawSe, t.tbl, t.expire)
for len(leftRows) > 0 {
for len(leftRows) > 0 && ctx.Err() == nil {
maxBatch := variable.TTLDeleteBatchSize.Load()
var delBatch [][]types.Datum
if int64(len(leftRows)) < maxBatch {
Expand Down Expand Up @@ -133,7 +134,6 @@ func (t *ttlDeleteTask) doDelete(ctx context.Context, rawSe session.Session) (re
sqlInterval := time.Since(sqlStart)
if err != nil {
metrics.DeleteErrorDuration.Observe(sqlInterval.Seconds())
needRetry = needRetry && ctx.Err() == nil
logutil.BgLogger().Warn(
"delete SQL in TTL failed",
zap.Error(err),
Expand Down Expand Up @@ -191,8 +191,14 @@ func (b *ttlDelRetryBuffer) RecordTaskResult(task *ttlDeleteTask, retryRows [][]
}

func (b *ttlDelRetryBuffer) DoRetry(do func(*ttlDeleteTask) [][]types.Datum) time.Duration {
for b.list.Len() > 0 {
l := b.list.Len()
// When `retryInterval==0`, to avoid the infinite retries, limit the max loop to the buffer length.
// It means one item only has one chance to retry in one `DoRetry` invoking.
for i := 0; i < l; i++ {
ele := b.list.Front()
if ele == nil {
break
}
item, ok := ele.Value.(*ttlDelRetryItem)
if !ok {
logutil.BgLogger().Error(fmt.Sprintf("invalid retry buffer item type: %T", ele))
Expand All @@ -214,6 +220,11 @@ func (b *ttlDelRetryBuffer) DoRetry(do func(*ttlDeleteTask) [][]types.Datum) tim
return b.retryInterval
}

// SetRetryInterval sets the retry interval of the buffer.
func (b *ttlDelRetryBuffer) SetRetryInterval(interval time.Duration) {
b.retryInterval = interval
}

// Drain drains a retry buffer.
func (b *ttlDelRetryBuffer) Drain() {
for ele := b.list.Front(); ele != nil; ele = ele.Next() {
Expand Down Expand Up @@ -296,8 +307,37 @@ func (w *ttlDeleteWorker) loop() error {
timer := time.NewTimer(w.retryBuffer.retryInterval)
defer timer.Stop()

// drain retry buffer to make sure the statistics are correct
defer w.retryBuffer.Drain()
defer func() {
// Have a final try to delete all rows in retry buffer while the worker stops
// to avoid leaving any TTL rows undeleted when shrinking the delete worker.
if w.retryBuffer.Len() > 0 {
start := time.Now()
log.Info(
"try to delete TTL rows in del worker buffer immediately because the worker is going to stop",
zap.Int("bufferLen", w.retryBuffer.Len()),
)
retryCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
w.retryBuffer.SetRetryInterval(0)
w.retryBuffer.DoRetry(func(task *ttlDeleteTask) [][]types.Datum {
return task.doDelete(retryCtx, se)
})
log.Info(
"delete TTL rows in del worker buffer finished",
zap.Duration("duration", time.Since(start)),
)
}

// drain retry buffer to make sure the statistics are correct
if w.retryBuffer.Len() > 0 {
log.Warn(
"some TTL rows are still in the buffer while the worker is going to stop, mark them as error",
zap.Int("bufferLen", w.retryBuffer.Len()),
)
w.retryBuffer.Drain()
}
}()

for w.Status() == workerStatusRunning {
tracer.EnterPhase(metrics.PhaseIdle)
select {
Expand Down
80 changes: 64 additions & 16 deletions pkg/ttl/ttlworker/del_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,22 @@ func TestTTLDelRetryBuffer(t *testing.T) {
require.Equal(t, 0, buffer.Len())
require.Equal(t, uint64(0), statics6.SuccessRows.Load())
require.Equal(t, uint64(7), statics6.ErrorRows.Load())

// test should only retry at most once for one item in a DoRetry call.
buffer2 := newTTLDelRetryBuffer()
buffer2.SetRetryInterval(0)
buffer2.maxRetry = math.MaxInt
task7, rows7, statics7 := createTask("t7")
buffer2.RecordTaskResult(task7, rows7[:8])
require.Equal(t, 1, buffer2.Len())
currentRetryFn := doRetryFail
buffer2.DoRetry(func(task *ttlDeleteTask) [][]types.Datum {
fn := currentRetryFn
currentRetryFn = shouldNotDoRetry
return fn(task)
})
require.Equal(t, uint64(1), statics7.SuccessRows.Load())
require.Equal(t, uint64(0), statics7.ErrorRows.Load())
}

func TestTTLDeleteTaskDoDelete(t *testing.T) {
Expand Down Expand Up @@ -269,7 +285,7 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) {
retryErrBatches: []int{1, 2, 4},
},
{
// some retries and some not
// some retries and some not and some are executed when ctx canceled
batchCnt: 10,
noRetryErrBatches: []int{3, 8, 9},
retryErrBatches: []int{1, 2, 4},
Expand All @@ -279,6 +295,7 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) {
}

for _, c := range cases {
require.True(t, c.cancelCtxBatch >= 0 && c.cancelCtxBatch < c.batchCnt)
ctx, cancel := context.WithCancel(context.Background())
if c.cancelCtx && c.cancelCtxBatch == 0 {
cancel()
Expand All @@ -298,16 +315,14 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) {
retryErrBatches = c.retryErrBatches
nonRetryBatches = c.noRetryErrBatches
retryRows := task.doDelete(ctx, s)
realBatchCnt := c.batchCnt
if c.cancelCtx {
realBatchCnt = c.cancelCtxBatch
}
require.LessOrEqual(t, realBatchCnt, c.batchCnt)

// check SQLs
require.Equal(t, realBatchCnt, len(sqls))
expectedSQLs := make([]string, 0, len(sqls))
for i := 0; i < realBatchCnt; i++ {
for i := 0; i < c.batchCnt; i++ {
if c.cancelCtx && i >= c.cancelCtxBatch {
break
}

batch := task.rows[i*delBatch : (i+1)*delBatch]
idList := make([]string, 0, delBatch)
for _, row := range batch {
Expand All @@ -324,8 +339,8 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) {

// check retry rows
var expectedRetryRows [][]types.Datum
for i := 0; i < realBatchCnt; i++ {
if slices.Contains(c.retryErrBatches, i) {
for i := 0; i < c.batchCnt; i++ {
if slices.Contains(c.retryErrBatches, i) || (c.cancelCtx && i >= c.cancelCtxBatch) {
expectedRetryRows = append(expectedRetryRows, task.rows[i*delBatch:(i+1)*delBatch]...)
}
}
Expand All @@ -334,7 +349,7 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) {
// check statistics
var expectedErrRows uint64
for i := 0; i < c.batchCnt; i++ {
if i >= realBatchCnt || slices.Contains(c.noRetryErrBatches, i) {
if slices.Contains(c.noRetryErrBatches, i) && !(c.cancelCtx && i >= c.cancelCtxBatch) {
expectedErrRows += uint64(delBatch)
}
}
Expand Down Expand Up @@ -384,6 +399,7 @@ func TestTTLDeleteTaskWorker(t *testing.T) {
t2 := newMockTTLTbl(t, "t2")
t3 := newMockTTLTbl(t, "t3")
t4 := newMockTTLTbl(t, "t4")
t5 := newMockTTLTbl(t, "t5")
s := newMockSession(t)
pool := newMockSessionPool(t)
pool.se = s
Expand All @@ -392,8 +408,11 @@ func TestTTLDeleteTaskWorker(t *testing.T) {
sqlMap := make(map[string]int)
t3Retried := make(chan struct{})
t4Retried := make(chan struct{})
t5Executed := make(chan struct{})
s.executeSQL = func(ctx context.Context, sql string, args ...any) ([]chunk.Row, error) {
pool.lastSession.sessionInfoSchema = newMockInfoSchema(t1.TableInfo, t2.TableInfo, t3.TableInfo, t4.TableInfo)
pool.lastSession.sessionInfoSchema = newMockInfoSchema(
t1.TableInfo, t2.TableInfo, t3.TableInfo, t4.TableInfo, t5.TableInfo,
)
if strings.Contains(sql, "`t1`") {
// success
return nil, nil
Expand All @@ -419,20 +438,35 @@ func TestTTLDeleteTaskWorker(t *testing.T) {
// error and retry still error
// this is to test the retry buffer should be drained after the delete worker stopped
i := sqlMap[sql]
if i >= 2 {
// i >= 2 means t4 has retried once and records in retry buffer
if i == 2 {
// i == 2 means t4 has retried once and records in retry buffer
close(t4Retried)
}
sqlMap[sql] = i + 1
return nil, errors.New("mockErr")
}

if strings.Contains(sql, "`t5`") {
// error when the worker is running,
// success when flushing retry buffer while the worker stopping.
i := sqlMap[sql]
sqlMap[sql] = i + 1
if ctx.Value("delWorker") != nil {
if i == 1 {
close(t5Executed)
}
return nil, errors.New("mockErr")
}
return nil, nil
}

require.FailNow(t, "")
return nil, nil
}

delCh := make(chan *ttlDeleteTask)
w := newDeleteWorker(delCh, pool)
w.ctx = context.WithValue(w.ctx, "delWorker", struct{}{})
w.retryBuffer.retryInterval = time.Millisecond
w.retryBuffer.maxRetry = math.MaxInt
require.Equal(t, workerStatusCreated, w.Status())
Expand All @@ -444,7 +478,7 @@ func TestTTLDeleteTaskWorker(t *testing.T) {
}()

tasks := make([]*ttlDeleteTask, 0)
for _, tbl := range []*cache.PhysicalTable{t1, t2, t3, t4} {
for _, tbl := range []*cache.PhysicalTable{t1, t2, t3, t4, t5} {
task := &ttlDeleteTask{
tbl: tbl,
expire: time.UnixMilli(0),
Expand Down Expand Up @@ -476,8 +510,17 @@ func TestTTLDeleteTaskWorker(t *testing.T) {
require.FailNow(t, "")
}

// before stop, t4 should always retry without any error rows
select {
case <-t5Executed:
case <-time.After(time.Second):
require.FailNow(t, "")
}

// before stop, t4, t5 should always retry without any error rows
require.Equal(t, uint64(0), tasks[3].statistics.SuccessRows.Load())
require.Equal(t, uint64(0), tasks[3].statistics.ErrorRows.Load())
require.Equal(t, uint64(0), tasks[4].statistics.SuccessRows.Load())
require.Equal(t, uint64(0), tasks[4].statistics.ErrorRows.Load())
w.Stop()
require.NoError(t, w.WaitStopped(context.Background(), 10*time.Second))

Expand All @@ -490,6 +533,11 @@ func TestTTLDeleteTaskWorker(t *testing.T) {
require.Equal(t, uint64(0), tasks[2].statistics.SuccessRows.Load())
require.Equal(t, uint64(3), tasks[2].statistics.ErrorRows.Load())

// t4 should be error because the buffer flush error while the worker stopping.
require.Equal(t, uint64(0), tasks[3].statistics.SuccessRows.Load())
require.Equal(t, uint64(3), tasks[3].statistics.ErrorRows.Load())

// t5 should be success because the buffer flush success while the worker stopping.
require.Equal(t, uint64(3), tasks[4].statistics.SuccessRows.Load())
require.Equal(t, uint64(0), tasks[4].statistics.ErrorRows.Load())
}
Loading

0 comments on commit bb9096c

Please sign in to comment.