Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
95248: streamingest: add a replication lag metric r=lidorcarmel a=lidorcarmel

Add a metric to track the lag of the replication frontier, in seconds.

Informs: #92959

Epic: CRDB-18752

Release note: None

95406: sql: retry query requiring cluster setting propagation r=cucaroach a=cucaroach

Fixes: #95359
Release note: None
Epic: none


95453: changefeedccl: nilsafe kafkaSink.Close r=[miretskiy] a=HonoreDB

We were tolerating nils in other parts of Close, but not here, which can create a race condition if a sink gets closed before being fully initialized resulting in a panic.

Fixes #95278
Release note: None

Co-authored-by: Lidor Carmel <[email protected]>
Co-authored-by: Tommy Reilly <[email protected]>
Co-authored-by: Aaron Zinger <[email protected]>
  • Loading branch information
4 people committed Jan 18, 2023
4 parents 85b40c7 + e5cc6d2 + 9e5bca6 + 0572cf5 commit 075b319
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 3 deletions.
6 changes: 4 additions & 2 deletions pkg/ccl/changefeedccl/sink_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,10 @@ func (s *kafkaSink) newSyncProducer(client kafkaClient) (sarama.SyncProducer, er

// Close implements the Sink interface.
func (s *kafkaSink) Close() error {
close(s.stopWorkerCh)
s.worker.Wait()
if s.stopWorkerCh != nil {
close(s.stopWorkerCh)
s.worker.Wait()
}

if s.producer != nil {
// Ignore errors related to outstanding messages since we're either shutting
Expand Down
8 changes: 8 additions & 0 deletions pkg/ccl/streamingccl/streamingest/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ var (
Measurement: "Resolved Spans",
Unit: metric.Unit_COUNT,
}
metaFrontierLagSeconds = metric.Metadata{
Name: "streaming.frontier_lag_seconds",
Help: "Time the replication frontier lags",
Measurement: "Seconds",
Unit: metric.Unit_SECONDS,
}
metaJobProgressUpdates = metric.Metadata{
Name: "streaming.job_progress_updates",
Help: "Total number of updates to the ingestion job progress",
Expand All @@ -115,6 +121,7 @@ type Metrics struct {
LatestDataCheckpointSpan *metric.Gauge
DataCheckpointSpanCount *metric.Gauge
FrontierCheckpointSpanCount *metric.Gauge
FrontierLagSeconds *metric.GaugeFloat64
}

// MetricStruct implements the metric.Struct interface.
Expand All @@ -139,6 +146,7 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct {
LatestDataCheckpointSpan: metric.NewGauge(metaLatestDataCheckpointSpan),
DataCheckpointSpanCount: metric.NewGauge(metaDataCheckpointSpanCount),
FrontierCheckpointSpanCount: metric.NewGauge(metaFrontierCheckpointSpanCount),
FrontierLagSeconds: metric.NewGaugeFloat64(metaFrontierLagSeconds),
}
return m
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ func (sf *streamIngestionFrontier) maybeUpdatePartitionProgress() error {
sf.metrics.JobProgressUpdates.Inc(1)
sf.persistedHighWater = f.Frontier()
sf.metrics.FrontierCheckpointSpanCount.Update(int64(len(frontierResolvedSpans)))
sf.metrics.FrontierLagSeconds.Update(timeutil.Since(sf.persistedHighWater.GoTime()).Seconds())

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ SELECT k FROM geo_table WHERE ST_DWithin('POINT(2.5 2.5)'::geometry, geom, 1) OR
statement ok
SET CLUSTER SETTING sql.spatial.experimental_box2d_comparison_operators.enabled = on

query I
query I retry
SELECT k FROM geo_table WHERE 'POINT(3.0 3.0)'::geometry && geom ORDER BY k
----
3
Expand Down
4 changes: 4 additions & 0 deletions pkg/ts/catalog/chart_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -1623,6 +1623,10 @@ var charts = []sectionDescription{
Title: "Frontier Checkpoint Span Count",
Metrics: []string{"streaming.frontier_checkpoint_span_count"},
},
{
Title: "Frontier Lag",
Metrics: []string{"streaming.frontier_lag_seconds"},
},
{
Title: "Job Progress Updates",
Metrics: []string{"streaming.job_progress_updates"},
Expand Down

0 comments on commit 075b319

Please sign in to comment.