Skip to content

Commit

Permalink
Merge pull request cockroachdb#114063 from cockroachdb/blathers/backp…
Browse files Browse the repository at this point in the history
…ort-release-23.2-113966

release-23.2: kvcoord: Reintroduce catchup scan semaphore for regular rangefeed
  • Loading branch information
miretskiy authored Nov 9, 2023
2 parents a8bccae + 9eb7633 commit 5c59f41
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 7 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ go_library(
"//pkg/util/grpcutil",
"//pkg/util/hlc",
"//pkg/util/iterutil",
"//pkg/util/limit",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/pprofutil",
Expand Down
69 changes: 63 additions & 6 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/pprofutil"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
Expand Down Expand Up @@ -72,6 +73,14 @@ var catchupStartupRate = settings.RegisterIntSetting(
settings.WithPublic,
)

var catchupScanConcurrency = settings.RegisterIntSetting(
settings.ApplicationLevel,
"kv.rangefeed.catchup_scan_concurrency",
"number of catchup scans that a single rangefeed can execute concurrently; 0 implies unlimited",
8,
settings.NonNegativeInt,
)

var rangefeedRangeStuckThreshold = settings.RegisterDurationSetting(
settings.ApplicationLevel,
"kv.rangefeed.range_stuck_threshold",
Expand Down Expand Up @@ -220,7 +229,7 @@ func (ds *DistSender) RangeFeedSpans(
cfg.rangeObserver(rr.ForEachPartialRangefeed)
}

rl := newCatchupScanRateLimiter(&ds.st.SV)
rl := newCatchupScanRateLimiter(&ds.st.SV, cfg.useMuxRangeFeed)

if enableMuxRangeFeed && cfg.useMuxRangeFeed {
return muxRangeFeed(ctx, cfg, spans, ds, rr, rl, eventCh)
Expand Down Expand Up @@ -694,11 +703,13 @@ func (a *activeRangeFeed) acquireCatchupScanQuota(
ctx context.Context, rl *catchupScanRateLimiter, metrics *DistSenderRangeFeedMetrics,
) error {
// Indicate catchup scan is starting.
if err := rl.Pace(ctx); err != nil {
alloc, err := rl.Pace(ctx)
if err != nil {
return err
}
metrics.RangefeedCatchupRanges.Inc(1)
a.catchupRes = func() {
alloc.Release()
metrics.RangefeedCatchupRanges.Dec(1)
}

Expand Down Expand Up @@ -987,18 +998,48 @@ type catchupScanRateLimiter struct {
pacer *quotapool.RateLimiter
sv *settings.Values
limit quotapool.Limit

// In addition to rate limiting catchup scans, a semaphore is used to restrict
// catchup scan concurrency for regular range feeds (catchupSem is nil for mux
// rangefeed).
// This additional limit is necessary due to the fact that regular
// rangefeed may buffer up to 2MB of data (or 128KB if
// useDedicatedRangefeedConnectionClass set to true) per rangefeed stream in the
// http2/gRPC buffers -- making OOMs likely if the consumer does not consume
// events quickly enough. See
// https://github.com/cockroachdb/cockroach/issues/74219 for details.
// TODO(yevgeniy): Drop this once regular rangefeed gets deprecated.
catchupSemLimit int
catchupSem *limit.ConcurrentRequestLimiter
}

func newCatchupScanRateLimiter(sv *settings.Values) *catchupScanRateLimiter {
func newCatchupScanRateLimiter(sv *settings.Values, useMuxRangeFeed bool) *catchupScanRateLimiter {
const slowAcquisitionThreshold = 5 * time.Second
lim := getCatchupRateLimit(sv)
return &catchupScanRateLimiter{

rl := &catchupScanRateLimiter{
sv: sv,
limit: lim,
pacer: quotapool.NewRateLimiter(
"distSenderCatchupLimit", lim, 0, /* smooth rate limit without burst */
quotapool.OnSlowAcquisition(slowAcquisitionThreshold, logSlowCatchupScanAcquisition(slowAcquisitionThreshold))),
}

if !useMuxRangeFeed {
rl.catchupSemLimit = maxConcurrentCatchupScans(sv)
l := limit.MakeConcurrentRequestLimiter("distSenderCatchupLimit", rl.catchupSemLimit)
rl.catchupSem = &l
}

return rl
}

func maxConcurrentCatchupScans(sv *settings.Values) int {
l := catchupScanConcurrency.Get(sv)
if l == 0 {
return math.MaxInt
}
return int(l)
}

func getCatchupRateLimit(sv *settings.Values) quotapool.Limit {
Expand All @@ -1009,15 +1050,31 @@ func getCatchupRateLimit(sv *settings.Values) quotapool.Limit {
}

// Pace paces the catchup scan startup.
func (rl *catchupScanRateLimiter) Pace(ctx context.Context) error {
func (rl *catchupScanRateLimiter) Pace(ctx context.Context) (limit.Reservation, error) {
// Take opportunity to update limits if they have changed.
if lim := getCatchupRateLimit(rl.sv); lim != rl.limit {
rl.limit = lim
rl.pacer.UpdateLimit(lim, 0 /* smooth rate limit without burst */)
}
return rl.pacer.WaitN(ctx, 1)

if err := rl.pacer.WaitN(ctx, 1); err != nil {
return nil, err
}

// Regular rangefeed, in addition to pacing also acquires catchup scan quota.
if rl.catchupSem != nil {
// Take opportunity to update limits if they have changed.
if lim := maxConcurrentCatchupScans(rl.sv); lim != rl.catchupSemLimit {
rl.catchupSem.SetLimit(lim)
}
return rl.catchupSem.Begin(ctx)
}

return catchupAlloc(releaseNothing), nil
}

func releaseNothing() {}

// logSlowCatchupScanAcquisition is a function returning a quotapool.SlowAcquisitionFunction.
// It differs from the quotapool.LogSlowAcquisition in that only some of slow acquisition
// events are logged to reduce log spam.
Expand Down
1 change: 0 additions & 1 deletion pkg/settings/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,6 @@ var retiredSettings = map[InternalKey]struct{}{
"jobs.trace.force_dump_mode": {},
"timeseries.storage.30m_resolution_ttl": {},
"server.cpu_profile.enabled": {},
"kv.rangefeed.catchup_scan_concurrency": {},
"changefeed.lagging_ranges_threshold": {},
"changefeed.lagging_ranges_polling_rate": {},
"trace.jaeger.agent": {},
Expand Down

0 comments on commit 5c59f41

Please sign in to comment.