Skip to content

Commit

Permalink
Merge pull request #110970 from jayshrivastava/lagging-range-backport…
Browse files Browse the repository at this point in the history
…-release-22.2

release-22.2: changefeedccl: add `changefeed.lagging_ranges` metric
  • Loading branch information
jayshrivastava authored Sep 26, 2023
2 parents 50cc26d + 41f05f7 commit ad4e261
Show file tree
Hide file tree
Showing 12 changed files with 536 additions and 73 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ go_test(
"//pkg/kv",
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/closedts",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/protectedts",
"//pkg/kv/kvserver/protectedts/ptpb",
Expand Down
66 changes: 43 additions & 23 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,30 +394,50 @@ func (ca *changeAggregator) makeKVFeedCfg(
initialHighWater, &ca.metrics.SchemaFeedMetrics, config.Opts.GetCanHandle())
}

monitoringCfg, err := makeKVFeedMonitoringCfg(ca.sliMetrics, ca.flowCtx.Cfg.Settings)
if err != nil {
return kvfeed.Config{}, err
}

return kvfeed.Config{
Writer: buf,
Settings: cfg.Settings,
DB: cfg.DB,
Codec: cfg.Codec,
Clock: cfg.DB.Clock(),
Gossip: cfg.Gossip,
Spans: spans,
CheckpointSpans: ca.spec.Checkpoint.Spans,
CheckpointTimestamp: ca.spec.Checkpoint.Timestamp,
Targets: AllTargets(ca.spec.Feed),
Metrics: &ca.metrics.KVFeedMetrics,
OnBackfillCallback: ca.sliMetrics.getBackfillCallback(),
OnBackfillRangeCallback: ca.sliMetrics.getBackfillRangeCallback(),
MM: ca.kvFeedMemMon,
InitialHighWater: initialHighWater,
EndTime: config.EndTime,
WithDiff: filters.WithDiff,
NeedsInitialScan: needsInitialScan,
SchemaChangeEvents: schemaChange.EventClass,
SchemaChangePolicy: schemaChange.Policy,
SchemaFeed: sf,
Knobs: ca.knobs.FeedKnobs,
UseMux: changefeedbase.UseMuxRangeFeed.Get(&cfg.Settings.SV),
Writer: buf,
Settings: cfg.Settings,
DB: cfg.DB,
Codec: cfg.Codec,
Clock: cfg.DB.Clock(),
Gossip: cfg.Gossip,
Spans: spans,
CheckpointSpans: ca.spec.Checkpoint.Spans,
CheckpointTimestamp: ca.spec.Checkpoint.Timestamp,
Targets: AllTargets(ca.spec.Feed),
Metrics: &ca.metrics.KVFeedMetrics,
MM: ca.kvFeedMemMon,
InitialHighWater: initialHighWater,
EndTime: config.EndTime,
WithDiff: filters.WithDiff,
NeedsInitialScan: needsInitialScan,
SchemaChangeEvents: schemaChange.EventClass,
SchemaChangePolicy: schemaChange.Policy,
SchemaFeed: sf,
Knobs: ca.knobs.FeedKnobs,
UseMux: changefeedbase.UseMuxRangeFeed.Get(&cfg.Settings.SV),
MonitoringCfg: monitoringCfg,
}, nil
}

func makeKVFeedMonitoringCfg(
sliMetrics *sliMetrics, settings *cluster.Settings,
) (kvfeed.MonitoringConfig, error) {
laggingRangesThreshold := changefeedbase.LaggingRangesThreshold.Get(&settings.SV)
laggingRangesInterval := changefeedbase.LaggingRangesPollingInterval.Get(&settings.SV)

return kvfeed.MonitoringConfig{
LaggingRangesCallback: sliMetrics.getLaggingRangesCallback(),
LaggingRangesThreshold: laggingRangesThreshold,
LaggingRangesPollingInterval: laggingRangesInterval,

OnBackfillCallback: sliMetrics.getBackfillCallback(),
OnBackfillRangeCallback: sliMetrics.getBackfillRangeCallback(),
}, nil
}

Expand Down
120 changes: 120 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
Expand Down Expand Up @@ -1024,6 +1025,123 @@ func TestChangefeedInitialScan(t *testing.T) {
cdcTest(t, testFn)
}

