Skip to content

Commit

Permalink
changefeedccl: add changefeed.lagging_ranges metric
Browse files Browse the repository at this point in the history
This change adds the `changefeed.lagging_ranges` metric which can be used to track
ranges which are behind. This metric is calculated based on a new cluster setting
`kv.rangefeed.lagging_ranges_threshold` which is the amount of time that a range
checkpoint needs to be in the past to be considered lagging. This defaults to 3 minutes.
 This change also adds the setting `kv.rangefeed.lagging_ranges_frequency` which is the polling rate
at which a rangefeed will poll for lagging ranges and update the metric.

Sometimes a range may not have any checkpoints for a while because the start time
may be far in the past (this causes a catchup scan during which no checkpoints are emitted).
In this case, the range is considered to the lagging if the created timestamp of the
rangefeed is older than `kv.rangefeed.lagging_ranges_threshold`. Note that this means that
any changefeed which starts with an initial scan containing a significant amount of data will
likely indicate nonzero `changefeed.lagging_ranges` until the initial scan is complete. This
is intentional.

Release note (ops change): A new metric `changefeed.lagging_ranges` is added to show the number of
ranges which are behind in changefeeds. This metric can be used with the `metrics_label` changefeed
option. A new cluster setting `kv.rangefeed.lagging_ranges_threshold` is added which is the amount of
time a range needs to be behind to be considered lagging. By default this is 3 minutes. There is also
a cluster setting `kv.rangefeed.lagging_ranges_frequency` which controls how often the lagging ranges
calculation is done. This setting defaults to polling every 1 minute. Note that polling adds latency to
the metric being updated. For example, if a range falls behind by 3 minutes, the metric may not update
until an additional minute afterwards.

Epic: None
  • Loading branch information
