Skip to content

Commit

Permalink
cdc,regionspan(ticdc): more logs about eventfeed retry rate limit (#4008
Browse files Browse the repository at this point in the history
)
  • Loading branch information
overvenus authored Dec 26, 2021
1 parent eceb426 commit 43a599d
Showing 1 changed file with 11 additions and 7 deletions.
18 changes: 11 additions & 7 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,8 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error {
}
})

tableID, tableName := util.TableIDFromCtx(ctx)
cfID := util.ChangefeedIDFromCtx(ctx)
g.Go(func() error {
checkRateLimitInterval := 50 * time.Millisecond
timer := time.NewTimer(checkRateLimitInterval)
Expand All @@ -520,7 +522,13 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error {
s.errChSizeGauge.Dec()
allowed := s.checkRateLimit(errInfo.singleRegionInfo.verID.GetID())
if !allowed {
// rate limit triggers, add the error info to the rate limit queue
// rate limit triggers, add the error info to the rate limit queue.
log.Info("EventFeed retry rate limited",
zap.Uint64("regionID", errInfo.singleRegionInfo.verID.GetID()),
zap.Uint64("ts", errInfo.singleRegionInfo.ts),
zap.String("changefeed", cfID), zap.Stringer("span", errInfo.span),
zap.Int64("tableID", tableID), zap.String("tableName", tableName),
zap.String("addr", errInfo.singleRegionInfo.rpcCtx.Addr))
s.rateLimitQueue = append(s.rateLimitQueue, errInfo)
} else {
err := s.handleError(ctx, errInfo)
Expand Down Expand Up @@ -967,15 +975,11 @@ func (s *eventFeedSession) handleRateLimit(ctx context.Context) {
}

// checkRateLimit checks whether a region can be reconnected based on its rate limiter
func (s *eventFeedSession) checkRateLimit(regionID uint64) (allowed bool) {
func (s *eventFeedSession) checkRateLimit(regionID uint64) bool {
limiter := s.client.getRegionLimiter(regionID)
// use Limiter.Allow here since if exceed the rate limit, we skip this region
// and try it later.
allowed = limiter.Allow()
if !allowed {
log.Info("EventFeed retry rate limited", zap.Uint64("regionID", regionID))
}
return
return limiter.Allow()
}

// handleError handles error returned by a region. If some new EventFeed connection should be established, the region
Expand Down

0 comments on commit 43a599d

Please sign in to comment.