Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-22.2: changefeedccl: add changefeed.lagging_ranges metric #110970

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ go_test(
"//pkg/kv",
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/closedts",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/protectedts",
"//pkg/kv/kvserver/protectedts/ptpb",
Expand Down
66 changes: 43 additions & 23 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,30 +394,50 @@ func (ca *changeAggregator) makeKVFeedCfg(
initialHighWater, &ca.metrics.SchemaFeedMetrics, config.Opts.GetCanHandle())
}

monitoringCfg, err := makeKVFeedMonitoringCfg(ca.sliMetrics, ca.flowCtx.Cfg.Settings)
if err != nil {
return kvfeed.Config{}, err
}

return kvfeed.Config{
Writer: buf,
Settings: cfg.Settings,
DB: cfg.DB,
Codec: cfg.Codec,
Clock: cfg.DB.Clock(),
Gossip: cfg.Gossip,
Spans: spans,
CheckpointSpans: ca.spec.Checkpoint.Spans,
CheckpointTimestamp: ca.spec.Checkpoint.Timestamp,
Targets: AllTargets(ca.spec.Feed),
Metrics: &ca.metrics.KVFeedMetrics,
OnBackfillCallback: ca.sliMetrics.getBackfillCallback(),
OnBackfillRangeCallback: ca.sliMetrics.getBackfillRangeCallback(),
MM: ca.kvFeedMemMon,
InitialHighWater: initialHighWater,
EndTime: config.EndTime,
WithDiff: filters.WithDiff,
NeedsInitialScan: needsInitialScan,
SchemaChangeEvents: schemaChange.EventClass,
SchemaChangePolicy: schemaChange.Policy,
SchemaFeed: sf,
Knobs: ca.knobs.FeedKnobs,
UseMux: changefeedbase.UseMuxRangeFeed.Get(&cfg.Settings.SV),
Writer: buf,
Settings: cfg.Settings,
DB: cfg.DB,
Codec: cfg.Codec,
Clock: cfg.DB.Clock(),
Gossip: cfg.Gossip,
Spans: spans,
CheckpointSpans: ca.spec.Checkpoint.Spans,
CheckpointTimestamp: ca.spec.Checkpoint.Timestamp,
Targets: AllTargets(ca.spec.Feed),
Metrics: &ca.metrics.KVFeedMetrics,
MM: ca.kvFeedMemMon,
InitialHighWater: initialHighWater,
EndTime: config.EndTime,
WithDiff: filters.WithDiff,
NeedsInitialScan: needsInitialScan,
SchemaChangeEvents: schemaChange.EventClass,
SchemaChangePolicy: schemaChange.Policy,
SchemaFeed: sf,
Knobs: ca.knobs.FeedKnobs,
UseMux: changefeedbase.UseMuxRangeFeed.Get(&cfg.Settings.SV),
MonitoringCfg: monitoringCfg,
}, nil
}

func makeKVFeedMonitoringCfg(
sliMetrics *sliMetrics, settings *cluster.Settings,
) (kvfeed.MonitoringConfig, error) {
laggingRangesThreshold := changefeedbase.LaggingRangesThreshold.Get(&settings.SV)
laggingRangesInterval := changefeedbase.LaggingRangesPollingInterval.Get(&settings.SV)

return kvfeed.MonitoringConfig{
LaggingRangesCallback: sliMetrics.getLaggingRangesCallback(),
LaggingRangesThreshold: laggingRangesThreshold,
LaggingRangesPollingInterval: laggingRangesInterval,

OnBackfillCallback: sliMetrics.getBackfillCallback(),
OnBackfillRangeCallback: sliMetrics.getBackfillRangeCallback(),
}, nil
}

Expand Down
120 changes: 120 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
Expand Down Expand Up @@ -1024,6 +1025,123 @@ func TestChangefeedInitialScan(t *testing.T) {
cdcTest(t, testFn)
}

