Skip to content

Commit

Permalink
Merge pull request #134674 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-24.3-134090

release-24.3: crosscluster: monitor lagging spans
  • Loading branch information
msbutler authored Nov 8, 2024
2 parents d8ebfbe + 9eb6c8b commit c193ab9
Show file tree
Hide file tree
Showing 9 changed files with 325 additions and 20 deletions.
4 changes: 4 additions & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -1500,6 +1500,8 @@
<tr><td>APPLICATION</td><td>kv.protectedts.reconciliation.records_processed</td><td>number of records processed without error during reconciliation on this node</td><td>Count</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>kv.protectedts.reconciliation.records_removed</td><td>number of records removed during reconciliation runs on this node</td><td>Count</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.batch_hist_nanos</td><td>Time spent flushing a batch</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.catchup_ranges</td><td>Source side ranges undergoing catch up scans (innacurate with multiple LDR jobs)</td><td>Ranges</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.catchup_ranges_by_label</td><td>Source side ranges undergoing catch up scans</td><td>Ranges</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.checkpoint_events_ingested</td><td>Checkpoint events ingested by all replication jobs</td><td>Events</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.commit_latency</td><td>Event commit latency: a difference between event MVCC timestamp and the time it was flushed into disk. If we batch events, then the difference between the oldest event in the batch and flush is recorded</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.events_dlqed</td><td>Row update events sent to DLQ</td><td>Failures</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
Expand All @@ -1521,6 +1523,8 @@
<tr><td>APPLICATION</td><td>logical_replication.replicated_time_seconds</td><td>The replicated time of the logical replication stream in seconds since the unix epoch.</td><td>Seconds</td><td>GAUGE</td><td>SECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.retry_queue_bytes</td><td>The replicated time of the logical replication stream in seconds since the unix epoch.</td><td>Bytes</td><td>GAUGE</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.retry_queue_events</td><td>The replicated time of the logical replication stream in seconds since the unix epoch.</td><td>Events</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.scanning_ranges</td><td>Source side ranges undergoing an initial scan (innacurate with multiple LDR jobs)</td><td>Ranges</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.scanning_ranges_by_label</td><td>Source side ranges undergoing an initial scan</td><td>Ranges</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>obs.tablemetadata.update_job.duration</td><td>Time spent running the update table metadata job.</td><td>Duration</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>obs.tablemetadata.update_job.errors</td><td>The total number of errors that have been emitted from the update table metadata job.</td><td>Errors</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>obs.tablemetadata.update_job.runs</td><td>The total number of runs of the update table metadata job.</td><td>Executions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/crosscluster/logical/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
"lww_row_processor.go",
"metrics.go",
"purgatory.go",
"range_stats.go",
"udf_row_processor.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/logical",
Expand Down Expand Up @@ -83,11 +84,13 @@ go_library(
"//pkg/util/randutil",
"//pkg/util/retry",
"//pkg/util/span",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
"@com_github_gogo_protobuf//types",
"@com_github_lib_pq//oid",
],
)
Expand All @@ -101,6 +104,7 @@ go_test(
"lww_row_processor_test.go",
"main_test.go",
"purgatory_test.go",
"range_stats_test.go",
"udf_row_processor_test.go",
],
data = ["//c-deps:libgeos"],
Expand Down
60 changes: 49 additions & 11 deletions pkg/ccl/crosscluster/logical/logical_replication_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package logical

import (
"context"
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
Expand Down Expand Up @@ -41,6 +40,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
pbtypes "github.com/gogo/protobuf/types"
"github.com/lib/pq/oid"
)

Expand Down Expand Up @@ -207,23 +207,19 @@ func (r *logicalReplicationResumer) ingest(
}()

