Skip to content

Commit

Permalink
streamingest: add a replication lag metric
Browse files Browse the repository at this point in the history
Add a metric to track the lag of the replication frontier, in seconds.

Informs: cockroachdb#92959

Epic: CRDB-18752

Release note: None
  • Loading branch information
lidorcarmel committed Jan 18, 2023
1 parent 6f5d89e commit e5cc6d2
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 0 deletions.
8 changes: 8 additions & 0 deletions pkg/ccl/streamingccl/streamingest/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ var (
Measurement: "Resolved Spans",
Unit: metric.Unit_COUNT,
}
metaFrontierLagSeconds = metric.Metadata{
Name: "streaming.frontier_lag_seconds",
Help: "Time the replication frontier lags",
Measurement: "Seconds",
Unit: metric.Unit_SECONDS,
}
metaJobProgressUpdates = metric.Metadata{
Name: "streaming.job_progress_updates",
Help: "Total number of updates to the ingestion job progress",
Expand All @@ -115,6 +121,7 @@ type Metrics struct {
LatestDataCheckpointSpan *metric.Gauge
DataCheckpointSpanCount *metric.Gauge
FrontierCheckpointSpanCount *metric.Gauge
FrontierLagSeconds *metric.GaugeFloat64
}

// MetricStruct implements the metric.Struct interface.
Expand All @@ -139,6 +146,7 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct {
LatestDataCheckpointSpan: metric.NewGauge(metaLatestDataCheckpointSpan),
DataCheckpointSpanCount: metric.NewGauge(metaDataCheckpointSpanCount),
FrontierCheckpointSpanCount: metric.NewGauge(metaFrontierCheckpointSpanCount),
FrontierLagSeconds: metric.NewGaugeFloat64(metaFrontierLagSeconds),
}
return m
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ func (sf *streamIngestionFrontier) maybeUpdatePartitionProgress() error {
sf.metrics.JobProgressUpdates.Inc(1)
sf.persistedHighWater = f.Frontier()
sf.metrics.FrontierCheckpointSpanCount.Update(int64(len(frontierResolvedSpans)))
sf.metrics.FrontierLagSeconds.Update(timeutil.Since(sf.persistedHighWater.GoTime()).Seconds())

return nil
}
4 changes: 4 additions & 0 deletions pkg/ts/catalog/chart_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -1623,6 +1623,10 @@ var charts = []sectionDescription{
Title: "Frontier Checkpoint Span Count",
Metrics: []string{"streaming.frontier_checkpoint_span_count"},
},
{
Title: "Frontier Lag",
Metrics: []string{"streaming.frontier_lag_seconds"},
},
{
Title: "Job Progress Updates",
Metrics: []string{"streaming.job_progress_updates"},
Expand Down

0 comments on commit e5cc6d2

Please sign in to comment.