From e5cc6d2f6bb495ed206af68e4f1aa831a04da498 Mon Sep 17 00:00:00 2001 From: Lidor Carmel Date: Fri, 13 Jan 2023 10:04:30 -0800 Subject: [PATCH] streamingest: add a replication lag metric Add a metric to track the lag of the replication frontier, in seconds. Informs: #92959 Epic: CRDB-18752 Release note: None --- pkg/ccl/streamingccl/streamingest/metrics.go | 8 ++++++++ .../streamingest/stream_ingestion_frontier_processor.go | 1 + pkg/ts/catalog/chart_catalog.go | 4 ++++ 3 files changed, 13 insertions(+) diff --git a/pkg/ccl/streamingccl/streamingest/metrics.go b/pkg/ccl/streamingccl/streamingest/metrics.go index aec88cc3a517..58c3c7910d82 100644 --- a/pkg/ccl/streamingccl/streamingest/metrics.go +++ b/pkg/ccl/streamingccl/streamingest/metrics.go @@ -92,6 +92,12 @@ var ( Measurement: "Resolved Spans", Unit: metric.Unit_COUNT, } + metaFrontierLagSeconds = metric.Metadata{ + Name: "streaming.frontier_lag_seconds", + Help: "Time the replication frontier lags", + Measurement: "Seconds", + Unit: metric.Unit_SECONDS, + } metaJobProgressUpdates = metric.Metadata{ Name: "streaming.job_progress_updates", Help: "Total number of updates to the ingestion job progress", @@ -115,6 +121,7 @@ type Metrics struct { LatestDataCheckpointSpan *metric.Gauge DataCheckpointSpanCount *metric.Gauge FrontierCheckpointSpanCount *metric.Gauge + FrontierLagSeconds *metric.GaugeFloat64 } // MetricStruct implements the metric.Struct interface. @@ -139,6 +146,7 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct { LatestDataCheckpointSpan: metric.NewGauge(metaLatestDataCheckpointSpan), DataCheckpointSpanCount: metric.NewGauge(metaDataCheckpointSpanCount), FrontierCheckpointSpanCount: metric.NewGauge(metaFrontierCheckpointSpanCount), + FrontierLagSeconds: metric.NewGaugeFloat64(metaFrontierLagSeconds), } return m } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go index c71fb79373fd..181cc5dc6657 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go @@ -467,6 +467,7 @@ func (sf *streamIngestionFrontier) maybeUpdatePartitionProgress() error { sf.metrics.JobProgressUpdates.Inc(1) sf.persistedHighWater = f.Frontier() sf.metrics.FrontierCheckpointSpanCount.Update(int64(len(frontierResolvedSpans))) + sf.metrics.FrontierLagSeconds.Update(timeutil.Since(sf.persistedHighWater.GoTime()).Seconds()) return nil } diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index 41614bdff589..ec83e785d72a 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -1623,6 +1623,10 @@ var charts = []sectionDescription{ Title: "Frontier Checkpoint Span Count", Metrics: []string{"streaming.frontier_checkpoint_span_count"}, }, + { + Title: "Frontier Lag", + Metrics: []string{"streaming.frontier_lag_seconds"}, + }, { Title: "Job Progress Updates", Metrics: []string{"streaming.job_progress_updates"},