execPlan := func(ctx context.Context) error {

metaFn := func(_ context.Context, meta *execinfrapb.ProducerMetadata) error {
log.VInfof(ctx, 2, "received producer meta: %v", meta)
return nil
}
rh := rowHandler{
replicatedTimeAtStart: replicatedTimeAtStart,
frontier: frontier,
metrics: metrics,
settings: &execCfg.Settings.SV,
job: r.job,
frontierUpdates: heartbeatSender.FrontierUpdates,
rangeStats: newRangeStatsCollector(planInfo.writeProcessorCount),
}
rowResultWriter := sql.NewCallbackResultWriter(rh.handleRow)
distSQLReceiver := sql.MakeDistSQLReceiver(
ctx,
sql.NewMetadataCallbackWriter(rowResultWriter, metaFn),
sql.NewMetadataCallbackWriter(rowResultWriter, rh.handleMeta),
tree.Rows,
execCfg.RangeDescriptorCache,
nil, /* txn */
Expand Down Expand Up @@ -292,9 +288,10 @@ type logicalReplicationPlanner struct {
}

type logicalReplicationPlanInfo struct {
sourceSpans []roachpb.Span
streamAddress []string
destTableBySrcID map[descpb.ID]dstTableMetadata
sourceSpans []roachpb.Span
streamAddress []string
destTableBySrcID map[descpb.ID]dstTableMetadata
writeProcessorCount int
}

func makeLogicalReplicationPlanner(
Expand Down Expand Up @@ -455,6 +452,7 @@ func (p *logicalReplicationPlanner) generatePlanImpl(
LogicalReplicationWriter: &sp,
},
})
info.writeProcessorCount++
}
}

Expand All @@ -481,9 +479,27 @@ type rowHandler struct {
job *jobs.Job
frontierUpdates chan hlc.Timestamp

rangeStats rangeStatsByProcessorID

lastPartitionUpdate time.Time
}

func (rh *rowHandler) handleMeta(ctx context.Context, meta *execinfrapb.ProducerMetadata) error {
if meta.BulkProcessorProgress == nil {
log.VInfof(ctx, 2, "received non progress producer meta: %v", meta)
return nil
}

var stats streampb.StreamEvent_RangeStats
if err := pbtypes.UnmarshalAny(&meta.BulkProcessorProgress.ProgressDetails, &stats); err != nil {
return errors.Wrap(err, "unable to unmarshal progress details")
}

rh.rangeStats.Add(meta.BulkProcessorProgress.ProcessorID, &stats)

return nil
}

func (rh *rowHandler) handleRow(ctx context.Context, row tree.Datums) error {
raw, ok := row[0].(*tree.DBytes)
if !ok {
Expand Down Expand Up @@ -523,6 +539,10 @@ func (rh *rowHandler) handleRow(ctx context.Context, row tree.Datums) error {
progress := md.Progress
prog := progress.Details.(*jobspb.Progress_LogicalReplication).LogicalReplication
prog.Checkpoint.ResolvedSpans = frontierResolvedSpans

// TODO (msbutler): add ldr initial and lagging range timeseries metrics.
aggRangeStats, fractionCompleted, status := rh.rangeStats.RollupStats()

if rh.replicatedTimeAtStart.Less(replicatedTime) {
prog.ReplicatedTime = replicatedTime
// The HighWater is for informational purposes
Expand All @@ -531,13 +551,31 @@ func (rh *rowHandler) handleRow(ctx context.Context, row tree.Datums) error {
HighWater: &replicatedTime,
}
}
progress.RunningStatus = fmt.Sprintf("logical replication running: %s", replicatedTime.GoTime())
progress.RunningStatus = status
if fractionCompleted > 0 && fractionCompleted < 1 {
// If 0, the coordinator has not gotten a complete range stats update
// from all nodes yet.
//
// If 1, the job is all caught up.
progress.Progress = &jobspb.Progress_FractionCompleted{
FractionCompleted: fractionCompleted,
}
}
ju.UpdateProgress(progress)
if md.RunStats != nil && md.RunStats.NumRuns > 1 {
ju.UpdateRunStats(1, md.RunStats.LastRun)
}
if l := rh.job.Details().(jobspb.LogicalReplicationDetails).MetricsLabel; l != "" {
rh.metrics.LabeledReplicatedTime.Update(map[string]string{"label": l}, replicatedTime.GoTime().Unix())

if aggRangeStats.RangeCount != 0 {
rh.metrics.LabeledScanningRanges.Update(map[string]string{"label": l}, aggRangeStats.ScanningRangeCount)
rh.metrics.LabeledCatchupRanges.Update(map[string]string{"label": l}, aggRangeStats.LaggingRangeCount)
}
}
if aggRangeStats.RangeCount != 0 {
rh.metrics.ScanningRanges.Update(aggRangeStats.ScanningRangeCount)
rh.metrics.CatchupRanges.Update(aggRangeStats.LaggingRangeCount)
}
return nil
}); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
pbtypes "github.com/gogo/protobuf/types"
)