// TestChangefeedLaggingRangesMetrics tests the behavior of the
// changefeed.lagging_ranges metric.
func TestChangefeedLaggingRangesMetrics(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

// Ensure a fast closed timestamp interval so ranges can catch up fast.
kvserver.RangeFeedRefreshInterval.Override(
context.Background(), &s.Server.ClusterSettings().SV, 20*time.Millisecond)
closedts.SideTransportCloseInterval.Override(
context.Background(), &s.Server.ClusterSettings().SV, 20*time.Millisecond)
closedts.TargetDuration.Override(
context.Background(), &s.Server.ClusterSettings().SV, 20*time.Millisecond)

changefeedbase.LaggingRangesPollingInterval.Override(
context.Background(), &s.Server.ClusterSettings().SV, 25*time.Millisecond)
changefeedbase.LaggingRangesThreshold.Override(
context.Background(), &s.Server.ClusterSettings().SV, 250*time.Millisecond)

skipMu := syncutil.Mutex{}
skippedRanges := map[string]struct{}{}
numRanges := 10
numRangesToSkip := int64(4)
var stopSkip atomic.Bool
// `shouldSkip` continuously skips checkpoints for the first `numRangesToSkip` ranges it sees.
// skipping is disabled by setting `stopSkip` to true.
shouldSkip := func(event *roachpb.RangeFeedEvent) bool {
if stopSkip.Load() {
return false
}
switch event.GetValue().(type) {
case *roachpb.RangeFeedCheckpoint:
sp := event.Checkpoint.Span
skipMu.Lock()
defer skipMu.Unlock()
if _, ok := skippedRanges[sp.String()]; ok || int64(len(skippedRanges)) < numRangesToSkip {
skippedRanges[sp.String()] = struct{}{}
return true
}
}
return false
}

knobs := s.TestingKnobs.DistSQL.(*execinfra.TestingKnobs).Changefeed.(*TestingKnobs)

knobs.FeedKnobs.RangefeedOptions = append(knobs.FeedKnobs.RangefeedOptions, kvcoord.TestingWithOnRangefeedEvent(
func(ctx context.Context, s roachpb.Span, event *roachpb.RangeFeedEvent) (skip bool, _ error) {
return shouldSkip(event), nil
}),
)

registry := s.Server.JobRegistry().(*jobs.Registry)
sli1, err := registry.MetricsStruct().Changefeed.(*Metrics).getSLIMetrics("t1")
require.NoError(t, err)
laggingRangesTier1 := sli1.LaggingRanges
sli2, err := registry.MetricsStruct().Changefeed.(*Metrics).getSLIMetrics("t2")
require.NoError(t, err)
laggingRangesTier2 := sli2.LaggingRanges

assertLaggingRanges := func(tier string, expected int64) {
testutils.SucceedsWithin(t, func() error {
var laggingRangesObserved int64
if tier == "t1" {
laggingRangesObserved = laggingRangesTier1.Value()
} else {
laggingRangesObserved = laggingRangesTier2.Value()
}
if laggingRangesObserved != expected {
return fmt.Errorf("expected %d lagging ranges, but found %d", expected, laggingRangesObserved)
}
return nil
}, 10*time.Second)
}

sqlDB.Exec(t, fmt.Sprintf(`
CREATE TABLE foo (key INT PRIMARY KEY);
INSERT INTO foo (key) SELECT * FROM generate_series(1, %d);
ALTER TABLE foo SPLIT AT (SELECT * FROM generate_series(1, %d, 1));
`, numRanges, numRanges-1))
sqlDB.CheckQueryResults(t, `SELECT count(*) FROM [SHOW RANGES FROM TABLE foo]`,
[][]string{{fmt.Sprint(numRanges)}},
)

feed1Tier1 := feed(t, f, `CREATE CHANGEFEED FOR foo WITH initial_scan='no', metrics_label="t1"`)
feed2Tier1 := feed(t, f, `CREATE CHANGEFEED FOR foo WITH initial_scan='no', metrics_label="t1"`)
feed3Tier2 := feed(t, f, `CREATE CHANGEFEED FOR foo WITH initial_scan='no', metrics_label="t2"`)

assertLaggingRanges("t1", numRangesToSkip*2)
assertLaggingRanges("t2", numRangesToSkip)

stopSkip.Store(true)
assertLaggingRanges("t1", 0)
assertLaggingRanges("t2", 0)

stopSkip.Store(false)
assertLaggingRanges("t1", numRangesToSkip*2)
assertLaggingRanges("t2", numRangesToSkip)

require.NoError(t, feed1Tier1.Close())
assertLaggingRanges("t1", numRangesToSkip)
assertLaggingRanges("t2", numRangesToSkip)

require.NoError(t, feed2Tier1.Close())
assertLaggingRanges("t1", 0)
assertLaggingRanges("t2", numRangesToSkip)

require.NoError(t, feed3Tier2.Close())
assertLaggingRanges("t1", 0)
assertLaggingRanges("t2", 0)
}
// Can't run on tenants due to lack of SPLIT AT support (#54254)
cdcTest(t, testFn, feedTestNoTenants, feedTestEnterpriseSinks)
}

func TestChangefeedBackfillObservability(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -1241,6 +1359,8 @@ func TestNoBackfillAfterNonTargetColumnDrop(t *testing.T) {

func TestChangefeedColumnDropsWithFamilyAndNonFamilyTargets(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,8 @@ func (s StatementOptions) getEnumValue(k string) (string, error) {
return rawVal, nil
}

// getDurationValue validates that the option `k` was supplied with a
// valid duration.
func (s StatementOptions) getDurationValue(k string) (*time.Duration, error) {
v, ok := s.m[k]
if !ok {
Expand Down
28 changes: 28 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,3 +279,31 @@ var EventConsumerElasticCPUControlEnabled = settings.RegisterBoolSetting(
"determines whether changefeed event processing integrates with elastic CPU control",
false,
)

var defaultLaggingRangesThreshold = 3 * time.Minute

var defaultLaggingRangesPollingInterval = 1 * time.Minute

// LaggingRangesThreshold specifies the duration by which a range must
// be lagging behind the present to be considered as 'lagging' behind in
// metrics.
var LaggingRangesThreshold = settings.RegisterDurationSetting(
settings.TenantWritable,
"changefeed.lagging_ranges_threshold",
"specifies the duration by which a range must be lagging behind the "+
"present to be considered as 'lagging' behind in metrics. will be "+
"removed in v23.2 in favor of a changefeed option",
defaultLaggingRangesThreshold,
settings.PositiveDuration,
)

// LaggingRangesPollingInterval is the polling rate at which lagging ranges are
// checked and metrics are updated.
var LaggingRangesPollingInterval = settings.RegisterDurationSetting(
settings.TenantWritable,
"changefeed.lagging_ranges_polling_interval",
"the polling rate at which lagging ranges are checked and "+
"corresponding metrics are updated. will be removed in v23.2 onwards",
defaultLaggingRangesPollingInterval,
settings.PositiveDuration,
)
Loading