samiskin authored and jayshrivastava committed Sep 2, 2023
1 parent 6a51bf4 commit 6e29018
Show file tree
Hide file tree
Showing 12 changed files with 421 additions and 39 deletions.
2 changes: 2 additions & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ kv.closed_timestamp.target_duration duration 3s if nonzero, attempt to provide c
kv.protectedts.reconciliation.interval duration 5m0s the frequency for reconciling jobs with protected timestamp records tenant-ro
kv.rangefeed.closed_timestamp_refresh_interval duration 3s the interval at which closed-timestamp updatesare delivered to rangefeeds; set to 0 to use kv.closed_timestamp.side_transport_interval tenant-ro
kv.rangefeed.enabled boolean false if set, rangefeed registration is enabled tenant-rw
kv.rangefeed.lagging_ranges_frequency duration 1m0s controls the frequency at which a rangefeed checks for ranges which have fallen behind tenant-rw
kv.rangefeed.lagging_ranges_threshold duration 3m0s controls how far behind a range must be from the present to be considered as 'lagging' behind in metrics tenant-rw
kv.rangefeed.range_stuck_threshold duration 1m0s restart rangefeeds if they don't emit anything for the specified threshold; 0 disables (kv.rangefeed.closed_timestamp_refresh_interval takes precedence) tenant-rw
kv.transaction.max_intents_bytes integer 4194304 maximum number of bytes used to track locks in transactions tenant-rw
kv.transaction.max_refresh_spans_bytes integer 4194304 maximum number of bytes used to track refresh spans in serializable transactions tenant-rw
Expand Down
2 changes: 2 additions & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@
<tr><td><div id="setting-kv-range-split-load-qps-threshold" class="anchored"><code>kv.range_split.load_qps_threshold</code></div></td><td>integer</td><td><code>2500</code></td><td>the QPS over which, the range becomes a candidate for load based splitting</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-rangefeed-closed-timestamp-refresh-interval" class="anchored"><code>kv.rangefeed.closed_timestamp_refresh_interval</code></div></td><td>duration</td><td><code>3s</code></td><td>the interval at which closed-timestamp updatesare delivered to rangefeeds; set to 0 to use kv.closed_timestamp.side_transport_interval</td><td>Serverless/Dedicated/Self-Hosted (read-only)</td></tr>
<tr><td><div id="setting-kv-rangefeed-enabled" class="anchored"><code>kv.rangefeed.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>if set, rangefeed registration is enabled</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-rangefeed-lagging-ranges-frequency" class="anchored"><code>kv.rangefeed.lagging_ranges_frequency</code></div></td><td>duration</td><td><code>1m0s</code></td><td>controls the frequency at which a rangefeed checks for ranges which have fallen behind</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-rangefeed-lagging-ranges-threshold" class="anchored"><code>kv.rangefeed.lagging_ranges_threshold</code></div></td><td>duration</td><td><code>3m0s</code></td><td>controls how far behind a range must be from the present to be considered as &#39;lagging&#39; behind in metrics</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-rangefeed-range-stuck-threshold" class="anchored"><code>kv.rangefeed.range_stuck_threshold</code></div></td><td>duration</td><td><code>1m0s</code></td><td>restart rangefeeds if they don&#39;t emit anything for the specified threshold; 0 disables (kv.rangefeed.closed_timestamp_refresh_interval takes precedence)</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-replica-circuit-breaker-slow-replication-threshold" class="anchored"><code>kv.replica_circuit_breaker.slow_replication_threshold</code></div></td><td>duration</td><td><code>1m0s</code></td><td>duration after which slow proposals trip the per-Replica circuit breaker (zero duration disables breakers)</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-replica-stats-addsst-request-size-factor" class="anchored"><code>kv.replica_stats.addsst_request_size_factor</code></div></td><td>integer</td><td><code>50000</code></td><td>the divisor that is applied to addsstable request sizes, then recorded in a leaseholders QPS; 0 means all requests are treated as cost 1</td><td>Dedicated/Self-Hosted</td></tr>
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ go_test(
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/closedts",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/protectedts",
"//pkg/roachpb",
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ func (ca *changeAggregator) makeKVFeedCfg(
SchemaFeed: sf,
Knobs: ca.knobs.FeedKnobs,
UseMux: changefeedbase.UseMuxRangeFeed.Get(&cfg.Settings.SV),
UpdateLaggingRanges: ca.sliMetrics.getLaggingRangesCallback(),
}, nil
}

Expand Down
120 changes: 120 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server"
Expand Down Expand Up @@ -1226,6 +1228,124 @@ func TestChangefeedInitialScan(t *testing.T) {
cdcTest(t, testFn)
}

