From b74a5e2770955981aba5bb51b3bb42ab42a4a661 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Fri, 22 Sep 2023 18:03:29 -0400 Subject: [PATCH] changefeedccl: add changefeed.cloudstorage_buffered_bytes metric This change adds a metric to track the number of bytes buffered by the cloudstorage sink. Release note: None Epic: None --- docs/generated/metrics/metrics.html | 1 + pkg/ccl/changefeedccl/changefeed_test.go | 69 +++++++++++++++++++ pkg/ccl/changefeedccl/metrics.go | 23 +++++++ .../parquet_sink_cloudstorage.go | 4 +- pkg/ccl/changefeedccl/sink_cloudstorage.go | 40 ++++++++--- pkg/ccl/changefeedccl/telemetry.go | 4 ++ 6 files changed, 129 insertions(+), 12 deletions(-) diff --git a/docs/generated/metrics/metrics.html b/docs/generated/metrics/metrics.html index 36434ecd54d6..fcfdbfce6398 100644 --- a/docs/generated/metrics/metrics.html +++ b/docs/generated/metrics/metrics.html @@ -736,6 +736,7 @@ APPLICATIONchangefeed.bytes.messages_pushback_nanosTotal time spent throttled for bytes quotaNanosecondsCOUNTERNANOSECONDSAVGNON_NEGATIVE_DERIVATIVE APPLICATIONchangefeed.checkpoint_hist_nanosTime spent checkpointing changefeed progressChangefeedsHISTOGRAMNANOSECONDSAVGNONE APPLICATIONchangefeed.checkpoint_progressThe earliest timestamp of any changefeed's persisted checkpoint (values prior to this timestamp will never need to be re-emitted)Unix Timestamp NanosecondsGAUGETIMESTAMP_NSAVGNONE +APPLICATIONchangefeed.cloudstorage_buffered_bytesThe number of bytes buffered in cloudstorage sink files which have not been emitted yetBytesGAUGECOUNTAVGNONE APPLICATIONchangefeed.commit_latencyEvent commit latency: a difference between event MVCC timestamp and the time it was acknowledged by the downstream sink. If the sink batches events, then the difference between the oldest event in the batch and acknowledgement is recorded; Excludes latency during backfillNanosecondsHISTOGRAMNANOSECONDSAVGNONE APPLICATIONchangefeed.emitted_bytesBytes emitted by all feedsBytesCOUNTERBYTESAVGNON_NEGATIVE_DERIVATIVE APPLICATIONchangefeed.emitted_messagesMessages emitted by all feedsMessagesCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index bbd2f57e1b43..77e692876009 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -8871,3 +8871,72 @@ func TestChangefeedPubsubResolvedMessages(t *testing.T) { cdcTest(t, testFn, feedTestForceSink("pubsub")) } + +// TestCloudstorageBufferedBytesMetric tests the metric which tracks the number +// of buffered bytes in the cloudstorage sink. +func TestCloudstorageBufferedBytesMetric(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + rng, _ := randutil.NewTestRand() + + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + registry := s.Server.JobRegistry().(*jobs.Registry) + metrics := registry.MetricsStruct().Changefeed.(*Metrics) + defaultSLI, err := metrics.getSLIMetrics(defaultSLIScope) + require.NoError(t, err) + + knobs := s.TestingKnobs. + DistSQL.(*execinfra.TestingKnobs). + Changefeed.(*TestingKnobs) + + var shouldEmit atomic.Bool + knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) (bool, error) { + return !shouldEmit.Load(), nil + } + db := sqlutils.MakeSQLRunner(s.DB) + db.Exec(t, ` + CREATE TABLE foo (key INT PRIMARY KEY); + INSERT INTO foo (key) SELECT * FROM generate_series(1, 1000); + `) + + require.Equal(t, int64(0), defaultSLI.CloudstorageBufferedBytes.Value()) + + format := "json" + if rng.Float32() < 0.5 { + format = "parquet" + } + foo, err := f.Feed(fmt.Sprintf("CREATE CHANGEFEED FOR TABLE foo WITH format='%s'", format)) + require.NoError(t, err) + + // Because checkpoints are disabled, we should have some bytes build up + // in the sink. + targetBytes := int64(40000) + if format == "parquet" { + // Parquet is a much more efficient format, so the buffered files will + // be much smaller. + targetBytes = 2000 + } + testutils.SucceedsSoon(t, func() error { + numBytes := defaultSLI.CloudstorageBufferedBytes.Value() + if defaultSLI.CloudstorageBufferedBytes.Value() < targetBytes { + return errors.Newf("expected at least %d buffered bytes but found %d", targetBytes, numBytes) + } + return nil + }) + + // Allow checkpoints to pass through and flush the sink. We should see + // zero bytes buffered after that. + shouldEmit.Store(true) + testutils.SucceedsSoon(t, func() error { + numBytes := defaultSLI.CloudstorageBufferedBytes.Value() + if defaultSLI.CloudstorageBufferedBytes.Value() != 0 { + return errors.Newf("expected at least %d buffered bytes but found %d", 0, numBytes) + } + return nil + }) + + require.NoError(t, foo.Close()) + } + + cdcTest(t, testFn, feedTestForceSink("cloudstorage")) +} diff --git a/pkg/ccl/changefeedccl/metrics.go b/pkg/ccl/changefeedccl/metrics.go index ed9b41cb6f2e..8e42dd850b0f 100644 --- a/pkg/ccl/changefeedccl/metrics.go +++ b/pkg/ccl/changefeedccl/metrics.go @@ -71,6 +71,7 @@ type AggMetrics struct { AggregatorProgress *aggmetric.AggGauge CheckpointProgress *aggmetric.AggGauge LaggingRanges *aggmetric.AggGauge + CloudstorageBufferedBytes *aggmetric.AggGauge // There is always at least 1 sliMetrics created for defaultSLI scope. mu struct { @@ -100,6 +101,7 @@ type metricsRecorder interface { recordSizeBasedFlush() recordParallelIOQueueLatency(time.Duration) recordSinkIOInflightChange(int64) + makeCloudstorageFileAllocCallback() func(delta int64) } var _ metricsRecorder = (*sliMetrics)(nil) @@ -134,6 +136,7 @@ type sliMetrics struct { AggregatorProgress *aggmetric.Gauge CheckpointProgress *aggmetric.Gauge LaggingRanges *aggmetric.Gauge + CloudstorageBufferedBytes *aggmetric.Gauge mu struct { syncutil.Mutex @@ -207,6 +210,14 @@ func (m *sliMetrics) recordMessageSize(sz int64) { } } +func (m *sliMetrics) makeCloudstorageFileAllocCallback() func(delta int64) { + return func(delta int64) { + if m != nil { + m.CloudstorageBufferedBytes.Inc(delta) + } + } +} + func (m *sliMetrics) recordInternalRetry(numMessages int64, reducedBatchSize bool) { if m == nil { return @@ -361,6 +372,10 @@ func (w *wrappingCostController) recordMessageSize(sz int64) { w.inner.recordMessageSize(sz) } +func (w *wrappingCostController) makeCloudstorageFileAllocCallback() func(delta int64) { + return w.inner.makeCloudstorageFileAllocCallback() +} + func (w *wrappingCostController) recordInternalRetry(numMessages int64, reducedBatchSize bool) { w.inner.recordInternalRetry(numMessages, reducedBatchSize) } @@ -615,6 +630,12 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics { Measurement: "Ranges", Unit: metric.Unit_COUNT, } + metaCloudstorageBufferedBytes := metric.Metadata{ + Name: "changefeed.cloudstorage_buffered_bytes", + Help: "The number of bytes buffered in cloudstorage sink files which have not been emitted yet", + Measurement: "Bytes", + Unit: metric.Unit_COUNT, + } functionalGaugeMinFn := func(childValues []int64) int64 { var min int64 @@ -691,6 +712,7 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics { AggregatorProgress: b.FunctionalGauge(metaAggregatorProgress, functionalGaugeMinFn), CheckpointProgress: b.FunctionalGauge(metaCheckpointProgress, functionalGaugeMinFn), LaggingRanges: b.Gauge(metaLaggingRangePercentage), + CloudstorageBufferedBytes: b.Gauge(metaCloudstorageBufferedBytes), } a.mu.sliMetrics = make(map[string]*sliMetrics) _, err := a.getOrCreateScope(defaultSLIScope) @@ -751,6 +773,7 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) { SchemaRegistryRetries: a.SchemaRegistryRetries.AddChild(scope), SchemaRegistrations: a.SchemaRegistrations.AddChild(scope), LaggingRanges: a.LaggingRanges.AddChild(scope), + CloudstorageBufferedBytes: a.CloudstorageBufferedBytes.AddChild(scope), } sm.mu.resolved = make(map[int64]hlc.Timestamp) sm.mu.checkpoint = make(map[int64]hlc.Timestamp) diff --git a/pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go b/pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go index b38f8777923b..1a572123d476 100644 --- a/pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go +++ b/pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go @@ -195,7 +195,7 @@ func (parquetSink *parquetCloudStorageSink) EncodeAndEmitRow( if err != nil { return err } - file.alloc.Merge(&alloc) + file.mergeAlloc(&alloc) if file.parquetCodec == nil { var err error @@ -231,7 +231,7 @@ func (parquetSink *parquetCloudStorageSink) EncodeAndEmitRow( // the number of bytes in the file and the number of buffered bytes. prevAllocSize := file.alloc.Bytes() newAllocSize := int64(file.buf.Len()) + bufferedBytesEstimate - file.alloc.AdjustBytesToTarget(ctx, newAllocSize) + file.adjustBytesToTarget(ctx, newAllocSize) if log.V(1) && parquetSink.everyN.ShouldLog() { log.Infof(ctx, "topic: %d/%d, written: %d, buffered %d, new alloc: %d, old alloc: %d", diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage.go b/pkg/ccl/changefeedccl/sink_cloudstorage.go index 3445251088c7..50ada755ef39 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage.go @@ -70,14 +70,33 @@ func cloudStorageFormatTime(ts hlc.Timestamp) string { type cloudStorageSinkFile struct { cloudStorageSinkKey - created time.Time - codec io.WriteCloser - rawSize int - numMessages int - buf bytes.Buffer - alloc kvevent.Alloc - oldestMVCC hlc.Timestamp - parquetCodec *parquetWriter + created time.Time + codec io.WriteCloser + rawSize int + numMessages int + buf bytes.Buffer + alloc kvevent.Alloc + oldestMVCC hlc.Timestamp + parquetCodec *parquetWriter + allocCallback func(delta int64) +} + +func (f *cloudStorageSinkFile) mergeAlloc(other *kvevent.Alloc) { + prev := f.alloc.Bytes() + f.alloc.Merge(other) + f.allocCallback(f.alloc.Bytes() - prev) +} + +func (f *cloudStorageSinkFile) releaseAlloc(ctx context.Context) { + prev := f.alloc.Bytes() + f.alloc.Release(ctx) + f.allocCallback(-prev) +} + +func (f *cloudStorageSinkFile) adjustBytesToTarget(ctx context.Context, targetBytes int64) { + prev := f.alloc.Bytes() + f.alloc.AdjustBytesToTarget(ctx, targetBytes) + f.allocCallback(f.alloc.Bytes() - prev) } var _ io.Writer = &cloudStorageSinkFile{} @@ -510,6 +529,7 @@ func (s *cloudStorageSink) getOrCreateFile( created: timeutil.Now(), cloudStorageSinkKey: key, oldestMVCC: eventMVCC, + allocCallback: s.metrics.makeCloudstorageFileAllocCallback(), } if s.compression.enabled() { @@ -557,7 +577,7 @@ func (s *cloudStorageSink) EmitRow( if err != nil { return err } - file.alloc.Merge(&alloc) + file.mergeAlloc(&alloc) if _, err := file.Write(value); err != nil { return err @@ -814,7 +834,7 @@ func (s *cloudStorageSink) asyncFlusher(ctx context.Context) error { func (f *cloudStorageSinkFile) flushToStorage( ctx context.Context, es cloud.ExternalStorage, dest string, m metricsRecorder, ) error { - defer f.alloc.Release(ctx) + defer f.releaseAlloc(ctx) if f.rawSize == 0 { // This method shouldn't be called with an empty file, but be defensive diff --git a/pkg/ccl/changefeedccl/telemetry.go b/pkg/ccl/changefeedccl/telemetry.go index 906b634a29d9..77aca8496e15 100644 --- a/pkg/ccl/changefeedccl/telemetry.go +++ b/pkg/ccl/changefeedccl/telemetry.go @@ -146,6 +146,10 @@ func (r *telemetryMetricsRecorder) recordMessageSize(sz int64) { r.inner.recordMessageSize(sz) } +func (r *telemetryMetricsRecorder) makeCloudstorageFileAllocCallback() func(delta int64) { + return r.inner.makeCloudstorageFileAllocCallback() +} + func (r *telemetryMetricsRecorder) recordInternalRetry(numMessages int64, reducedBatchSize bool) { r.inner.recordInternalRetry(numMessages, reducedBatchSize) }