var logicalReplicationWriterResultType = []*types.T{
Expand Down Expand Up @@ -83,6 +84,7 @@ var maxChunkSize = settings.RegisterIntSetting(
// by decoding kvs in it to logical changes and applying them by executing DMLs.
type logicalReplicationWriterProcessor struct {
execinfra.ProcessorBase
processorID int32

spec execinfrapb.LogicalReplicationWriterSpec

Expand Down Expand Up @@ -113,6 +115,8 @@ type logicalReplicationWriterProcessor struct {

checkpointCh chan []jobspb.ResolvedSpan

rangeStatsCh chan *streampb.StreamEvent_RangeStats

// metrics are monitoring all running ingestion jobs.
metrics *Metrics

Expand Down Expand Up @@ -187,6 +191,7 @@ func newLogicalReplicationWriterProcessor(
lrw := &logicalReplicationWriterProcessor{
configByTable: procConfigByDestTableID,
spec: spec,
processorID: processorID,
getBatchSize: func() int {
// TODO(ssd): We set this to 1 since putting more than 1
// row in a KV batch using the new ConditionalPut-based
Expand Down Expand Up @@ -221,6 +226,7 @@ func newLogicalReplicationWriterProcessor(
frontier: frontier,
stopCh: make(chan struct{}),
checkpointCh: make(chan []jobspb.ResolvedSpan),
rangeStatsCh: make(chan *streampb.StreamEvent_RangeStats),
errCh: make(chan error, 1),
logBufferEvery: log.Every(30 * time.Second),
debug: streampb.DebugLogicalConsumerStatus{
Expand Down Expand Up @@ -382,6 +388,13 @@ func (lrw *logicalReplicationWriterProcessor) Next() (
return nil, lrw.DrainHelper()
}
}
case stats := <-lrw.rangeStatsCh:
meta, err := lrw.newRangeStatsProgressMeta(stats)
if err != nil {
lrw.MoveToDrainingAndLogError(err)
return nil, lrw.DrainHelper()
}
return nil, meta
case err := <-lrw.errCh:
lrw.MoveToDrainingAndLogError(err)
return nil, lrw.DrainHelper()
Expand Down Expand Up @@ -494,7 +507,7 @@ func (lrw *logicalReplicationWriterProcessor) handleEvent(
return err
}
case crosscluster.CheckpointEvent:
if err := lrw.maybeCheckpoint(ctx, event.GetCheckpoint().ResolvedSpans); err != nil {
if err := lrw.maybeCheckpoint(ctx, event.GetCheckpoint()); err != nil {
return err
}
case crosscluster.SSTableEvent, crosscluster.DeleteRangeEvent:
Expand All @@ -512,16 +525,65 @@ func (lrw *logicalReplicationWriterProcessor) handleEvent(
}

func (lrw *logicalReplicationWriterProcessor) maybeCheckpoint(
ctx context.Context, resolvedSpans []jobspb.ResolvedSpan,
ctx context.Context, checkpoint *streampb.StreamEvent_StreamCheckpoint,
) error {
// If the checkpoint contains stats publish them to the coordinator. The
// stats ignore purgatory because they:
// 1. Track the status of the producer scans
// 2. Are intended for monitoring and don't need to reflect the committed
// state of the write processor.
//
// RangeStats may be nil if the producer does not support the stats field or
// the the producer has not finished counting the ranges.
if checkpoint.RangeStats != nil {
err := lrw.rangeStats(ctx, checkpoint.RangeStats)
if err != nil {
return err
}
}

// If purgatory is non-empty, it intercepts the checkpoint and then we can try
// to drain it.
if !lrw.purgatory.Empty() {
lrw.purgatory.Checkpoint(ctx, resolvedSpans)
lrw.purgatory.Checkpoint(ctx, checkpoint.ResolvedSpans)
return lrw.purgatory.Drain(ctx)
}

return lrw.checkpoint(ctx, resolvedSpans)
return lrw.checkpoint(ctx, checkpoint.ResolvedSpans)
}

func (lrw *logicalReplicationWriterProcessor) rangeStats(
ctx context.Context, stats *streampb.StreamEvent_RangeStats,
) error {
select {
case <-ctx.Done():
return ctx.Err()
case lrw.rangeStatsCh <- stats:
return nil
case <-lrw.stopCh:
// We need to select on stopCh here because the reader
// of rangestatsCh is the caller of Next(). But there
// might never be another Next() call since it may
// have exited based on an error.
return nil
}
}

func (lrw *logicalReplicationWriterProcessor) newRangeStatsProgressMeta(
stats *streampb.StreamEvent_RangeStats,
) (*execinfrapb.ProducerMetadata, error) {
asAny, err := pbtypes.MarshalAny(stats)
if err != nil {
return nil, errors.Wrap(err, "unable to convert stats into any proto")
}
return &execinfrapb.ProducerMetadata{
BulkProcessorProgress: &execinfrapb.RemoteProducerMetadata_BulkProcessorProgress{
NodeID: lrw.FlowCtx.NodeID.SQLInstanceID(),
FlowID: lrw.FlowCtx.ID,
ProcessorID: lrw.ProcessorID,
ProgressDetails: *asAny,
},
}, nil
}

func (lrw *logicalReplicationWriterProcessor) checkpoint(
Expand Down
34 changes: 34 additions & 0 deletions pkg/ccl/crosscluster/logical/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,18 @@ var (
Measurement: "Events",
Unit: metric.Unit_COUNT,
}
metaScanningRanges = metric.Metadata{
Name: "logical_replication.scanning_ranges",
Help: "Source side ranges undergoing an initial scan (innacurate with multiple LDR jobs)",
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
}
metaCatchupRanges = metric.Metadata{
Name: "logical_replication.catchup_ranges",
Help: "Source side ranges undergoing catch up scans (innacurate with multiple LDR jobs)",
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
}

// Labeled metrics.
metaLabeledReplicatedTime = metric.Metadata{
Expand All @@ -154,6 +166,18 @@ var (
Measurement: "Failures",
Unit: metric.Unit_COUNT,
}
metaLabeledScanningRanges = metric.Metadata{
Name: "logical_replication.scanning_ranges_by_label",
Help: "Source side ranges undergoing an initial scan",
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
}
metaLabeledCatchupRanges = metric.Metadata{
Name: "logical_replication.catchup_ranges_by_label",
Help: "Source side ranges undergoing catch up scans",
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
}
)

// Metrics are for production monitoring of logical replication jobs.
Expand Down Expand Up @@ -182,6 +206,9 @@ type Metrics struct {
RetriedApplySuccesses *metric.Counter
RetriedApplyFailures *metric.Counter

ScanningRanges *metric.Gauge
CatchupRanges *metric.Gauge

// Internal numbers that are useful for determining why a stream is behaving
// a specific way.
CheckpointEvents *metric.Counter
Expand All @@ -193,6 +220,8 @@ type Metrics struct {
LabeledReplicatedTime *metric.GaugeVec
LabeledEventsIngested *metric.CounterVec
LabeledEventsDLQed *metric.CounterVec
LabeledScanningRanges *metric.GaugeVec
LabeledCatchupRanges *metric.GaugeVec
}

// MetricStruct implements the metric.Struct interface.
Expand Down Expand Up @@ -233,9 +262,14 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct {
KVUpdateTooOld: metric.NewCounter(metaKVUpdateTooOld),
KVValueRefreshes: metric.NewCounter(metaKVValueRefreshes),

ScanningRanges: metric.NewGauge(metaScanningRanges),
CatchupRanges: metric.NewGauge(metaCatchupRanges),

// Labeled export-only metrics.
LabeledReplicatedTime: metric.NewExportedGaugeVec(metaLabeledReplicatedTime, []string{"label"}),
LabeledEventsIngested: metric.NewExportedCounterVec(metaLabeledEventsIngetsted, []string{"label"}),
LabeledEventsDLQed: metric.NewExportedCounterVec(metaLabeledEventsDLQed, []string{"label"}),
LabeledScanningRanges: metric.NewExportedGaugeVec(metaLabeledScanningRanges, []string{"label"}),
LabeledCatchupRanges: metric.NewExportedGaugeVec(metaLabeledCatchupRanges, []string{"label"}),
}
}
Loading

0 comments on commit c193ab9

Please sign in to comment.