Skip to content

Commit

Permalink
kvcoord: Replace rangefeed catchup semaphore with rate limiter
Browse files Browse the repository at this point in the history
The catchup scan limit was added in cockroachdb#77725 in order to attempt
to restrict a single rangefeed from consuming all catchup
scan slots on the KV side.  This client side limit has
been largely ineffective.

More recently, this semaphore has been coopted in cockroachdb#109346 in order
to pace goroutine creation rate on the client. This functionality
is important, but the implementation is not very good.

Namely, we are interested in controling the rate of new catchup scans
being started by the client (and thus, control the rate of goroutine
creation).  This implementation replaces the old implementation
with rate limit based approach.  The rate limits are configured using
two new settings: `kv.rangefeed.client.startup_range_burst` which sets
the maximum burst rate on the number of newly established rangefeed
streams (default 180), and a `kv.rangefeed.client.startup_window` setting
which sets the window period over which "burst" rangefeed connections
may be established.

Closes cockroachdb#110439
Epic: CRDB-26372

Release note: None
  • Loading branch information
Yevgeniy Miretskiy committed Sep 20, 2023
1 parent d414347 commit 91a78db
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 39 deletions.
2 changes: 2 additions & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ kv.closed_timestamp.lead_for_global_reads_override duration 0s if nonzero, overr
kv.closed_timestamp.side_transport_interval duration 200ms the interval at which the closed timestamp side-transport attempts to advance each range's closed timestamp; set to 0 to disable the side-transport tenant-ro
kv.closed_timestamp.target_duration duration 3s if nonzero, attempt to provide closed timestamp notifications for timestamps trailing cluster time by approximately this duration tenant-ro
kv.protectedts.reconciliation.interval duration 5m0s the frequency for reconciling jobs with protected timestamp records tenant-ro
kv.rangefeed.client.startup_range_burst integer 180 maximum number of rangefeed streams that may be started in a kv.rangefeed.client.startup_window interval ; 0 implies unlimited tenant-rw
kv.rangefeed.client.startup_window duration 3s controls the rate the client will initiate new rangefeed stream for a single range.up to kv.rangefeed.client.startup_range_burst new rangefeed streams will be established during this period tenant-rw
kv.rangefeed.closed_timestamp_refresh_interval duration 3s the interval at which closed-timestamp updatesare delivered to rangefeeds; set to 0 to use kv.closed_timestamp.side_transport_interval tenant-ro
kv.rangefeed.enabled boolean false if set, rangefeed registration is enabled tenant-rw
kv.rangefeed.range_stuck_threshold duration 1m0s restart rangefeeds if they don't emit anything for the specified threshold; 0 disables (kv.rangefeed.closed_timestamp_refresh_interval takes precedence) tenant-rw
Expand Down
2 changes: 2 additions & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@
<tr><td><div id="setting-kv-range-split-by-load-enabled" class="anchored"><code>kv.range_split.by_load.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>allow automatic splits of ranges based on where load is concentrated</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-range-split-load-cpu-threshold" class="anchored"><code>kv.range_split.load_cpu_threshold</code></div></td><td>duration</td><td><code>500ms</code></td><td>the CPU use per second over which, the range becomes a candidate for load based splitting</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-range-split-load-qps-threshold" class="anchored"><code>kv.range_split.load_qps_threshold</code></div></td><td>integer</td><td><code>2500</code></td><td>the QPS over which, the range becomes a candidate for load based splitting</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-rangefeed-client-startup-range-burst" class="anchored"><code>kv.rangefeed.client.startup_range_burst</code></div></td><td>integer</td><td><code>180</code></td><td>maximum number of rangefeed streams that may be started in a kv.rangefeed.client.startup_window interval ; 0 implies unlimited</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-rangefeed-client-startup-window" class="anchored"><code>kv.rangefeed.client.startup_window</code></div></td><td>duration</td><td><code>3s</code></td><td>controls the rate the client will initiate new rangefeed stream for a single range.up to kv.rangefeed.client.startup_range_burst new rangefeed streams will be established during this period</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-rangefeed-closed-timestamp-refresh-interval" class="anchored"><code>kv.rangefeed.closed_timestamp_refresh_interval</code></div></td><td>duration</td><td><code>3s</code></td><td>the interval at which closed-timestamp updatesare delivered to rangefeeds; set to 0 to use kv.closed_timestamp.side_transport_interval</td><td>Serverless/Dedicated/Self-Hosted (read-only)</td></tr>
<tr><td><div id="setting-kv-rangefeed-enabled" class="anchored"><code>kv.rangefeed.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>if set, rangefeed registration is enabled</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-rangefeed-range-stuck-threshold" class="anchored"><code>kv.rangefeed.range_stuck_threshold</code></div></td><td>duration</td><td><code>1m0s</code></td><td>restart rangefeeds if they don&#39;t emit anything for the specified threshold; 0 disables (kv.rangefeed.closed_timestamp_refresh_interval takes precedence)</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
Expand Down
1 change: 0 additions & 1 deletion pkg/cmd/roachtest/tests/cdc_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ func makeCDCBenchOptions() (option.StartOpts, install.ClusterSettings) {

// Bump up the number of allowed catchup scans. Doing catchup for 100k ranges with default
// configuration (8 client side, 16 per store) takes a while (~1500-2000 ranges per min minutes).
settings.ClusterSettings["kv.rangefeed.catchup_scan_concurrency"] = "16"
settings.ClusterSettings["kv.rangefeed.concurrent_catchup_iterators"] = "16"

// Give changefeed more memory and slow down rangefeed checkpoints.
Expand Down
1 change: 0 additions & 1 deletion pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ go_library(
"//pkg/util/grpcutil",
"//pkg/util/hlc",
"//pkg/util/iterutil",
"//pkg/util/limit",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/pprofutil",
Expand Down
9 changes: 4 additions & 5 deletions pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/future"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/pprofutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
Expand All @@ -49,7 +48,7 @@ type rangefeedMuxer struct {
metrics *DistSenderRangeFeedMetrics
cfg rangeFeedConfig
registry *rangeFeedRegistry
catchupSem *limit.ConcurrentRequestLimiter
catchupSem *catchupScanRateLimiter
eventCh chan<- RangeFeedMessage

// Each call to start new range feed gets a unique ID which is echoed back
Expand All @@ -71,7 +70,7 @@ func muxRangeFeed(
spans []SpanTimePair,
ds *DistSender,
rr *rangeFeedRegistry,
catchupSem *limit.ConcurrentRequestLimiter,
catchupRateLimiter *catchupScanRateLimiter,
eventCh chan<- RangeFeedMessage,
) (retErr error) {
if log.V(1) {
Expand All @@ -88,7 +87,7 @@ func muxRangeFeed(
ds: ds,
cfg: cfg,
metrics: &ds.metrics.DistSenderRangeFeedMetrics,
catchupSem: catchupSem,
catchupSem: catchupRateLimiter,
eventCh: eventCh,
}
if cfg.knobs.metrics != nil {
Expand Down Expand Up @@ -244,7 +243,7 @@ func (s *activeMuxRangeFeed) start(ctx context.Context, m *rangefeedMuxer) error
streamID := atomic.AddInt64(&m.seqID, 1)

// Before starting single rangefeed, acquire catchup scan quota.
if err := s.acquireCatchupScanQuota(ctx, &m.ds.st.SV, m.catchupSem, m.metrics); err != nil {
if err := s.acquireCatchupScanQuota(ctx, m.catchupSem, m.metrics); err != nil {
return err
}

Expand Down
86 changes: 59 additions & 27 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/pprofutil"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand All @@ -64,12 +64,23 @@ var useDedicatedRangefeedConnectionClass = settings.RegisterBoolSetting(
"kv.rangefeed.use_dedicated_connection_class.enabled", false),
)

var catchupScanConcurrency = settings.RegisterIntSetting(
var catchupScanDurationEstimate = settings.RegisterDurationSetting(
settings.TenantWritable,
"kv.rangefeed.catchup_scan_concurrency",
"number of catchup scans that a single rangefeed can execute concurrently; 0 implies unlimited",
8,
"kv.rangefeed.client.startup_window",
"controls the rate the client will initiate new rangefeed stream for a single range."+
"up to kv.rangefeed.client.startup_range_burst new rangefeed streams will be established during this period",
3*time.Second,
settings.NonNegativeDuration,
settings.WithPublic,
)

var catchupScanBurst = settings.RegisterIntSetting(
settings.TenantWritable,
"kv.rangefeed.client.startup_range_burst",
"maximum number of rangefeed streams that may be started in a kv.rangefeed.client.startup_window interval ; 0 implies unlimited",
180, // 180 ranges over 3 seconds window: 60 ranges/sec; 12k ranges in 200 seconds.
settings.NonNegativeInt,
settings.WithPublic,
)

var rangefeedRangeStuckThreshold = settings.RegisterDurationSetting(
Expand All @@ -80,14 +91,6 @@ var rangefeedRangeStuckThreshold = settings.RegisterDurationSetting(
settings.NonNegativeDuration,
settings.WithPublic)

func maxConcurrentCatchupScans(sv *settings.Values) int {
l := catchupScanConcurrency.Get(sv)
if l == 0 {
return math.MaxInt
}
return int(l)
}

// ForEachRangeFn is used to execute `fn` over each range in a rangefeed.
type ForEachRangeFn func(fn ActiveRangeFeedIterFn) error

Expand Down Expand Up @@ -226,12 +229,11 @@ func (ds *DistSender) RangeFeedSpans(
cfg.rangeObserver(rr.ForEachPartialRangefeed)
}

catchupSem := limit.MakeConcurrentRequestLimiter(
"distSenderCatchupLimit", maxConcurrentCatchupScans(&ds.st.SV))
rl := newCatchupScanRateLimiter(&ds.st.SV)

if ds.st.Version.IsActive(ctx, clusterversion.TODODelete_V22_2RangefeedUseOneStreamPerNode) &&
enableMuxRangeFeed && cfg.useMuxRangeFeed {
return muxRangeFeed(ctx, cfg, spans, ds, rr, &catchupSem, eventCh)
return muxRangeFeed(ctx, cfg, spans, ds, rr, rl, eventCh)
}

// Goroutine that processes subdivided ranges and creates a rangefeed for
Expand All @@ -256,7 +258,7 @@ func (ds *DistSender) RangeFeedSpans(
}
// Prior to spawning goroutine to process this feed, acquire catchup scan quota.
// This quota acquisition paces the rate of new goroutine creation.
if err := active.acquireCatchupScanQuota(ctx, &ds.st.SV, &catchupSem, metrics); err != nil {
if err := active.acquireCatchupScanQuota(ctx, rl, metrics); err != nil {
return err
}
if log.V(1) {
Expand Down Expand Up @@ -700,23 +702,17 @@ func (a catchupAlloc) Release() {
}

func (a *activeRangeFeed) acquireCatchupScanQuota(
ctx context.Context,
sv *settings.Values,
catchupSem *limit.ConcurrentRequestLimiter,
metrics *DistSenderRangeFeedMetrics,
ctx context.Context, rl *catchupScanRateLimiter, metrics *DistSenderRangeFeedMetrics,
) error {
// Indicate catchup scan is starting; Before potentially blocking on a semaphore, take
// opportunity to update semaphore limit.
catchupSem.SetLimit(maxConcurrentCatchupScans(sv))
res, err := catchupSem.Begin(ctx)
if err != nil {
// Indicate catchup scan is starting.
if err := rl.Pace(ctx); err != nil {
return err
}
metrics.RangefeedCatchupRanges.Inc(1)
a.catchupRes = func() {
metrics.RangefeedCatchupRanges.Dec(1)
res.Release()
}

a.Lock()
defer a.Unlock()
a.InCatchup = true
Expand Down Expand Up @@ -989,3 +985,39 @@ func TestingWithMuxRangeFeedRequestSenderCapture(

// TestingMakeRangeFeedMetrics exposes makeDistSenderRangeFeedMetrics for test use.
var TestingMakeRangeFeedMetrics = makeDistSenderRangeFeedMetrics

type catchupScanRateLimiter struct {
pacer *quotapool.RateLimiter
sv *settings.Values
limit quotapool.Limit
burst int64
}

func newCatchupScanRateLimiter(sv *settings.Values) *catchupScanRateLimiter {
rl := &catchupScanRateLimiter{sv: sv}
rl.limit, rl.burst = getCatchupScanLimits(rl.sv)
rl.pacer = quotapool.NewRateLimiter("distSenderCatchupLimit", rl.limit, rl.burst)
return rl
}

func getCatchupScanLimits(sv *settings.Values) (quotapool.Limit, int64) {
lim := quotapool.Inf()
burst := catchupScanBurst.Get(sv)
if burst > 0 {
if window := catchupScanDurationEstimate.Get(sv) / time.Second; window > 0 {
lim = quotapool.Limit(burst) / quotapool.Limit(window)
}
}
return lim, burst
}

// Pace paces the catchup scan startup.
func (rl *catchupScanRateLimiter) Pace(ctx context.Context) error {
// Take opportunity to update limits if they have changed.
if lim, burst := getCatchupScanLimits(rl.sv); lim != rl.limit || burst != rl.burst {
rl.limit, rl.burst = lim, burst
rl.pacer.UpdateLimit(lim, burst)
}

return rl.pacer.WaitN(ctx, 1)
}
15 changes: 10 additions & 5 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ func makeTransportFactory(

// rangeFeed is a helper to execute rangefeed. We are not using rangefeed library
// here because of circular dependencies.
// Returns cleanup function, which is safe to call multiple times, that shuts down
// and waits for the rangefeed to terminate.
func rangeFeed(
dsI interface{},
sp roachpb.Span,
Expand Down Expand Up @@ -307,8 +309,8 @@ func TestBiDirectionalRangefeedNotUsedUntilUpgradeFinalilzed(t *testing.T) {

allSeen, onValue := observeNValues(1000)
closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startTime, onValue, false)
defer closeFeed()
channelWaitWithTimeout(t, allSeen)
closeFeed()
}

t.Run("rangefeed-stream-disabled-prior-to-version-upgrade", func(t *testing.T) {
Expand Down Expand Up @@ -378,8 +380,9 @@ func TestMuxRangeFeedConnectsToNodeOnce(t *testing.T) {

allSeen, onValue := observeNValues(1000)
closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startTime, onValue, true)
defer closeFeed()
channelWaitWithTimeout(t, allSeen)
closeFeed()
closeFeed() // Explicitly shutdown the feed to make sure counters no longer change.

// Verify we connected to each node once.
connCounts.Lock()
Expand Down Expand Up @@ -458,8 +461,9 @@ func TestRestartsStuckRangeFeeds(t *testing.T) {

allSeen, onValue := observeNValues(100)
closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startTime, onValue, false)
defer closeFeed()
channelWaitWithTimeout(t, allSeen)
closeFeed()
closeFeed() // Explicitly shutdown feed to make sure metrics no longer change.

require.True(t, blockingClient.ctxCanceled)
require.EqualValues(t, 1, tc.Server(0).DistSenderI().(*kvcoord.DistSender).Metrics().Errors.Stuck.Count())
Expand Down Expand Up @@ -609,7 +613,8 @@ func TestMuxRangeCatchupScanQuotaReleased(t *testing.T) {
// Initial setup: only single catchup scan allowed.
sqlDB.ExecMultiple(t,
`SET CLUSTER SETTING kv.rangefeed.enabled = true`,
`SET CLUSTER SETTING kv.rangefeed.catchup_scan_concurrency = 1`,
`SET CLUSTER SETTING kv.rangefeed.client.startup_range_burst = 1`,
`SET CLUSTER SETTING kv.rangefeed.client.startup_window = '1ms'`,
`ALTER DATABASE defaultdb CONFIGURE ZONE USING num_replicas = 1`,
`CREATE TABLE foo (key INT PRIMARY KEY)`,
`INSERT INTO foo (key) SELECT * FROM generate_series(1, 1000)`,
Expand Down Expand Up @@ -640,8 +645,8 @@ func TestMuxRangeCatchupScanQuotaReleased(t *testing.T) {
}
return false, nil
}))
defer closeFeed()
channelWaitWithTimeout(t, enoughErrors)
closeFeed()
}

// Test to make sure the various metrics used by rangefeed are correctly
Expand Down
1 change: 1 addition & 0 deletions pkg/settings/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ var retiredSettings = map[InternalKey]struct{}{
"jobs.trace.force_dump_mode": {},
"timeseries.storage.30m_resolution_ttl": {},
"server.cpu_profile.enabled": {},
"kv.rangefeed.catchup_scan_concurrency": {},
}

// sqlDefaultSettings is the list of "grandfathered" existing sql.defaults
Expand Down

0 comments on commit 91a78db

Please sign in to comment.