diff --git a/pkg/ccl/streamingccl/streamingest/metrics.go b/pkg/ccl/streamingccl/streamingest/metrics.go index aec88cc3a517..752ca1700e5e 100644 --- a/pkg/ccl/streamingccl/streamingest/metrics.go +++ b/pkg/ccl/streamingccl/streamingest/metrics.go @@ -92,6 +92,12 @@ var ( Measurement: "Resolved Spans", Unit: metric.Unit_COUNT, } + metaFrontierLagSeconds = metric.Metadata{ + Name: "replication.frontier_lag_seconds", + Help: "Time the replication frontier lags", + Measurement: "Seconds", + Unit: metric.Unit_SECONDS, + } metaJobProgressUpdates = metric.Metadata{ Name: "streaming.job_progress_updates", Help: "Total number of updates to the ingestion job progress", @@ -115,6 +121,7 @@ type Metrics struct { LatestDataCheckpointSpan *metric.Gauge DataCheckpointSpanCount *metric.Gauge FrontierCheckpointSpanCount *metric.Gauge + FrontierLagSeconds *metric.GaugeFloat64 } // MetricStruct implements the metric.Struct interface. @@ -139,6 +146,7 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct { LatestDataCheckpointSpan: metric.NewGauge(metaLatestDataCheckpointSpan), DataCheckpointSpanCount: metric.NewGauge(metaDataCheckpointSpanCount), FrontierCheckpointSpanCount: metric.NewGauge(metaFrontierCheckpointSpanCount), + FrontierLagSeconds: metric.NewGaugeFloat64(metaFrontierLagSeconds), } return m } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go index c71fb79373fd..7e10258ee482 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go @@ -198,7 +198,7 @@ func (h *heartbeatSender) maybeHeartbeat( return true, s, err } -func (h *heartbeatSender) startHeartbeatLoop(ctx context.Context) { +func (h *heartbeatSender) startHeartbeatLoop(ctx context.Context, metrics *Metrics) { ctx, cancel := context.WithCancel(ctx) h.cancel = cancel h.cg = ctxgroup.WithContext(ctx) @@ -221,6 +221,7 @@ func (h *heartbeatSender) startHeartbeatLoop(ctx context.Context) { case frontier := <-h.frontierUpdates: h.frontier.Forward(frontier) } + metrics.FrontierLagSeconds.Update(timeutil.Since(h.frontier.GoTime()).Seconds()) sent, streamStatus, err := h.maybeHeartbeat(ctx, h.frontier) // TODO(casper): add unit tests to test different kinds of client errors. if err != nil { @@ -269,7 +270,7 @@ func (sf *streamIngestionFrontier) Start(ctx context.Context) { ctx = sf.StartInternal(ctx, streamIngestionFrontierProcName) sf.metrics.RunningCount.Inc(1) sf.input.Start(ctx) - sf.heartbeatSender.startHeartbeatLoop(ctx) + sf.heartbeatSender.startHeartbeatLoop(ctx, sf.metrics) } // Next is part of the RowSource interface. diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index 41614bdff589..88890ee198d2 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -1623,6 +1623,10 @@ var charts = []sectionDescription{ Title: "Frontier Checkpoint Span Count", Metrics: []string{"streaming.frontier_checkpoint_span_count"}, }, + { + Title: "Frontier Lag", + Metrics: []string{"replication.frontier_lag_seconds"}, + }, { Title: "Job Progress Updates", Metrics: []string{"streaming.job_progress_updates"},