Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#58206
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
lcwangchao authored and ti-chi-bot committed Jan 8, 2025
1 parent ea9c4ab commit 53459e3
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 23 deletions.
26 changes: 18 additions & 8 deletions pkg/ttl/ttlworker/del.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,26 @@ const (
delRetryInterval = time.Second * 5
)

type delRateLimiter interface {
WaitDelToken(ctx context.Context) error
}

var globalDelRateLimiter = newDelRateLimiter()

type delRateLimiter struct {
type defaultDelRateLimiter struct {
sync.Mutex
limiter *rate.Limiter
limit atomic.Int64
}

func newDelRateLimiter() *delRateLimiter {
limiter := &delRateLimiter{}
func newDelRateLimiter() delRateLimiter {
limiter := &defaultDelRateLimiter{}
limiter.limiter = rate.NewLimiter(0, 1)
limiter.limit.Store(0)
return limiter
}

func (l *delRateLimiter) Wait(ctx context.Context) error {
func (l *defaultDelRateLimiter) WaitDelToken(ctx context.Context) error {
limit := l.limit.Load()
if variable.TTLDeleteRateLimit.Load() != limit {
limit = l.reset()
Expand All @@ -68,7 +72,7 @@ func (l *delRateLimiter) Wait(ctx context.Context) error {
return l.limiter.Wait(ctx)
}

func (l *delRateLimiter) reset() (newLimit int64) {
func (l *defaultDelRateLimiter) reset() (newLimit int64) {
l.Lock()
defer l.Unlock()
newLimit = variable.TTLDeleteRateLimit.Load()
Expand Down Expand Up @@ -122,9 +126,15 @@ func (t *ttlDeleteTask) doDelete(ctx context.Context, rawSe session.Session) (re
}

tracer.EnterPhase(metrics.PhaseWaitToken)
if err = globalDelRateLimiter.Wait(ctx); err != nil {
t.statistics.IncErrorRows(len(delBatch))
return
if err = globalDelRateLimiter.WaitDelToken(ctx); err != nil {
tracer.EnterPhase(metrics.PhaseOther)
logutil.BgLogger().Info(
"wait TTL delete rate limiter interrupted",
zap.Error(err),
zap.Int("waitDelRowCnt", len(delBatch)),
)
retryRows = append(retryRows, delBatch...)
continue
}
tracer.EnterPhase(metrics.PhaseOther)

Expand Down
71 changes: 56 additions & 15 deletions pkg/ttl/ttlworker/del_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,14 @@ func TestTTLDelRetryBuffer(t *testing.T) {
require.Equal(t, uint64(7), statics6.ErrorRows.Load())
}

type mockDelRateLimiter struct {
waitFn func(context.Context) error
}

func (m *mockDelRateLimiter) WaitDelToken(ctx context.Context) error {
return m.waitFn(ctx)
}

func TestTTLDeleteTaskDoDelete(t *testing.T) {
origBatchSize := variable.TTLDeleteBatchSize.Load()
delBatch := 3
Expand Down Expand Up @@ -242,11 +250,12 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) {
}

cases := []struct {
batchCnt int
retryErrBatches []int
noRetryErrBatches []int
cancelCtx bool
cancelCtxBatch int
batchCnt int
retryErrBatches []int
noRetryErrBatches []int
cancelCtx bool
cancelCtxBatch int
cancelCtxErrInLimiter bool
}{
{
// all success
Expand Down Expand Up @@ -276,18 +285,50 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) {
cancelCtx: true,
cancelCtxBatch: 6,
},
{
// some executed when rate limiter returns error
batchCnt: 10,
cancelCtx: true,
cancelCtxBatch: 3,
cancelCtxErrInLimiter: true,
},
}

errLimiter := &mockDelRateLimiter{
waitFn: func(ctx context.Context) error {
return errors.New("mock rate limiter error")
},
}

origGlobalDelRateLimiter := globalDelRateLimiter
defer func() {
globalDelRateLimiter = origGlobalDelRateLimiter
}()

for _, c := range cases {
<<<<<<< HEAD
ctx, cancel := context.WithCancel(context.Background())
=======
globalDelRateLimiter = origGlobalDelRateLimiter
require.True(t, c.cancelCtxBatch >= 0 && c.cancelCtxBatch < c.batchCnt)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
>>>>>>> cea46f17ce0 (ttl: retry the rows when del rate limiter returns error in delWorker (#58206))
if c.cancelCtx && c.cancelCtxBatch == 0 {
cancel()
if c.cancelCtxErrInLimiter {
globalDelRateLimiter = errLimiter
} else {
cancel()
}
}

afterExecuteSQL = func() {
if c.cancelCtx {
if len(sqls) == c.cancelCtxBatch {
cancel()
if c.cancelCtxErrInLimiter {
globalDelRateLimiter = errLimiter
} else {
cancel()
}
}
}
}
Expand Down Expand Up @@ -358,21 +399,21 @@ func TestTTLDeleteRateLimiter(t *testing.T) {
}()

variable.TTLDeleteRateLimit.Store(100000)
require.NoError(t, globalDelRateLimiter.Wait(ctx))
require.Equal(t, rate.Limit(100000), globalDelRateLimiter.limiter.Limit())
require.Equal(t, int64(100000), globalDelRateLimiter.limit.Load())
require.NoError(t, globalDelRateLimiter.WaitDelToken(ctx))
require.Equal(t, rate.Limit(100000), globalDelRateLimiter.(*defaultDelRateLimiter).limiter.Limit())
require.Equal(t, int64(100000), globalDelRateLimiter.(*defaultDelRateLimiter).limit.Load())

variable.TTLDeleteRateLimit.Store(0)
require.NoError(t, globalDelRateLimiter.Wait(ctx))
require.Equal(t, rate.Limit(0), globalDelRateLimiter.limiter.Limit())
require.Equal(t, int64(0), globalDelRateLimiter.limit.Load())
require.NoError(t, globalDelRateLimiter.WaitDelToken(ctx))
require.Equal(t, rate.Limit(0), globalDelRateLimiter.(*defaultDelRateLimiter).limiter.Limit())
require.Equal(t, int64(0), globalDelRateLimiter.(*defaultDelRateLimiter).limit.Load())

// 0 stands for no limit
require.NoError(t, globalDelRateLimiter.Wait(ctx))
require.NoError(t, globalDelRateLimiter.WaitDelToken(ctx))
// cancel ctx returns an error
cancel()
cancel = nil
require.EqualError(t, globalDelRateLimiter.Wait(ctx), "context canceled")
require.EqualError(t, globalDelRateLimiter.WaitDelToken(ctx), "context canceled")
}

func TestTTLDeleteTaskWorker(t *testing.T) {
Expand Down

0 comments on commit 53459e3

Please sign in to comment.