// TestChangefeedLaggingRangesMetrics tests the behavior of the
// changefeed.lagging_ranges metric.
func TestChangefeedLaggingRangesMetrics(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

// Ensure a fast closed timestamp interval so ranges can catch up fast.
kvserver.RangeFeedRefreshInterval.Override(
context.Background(), &s.Server.ClusterSettings().SV, 20*time.Millisecond)
closedts.SideTransportCloseInterval.Override(
context.Background(), &s.Server.ClusterSettings().SV, 20*time.Millisecond)
closedts.TargetDuration.Override(
context.Background(), &s.Server.ClusterSettings().SV, 20*time.Millisecond)

changefeedbase.LaggingRangesPollingInterval.Override(
context.Background(), &s.Server.ClusterSettings().SV, 25*time.Millisecond)
changefeedbase.LaggingRangesThreshold.Override(
context.Background(), &s.Server.ClusterSettings().SV, 250*time.Millisecond)

skipMu := syncutil.Mutex{}
skippedRanges := map[string]struct{}{}
numRanges := 10
numRangesToSkip := int64(4)
var stopSkip atomic.Bool
// `shouldSkip` continuously skips checkpoints for the first `numRangesToSkip` ranges it sees.
// skipping is disabled by setting `stopSkip` to true.
shouldSkip := func(event *roachpb.RangeFeedEvent) bool {
if stopSkip.Load() {
return false
}
switch event.GetValue().(type) {
case *roachpb.RangeFeedCheckpoint:
sp := event.Checkpoint.Span
skipMu.Lock()
defer skipMu.Unlock()
if _, ok := skippedRanges[sp.String()]; ok || int64(len(skippedRanges)) < numRangesToSkip {
skippedRanges[sp.String()] = struct{}{}
return true
}
}
return false
}

knobs := s.TestingKnobs.DistSQL.(*execinfra.TestingKnobs).Changefeed.(*TestingKnobs)

knobs.FeedKnobs.RangefeedOptions = append(knobs.FeedKnobs.RangefeedOptions, kvcoord.TestingWithOnRangefeedEvent(
func(ctx context.Context, s roachpb.Span, event *roachpb.RangeFeedEvent) (skip bool, _ error) {
return shouldSkip(event), nil
}),
)

registry := s.Server.JobRegistry().(*jobs.Registry)
sli1, err := registry.MetricsStruct().Changefeed.(*Metrics).getSLIMetrics("t1")
require.NoError(t, err)
laggingRangesTier1 := sli1.LaggingRanges
sli2, err := registry.MetricsStruct().Changefeed.(*Metrics).getSLIMetrics("t2")
require.NoError(t, err)
laggingRangesTier2 := sli2.LaggingRanges

assertLaggingRanges := func(tier string, expected int64) {
testutils.SucceedsWithin(t, func() error {
var laggingRangesObserved int64
if tier == "t1" {
laggingRangesObserved = laggingRangesTier1.Value()
} else {
laggingRangesObserved = laggingRangesTier2.Value()
}
if laggingRangesObserved != expected {
return fmt.Errorf("expected %d lagging ranges, but found %d", expected, laggingRangesObserved)
}
return nil
}, 10*time.Second)
}

sqlDB.Exec(t, fmt.Sprintf(`
CREATE TABLE foo (key INT PRIMARY KEY);
INSERT INTO foo (key) SELECT * FROM generate_series(1, %d);
ALTER TABLE foo SPLIT AT (SELECT * FROM generate_series(1, %d, 1));
`, numRanges, numRanges-1))
sqlDB.CheckQueryResults(t, `SELECT count(*) FROM [SHOW RANGES FROM TABLE foo]`,
[][]string{{fmt.Sprint(numRanges)}},
)

feed1Tier1 := feed(t, f, `CREATE CHANGEFEED FOR foo WITH initial_scan='no', metrics_label="t1"`)
feed2Tier1 := feed(t, f, `CREATE CHANGEFEED FOR foo WITH initial_scan='no', metrics_label="t1"`)
feed3Tier2 := feed(t, f, `CREATE CHANGEFEED FOR foo WITH initial_scan='no', metrics_label="t2"`)

assertLaggingRanges("t1", numRangesToSkip*2)
assertLaggingRanges("t2", numRangesToSkip)

stopSkip.Store(true)
assertLaggingRanges("t1", 0)
assertLaggingRanges("t2", 0)

stopSkip.Store(false)
assertLaggingRanges("t1", numRangesToSkip*2)
assertLaggingRanges("t2", numRangesToSkip)

require.NoError(t, feed1Tier1.Close())
assertLaggingRanges("t1", numRangesToSkip)
assertLaggingRanges("t2", numRangesToSkip)

require.NoError(t, feed2Tier1.Close())
assertLaggingRanges("t1", 0)
assertLaggingRanges("t2", numRangesToSkip)

require.NoError(t, feed3Tier2.Close())
assertLaggingRanges("t1", 0)
assertLaggingRanges("t2", 0)
}
// Can't run on tenants due to lack of SPLIT AT support (#54254)
cdcTest(t, testFn, feedTestNoTenants, feedTestEnterpriseSinks)
}

func TestChangefeedBackfillObservability(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -1241,6 +1359,8 @@ func TestNoBackfillAfterNonTargetColumnDrop(t *testing.T) {

func TestChangefeedColumnDropsWithFamilyAndNonFamilyTargets(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,8 @@ func (s StatementOptions) getEnumValue(k string) (string, error) {
return rawVal, nil
}

// getDurationValue validates that the option `k` was supplied with a
// valid duration.
func (s StatementOptions) getDurationValue(k string) (*time.Duration, error) {
v, ok := s.m[k]
if !ok {
Expand Down
28 changes: 28 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,3 +279,31 @@ var EventConsumerElasticCPUControlEnabled = settings.RegisterBoolSetting(
"determines whether changefeed event processing integrates with elastic CPU control",
false,
)

var defaultLaggingRangesThreshold = 3 * time.Minute

var defaultLaggingRangesPollingInterval = 1 * time.Minute

// LaggingRangesThreshold specifies the duration by which a range must
// be lagging behind the present to be considered as 'lagging' behind in
// metrics.
var LaggingRangesThreshold = settings.RegisterDurationSetting(
settings.TenantWritable,
"changefeed.lagging_ranges_threshold",
"specifies the duration by which a range must be lagging behind the "+
"present to be considered as 'lagging' behind in metrics. will be "+
"removed in v23.2 in favor of a changefeed option",
defaultLaggingRangesThreshold,
settings.PositiveDuration,
)

// LaggingRangesPollingInterval is the polling rate at which lagging ranges are
// checked and metrics are updated.
var LaggingRangesPollingInterval = settings.RegisterDurationSetting(
settings.TenantWritable,
"changefeed.lagging_ranges_polling_interval",
"the polling rate at which lagging ranges are checked and "+
"corresponding metrics are updated. will be removed in v23.2 onwards",
defaultLaggingRangesPollingInterval,
settings.PositiveDuration,
)
Loading

0 comments on commit ad4e261

Please sign in to comment.