Skip to content

Commit

Permalink
streamingest: add a metric for replication cutover progress
Browse files Browse the repository at this point in the history
We already have the progress info in the job as the percentage of ranges
that were reverted. This commit adds the number of ranges that are left
to be reverted as a metric.

Epic: CRDB-18752

Fixes: #96536

Release note: None
  • Loading branch information
lidorcarmel committed Feb 7, 2023
1 parent f640acb commit 45faeeb
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 1 deletion.
13 changes: 13 additions & 0 deletions pkg/ccl/streamingccl/streamingest/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,17 @@ var (
Measurement: "Job Updates",
Unit: metric.Unit_COUNT,
}
// This metric would be 0 until cutover begins, and then it will be updated to
// the total number of ranges that need to be reverted, and then gradually go
// down to 0 again. NB: that the number of ranges is the total number of
// ranges left to be reverted, but some may not have writes and therefore the
// revert will be a no-op for those ranges.
metaReplicationCutoverProgress = metric.Metadata{
Name: "replication.cutover_progress",
Help: "The number of ranges left to revert in order to complete an inflight cutover",
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
}
)

// Metrics are for production monitoring of stream ingestion jobs.
Expand All @@ -136,6 +147,7 @@ type Metrics struct {
DataCheckpointSpanCount *metric.Gauge
FrontierCheckpointSpanCount *metric.Gauge
FrontierLagSeconds *metric.GaugeFloat64
ReplicationCutoverProgress *metric.Gauge
}

// MetricStruct implements the metric.Struct interface.
Expand Down Expand Up @@ -177,6 +189,7 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct {
DataCheckpointSpanCount: metric.NewGauge(metaDataCheckpointSpanCount),
FrontierCheckpointSpanCount: metric.NewGauge(metaFrontierCheckpointSpanCount),
FrontierLagSeconds: metric.NewGaugeFloat64(metaFrontierLagSeconds),
ReplicationCutoverProgress: metric.NewGauge(metaReplicationCutoverProgress),
}
return m
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,8 @@ func maybeRevertToCutoverTimestamp(

p := execCtx.(sql.JobExecContext)
db := p.ExecCfg().DB
j, err := p.ExecCfg().JobRegistry.LoadJob(ctx, ingestionJobID)
jobRegistry := p.ExecCfg().JobRegistry
j, err := jobRegistry.LoadJob(ctx, ingestionJobID)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -542,6 +543,8 @@ func maybeRevertToCutoverTimestamp(
if err != nil {
return err
}
m := jobRegistry.MetricsStruct().StreamIngest.(*Metrics)
m.ReplicationCutoverProgress.Update(int64(nRanges))
if origNRanges == -1 {
origNRanges = nRanges
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,9 @@ func TestCutoverFractionProgressed(t *testing.T) {
return jobs.UpdateHighwaterProgressed(cutover, md, ju)
}))

metrics := registry.MetricsStruct().StreamIngest.(*Metrics)
require.Equal(t, int64(0), metrics.ReplicationCutoverProgress.Value())

g := ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) error {
defer close(respRecvd)
Expand All @@ -491,6 +494,7 @@ func TestCutoverFractionProgressed(t *testing.T) {
"0.67": false,
"0.83": false,
}
var expectedRanges int64 = 6
g.GoCtx(func(ctx context.Context) error {
for {
select {
Expand All @@ -506,6 +510,14 @@ func TestCutoverFractionProgressed(t *testing.T) {
if _, ok := progressMap[s]; !ok {
t.Fatalf("unexpected progress fraction %s", s)
}
// We sometimes see the same progress, which is valid, no need to update
// the expected range count.
if expectedRanges != metrics.ReplicationCutoverProgress.Value() {
// There is progress, which means that another range was reverted,
// updated the expected range count.
expectedRanges--
}
require.Equal(t, expectedRanges, metrics.ReplicationCutoverProgress.Value())
progressMap[s] = true
continueRevert <- struct{}{}
}
Expand All @@ -514,6 +526,7 @@ func TestCutoverFractionProgressed(t *testing.T) {
require.NoError(t, g.Wait())
sip := loadProgress()
require.Equal(t, sip.GetFractionCompleted(), float32(1))
require.Equal(t, int64(0), metrics.ReplicationCutoverProgress.Value())

// Ensure we have hit all our expected progress fractions.
for k, v := range progressMap {
Expand Down
4 changes: 4 additions & 0 deletions pkg/ts/catalog/chart_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -1649,6 +1649,10 @@ var charts = []sectionDescription{
Title: "Job Progress Updates",
Metrics: []string{"replication.job_progress_updates"},
},
{
Title: "Ranges To Revert",
Metrics: []string{"replication.cutover_progress"},
},
},
},
{
Expand Down

0 comments on commit 45faeeb

Please sign in to comment.