Skip to content

Commit

Permalink
ttl: retry the rows when del rate limiter returns error in delWorker (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao authored Dec 16, 2024
1 parent 9dfb87c commit cea46f1
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 24 deletions.
26 changes: 18 additions & 8 deletions pkg/ttl/ttlworker/del.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,26 @@ const (
delRetryInterval = time.Second * 5
)

type delRateLimiter interface {
WaitDelToken(ctx context.Context) error
}

var globalDelRateLimiter = newDelRateLimiter()

type delRateLimiter struct {
type defaultDelRateLimiter struct {
sync.Mutex
limiter *rate.Limiter
limit atomic.Int64
}

func newDelRateLimiter() *delRateLimiter {
limiter := &delRateLimiter{}
func newDelRateLimiter() delRateLimiter {
limiter := &defaultDelRateLimiter{}
limiter.limiter = rate.NewLimiter(0, 1)
limiter.limit.Store(0)
return limiter
}

func (l *delRateLimiter) Wait(ctx context.Context) error {
func (l *defaultDelRateLimiter) WaitDelToken(ctx context.Context) error {
limit := l.limit.Load()
if variable.TTLDeleteRateLimit.Load() != limit {
limit = l.reset()
Expand All @@ -69,7 +73,7 @@ func (l *delRateLimiter) Wait(ctx context.Context) error {
return l.limiter.Wait(ctx)
}

func (l *delRateLimiter) reset() (newLimit int64) {
func (l *defaultDelRateLimiter) reset() (newLimit int64) {
l.Lock()
defer l.Unlock()
newLimit = variable.TTLDeleteRateLimit.Load()
Expand Down Expand Up @@ -123,9 +127,15 @@ func (t *ttlDeleteTask) doDelete(ctx context.Context, rawSe session.Session) (re
}

tracer.EnterPhase(metrics.PhaseWaitToken)
if err = globalDelRateLimiter.Wait(ctx); err != nil {
t.statistics.IncErrorRows(len(delBatch))
return
if err = globalDelRateLimiter.WaitDelToken(ctx); err != nil {
tracer.EnterPhase(metrics.PhaseOther)
logutil.BgLogger().Info(
"wait TTL delete rate limiter interrupted",
zap.Error(err),
zap.Int("waitDelRowCnt", len(delBatch)),
)
retryRows = append(retryRows, delBatch...)
continue
}
tracer.EnterPhase(metrics.PhaseOther)

Expand Down
68 changes: 52 additions & 16 deletions pkg/ttl/ttlworker/del_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,14 @@ func TestTTLDelRetryBuffer(t *testing.T) {
require.Equal(t, uint64(0), statics7.ErrorRows.Load())
}

type mockDelRateLimiter struct {
waitFn func(context.Context) error
}

func (m *mockDelRateLimiter) WaitDelToken(ctx context.Context) error {
return m.waitFn(ctx)
}

func TestTTLDeleteTaskDoDelete(t *testing.T) {
origBatchSize := variable.TTLDeleteBatchSize.Load()
delBatch := 3
Expand Down Expand Up @@ -258,11 +266,12 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) {
}

cases := []struct {
batchCnt int
retryErrBatches []int
noRetryErrBatches []int
cancelCtx bool
cancelCtxBatch int
batchCnt int
retryErrBatches []int
noRetryErrBatches []int
cancelCtx bool
cancelCtxBatch int
cancelCtxErrInLimiter bool
}{
{
// all success
Expand Down Expand Up @@ -292,19 +301,46 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) {
cancelCtx: true,
cancelCtxBatch: 6,
},
{
// some executed when rate limiter returns error
batchCnt: 10,
cancelCtx: true,
cancelCtxBatch: 3,
cancelCtxErrInLimiter: true,
},
}

errLimiter := &mockDelRateLimiter{
waitFn: func(ctx context.Context) error {
return errors.New("mock rate limiter error")
},
}

origGlobalDelRateLimiter := globalDelRateLimiter
defer func() {
globalDelRateLimiter = origGlobalDelRateLimiter
}()

for _, c := range cases {
globalDelRateLimiter = origGlobalDelRateLimiter
require.True(t, c.cancelCtxBatch >= 0 && c.cancelCtxBatch < c.batchCnt)
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
if c.cancelCtx && c.cancelCtxBatch == 0 {
cancel()
if c.cancelCtxErrInLimiter {
globalDelRateLimiter = errLimiter
} else {
cancel()
}
}

afterExecuteSQL = func() {
if c.cancelCtx {
if len(sqls) == c.cancelCtxBatch {
cancel()
if c.cancelCtxErrInLimiter {
globalDelRateLimiter = errLimiter
} else {
cancel()
}
}
}
}
Expand Down Expand Up @@ -373,21 +409,21 @@ func TestTTLDeleteRateLimiter(t *testing.T) {
}()

variable.TTLDeleteRateLimit.Store(100000)
require.NoError(t, globalDelRateLimiter.Wait(ctx))
require.Equal(t, rate.Limit(100000), globalDelRateLimiter.limiter.Limit())
require.Equal(t, int64(100000), globalDelRateLimiter.limit.Load())
require.NoError(t, globalDelRateLimiter.WaitDelToken(ctx))
require.Equal(t, rate.Limit(100000), globalDelRateLimiter.(*defaultDelRateLimiter).limiter.Limit())
require.Equal(t, int64(100000), globalDelRateLimiter.(*defaultDelRateLimiter).limit.Load())

variable.TTLDeleteRateLimit.Store(0)
require.NoError(t, globalDelRateLimiter.Wait(ctx))
require.Equal(t, rate.Limit(0), globalDelRateLimiter.limiter.Limit())
require.Equal(t, int64(0), globalDelRateLimiter.limit.Load())
require.NoError(t, globalDelRateLimiter.WaitDelToken(ctx))
require.Equal(t, rate.Limit(0), globalDelRateLimiter.(*defaultDelRateLimiter).limiter.Limit())
require.Equal(t, int64(0), globalDelRateLimiter.(*defaultDelRateLimiter).limit.Load())

// 0 stands for no limit
require.NoError(t, globalDelRateLimiter.Wait(ctx))
require.NoError(t, globalDelRateLimiter.WaitDelToken(ctx))
// cancel ctx returns an error
cancel()
cancel = nil
require.EqualError(t, globalDelRateLimiter.Wait(ctx), "context canceled")
require.EqualError(t, globalDelRateLimiter.WaitDelToken(ctx), "context canceled")
}

func TestTTLDeleteTaskWorker(t *testing.T) {
Expand Down

0 comments on commit cea46f1

Please sign in to comment.