// TestChangefeedLaggingRangesMetrics tests the behavior of the
// changefeed.lagging_ranges metric.
func TestChangefeedLaggingRangesMetrics(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

// Set a small threshold and frequeny so lagging ranges can be detected faster.
kvcoord.LaggingRangesThreshold.Override(
context.Background(), &s.Server.ClusterSettings().SV, 250*time.Millisecond)
kvcoord.LaggingRangesCheckFrequency.Override(
context.Background(), &s.Server.ClusterSettings().SV, 25*time.Millisecond)

// Ensure a fast closed timestamp interval so ranges can catch up fast.
kvserver.RangeFeedRefreshInterval.Override(
context.Background(), &s.Server.ClusterSettings().SV, 20*time.Millisecond)
closedts.SideTransportCloseInterval.Override(
context.Background(), &s.Server.ClusterSettings().SV, 20*time.Millisecond)
closedts.TargetDuration.Override(
context.Background(), &s.Server.ClusterSettings().SV, 20*time.Millisecond)

skipMu := syncutil.Mutex{}
skippedRanges := map[string]struct{}{}
numRanges := 10
numRangesToSkip := int64(4)
var stopSkip atomic.Bool
// `shouldSkip` continuously skips checkpoints for the first `numRangesToSkip` ranges it sees.
// skipping is disabled by setting `stopSkip` to true.
shouldSkip := func(event *kvpb.RangeFeedEvent) bool {
if stopSkip.Load() {
return false
}
switch event.GetValue().(type) {
case *kvpb.RangeFeedCheckpoint:
sp := event.Checkpoint.Span
skipMu.Lock()
defer skipMu.Unlock()
if _, ok := skippedRanges[sp.String()]; ok || int64(len(skippedRanges)) < numRangesToSkip {
skippedRanges[sp.String()] = struct{}{}
return true
}
}
return false
}

knobs := s.TestingKnobs.DistSQL.(*execinfra.TestingKnobs).Changefeed.(*TestingKnobs)

knobs.FeedKnobs.RangefeedOptions = append(knobs.FeedKnobs.RangefeedOptions, kvcoord.TestingWithOnRangefeedEvent(
func(ctx context.Context, s roachpb.Span, _ int64, event *kvpb.RangeFeedEvent) (skip bool, _ error) {
return shouldSkip(event), nil
}),
)

registry := s.Server.JobRegistry().(*jobs.Registry)
sli1, err := registry.MetricsStruct().Changefeed.(*Metrics).getSLIMetrics("t1")
require.NoError(t, err)
laggingRangesTier1 := sli1.LaggingRanges
sli2, err := registry.MetricsStruct().Changefeed.(*Metrics).getSLIMetrics("t2")
require.NoError(t, err)
laggingRangesTier2 := sli2.LaggingRanges

assertLaggingRanges := func(tier string, expected int64) {
testutils.SucceedsWithin(t, func() error {
var laggingRangesObserved int64
if tier == "t1" {
laggingRangesObserved = laggingRangesTier1.Value()
} else {
laggingRangesObserved = laggingRangesTier2.Value()
}
if laggingRangesObserved != expected {
return fmt.Errorf("expected %d lagging ranges, but found %d", expected, laggingRangesObserved)
}
return nil
}, 10*time.Second)
}

sqlDB.Exec(t, fmt.Sprintf(`
CREATE TABLE foo (key INT PRIMARY KEY);
INSERT INTO foo (key) SELECT * FROM generate_series(1, %d);
ALTER TABLE foo SPLIT AT (SELECT * FROM generate_series(1, %d, 1));
`, numRanges, numRanges-1))
sqlDB.CheckQueryResults(t, `SELECT count(*) FROM [SHOW RANGES FROM TABLE foo]`,
[][]string{{fmt.Sprint(numRanges)}},
)

feed1Tier1 := feed(t, f, `CREATE CHANGEFEED FOR foo WITH initial_scan='no', metrics_label="t1"`)
feed2Tier1 := feed(t, f, `CREATE CHANGEFEED FOR foo WITH initial_scan='no', metrics_label="t1"`)
feed3Tier2 := feed(t, f, `CREATE CHANGEFEED FOR foo WITH initial_scan='no', metrics_label="t2"`)

assertLaggingRanges("t1", numRangesToSkip*2)
assertLaggingRanges("t2", numRangesToSkip)

stopSkip.Store(true)
assertLaggingRanges("t1", 0)
assertLaggingRanges("t2", 0)

stopSkip.Store(false)
assertLaggingRanges("t1", numRangesToSkip*2)
assertLaggingRanges("t2", numRangesToSkip)

require.NoError(t, feed1Tier1.Close())
assertLaggingRanges("t1", numRangesToSkip)
assertLaggingRanges("t2", numRangesToSkip)

require.NoError(t, feed2Tier1.Close())
assertLaggingRanges("t1", 0)
assertLaggingRanges("t2", numRangesToSkip)

require.NoError(t, feed3Tier2.Close())
assertLaggingRanges("t1", 0)
assertLaggingRanges("t2", 0)
}
// Can't run on tenants due to lack of SPLIT AT support (#54254)
cdcTest(t, testFn, feedTestNoTenants, feedTestEnterpriseSinks)
}

