From cea46f17ce0dfcd7ee65224144d71c207438ae2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E8=B6=85?= Date: Mon, 16 Dec 2024 12:56:51 +0800 Subject: [PATCH] ttl: retry the rows when del rate limiter returns error in delWorker (#58206) close pingcap/tidb#58205 --- pkg/ttl/ttlworker/del.go | 26 +++++++++----- pkg/ttl/ttlworker/del_test.go | 68 ++++++++++++++++++++++++++--------- 2 files changed, 70 insertions(+), 24 deletions(-) diff --git a/pkg/ttl/ttlworker/del.go b/pkg/ttl/ttlworker/del.go index 671560fc3a3b4..854feb7f64446 100644 --- a/pkg/ttl/ttlworker/del.go +++ b/pkg/ttl/ttlworker/del.go @@ -41,22 +41,26 @@ const ( delRetryInterval = time.Second * 5 ) +type delRateLimiter interface { + WaitDelToken(ctx context.Context) error +} + var globalDelRateLimiter = newDelRateLimiter() -type delRateLimiter struct { +type defaultDelRateLimiter struct { sync.Mutex limiter *rate.Limiter limit atomic.Int64 } -func newDelRateLimiter() *delRateLimiter { - limiter := &delRateLimiter{} +func newDelRateLimiter() delRateLimiter { + limiter := &defaultDelRateLimiter{} limiter.limiter = rate.NewLimiter(0, 1) limiter.limit.Store(0) return limiter } -func (l *delRateLimiter) Wait(ctx context.Context) error { +func (l *defaultDelRateLimiter) WaitDelToken(ctx context.Context) error { limit := l.limit.Load() if variable.TTLDeleteRateLimit.Load() != limit { limit = l.reset() @@ -69,7 +73,7 @@ func (l *delRateLimiter) Wait(ctx context.Context) error { return l.limiter.Wait(ctx) } -func (l *delRateLimiter) reset() (newLimit int64) { +func (l *defaultDelRateLimiter) reset() (newLimit int64) { l.Lock() defer l.Unlock() newLimit = variable.TTLDeleteRateLimit.Load() @@ -123,9 +127,15 @@ func (t *ttlDeleteTask) doDelete(ctx context.Context, rawSe session.Session) (re } tracer.EnterPhase(metrics.PhaseWaitToken) - if err = globalDelRateLimiter.Wait(ctx); err != nil { - t.statistics.IncErrorRows(len(delBatch)) - return + if err = globalDelRateLimiter.WaitDelToken(ctx); err != nil { + tracer.EnterPhase(metrics.PhaseOther) + logutil.BgLogger().Info( + "wait TTL delete rate limiter interrupted", + zap.Error(err), + zap.Int("waitDelRowCnt", len(delBatch)), + ) + retryRows = append(retryRows, delBatch...) + continue } tracer.EnterPhase(metrics.PhaseOther) diff --git a/pkg/ttl/ttlworker/del_test.go b/pkg/ttl/ttlworker/del_test.go index 1c9f604ebecb2..34a26c1591057 100644 --- a/pkg/ttl/ttlworker/del_test.go +++ b/pkg/ttl/ttlworker/del_test.go @@ -197,6 +197,14 @@ func TestTTLDelRetryBuffer(t *testing.T) { require.Equal(t, uint64(0), statics7.ErrorRows.Load()) } +type mockDelRateLimiter struct { + waitFn func(context.Context) error +} + +func (m *mockDelRateLimiter) WaitDelToken(ctx context.Context) error { + return m.waitFn(ctx) +} + func TestTTLDeleteTaskDoDelete(t *testing.T) { origBatchSize := variable.TTLDeleteBatchSize.Load() delBatch := 3 @@ -258,11 +266,12 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) { } cases := []struct { - batchCnt int - retryErrBatches []int - noRetryErrBatches []int - cancelCtx bool - cancelCtxBatch int + batchCnt int + retryErrBatches []int + noRetryErrBatches []int + cancelCtx bool + cancelCtxBatch int + cancelCtxErrInLimiter bool }{ { // all success @@ -292,19 +301,46 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) { cancelCtx: true, cancelCtxBatch: 6, }, + { + // some executed when rate limiter returns error + batchCnt: 10, + cancelCtx: true, + cancelCtxBatch: 3, + cancelCtxErrInLimiter: true, + }, + } + + errLimiter := &mockDelRateLimiter{ + waitFn: func(ctx context.Context) error { + return errors.New("mock rate limiter error") + }, } + origGlobalDelRateLimiter := globalDelRateLimiter + defer func() { + globalDelRateLimiter = origGlobalDelRateLimiter + }() + for _, c := range cases { + globalDelRateLimiter = origGlobalDelRateLimiter require.True(t, c.cancelCtxBatch >= 0 && c.cancelCtxBatch < c.batchCnt) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) if c.cancelCtx && c.cancelCtxBatch == 0 { - cancel() + if c.cancelCtxErrInLimiter { + globalDelRateLimiter = errLimiter + } else { + cancel() + } } afterExecuteSQL = func() { if c.cancelCtx { if len(sqls) == c.cancelCtxBatch { - cancel() + if c.cancelCtxErrInLimiter { + globalDelRateLimiter = errLimiter + } else { + cancel() + } } } } @@ -373,21 +409,21 @@ func TestTTLDeleteRateLimiter(t *testing.T) { }() variable.TTLDeleteRateLimit.Store(100000) - require.NoError(t, globalDelRateLimiter.Wait(ctx)) - require.Equal(t, rate.Limit(100000), globalDelRateLimiter.limiter.Limit()) - require.Equal(t, int64(100000), globalDelRateLimiter.limit.Load()) + require.NoError(t, globalDelRateLimiter.WaitDelToken(ctx)) + require.Equal(t, rate.Limit(100000), globalDelRateLimiter.(*defaultDelRateLimiter).limiter.Limit()) + require.Equal(t, int64(100000), globalDelRateLimiter.(*defaultDelRateLimiter).limit.Load()) variable.TTLDeleteRateLimit.Store(0) - require.NoError(t, globalDelRateLimiter.Wait(ctx)) - require.Equal(t, rate.Limit(0), globalDelRateLimiter.limiter.Limit()) - require.Equal(t, int64(0), globalDelRateLimiter.limit.Load()) + require.NoError(t, globalDelRateLimiter.WaitDelToken(ctx)) + require.Equal(t, rate.Limit(0), globalDelRateLimiter.(*defaultDelRateLimiter).limiter.Limit()) + require.Equal(t, int64(0), globalDelRateLimiter.(*defaultDelRateLimiter).limit.Load()) // 0 stands for no limit - require.NoError(t, globalDelRateLimiter.Wait(ctx)) + require.NoError(t, globalDelRateLimiter.WaitDelToken(ctx)) // cancel ctx returns an error cancel() cancel = nil - require.EqualError(t, globalDelRateLimiter.Wait(ctx), "context canceled") + require.EqualError(t, globalDelRateLimiter.WaitDelToken(ctx), "context canceled") } func TestTTLDeleteTaskWorker(t *testing.T) {