diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt
index f282ca60d0ea..7d326b7d1b1d 100644
--- a/docs/generated/settings/settings-for-tenants.txt
+++ b/docs/generated/settings/settings-for-tenants.txt
@@ -48,6 +48,7 @@ kv.closed_timestamp.lead_for_global_reads_override duration 0s if nonzero, overr
kv.closed_timestamp.side_transport_interval duration 200ms the interval at which the closed timestamp side-transport attempts to advance each range's closed timestamp; set to 0 to disable the side-transport tenant-ro
kv.closed_timestamp.target_duration duration 3s if nonzero, attempt to provide closed timestamp notifications for timestamps trailing cluster time by approximately this duration tenant-ro
kv.protectedts.reconciliation.interval duration 5m0s the frequency for reconciling jobs with protected timestamp records tenant-ro
+kv.rangefeed.client.stream_startup_rate integer 100 controls the rate per second the client will initiate new rangefeed stream for a single range; 0 implies unlimited tenant-rw
kv.rangefeed.closed_timestamp_refresh_interval duration 3s the interval at which closed-timestamp updatesare delivered to rangefeeds; set to 0 to use kv.closed_timestamp.side_transport_interval tenant-ro
kv.rangefeed.enabled boolean false if set, rangefeed registration is enabled tenant-rw
kv.rangefeed.range_stuck_threshold duration 1m0s restart rangefeeds if they don't emit anything for the specified threshold; 0 disables (kv.rangefeed.closed_timestamp_refresh_interval takes precedence) tenant-rw
diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index 7f852826c3b9..680a085e1430 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -69,6 +69,7 @@
kv.range_split.by_load.enabled
| boolean | true | allow automatic splits of ranges based on where load is concentrated | Dedicated/Self-Hosted |
kv.range_split.load_cpu_threshold
| duration | 500ms | the CPU use per second over which, the range becomes a candidate for load based splitting | Dedicated/Self-Hosted |
kv.range_split.load_qps_threshold
| integer | 2500 | the QPS over which, the range becomes a candidate for load based splitting | Dedicated/Self-Hosted |
+kv.rangefeed.client.stream_startup_rate
| integer | 100 | controls the rate per second the client will initiate new rangefeed stream for a single range; 0 implies unlimited | Serverless/Dedicated/Self-Hosted |
kv.rangefeed.closed_timestamp_refresh_interval
| duration | 3s | the interval at which closed-timestamp updatesare delivered to rangefeeds; set to 0 to use kv.closed_timestamp.side_transport_interval | Serverless/Dedicated/Self-Hosted (read-only) |
kv.rangefeed.enabled
| boolean | false | if set, rangefeed registration is enabled | Serverless/Dedicated/Self-Hosted |
kv.rangefeed.range_stuck_threshold
| duration | 1m0s | restart rangefeeds if they don't emit anything for the specified threshold; 0 disables (kv.rangefeed.closed_timestamp_refresh_interval takes precedence) | Serverless/Dedicated/Self-Hosted |
diff --git a/pkg/cmd/roachtest/tests/cdc_bench.go b/pkg/cmd/roachtest/tests/cdc_bench.go
index 8b2f98ed795e..0a09ecd27cc5 100644
--- a/pkg/cmd/roachtest/tests/cdc_bench.go
+++ b/pkg/cmd/roachtest/tests/cdc_bench.go
@@ -177,7 +177,6 @@ func makeCDCBenchOptions() (option.StartOpts, install.ClusterSettings) {
// Bump up the number of allowed catchup scans. Doing catchup for 100k ranges with default
// configuration (8 client side, 16 per store) takes a while (~1500-2000 ranges per min minutes).
- settings.ClusterSettings["kv.rangefeed.catchup_scan_concurrency"] = "16"
settings.ClusterSettings["kv.rangefeed.concurrent_catchup_iterators"] = "16"
// Give changefeed more memory and slow down rangefeed checkpoints.
diff --git a/pkg/kv/kvclient/kvcoord/BUILD.bazel b/pkg/kv/kvclient/kvcoord/BUILD.bazel
index d90de5491fb7..8ea9ac8ec332 100644
--- a/pkg/kv/kvclient/kvcoord/BUILD.bazel
+++ b/pkg/kv/kvclient/kvcoord/BUILD.bazel
@@ -73,7 +73,6 @@ go_library(
"//pkg/util/grpcutil",
"//pkg/util/hlc",
"//pkg/util/iterutil",
- "//pkg/util/limit",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/pprofutil",
diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go
index a12078901027..013e0de08cd0 100644
--- a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go
+++ b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go
@@ -27,7 +27,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/future"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
- "github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/pprofutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
@@ -49,7 +48,7 @@ type rangefeedMuxer struct {
metrics *DistSenderRangeFeedMetrics
cfg rangeFeedConfig
registry *rangeFeedRegistry
- catchupSem *limit.ConcurrentRequestLimiter
+ catchupSem *catchupScanRateLimiter
eventCh chan<- RangeFeedMessage
// Each call to start new range feed gets a unique ID which is echoed back
@@ -71,7 +70,7 @@ func muxRangeFeed(
spans []SpanTimePair,
ds *DistSender,
rr *rangeFeedRegistry,
- catchupSem *limit.ConcurrentRequestLimiter,
+ catchupRateLimiter *catchupScanRateLimiter,
eventCh chan<- RangeFeedMessage,
) (retErr error) {
if log.V(1) {
@@ -88,7 +87,7 @@ func muxRangeFeed(
ds: ds,
cfg: cfg,
metrics: &ds.metrics.DistSenderRangeFeedMetrics,
- catchupSem: catchupSem,
+ catchupSem: catchupRateLimiter,
eventCh: eventCh,
}
if cfg.knobs.metrics != nil {
@@ -244,7 +243,7 @@ func (s *activeMuxRangeFeed) start(ctx context.Context, m *rangefeedMuxer) error
streamID := atomic.AddInt64(&m.seqID, 1)
// Before starting single rangefeed, acquire catchup scan quota.
- if err := s.acquireCatchupScanQuota(ctx, &m.ds.st.SV, m.catchupSem, m.metrics); err != nil {
+ if err := s.acquireCatchupScanQuota(ctx, m.catchupSem, m.metrics); err != nil {
return err
}
diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
index 455586fe94d3..fed832f45d60 100644
--- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
+++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
@@ -38,9 +38,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
- "github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/pprofutil"
+ "github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
@@ -64,12 +64,13 @@ var useDedicatedRangefeedConnectionClass = settings.RegisterBoolSetting(
"kv.rangefeed.use_dedicated_connection_class.enabled", false),
)
-var catchupScanConcurrency = settings.RegisterIntSetting(
+var catchupStartupRate = settings.RegisterIntSetting(
settings.TenantWritable,
- "kv.rangefeed.catchup_scan_concurrency",
- "number of catchup scans that a single rangefeed can execute concurrently; 0 implies unlimited",
- 8,
+ "kv.rangefeed.client.stream_startup_rate",
+ "controls the rate per second the client will initiate new rangefeed stream for a single range; 0 implies unlimited",
+ 100, // e.g.: 200 seconds for 20k ranges.
settings.NonNegativeInt,
+ settings.WithPublic,
)
var rangefeedRangeStuckThreshold = settings.RegisterDurationSetting(
@@ -80,14 +81,6 @@ var rangefeedRangeStuckThreshold = settings.RegisterDurationSetting(
settings.NonNegativeDuration,
settings.WithPublic)
-func maxConcurrentCatchupScans(sv *settings.Values) int {
- l := catchupScanConcurrency.Get(sv)
- if l == 0 {
- return math.MaxInt
- }
- return int(l)
-}
-
// ForEachRangeFn is used to execute `fn` over each range in a rangefeed.
type ForEachRangeFn func(fn ActiveRangeFeedIterFn) error
@@ -226,12 +219,11 @@ func (ds *DistSender) RangeFeedSpans(
cfg.rangeObserver(rr.ForEachPartialRangefeed)
}
- catchupSem := limit.MakeConcurrentRequestLimiter(
- "distSenderCatchupLimit", maxConcurrentCatchupScans(&ds.st.SV))
+ rl := newCatchupScanRateLimiter(&ds.st.SV)
if ds.st.Version.IsActive(ctx, clusterversion.TODODelete_V22_2RangefeedUseOneStreamPerNode) &&
enableMuxRangeFeed && cfg.useMuxRangeFeed {
- return muxRangeFeed(ctx, cfg, spans, ds, rr, &catchupSem, eventCh)
+ return muxRangeFeed(ctx, cfg, spans, ds, rr, rl, eventCh)
}
// Goroutine that processes subdivided ranges and creates a rangefeed for
@@ -256,7 +248,7 @@ func (ds *DistSender) RangeFeedSpans(
}
// Prior to spawning goroutine to process this feed, acquire catchup scan quota.
// This quota acquisition paces the rate of new goroutine creation.
- if err := active.acquireCatchupScanQuota(ctx, &ds.st.SV, &catchupSem, metrics); err != nil {
+ if err := active.acquireCatchupScanQuota(ctx, rl, metrics); err != nil {
return err
}
if log.V(1) {
@@ -700,23 +692,17 @@ func (a catchupAlloc) Release() {
}
func (a *activeRangeFeed) acquireCatchupScanQuota(
- ctx context.Context,
- sv *settings.Values,
- catchupSem *limit.ConcurrentRequestLimiter,
- metrics *DistSenderRangeFeedMetrics,
+ ctx context.Context, rl *catchupScanRateLimiter, metrics *DistSenderRangeFeedMetrics,
) error {
- // Indicate catchup scan is starting; Before potentially blocking on a semaphore, take
- // opportunity to update semaphore limit.
- catchupSem.SetLimit(maxConcurrentCatchupScans(sv))
- res, err := catchupSem.Begin(ctx)
- if err != nil {
+ // Indicate catchup scan is starting.
+ if err := rl.Pace(ctx); err != nil {
return err
}
metrics.RangefeedCatchupRanges.Inc(1)
a.catchupRes = func() {
metrics.RangefeedCatchupRanges.Dec(1)
- res.Release()
}
+
a.Lock()
defer a.Unlock()
a.InCatchup = true
@@ -989,3 +975,34 @@ func TestingWithMuxRangeFeedRequestSenderCapture(
// TestingMakeRangeFeedMetrics exposes makeDistSenderRangeFeedMetrics for test use.
var TestingMakeRangeFeedMetrics = makeDistSenderRangeFeedMetrics
+
+type catchupScanRateLimiter struct {
+ pacer *quotapool.RateLimiter
+ sv *settings.Values
+ limit quotapool.Limit
+}
+
+func newCatchupScanRateLimiter(sv *settings.Values) *catchupScanRateLimiter {
+ rl := &catchupScanRateLimiter{sv: sv}
+ rl.limit = getCatchupRateLimit(rl.sv)
+ rl.pacer = quotapool.NewRateLimiter("distSenderCatchupLimit", rl.limit, 0 /* smooth rate */)
+ return rl
+}
+
+func getCatchupRateLimit(sv *settings.Values) quotapool.Limit {
+ if r := catchupStartupRate.Get(sv); r > 0 {
+ return quotapool.Limit(r)
+ }
+ return quotapool.Inf()
+}
+
+// Pace paces the catchup scan startup.
+func (rl *catchupScanRateLimiter) Pace(ctx context.Context) error {
+ // Take opportunity to update limits if they have changed.
+ if lim := getCatchupRateLimit(rl.sv); lim != rl.limit {
+ rl.limit = lim
+ rl.pacer.UpdateLimit(lim, 0)
+ }
+
+ return rl.pacer.WaitN(ctx, 1)
+}
diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go
index 28be7891eafb..fb1f44311e83 100644
--- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go
+++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go
@@ -192,6 +192,8 @@ func makeTransportFactory(
// rangeFeed is a helper to execute rangefeed. We are not using rangefeed library
// here because of circular dependencies.
+// Returns cleanup function, which is safe to call multiple times, that shuts down
+// and waits for the rangefeed to terminate.
func rangeFeed(
dsI interface{},
sp roachpb.Span,
@@ -307,8 +309,8 @@ func TestBiDirectionalRangefeedNotUsedUntilUpgradeFinalilzed(t *testing.T) {
allSeen, onValue := observeNValues(1000)
closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startTime, onValue, false)
+ defer closeFeed()
channelWaitWithTimeout(t, allSeen)
- closeFeed()
}
t.Run("rangefeed-stream-disabled-prior-to-version-upgrade", func(t *testing.T) {
@@ -378,8 +380,9 @@ func TestMuxRangeFeedConnectsToNodeOnce(t *testing.T) {
allSeen, onValue := observeNValues(1000)
closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startTime, onValue, true)
+ defer closeFeed()
channelWaitWithTimeout(t, allSeen)
- closeFeed()
+ closeFeed() // Explicitly shutdown the feed to make sure counters no longer change.
// Verify we connected to each node once.
connCounts.Lock()
@@ -458,8 +461,9 @@ func TestRestartsStuckRangeFeeds(t *testing.T) {
allSeen, onValue := observeNValues(100)
closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startTime, onValue, false)
+ defer closeFeed()
channelWaitWithTimeout(t, allSeen)
- closeFeed()
+ closeFeed() // Explicitly shutdown feed to make sure metrics no longer change.
require.True(t, blockingClient.ctxCanceled)
require.EqualValues(t, 1, tc.Server(0).DistSenderI().(*kvcoord.DistSender).Metrics().Errors.Stuck.Count())
@@ -609,7 +613,6 @@ func TestMuxRangeCatchupScanQuotaReleased(t *testing.T) {
// Initial setup: only single catchup scan allowed.
sqlDB.ExecMultiple(t,
`SET CLUSTER SETTING kv.rangefeed.enabled = true`,
- `SET CLUSTER SETTING kv.rangefeed.catchup_scan_concurrency = 1`,
`ALTER DATABASE defaultdb CONFIGURE ZONE USING num_replicas = 1`,
`CREATE TABLE foo (key INT PRIMARY KEY)`,
`INSERT INTO foo (key) SELECT * FROM generate_series(1, 1000)`,
@@ -640,8 +643,8 @@ func TestMuxRangeCatchupScanQuotaReleased(t *testing.T) {
}
return false, nil
}))
+ defer closeFeed()
channelWaitWithTimeout(t, enoughErrors)
- closeFeed()
}
// Test to make sure the various metrics used by rangefeed are correctly
diff --git a/pkg/settings/registry.go b/pkg/settings/registry.go
index 08dc56505ab3..1c7c16209246 100644
--- a/pkg/settings/registry.go
+++ b/pkg/settings/registry.go
@@ -188,6 +188,7 @@ var retiredSettings = map[InternalKey]struct{}{
"jobs.trace.force_dump_mode": {},
"timeseries.storage.30m_resolution_ttl": {},
"server.cpu_profile.enabled": {},
+ "kv.rangefeed.catchup_scan_concurrency": {},
}
// sqlDefaultSettings is the list of "grandfathered" existing sql.defaults