func TestChangefeedBackfillObservability(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
20 changes: 12 additions & 8 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Config struct {
Metrics *kvevent.Metrics
OnBackfillCallback func() func()
OnBackfillRangeCallback func(int64) (func(), func())
UpdateLaggingRanges func(int64)
MM *mon.BytesMonitor
WithDiff bool
SchemaChangeEvents changefeedbase.SchemaChangeEventClass
Expand Down Expand Up @@ -107,6 +108,7 @@ func Run(ctx context.Context, cfg Config) error {
cfg.SchemaFeed,
sc, pff, bf, cfg.UseMux, cfg.Targets, cfg.Knobs)
f.onBackfillCallback = cfg.OnBackfillCallback
f.updateLaggingRanges = cfg.UpdateLaggingRanges

g := ctxgroup.WithContext(ctx)
g.GoCtx(cfg.SchemaFeed.Run)
Expand Down Expand Up @@ -174,9 +176,10 @@ type kvFeed struct {
writer kvevent.Writer
codec keys.SQLCodec

onBackfillCallback func() func()
schemaChangeEvents changefeedbase.SchemaChangeEventClass
schemaChangePolicy changefeedbase.SchemaChangePolicy
onBackfillCallback func() func()
updateLaggingRanges func(int64)
schemaChangeEvents changefeedbase.SchemaChangeEventClass
schemaChangePolicy changefeedbase.SchemaChangePolicy

useMux bool

Expand Down Expand Up @@ -485,11 +488,12 @@ func (f *kvFeed) runUntilTableEvent(

g := ctxgroup.WithContext(ctx)
physicalCfg := rangeFeedConfig{
Spans: stps,
Frontier: resumeFrontier.Frontier(),
WithDiff: f.withDiff,
Knobs: f.knobs,
UseMux: f.useMux,
Spans: stps,
Frontier: resumeFrontier.Frontier(),
WithDiff: f.withDiff,
Knobs: f.knobs,
UseMux: f.useMux,
UpdateLaggingRanges: f.updateLaggingRanges,
}

// The following two synchronous calls works as follows:
Expand Down
17 changes: 12 additions & 5 deletions pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ type physicalFeedFactory interface {
}

type rangeFeedConfig struct {
Frontier hlc.Timestamp
Spans []kvcoord.SpanTimePair
WithDiff bool
Knobs TestingKnobs
UseMux bool
Frontier hlc.Timestamp
Spans []kvcoord.SpanTimePair
WithDiff bool
UpdateLaggingRanges func(int64)
Knobs TestingKnobs
UseMux bool
}

type rangefeedFactory func(
Expand Down Expand Up @@ -81,6 +82,12 @@ func (p rangefeedFactory) Run(ctx context.Context, sink kvevent.Writer, cfg rang
if cfg.WithDiff {
rfOpts = append(rfOpts, kvcoord.WithDiff())
}
if cfg.UpdateLaggingRanges != nil {
rfOpts = append(rfOpts, kvcoord.WithLaggingRangesUpdate(cfg.UpdateLaggingRanges))
}
if len(cfg.Knobs.RangefeedOptions) != 0 {
rfOpts = append(rfOpts, cfg.Knobs.RangefeedOptions...)
}

g.GoCtx(func(ctx context.Context) error {
return p(ctx, cfg.Spans, feed.eventC, rfOpts...)
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/changefeedccl/kvfeed/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ type TestingKnobs struct {
// ModifyTimestamps is called on the timestamp for each RangefeedMessage
// before converting it into a kv event.
ModifyTimestamps func(*hlc.Timestamp)
// RangefeedOptions which will be passed to the rangefeed which lies under
// the kvfeed.
RangefeedOptions []kvcoord.RangeFeedOption
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down
36 changes: 36 additions & 0 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type AggMetrics struct {
SchemaRegistryRetries *aggmetric.AggCounter
AggregatorProgress *aggmetric.AggGauge
CheckpointProgress *aggmetric.AggGauge
LaggingRanges *aggmetric.AggGauge

// There is always at least 1 sliMetrics created for defaultSLI scope.
mu struct {
Expand Down Expand Up @@ -132,6 +133,7 @@ type sliMetrics struct {
SchemaRegistryRetries *aggmetric.Counter
AggregatorProgress *aggmetric.Gauge
CheckpointProgress *aggmetric.Gauge
LaggingRanges *aggmetric.Gauge

mu struct {
syncutil.Mutex
Expand Down Expand Up @@ -607,6 +609,12 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics {
Measurement: "Unix Timestamp Nanoseconds",
Unit: metric.Unit_TIMESTAMP_NS,
}
metaLaggingRangePercentage := metric.Metadata{
Name: "changefeed.lagging_ranges",
Help: "The number of ranges considered to be lagging behind",
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
}

functionalGaugeMinFn := func(childValues []int64) int64 {
var min int64
Expand All @@ -617,6 +625,7 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics {
}
return min
}

// NB: When adding new histograms, use sigFigs = 1. Older histograms
// retain significant figures of 2.
b := aggmetric.MakeBuilder("scope")
Expand Down Expand Up @@ -681,6 +690,7 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics {
SchemaRegistrations: b.Counter(metaSchemaRegistryRegistrations),
AggregatorProgress: b.FunctionalGauge(metaAggregatorProgress, functionalGaugeMinFn),
CheckpointProgress: b.FunctionalGauge(metaCheckpointProgress, functionalGaugeMinFn),
LaggingRanges: b.Gauge(metaLaggingRangePercentage),
}
a.mu.sliMetrics = make(map[string]*sliMetrics)
_, err := a.getOrCreateScope(defaultSLIScope)
Expand Down Expand Up @@ -740,6 +750,7 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
InternalRetryMessageCount: a.InternalRetryMessageCount.AddChild(scope),
SchemaRegistryRetries: a.SchemaRegistryRetries.AddChild(scope),
SchemaRegistrations: a.SchemaRegistrations.AddChild(scope),
LaggingRanges: a.LaggingRanges.AddChild(scope),
}
sm.mu.resolved = make(map[int64]hlc.Timestamp)
sm.mu.checkpoint = make(map[int64]hlc.Timestamp)
Expand Down Expand Up @@ -769,6 +780,31 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
return sm, nil
}

// getLaggingRangesCallback returns a function which can be called to update the
// lagging ranges metric. It should be called with the current number of lagging
// ranges.
func (s *sliMetrics) getLaggingRangesCallback() func(int64) {
// Because this gauge is shared between changefeeds in the same metrics scope,
// must instead modify it using `Inc` and `Dec` (as opposed to `Update`) to
// ensure values written by others are not overwritten. The code below is used
// to determine the deltas based on the last known number of lagging ranges.
//
// Example:
//
// Initially there are 0 lagging ranges, so `last` is 0. Assume the gauge
// has an arbitrary value X.
//
// If 10 ranges are behind, last=0,i=10: X.Dec(0 - 10) = X.Inc(10)
// If 3 ranges catch up, last=10,i=7: X.Dec(10 - 7) = X.Dec(3)
// If 4 ranges fall behind, last=7,i=11: X.Dec(7 - 11) = X.Inc(4)
// If 1 lagging range is deleted, last=7,i=10: X.Dec(11-10) = X.Dec(1)
var last int64
return func(i int64) {
s.LaggingRanges.Dec(last - i)
last = i
}
}

// Metrics are for production monitoring of changefeeds.
type Metrics struct {
AggMetrics *AggMetrics
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,12 @@ func muxRangeFeed(
if cfg.knobs.metrics != nil {
m.metrics = cfg.knobs.metrics
}

divideAllSpansOnRangeBoundaries(spans, m.startSingleRangeFeed, ds, &m.g)
if cfg.withLaggingRangesUpdate != nil {
m.g.GoCtx(func(ctx context.Context) error {
return ds.monitorLaggingRanges(ctx, rr, cfg.withLaggingRangesUpdate)
})
}
return errors.CombineErrors(m.g.Wait(), ctx.Err())
}

Expand Down
Loading

0 comments on commit 6e29018

Please sign in to comment.