From db12b5425776bd2cd5a1a2f6e56431d45689ecae Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Tue, 19 Sep 2023 13:22:30 -0400 Subject: [PATCH] kvcoord: Replace rangefeed catchup semaphore with rate limiter The catchup scan limit was added in #77725 in order to attempt to restrict a single rangefeed from consuming all catchup scan slots on the KV side. This client side limit has been largely ineffective. More recently, this semaphore has been coopted in #109346 in order to pace goroutine creation rate on the client. This functionality is important, but the implementation is not very good. Namely, we are interested in controling the rate of new catchup scans being started by the client (and thus, control the rate of goroutine creation). This implementation replaces the old implementation with rate limit based approach. The rate limits are configured using `kv.rangefeed.client.stream_startup_rate` setting which sets the rate on the number of newly established rangefeed streams (default 100). Closes #110439 Epic: CRDB-26372 Release note: None --- .../settings/settings-for-tenants.txt | 1 + docs/generated/settings/settings.html | 1 + pkg/cmd/roachtest/tests/cdc_bench.go | 1 - pkg/kv/kvclient/kvcoord/BUILD.bazel | 1 - .../kvcoord/dist_sender_mux_rangefeed.go | 9 ++- .../kvclient/kvcoord/dist_sender_rangefeed.go | 71 ++++++++++++------- .../kvcoord/dist_sender_rangefeed_test.go | 13 ++-- pkg/settings/registry.go | 1 + 8 files changed, 59 insertions(+), 39 deletions(-) diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index f282ca60d0ea..7d326b7d1b1d 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -48,6 +48,7 @@ kv.closed_timestamp.lead_for_global_reads_override duration 0s if nonzero, overr kv.closed_timestamp.side_transport_interval duration 200ms the interval at which the closed timestamp side-transport attempts to advance each range's closed timestamp; set to 0 to disable the side-transport tenant-ro kv.closed_timestamp.target_duration duration 3s if nonzero, attempt to provide closed timestamp notifications for timestamps trailing cluster time by approximately this duration tenant-ro kv.protectedts.reconciliation.interval duration 5m0s the frequency for reconciling jobs with protected timestamp records tenant-ro +kv.rangefeed.client.stream_startup_rate integer 100 controls the rate per second the client will initiate new rangefeed stream for a single range; 0 implies unlimited tenant-rw kv.rangefeed.closed_timestamp_refresh_interval duration 3s the interval at which closed-timestamp updatesare delivered to rangefeeds; set to 0 to use kv.closed_timestamp.side_transport_interval tenant-ro kv.rangefeed.enabled boolean false if set, rangefeed registration is enabled tenant-rw kv.rangefeed.range_stuck_threshold duration 1m0s restart rangefeeds if they don't emit anything for the specified threshold; 0 disables (kv.rangefeed.closed_timestamp_refresh_interval takes precedence) tenant-rw diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 7f852826c3b9..680a085e1430 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -69,6 +69,7 @@
kv.range_split.by_load.enabled
booleantrueallow automatic splits of ranges based on where load is concentratedDedicated/Self-Hosted
kv.range_split.load_cpu_threshold
duration500msthe CPU use per second over which, the range becomes a candidate for load based splittingDedicated/Self-Hosted
kv.range_split.load_qps_threshold
integer2500the QPS over which, the range becomes a candidate for load based splittingDedicated/Self-Hosted +
kv.rangefeed.client.stream_startup_rate
integer100controls the rate per second the client will initiate new rangefeed stream for a single range; 0 implies unlimitedServerless/Dedicated/Self-Hosted
kv.rangefeed.closed_timestamp_refresh_interval
duration3sthe interval at which closed-timestamp updatesare delivered to rangefeeds; set to 0 to use kv.closed_timestamp.side_transport_intervalServerless/Dedicated/Self-Hosted (read-only)
kv.rangefeed.enabled
booleanfalseif set, rangefeed registration is enabledServerless/Dedicated/Self-Hosted
kv.rangefeed.range_stuck_threshold
duration1m0srestart rangefeeds if they don't emit anything for the specified threshold; 0 disables (kv.rangefeed.closed_timestamp_refresh_interval takes precedence)Serverless/Dedicated/Self-Hosted diff --git a/pkg/cmd/roachtest/tests/cdc_bench.go b/pkg/cmd/roachtest/tests/cdc_bench.go index 8b2f98ed795e..0a09ecd27cc5 100644 --- a/pkg/cmd/roachtest/tests/cdc_bench.go +++ b/pkg/cmd/roachtest/tests/cdc_bench.go @@ -177,7 +177,6 @@ func makeCDCBenchOptions() (option.StartOpts, install.ClusterSettings) { // Bump up the number of allowed catchup scans. Doing catchup for 100k ranges with default // configuration (8 client side, 16 per store) takes a while (~1500-2000 ranges per min minutes). - settings.ClusterSettings["kv.rangefeed.catchup_scan_concurrency"] = "16" settings.ClusterSettings["kv.rangefeed.concurrent_catchup_iterators"] = "16" // Give changefeed more memory and slow down rangefeed checkpoints. diff --git a/pkg/kv/kvclient/kvcoord/BUILD.bazel b/pkg/kv/kvclient/kvcoord/BUILD.bazel index d90de5491fb7..8ea9ac8ec332 100644 --- a/pkg/kv/kvclient/kvcoord/BUILD.bazel +++ b/pkg/kv/kvclient/kvcoord/BUILD.bazel @@ -73,7 +73,6 @@ go_library( "//pkg/util/grpcutil", "//pkg/util/hlc", "//pkg/util/iterutil", - "//pkg/util/limit", "//pkg/util/log", "//pkg/util/metric", "//pkg/util/pprofutil", diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go index a12078901027..013e0de08cd0 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go @@ -27,7 +27,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/future" "github.com/cockroachdb/cockroach/pkg/util/grpcutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" - "github.com/cockroachdb/cockroach/pkg/util/limit" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/pprofutil" "github.com/cockroachdb/cockroach/pkg/util/retry" @@ -49,7 +48,7 @@ type rangefeedMuxer struct { metrics *DistSenderRangeFeedMetrics cfg rangeFeedConfig registry *rangeFeedRegistry - catchupSem *limit.ConcurrentRequestLimiter + catchupSem *catchupScanRateLimiter eventCh chan<- RangeFeedMessage // Each call to start new range feed gets a unique ID which is echoed back @@ -71,7 +70,7 @@ func muxRangeFeed( spans []SpanTimePair, ds *DistSender, rr *rangeFeedRegistry, - catchupSem *limit.ConcurrentRequestLimiter, + catchupRateLimiter *catchupScanRateLimiter, eventCh chan<- RangeFeedMessage, ) (retErr error) { if log.V(1) { @@ -88,7 +87,7 @@ func muxRangeFeed( ds: ds, cfg: cfg, metrics: &ds.metrics.DistSenderRangeFeedMetrics, - catchupSem: catchupSem, + catchupSem: catchupRateLimiter, eventCh: eventCh, } if cfg.knobs.metrics != nil { @@ -244,7 +243,7 @@ func (s *activeMuxRangeFeed) start(ctx context.Context, m *rangefeedMuxer) error streamID := atomic.AddInt64(&m.seqID, 1) // Before starting single rangefeed, acquire catchup scan quota. - if err := s.acquireCatchupScanQuota(ctx, &m.ds.st.SV, m.catchupSem, m.metrics); err != nil { + if err := s.acquireCatchupScanQuota(ctx, m.catchupSem, m.metrics); err != nil { return err } diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index 455586fe94d3..fed832f45d60 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -38,9 +38,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/grpcutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/iterutil" - "github.com/cockroachdb/cockroach/pkg/util/limit" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/pprofutil" + "github.com/cockroachdb/cockroach/pkg/util/quotapool" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -64,12 +64,13 @@ var useDedicatedRangefeedConnectionClass = settings.RegisterBoolSetting( "kv.rangefeed.use_dedicated_connection_class.enabled", false), ) -var catchupScanConcurrency = settings.RegisterIntSetting( +var catchupStartupRate = settings.RegisterIntSetting( settings.TenantWritable, - "kv.rangefeed.catchup_scan_concurrency", - "number of catchup scans that a single rangefeed can execute concurrently; 0 implies unlimited", - 8, + "kv.rangefeed.client.stream_startup_rate", + "controls the rate per second the client will initiate new rangefeed stream for a single range; 0 implies unlimited", + 100, // e.g.: 200 seconds for 20k ranges. settings.NonNegativeInt, + settings.WithPublic, ) var rangefeedRangeStuckThreshold = settings.RegisterDurationSetting( @@ -80,14 +81,6 @@ var rangefeedRangeStuckThreshold = settings.RegisterDurationSetting( settings.NonNegativeDuration, settings.WithPublic) -func maxConcurrentCatchupScans(sv *settings.Values) int { - l := catchupScanConcurrency.Get(sv) - if l == 0 { - return math.MaxInt - } - return int(l) -} - // ForEachRangeFn is used to execute `fn` over each range in a rangefeed. type ForEachRangeFn func(fn ActiveRangeFeedIterFn) error @@ -226,12 +219,11 @@ func (ds *DistSender) RangeFeedSpans( cfg.rangeObserver(rr.ForEachPartialRangefeed) } - catchupSem := limit.MakeConcurrentRequestLimiter( - "distSenderCatchupLimit", maxConcurrentCatchupScans(&ds.st.SV)) + rl := newCatchupScanRateLimiter(&ds.st.SV) if ds.st.Version.IsActive(ctx, clusterversion.TODODelete_V22_2RangefeedUseOneStreamPerNode) && enableMuxRangeFeed && cfg.useMuxRangeFeed { - return muxRangeFeed(ctx, cfg, spans, ds, rr, &catchupSem, eventCh) + return muxRangeFeed(ctx, cfg, spans, ds, rr, rl, eventCh) } // Goroutine that processes subdivided ranges and creates a rangefeed for @@ -256,7 +248,7 @@ func (ds *DistSender) RangeFeedSpans( } // Prior to spawning goroutine to process this feed, acquire catchup scan quota. // This quota acquisition paces the rate of new goroutine creation. - if err := active.acquireCatchupScanQuota(ctx, &ds.st.SV, &catchupSem, metrics); err != nil { + if err := active.acquireCatchupScanQuota(ctx, rl, metrics); err != nil { return err } if log.V(1) { @@ -700,23 +692,17 @@ func (a catchupAlloc) Release() { } func (a *activeRangeFeed) acquireCatchupScanQuota( - ctx context.Context, - sv *settings.Values, - catchupSem *limit.ConcurrentRequestLimiter, - metrics *DistSenderRangeFeedMetrics, + ctx context.Context, rl *catchupScanRateLimiter, metrics *DistSenderRangeFeedMetrics, ) error { - // Indicate catchup scan is starting; Before potentially blocking on a semaphore, take - // opportunity to update semaphore limit. - catchupSem.SetLimit(maxConcurrentCatchupScans(sv)) - res, err := catchupSem.Begin(ctx) - if err != nil { + // Indicate catchup scan is starting. + if err := rl.Pace(ctx); err != nil { return err } metrics.RangefeedCatchupRanges.Inc(1) a.catchupRes = func() { metrics.RangefeedCatchupRanges.Dec(1) - res.Release() } + a.Lock() defer a.Unlock() a.InCatchup = true @@ -989,3 +975,34 @@ func TestingWithMuxRangeFeedRequestSenderCapture( // TestingMakeRangeFeedMetrics exposes makeDistSenderRangeFeedMetrics for test use. var TestingMakeRangeFeedMetrics = makeDistSenderRangeFeedMetrics + +type catchupScanRateLimiter struct { + pacer *quotapool.RateLimiter + sv *settings.Values + limit quotapool.Limit +} + +func newCatchupScanRateLimiter(sv *settings.Values) *catchupScanRateLimiter { + rl := &catchupScanRateLimiter{sv: sv} + rl.limit = getCatchupRateLimit(rl.sv) + rl.pacer = quotapool.NewRateLimiter("distSenderCatchupLimit", rl.limit, 0 /* smooth rate */) + return rl +} + +func getCatchupRateLimit(sv *settings.Values) quotapool.Limit { + if r := catchupStartupRate.Get(sv); r > 0 { + return quotapool.Limit(r) + } + return quotapool.Inf() +} + +// Pace paces the catchup scan startup. +func (rl *catchupScanRateLimiter) Pace(ctx context.Context) error { + // Take opportunity to update limits if they have changed. + if lim := getCatchupRateLimit(rl.sv); lim != rl.limit { + rl.limit = lim + rl.pacer.UpdateLimit(lim, 0) + } + + return rl.pacer.WaitN(ctx, 1) +} diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go index 28be7891eafb..fb1f44311e83 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go @@ -192,6 +192,8 @@ func makeTransportFactory( // rangeFeed is a helper to execute rangefeed. We are not using rangefeed library // here because of circular dependencies. +// Returns cleanup function, which is safe to call multiple times, that shuts down +// and waits for the rangefeed to terminate. func rangeFeed( dsI interface{}, sp roachpb.Span, @@ -307,8 +309,8 @@ func TestBiDirectionalRangefeedNotUsedUntilUpgradeFinalilzed(t *testing.T) { allSeen, onValue := observeNValues(1000) closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startTime, onValue, false) + defer closeFeed() channelWaitWithTimeout(t, allSeen) - closeFeed() } t.Run("rangefeed-stream-disabled-prior-to-version-upgrade", func(t *testing.T) { @@ -378,8 +380,9 @@ func TestMuxRangeFeedConnectsToNodeOnce(t *testing.T) { allSeen, onValue := observeNValues(1000) closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startTime, onValue, true) + defer closeFeed() channelWaitWithTimeout(t, allSeen) - closeFeed() + closeFeed() // Explicitly shutdown the feed to make sure counters no longer change. // Verify we connected to each node once. connCounts.Lock() @@ -458,8 +461,9 @@ func TestRestartsStuckRangeFeeds(t *testing.T) { allSeen, onValue := observeNValues(100) closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startTime, onValue, false) + defer closeFeed() channelWaitWithTimeout(t, allSeen) - closeFeed() + closeFeed() // Explicitly shutdown feed to make sure metrics no longer change. require.True(t, blockingClient.ctxCanceled) require.EqualValues(t, 1, tc.Server(0).DistSenderI().(*kvcoord.DistSender).Metrics().Errors.Stuck.Count()) @@ -609,7 +613,6 @@ func TestMuxRangeCatchupScanQuotaReleased(t *testing.T) { // Initial setup: only single catchup scan allowed. sqlDB.ExecMultiple(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`, - `SET CLUSTER SETTING kv.rangefeed.catchup_scan_concurrency = 1`, `ALTER DATABASE defaultdb CONFIGURE ZONE USING num_replicas = 1`, `CREATE TABLE foo (key INT PRIMARY KEY)`, `INSERT INTO foo (key) SELECT * FROM generate_series(1, 1000)`, @@ -640,8 +643,8 @@ func TestMuxRangeCatchupScanQuotaReleased(t *testing.T) { } return false, nil })) + defer closeFeed() channelWaitWithTimeout(t, enoughErrors) - closeFeed() } // Test to make sure the various metrics used by rangefeed are correctly diff --git a/pkg/settings/registry.go b/pkg/settings/registry.go index 08dc56505ab3..1c7c16209246 100644 --- a/pkg/settings/registry.go +++ b/pkg/settings/registry.go @@ -188,6 +188,7 @@ var retiredSettings = map[InternalKey]struct{}{ "jobs.trace.force_dump_mode": {}, "timeseries.storage.30m_resolution_ttl": {}, "server.cpu_profile.enabled": {}, + "kv.rangefeed.catchup_scan_concurrency": {}, } // sqlDefaultSettings is the list of "grandfathered" existing sql.defaults