diff --git a/pkg/kv/kvclient/kvcoord/BUILD.bazel b/pkg/kv/kvclient/kvcoord/BUILD.bazel index 533904e96765..0c5a1441fef1 100644 --- a/pkg/kv/kvclient/kvcoord/BUILD.bazel +++ b/pkg/kv/kvclient/kvcoord/BUILD.bazel @@ -73,6 +73,7 @@ go_library( "//pkg/util/grpcutil", "//pkg/util/hlc", "//pkg/util/iterutil", + "//pkg/util/limit", "//pkg/util/log", "//pkg/util/metric", "//pkg/util/pprofutil", diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index 97ee1c85e7a9..f00aee7c63ee 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -37,6 +37,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/grpcutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/iterutil" + "github.com/cockroachdb/cockroach/pkg/util/limit" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/pprofutil" "github.com/cockroachdb/cockroach/pkg/util/quotapool" @@ -72,6 +73,14 @@ var catchupStartupRate = settings.RegisterIntSetting( settings.WithPublic, ) +var catchupScanConcurrency = settings.RegisterIntSetting( + settings.ApplicationLevel, + "kv.rangefeed.catchup_scan_concurrency", + "number of catchup scans that a single rangefeed can execute concurrently; 0 implies unlimited", + 8, + settings.NonNegativeInt, +) + var rangefeedRangeStuckThreshold = settings.RegisterDurationSetting( settings.ApplicationLevel, "kv.rangefeed.range_stuck_threshold", @@ -220,7 +229,7 @@ func (ds *DistSender) RangeFeedSpans( cfg.rangeObserver(rr.ForEachPartialRangefeed) } - rl := newCatchupScanRateLimiter(&ds.st.SV) + rl := newCatchupScanRateLimiter(&ds.st.SV, cfg.useMuxRangeFeed) if enableMuxRangeFeed && cfg.useMuxRangeFeed { return muxRangeFeed(ctx, cfg, spans, ds, rr, rl, eventCh) @@ -694,11 +703,13 @@ func (a *activeRangeFeed) acquireCatchupScanQuota( ctx context.Context, rl *catchupScanRateLimiter, metrics *DistSenderRangeFeedMetrics, ) error { // Indicate catchup scan is starting. - if err := rl.Pace(ctx); err != nil { + alloc, err := rl.Pace(ctx) + if err != nil { return err } metrics.RangefeedCatchupRanges.Inc(1) a.catchupRes = func() { + alloc.Release() metrics.RangefeedCatchupRanges.Dec(1) } @@ -987,18 +998,48 @@ type catchupScanRateLimiter struct { pacer *quotapool.RateLimiter sv *settings.Values limit quotapool.Limit + + // In addition to rate limiting catchup scans, a semaphore is used to restrict + // catchup scan concurrency for regular range feeds (catchupSem is nil for mux + // rangefeed). + // This additional limit is necessary due to the fact that regular + // rangefeed may buffer up to 2MB of data (or 128KB if + // useDedicatedRangefeedConnectionClass set to true) per rangefeed stream in the + // http2/gRPC buffers -- making OOMs likely if the consumer does not consume + // events quickly enough. See + // https://github.com/cockroachdb/cockroach/issues/74219 for details. + // TODO(yevgeniy): Drop this once regular rangefeed gets deprecated. + catchupSemLimit int + catchupSem *limit.ConcurrentRequestLimiter } -func newCatchupScanRateLimiter(sv *settings.Values) *catchupScanRateLimiter { +func newCatchupScanRateLimiter(sv *settings.Values, useMuxRangeFeed bool) *catchupScanRateLimiter { const slowAcquisitionThreshold = 5 * time.Second lim := getCatchupRateLimit(sv) - return &catchupScanRateLimiter{ + + rl := &catchupScanRateLimiter{ sv: sv, limit: lim, pacer: quotapool.NewRateLimiter( "distSenderCatchupLimit", lim, 0, /* smooth rate limit without burst */ quotapool.OnSlowAcquisition(slowAcquisitionThreshold, logSlowCatchupScanAcquisition(slowAcquisitionThreshold))), } + + if !useMuxRangeFeed { + rl.catchupSemLimit = maxConcurrentCatchupScans(sv) + l := limit.MakeConcurrentRequestLimiter("distSenderCatchupLimit", rl.catchupSemLimit) + rl.catchupSem = &l + } + + return rl +} + +func maxConcurrentCatchupScans(sv *settings.Values) int { + l := catchupScanConcurrency.Get(sv) + if l == 0 { + return math.MaxInt + } + return int(l) } func getCatchupRateLimit(sv *settings.Values) quotapool.Limit { @@ -1009,15 +1050,31 @@ func getCatchupRateLimit(sv *settings.Values) quotapool.Limit { } // Pace paces the catchup scan startup. -func (rl *catchupScanRateLimiter) Pace(ctx context.Context) error { +func (rl *catchupScanRateLimiter) Pace(ctx context.Context) (limit.Reservation, error) { // Take opportunity to update limits if they have changed. if lim := getCatchupRateLimit(rl.sv); lim != rl.limit { rl.limit = lim rl.pacer.UpdateLimit(lim, 0 /* smooth rate limit without burst */) } - return rl.pacer.WaitN(ctx, 1) + + if err := rl.pacer.WaitN(ctx, 1); err != nil { + return nil, err + } + + // Regular rangefeed, in addition to pacing also acquires catchup scan quota. + if rl.catchupSem != nil { + // Take opportunity to update limits if they have changed. + if lim := maxConcurrentCatchupScans(rl.sv); lim != rl.catchupSemLimit { + rl.catchupSem.SetLimit(lim) + } + return rl.catchupSem.Begin(ctx) + } + + return catchupAlloc(releaseNothing), nil } +func releaseNothing() {} + // logSlowCatchupScanAcquisition is a function returning a quotapool.SlowAcquisitionFunction. // It differs from the quotapool.LogSlowAcquisition in that only some of slow acquisition // events are logged to reduce log spam. diff --git a/pkg/settings/registry.go b/pkg/settings/registry.go index 8dc6df579bd3..dfec5294a751 100644 --- a/pkg/settings/registry.go +++ b/pkg/settings/registry.go @@ -222,7 +222,6 @@ var retiredSettings = map[InternalKey]struct{}{ "jobs.trace.force_dump_mode": {}, "timeseries.storage.30m_resolution_ttl": {}, "server.cpu_profile.enabled": {}, - "kv.rangefeed.catchup_scan_concurrency": {}, "changefeed.lagging_ranges_threshold": {}, "changefeed.lagging_ranges_polling_rate": {}, "trace.jaeger.agent": {},