diff --git a/cdc/kv/client.go b/cdc/kv/client.go index e49809119ad..c4f5a3190aa 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -77,8 +77,12 @@ const ( // frequency of creating new goroutine. defaultRegionChanSize = 128 - // initial size for region rate limit queue + // initial size for region rate limit queue. defaultRegionRateLimitQueueSize = 128 + // Interval of check region retry rate limit queue. + defaultCheckRegionRateLimitInterval = 50 * time.Millisecond + // Duration of warning region retry rate limited too long. + defaultLogRegionRateLimitDuration = 10 * time.Second ) // time interval to force kv client to terminate gRPC stream and reconnect @@ -135,6 +139,33 @@ func newSingleRegionInfo(verID tikv.RegionVerID, span regionspan.ComparableSpan, type regionErrorInfo struct { singleRegionInfo err error + + retryLimitTime *time.Time + logRateLimitDuration time.Duration +} + +func newRegionErrorInfo(info singleRegionInfo, err error) regionErrorInfo { + return regionErrorInfo{ + singleRegionInfo: info, + err: err, + + logRateLimitDuration: defaultLogRegionRateLimitDuration, + } +} + +func (r *regionErrorInfo) logRateLimitedHint() bool { + now := time.Now() + if r.retryLimitTime == nil { + // Caller should log on the first rate limited. + r.retryLimitTime = &now + return true + } + if now.Sub(*r.retryLimitTime) > r.logRateLimitDuration { + // Caller should log if it lasts too long. + r.retryLimitTime = &now + return true + } + return false } type regionFeedState struct { @@ -521,8 +552,7 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error { tableID, tableName := util.TableIDFromCtx(ctx) cfID := util.ChangefeedIDFromCtx(ctx) g.Go(func() error { - checkRateLimitInterval := 50 * time.Millisecond - timer := time.NewTimer(checkRateLimitInterval) + timer := time.NewTimer(defaultCheckRegionRateLimitInterval) defer timer.Stop() for { select { @@ -530,18 +560,27 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error { return ctx.Err() case <-timer.C: s.handleRateLimit(ctx) - timer.Reset(checkRateLimitInterval) + timer.Reset(defaultCheckRegionRateLimitInterval) case errInfo := <-s.errCh: s.errChSizeGauge.Dec() allowed := s.checkRateLimit(errInfo.singleRegionInfo.verID.GetID()) if !allowed { + if errInfo.logRateLimitedHint() { + zapFieldAddr := zap.Skip() + if errInfo.singleRegionInfo.rpcCtx != nil { + // rpcCtx may be nil if we fails to get region info + // from pd. It could cause by pd down or the region + // has been merged. + zapFieldAddr = zap.String("addr", errInfo.singleRegionInfo.rpcCtx.Addr) + } + log.Info("EventFeed retry rate limited", + zap.Uint64("regionID", errInfo.singleRegionInfo.verID.GetID()), + zap.Uint64("ts", errInfo.singleRegionInfo.ts), + zap.String("changefeed", cfID), zap.Stringer("span", errInfo.span), + zap.Int64("tableID", tableID), zap.String("tableName", tableName), + zapFieldAddr) + } // rate limit triggers, add the error info to the rate limit queue. - log.Info("EventFeed retry rate limited", - zap.Uint64("regionID", errInfo.singleRegionInfo.verID.GetID()), - zap.Uint64("ts", errInfo.singleRegionInfo.ts), - zap.String("changefeed", cfID), zap.Stringer("span", errInfo.span), - zap.Int64("tableID", tableID), zap.String("tableName", tableName), - zap.String("addr", errInfo.singleRegionInfo.rpcCtx.Addr)) s.rateLimitQueue = append(s.rateLimitQueue, errInfo) } else { err := s.handleError(ctx, errInfo) @@ -634,14 +673,13 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single // onRegionFail handles a region's failure, which means, unlock the region's range and send the error to the errCh for // error handling. This function is non blocking even if error channel is full. // CAUTION: Note that this should only be called in a context that the region has locked it's range. -func (s *eventFeedSession) onRegionFail(ctx context.Context, errorInfo regionErrorInfo, revokeToken bool) error { +func (s *eventFeedSession) onRegionFail(ctx context.Context, errorInfo regionErrorInfo, revokeToken bool) { log.Debug("region failed", zap.Uint64("regionID", errorInfo.verID.GetID()), zap.Error(errorInfo.err)) s.rangeLock.UnlockRange(errorInfo.span.Start, errorInfo.span.End, errorInfo.verID.GetID(), errorInfo.verID.GetVer(), errorInfo.ts) if revokeToken { s.regionRouter.Release(errorInfo.rpcCtx.Addr) } s.enqueueError(ctx, errorInfo) - return nil } // requestRegionToStore gets singleRegionInfo from regionRouter, which is a token @@ -735,13 +773,8 @@ func (s *eventFeedSession) requestRegionToStore( } bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) s.client.regionCache.OnSendFail(bo, rpcCtx, regionScheduleReload, err) - err = s.onRegionFail(ctx, regionErrorInfo{ - singleRegionInfo: sri, - err: &connectToStoreErr{}, - }, false /* revokeToken */) - if err != nil { - return errors.Trace(err) - } + errInfo := newRegionErrorInfo(sri, &connectToStoreErr{}) + s.onRegionFail(ctx, errInfo, false /* revokeToken */) continue } s.addStream(rpcCtx.Addr, stream, streamCancel) @@ -795,15 +828,8 @@ func (s *eventFeedSession) requestRegionToStore( continue } - // Wait for a while and retry sending the request - time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) - err = s.onRegionFail(ctx, regionErrorInfo{ - singleRegionInfo: sri, - err: &sendRequestToStoreErr{}, - }, false /* revokeToken */) - if err != nil { - return errors.Trace(err) - } + errInfo := newRegionErrorInfo(sri, &sendRequestToStoreErr{}) + s.onRegionFail(ctx, errInfo, false /* revokeToken */) } else { s.regionRouter.Acquire(rpcCtx.Addr) } @@ -864,15 +890,8 @@ func (s *eventFeedSession) dispatchRequest( log.Info("cannot get rpcCtx, retry span", zap.Uint64("regionID", sri.verID.GetID()), zap.Stringer("span", sri.span)) - err = s.onRegionFail(ctx, regionErrorInfo{ - singleRegionInfo: sri, - err: &rpcCtxUnavailableErr{ - verID: sri.verID, - }, - }, false /* revokeToken */) - if err != nil { - return errors.Trace(err) - } + errInfo := newRegionErrorInfo(sri, &rpcCtxUnavailableErr{verID: sri.verID}) + s.onRegionFail(ctx, errInfo, false /* revokeToken */) continue } sri.rpcCtx = rpcCtx @@ -1085,14 +1104,8 @@ func (s *eventFeedSession) receiveFromStream( remainingRegions := pendingRegions.takeAll() for _, state := range remainingRegions { - err := s.onRegionFail(ctx, regionErrorInfo{ - singleRegionInfo: state.sri, - err: cerror.ErrPendingRegionCancel.GenWithStackByArgs(), - }, true /* revokeToken */) - if err != nil { - // The only possible is that the ctx is cancelled. Simply return. - return - } + errInfo := newRegionErrorInfo(state.sri, cerror.ErrPendingRegionCancel.FastGenByArgs()) + s.onRegionFail(ctx, errInfo, true /* revokeToken */) } }() @@ -1104,9 +1117,7 @@ func (s *eventFeedSession) receiveFromStream( // to call exactly once from outter code logic worker := newRegionWorker(s, addr) - defer func() { - worker.evictAllRegions() //nolint:errcheck - }() + defer worker.evictAllRegions() g.Go(func() error { return worker.run(ctx) diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index e61310ec110..fd5094637f3 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -43,6 +43,7 @@ import ( "github.com/pingcap/tiflow/pkg/txnutil" "github.com/pingcap/tiflow/pkg/util" "github.com/pingcap/tiflow/pkg/util/testleak" + "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/testutils" "github.com/tikv/client-go/v2/tikv" @@ -3497,3 +3498,19 @@ func (s *etcdSuite) TestHandleRateLimit(c *check.C) { c.Assert(session.rateLimitQueue, check.HasLen, 0) c.Assert(cap(session.rateLimitQueue), check.Equals, 128) } + +func TestRegionErrorInfoLogRateLimitedHint(t *testing.T) { + t.Parallel() + + errInfo := newRegionErrorInfo(singleRegionInfo{}, nil) + errInfo.logRateLimitDuration = time.Second + + // True on the first rate limited. + require.True(t, errInfo.logRateLimitedHint()) + require.False(t, errInfo.logRateLimitedHint()) + + // True if it lasts too long. + time.Sleep(2 * errInfo.logRateLimitDuration) + require.True(t, errInfo.logRateLimitedHint()) + require.False(t, errInfo.logRateLimitedHint()) +} diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index ad37a3eee5b..d0f55a4dbee 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -275,13 +275,8 @@ func (w *regionWorker) handleSingleRegionError(err error, state *regionFeedState revokeToken := !state.initialized // since the context used in region worker will be cancelled after region // worker exits, we must use the parent context to prevent regionErrorInfo loss. - err2 := w.session.onRegionFail(w.parentCtx, regionErrorInfo{ - singleRegionInfo: state.sri, - err: err, - }, revokeToken) - if err2 != nil { - return err2 - } + errInfo := newRegionErrorInfo(state.sri, err) + w.session.onRegionFail(w.parentCtx, errInfo, revokeToken) return retErr } @@ -771,8 +766,7 @@ func (w *regionWorker) handleResolvedTs( // evictAllRegions is used when gRPC stream meets error and re-establish, notify // all existing regions to re-establish -func (w *regionWorker) evictAllRegions() error { - var err error +func (w *regionWorker) evictAllRegions() { for _, states := range w.statesManager.states { states.Range(func(_, value interface{}) bool { state := value.(*regionFeedState) @@ -792,14 +786,11 @@ func (w *regionWorker) evictAllRegions() error { // since the context used in region worker will be cancelled after // region worker exits, we must use the parent context to prevent // regionErrorInfo loss. - err = w.session.onRegionFail(w.parentCtx, regionErrorInfo{ - singleRegionInfo: state.sri, - err: cerror.ErrEventFeedAborted.FastGenByArgs(), - }, revokeToken) - return err == nil + errInfo := newRegionErrorInfo(state.sri, cerror.ErrEventFeedAborted.FastGenByArgs()) + w.session.onRegionFail(w.parentCtx, errInfo, revokeToken) + return true }) } - return err } func getWorkerPoolSize() (size int) {