Skip to content

Commit

Permalink
streamingest: add a replication lag metric
Browse files Browse the repository at this point in the history
Add a metric to track the lag of the replication frontier, in seconds.

Informs: #92959

Epic: CRDB-18752

Release note: None
  • Loading branch information
lidorcarmel committed Jan 17, 2023
1 parent 6f5d89e commit 81d5e48
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 2 deletions.
8 changes: 8 additions & 0 deletions pkg/ccl/streamingccl/streamingest/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ var (
Measurement: "Resolved Spans",
Unit: metric.Unit_COUNT,
}
metaFrontierLagSeconds = metric.Metadata{
Name: "replication.frontier_lag_seconds",
Help: "Time the replication frontier lags",
Measurement: "Seconds",
Unit: metric.Unit_SECONDS,
}
metaJobProgressUpdates = metric.Metadata{
Name: "streaming.job_progress_updates",
Help: "Total number of updates to the ingestion job progress",
Expand All @@ -115,6 +121,7 @@ type Metrics struct {
LatestDataCheckpointSpan *metric.Gauge
DataCheckpointSpanCount *metric.Gauge
FrontierCheckpointSpanCount *metric.Gauge
FrontierLagSeconds *metric.GaugeFloat64
}

// MetricStruct implements the metric.Struct interface.
Expand All @@ -139,6 +146,7 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct {
LatestDataCheckpointSpan: metric.NewGauge(metaLatestDataCheckpointSpan),
DataCheckpointSpanCount: metric.NewGauge(metaDataCheckpointSpanCount),
FrontierCheckpointSpanCount: metric.NewGauge(metaFrontierCheckpointSpanCount),
FrontierLagSeconds: metric.NewGaugeFloat64(metaFrontierLagSeconds),
}
return m
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (h *heartbeatSender) maybeHeartbeat(
return true, s, err
}

func (h *heartbeatSender) startHeartbeatLoop(ctx context.Context) {
func (h *heartbeatSender) startHeartbeatLoop(ctx context.Context, metrics *Metrics) {
ctx, cancel := context.WithCancel(ctx)
h.cancel = cancel
h.cg = ctxgroup.WithContext(ctx)
Expand All @@ -221,6 +221,7 @@ func (h *heartbeatSender) startHeartbeatLoop(ctx context.Context) {
case frontier := <-h.frontierUpdates:
h.frontier.Forward(frontier)
}
metrics.FrontierLagSeconds.Update(timeutil.Since(h.frontier.GoTime()).Seconds())
sent, streamStatus, err := h.maybeHeartbeat(ctx, h.frontier)
// TODO(casper): add unit tests to test different kinds of client errors.
if err != nil {
Expand Down Expand Up @@ -269,7 +270,7 @@ func (sf *streamIngestionFrontier) Start(ctx context.Context) {
ctx = sf.StartInternal(ctx, streamIngestionFrontierProcName)
sf.metrics.RunningCount.Inc(1)
sf.input.Start(ctx)
sf.heartbeatSender.startHeartbeatLoop(ctx)
sf.heartbeatSender.startHeartbeatLoop(ctx, sf.metrics)
}

// Next is part of the RowSource interface.
Expand Down
4 changes: 4 additions & 0 deletions pkg/ts/catalog/chart_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -1623,6 +1623,10 @@ var charts = []sectionDescription{
Title: "Frontier Checkpoint Span Count",
Metrics: []string{"streaming.frontier_checkpoint_span_count"},
},
{
Title: "Frontier Lag",
Metrics: []string{"replication.frontier_lag_seconds"},
},
{
Title: "Job Progress Updates",
Metrics: []string{"streaming.job_progress_updates"},
Expand Down

0 comments on commit 81d5e48

Please sign in to comment.