Skip to content

Commit

Permalink
kvcoord: Replace rangefeed catchup semaphore with rate limiter
Browse files Browse the repository at this point in the history
The catchup scan limit was added in cockroachdb#77725 in order to attempt
to restrict a single rangefeed from consuming all catchup
scan slots on the KV side.  This client side limit has
been largely ineffective.

More recently, this semaphore has been coopted in cockroachdb#109346 in order
to pace goroutine creation rate on the client. This functionality
is important, but the implementation is not very good.

Namely, we are interested in controling the rate of new catchup scans
being started by the client (and thus, control the rate of goroutine
creation).  This implementation replaces the old implementation
with rate limit based approach.  The rate limits are configured using
`kv.rangefeed.client.stream_startup_rate` setting which sets
the rate on the number of newly established rangefeed
streams (default 100).

Closes cockroachdb#110439
Epic: CRDB-26372

Release note: None
  • Loading branch information
Yevgeniy Miretskiy committed Sep 21, 2023
1 parent d414347 commit db12b54
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 39 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ kv.closed_timestamp.lead_for_global_reads_override duration 0s if nonzero, overr
kv.closed_timestamp.side_transport_interval duration 200ms the interval at which the closed timestamp side-transport attempts to advance each range's closed timestamp; set to 0 to disable the side-transport tenant-ro
kv.closed_timestamp.target_duration duration 3s if nonzero, attempt to provide closed timestamp notifications for timestamps trailing cluster time by approximately this duration tenant-ro
kv.protectedts.reconciliation.interval duration 5m0s the frequency for reconciling jobs with protected timestamp records tenant-ro
kv.rangefeed.client.stream_startup_rate integer 100 controls the rate per second the client will initiate new rangefeed stream for a single range; 0 implies unlimited tenant-rw
kv.rangefeed.closed_timestamp_refresh_interval duration 3s the interval at which closed-timestamp updatesare delivered to rangefeeds; set to 0 to use kv.closed_timestamp.side_transport_interval tenant-ro
kv.rangefeed.enabled boolean false if set, rangefeed registration is enabled tenant-rw
kv.rangefeed.range_stuck_threshold duration 1m0s restart rangefeeds if they don't emit anything for the specified threshold; 0 disables (kv.rangefeed.closed_timestamp_refresh_interval takes precedence) tenant-rw
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
<tr><td><div id="setting-kv-range-split-by-load-enabled" class="anchored"><code>kv.range_split.by_load.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>allow automatic splits of ranges based on where load is concentrated</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-range-split-load-cpu-threshold" class="anchored"><code>kv.range_split.load_cpu_threshold</code></div></td><td>duration</td><td><code>500ms</code></td><td>the CPU use per second over which, the range becomes a candidate for load based splitting</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-range-split-load-qps-threshold" class="anchored"><code>kv.range_split.load_qps_threshold</code></div></td><td>integer</td><td><code>2500</code></td><td>the QPS over which, the range becomes a candidate for load based splitting</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-rangefeed-client-stream-startup-rate" class="anchored"><code>kv.rangefeed.client.stream_startup_rate</code></div></td><td>integer</td><td><code>100</code></td><td>controls the rate per second the client will initiate new rangefeed stream for a single range; 0 implies unlimited</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-rangefeed-closed-timestamp-refresh-interval" class="anchored"><code>kv.rangefeed.closed_timestamp_refresh_interval</code></div></td><td>duration</td><td><code>3s</code></td><td>the interval at which closed-timestamp updatesare delivered to rangefeeds; set to 0 to use kv.closed_timestamp.side_transport_interval</td><td>Serverless/Dedicated/Self-Hosted (read-only)</td></tr>
<tr><td><div id="setting-kv-rangefeed-enabled" class="anchored"><code>kv.rangefeed.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>if set, rangefeed registration is enabled</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-rangefeed-range-stuck-threshold" class="anchored"><code>kv.rangefeed.range_stuck_threshold</code></div></td><td>duration</td><td><code>1m0s</code></td><td>restart rangefeeds if they don&#39;t emit anything for the specified threshold; 0 disables (kv.rangefeed.closed_timestamp_refresh_interval takes precedence)</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
Expand Down
1 change: 0 additions & 1 deletion pkg/cmd/roachtest/tests/cdc_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ func makeCDCBenchOptions() (option.StartOpts, install.ClusterSettings) {

// Bump up the number of allowed catchup scans. Doing catchup for 100k ranges with default
// configuration (8 client side, 16 per store) takes a while (~1500-2000 ranges per min minutes).
settings.ClusterSettings["kv.rangefeed.catchup_scan_concurrency"] = "16"
settings.ClusterSettings["kv.rangefeed.concurrent_catchup_iterators"] = "16"

// Give changefeed more memory and slow down rangefeed checkpoints.
Expand Down
1 change: 0 additions & 1 deletion pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ go_library(
"//pkg/util/grpcutil",
"//pkg/util/hlc",
"//pkg/util/iterutil",
"//pkg/util/limit",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/pprofutil",
Expand Down
9 changes: 4 additions & 5 deletions pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/future"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/pprofutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
Expand All @@ -49,7 +48,7 @@ type rangefeedMuxer struct {
metrics *DistSenderRangeFeedMetrics
cfg rangeFeedConfig
registry *rangeFeedRegistry
catchupSem *limit.ConcurrentRequestLimiter
catchupSem *catchupScanRateLimiter
eventCh chan<- RangeFeedMessage

// Each call to start new range feed gets a unique ID which is echoed back
Expand All @@ -71,7 +70,7 @@ func muxRangeFeed(
spans []SpanTimePair,
ds *DistSender,
rr *rangeFeedRegistry,
catchupSem *limit.ConcurrentRequestLimiter,
catchupRateLimiter *catchupScanRateLimiter,
eventCh chan<- RangeFeedMessage,
) (retErr error) {
if log.V(1) {
Expand All @@ -88,7 +87,7 @@ func muxRangeFeed(
ds: ds,
cfg: cfg,
metrics: &ds.metrics.DistSenderRangeFeedMetrics,
catchupSem: catchupSem,
catchupSem: catchupRateLimiter,
eventCh: eventCh,
}
if cfg.knobs.metrics != nil {
Expand Down Expand Up @@ -244,7 +243,7 @@ func (s *activeMuxRangeFeed) start(ctx context.Context, m *rangefeedMuxer) error
streamID := atomic.AddInt64(&m.seqID, 1)

// Before starting single rangefeed, acquire catchup scan quota.
if err := s.acquireCatchupScanQuota(ctx, &m.ds.st.SV, m.catchupSem, m.metrics); err != nil {
if err := s.acquireCatchupScanQuota(ctx, m.catchupSem, m.metrics); err != nil {
return err
}

Expand Down
71 changes: 44 additions & 27 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/pprofutil"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand All @@ -64,12 +64,13 @@ var useDedicatedRangefeedConnectionClass = settings.RegisterBoolSetting(
"kv.rangefeed.use_dedicated_connection_class.enabled", false),
)

var catchupScanConcurrency = settings.RegisterIntSetting(
var catchupStartupRate = settings.RegisterIntSetting(
settings.TenantWritable,
"kv.rangefeed.catchup_scan_concurrency",
"number of catchup scans that a single rangefeed can execute concurrently; 0 implies unlimited",
8,
"kv.rangefeed.client.stream_startup_rate",
"controls the rate per second the client will initiate new rangefeed stream for a single range; 0 implies unlimited",
100, // e.g.: 200 seconds for 20k ranges.
settings.NonNegativeInt,
settings.WithPublic,
)

var rangefeedRangeStuckThreshold = settings.RegisterDurationSetting(
Expand All @@ -80,14 +81,6 @@ var rangefeedRangeStuckThreshold = settings.RegisterDurationSetting(
settings.NonNegativeDuration,
settings.WithPublic)

func maxConcurrentCatchupScans(sv *settings.Values) int {
l := catchupScanConcurrency.Get(sv)
if l == 0 {
return math.MaxInt
}
return int(l)
}

// ForEachRangeFn is used to execute `fn` over each range in a rangefeed.
type ForEachRangeFn func(fn ActiveRangeFeedIterFn) error

Expand Down Expand Up @@ -226,12 +219,11 @@ func (ds *DistSender) RangeFeedSpans(
cfg.rangeObserver(rr.ForEachPartialRangefeed)
}

catchupSem := limit.MakeConcurrentRequestLimiter(
"distSenderCatchupLimit", maxConcurrentCatchupScans(&ds.st.SV))
rl := newCatchupScanRateLimiter(&ds.st.SV)

if ds.st.Version.IsActive(ctx, clusterversion.TODODelete_V22_2RangefeedUseOneStreamPerNode) &&
enableMuxRangeFeed && cfg.useMuxRangeFeed {
return muxRangeFeed(ctx, cfg, spans, ds, rr, &catchupSem, eventCh)
return muxRangeFeed(ctx, cfg, spans, ds, rr, rl, eventCh)
}

// Goroutine that processes subdivided ranges and creates a rangefeed for
Expand All @@ -256,7 +248,7 @@ func (ds *DistSender) RangeFeedSpans(
}
// Prior to spawning goroutine to process this feed, acquire catchup scan quota.
// This quota acquisition paces the rate of new goroutine creation.
if err := active.acquireCatchupScanQuota(ctx, &ds.st.SV, &catchupSem, metrics); err != nil {
if err := active.acquireCatchupScanQuota(ctx, rl, metrics); err != nil {
return err
}
if log.V(1) {
Expand Down Expand Up @@ -700,23 +692,17 @@ func (a catchupAlloc) Release() {
}

func (a *activeRangeFeed) acquireCatchupScanQuota(
ctx context.Context,
sv *settings.Values,
catchupSem *limit.ConcurrentRequestLimiter,
metrics *DistSenderRangeFeedMetrics,
ctx context.Context, rl *catchupScanRateLimiter, metrics *DistSenderRangeFeedMetrics,
) error {
// Indicate catchup scan is starting; Before potentially blocking on a semaphore, take
// opportunity to update semaphore limit.
catchupSem.SetLimit(maxConcurrentCatchupScans(sv))
res, err := catchupSem.Begin(ctx)
if err != nil {
// Indicate catchup scan is starting.
if err := rl.Pace(ctx); err != nil {
return err
}
metrics.RangefeedCatchupRanges.Inc(1)
a.catchupRes = func() {
metrics.RangefeedCatchupRanges.Dec(1)
res.Release()
}

a.Lock()
defer a.Unlock()
a.InCatchup = true
Expand Down Expand Up @@ -989,3 +975,34 @@ func TestingWithMuxRangeFeedRequestSenderCapture(

// TestingMakeRangeFeedMetrics exposes makeDistSenderRangeFeedMetrics for test use.
var TestingMakeRangeFeedMetrics = makeDistSenderRangeFeedMetrics

type catchupScanRateLimiter struct {
pacer *quotapool.RateLimiter
sv *settings.Values
limit quotapool.Limit
}

func newCatchupScanRateLimiter(sv *settings.Values) *catchupScanRateLimiter {
rl := &catchupScanRateLimiter{sv: sv}
rl.limit = getCatchupRateLimit(rl.sv)
rl.pacer = quotapool.NewRateLimiter("distSenderCatchupLimit", rl.limit, 0 /* smooth rate */)
return rl
}

func getCatchupRateLimit(sv *settings.Values) quotapool.Limit {
if r := catchupStartupRate.Get(sv); r > 0 {
return quotapool.Limit(r)
}
return quotapool.Inf()
}

// Pace paces the catchup scan startup.
func (rl *catchupScanRateLimiter) Pace(ctx context.Context) error {
// Take opportunity to update limits if they have changed.
if lim := getCatchupRateLimit(rl.sv); lim != rl.limit {
rl.limit = lim
rl.pacer.UpdateLimit(lim, 0)
}

return rl.pacer.WaitN(ctx, 1)
}
13 changes: 8 additions & 5 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ func makeTransportFactory(

// rangeFeed is a helper to execute rangefeed. We are not using rangefeed library
// here because of circular dependencies.
// Returns cleanup function, which is safe to call multiple times, that shuts down
// and waits for the rangefeed to terminate.
func rangeFeed(
dsI interface{},
sp roachpb.Span,
Expand Down Expand Up @@ -307,8 +309,8 @@ func TestBiDirectionalRangefeedNotUsedUntilUpgradeFinalilzed(t *testing.T) {

allSeen, onValue := observeNValues(1000)
closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startTime, onValue, false)
defer closeFeed()
channelWaitWithTimeout(t, allSeen)
closeFeed()
}

t.Run("rangefeed-stream-disabled-prior-to-version-upgrade", func(t *testing.T) {
Expand Down Expand Up @@ -378,8 +380,9 @@ func TestMuxRangeFeedConnectsToNodeOnce(t *testing.T) {

allSeen, onValue := observeNValues(1000)
closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startTime, onValue, true)
defer closeFeed()
channelWaitWithTimeout(t, allSeen)
closeFeed()
closeFeed() // Explicitly shutdown the feed to make sure counters no longer change.

// Verify we connected to each node once.
connCounts.Lock()
Expand Down Expand Up @@ -458,8 +461,9 @@ func TestRestartsStuckRangeFeeds(t *testing.T) {

allSeen, onValue := observeNValues(100)
closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startTime, onValue, false)
defer closeFeed()
channelWaitWithTimeout(t, allSeen)
closeFeed()
closeFeed() // Explicitly shutdown feed to make sure metrics no longer change.

require.True(t, blockingClient.ctxCanceled)
require.EqualValues(t, 1, tc.Server(0).DistSenderI().(*kvcoord.DistSender).Metrics().Errors.Stuck.Count())
Expand Down Expand Up @@ -609,7 +613,6 @@ func TestMuxRangeCatchupScanQuotaReleased(t *testing.T) {
// Initial setup: only single catchup scan allowed.
sqlDB.ExecMultiple(t,
`SET CLUSTER SETTING kv.rangefeed.enabled = true`,
`SET CLUSTER SETTING kv.rangefeed.catchup_scan_concurrency = 1`,
`ALTER DATABASE defaultdb CONFIGURE ZONE USING num_replicas = 1`,
`CREATE TABLE foo (key INT PRIMARY KEY)`,
`INSERT INTO foo (key) SELECT * FROM generate_series(1, 1000)`,
Expand Down Expand Up @@ -640,8 +643,8 @@ func TestMuxRangeCatchupScanQuotaReleased(t *testing.T) {
}
return false, nil
}))
defer closeFeed()
channelWaitWithTimeout(t, enoughErrors)
closeFeed()
}

// Test to make sure the various metrics used by rangefeed are correctly
Expand Down
1 change: 1 addition & 0 deletions pkg/settings/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ var retiredSettings = map[InternalKey]struct{}{
"jobs.trace.force_dump_mode": {},
"timeseries.storage.30m_resolution_ttl": {},
"server.cpu_profile.enabled": {},
"kv.rangefeed.catchup_scan_concurrency": {},
}

// sqlDefaultSettings is the list of "grandfathered" existing sql.defaults
Expand Down

0 comments on commit db12b54

Please sign in to comment.