Skip to content

Commit

Permalink
changefeedccl: add changefeed.cloudstorage_buffered_bytes metric
Browse files Browse the repository at this point in the history
This change adds a metric to track the number of bytes buffered by
the cloudstorage sink.

Release note: None
Epic: None
  • Loading branch information
jayshrivastava committed Sep 25, 2023
1 parent fa5bedb commit f6b7212
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 12 deletions.
1 change: 1 addition & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,7 @@
<tr><td>APPLICATION</td><td>changefeed.bytes.messages_pushback_nanos</td><td>Total time spent throttled for bytes quota</td><td>Nanoseconds</td><td>COUNTER</td><td>NANOSECONDS</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.checkpoint_hist_nanos</td><td>Time spent checkpointing changefeed progress</td><td>Changefeeds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.checkpoint_progress</td><td>The earliest timestamp of any changefeed&#39;s persisted checkpoint (values prior to this timestamp will never need to be re-emitted)</td><td>Unix Timestamp Nanoseconds</td><td>GAUGE</td><td>TIMESTAMP_NS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.cloudstorage_buffered_bytes</td><td>The number of bytes buffered in cloudstorage sink files which have not been emitted yet</td><td>Bytes</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.commit_latency</td><td>Event commit latency: a difference between event MVCC timestamp and the time it was acknowledged by the downstream sink. If the sink batches events, then the difference between the oldest event in the batch and acknowledgement is recorded; Excludes latency during backfill</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.emitted_bytes</td><td>Bytes emitted by all feeds</td><td>Bytes</td><td>COUNTER</td><td>BYTES</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.emitted_messages</td><td>Messages emitted by all feeds</td><td>Messages</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
Expand Down
69 changes: 69 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8867,3 +8867,72 @@ func TestChangefeedPubsubResolvedMessages(t *testing.T) {

cdcTest(t, testFn, feedTestForceSink("pubsub"))
}

// TestCloudstorageBufferedBytesMetric tests the metric which tracks the number
// of buffered bytes in the cloudstorage sink.
func TestCloudstorageBufferedBytesMetric(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
rng, _ := randutil.NewTestRand()

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
registry := s.Server.JobRegistry().(*jobs.Registry)
metrics := registry.MetricsStruct().Changefeed.(*Metrics)
defaultSLI, err := metrics.getSLIMetrics(defaultSLIScope)
require.NoError(t, err)

knobs := s.TestingKnobs.
DistSQL.(*execinfra.TestingKnobs).
Changefeed.(*TestingKnobs)

var shouldEmit atomic.Bool
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) (bool, error) {
return !shouldEmit.Load(), nil
}
db := sqlutils.MakeSQLRunner(s.DB)
db.Exec(t, `
CREATE TABLE foo (key INT PRIMARY KEY);
INSERT INTO foo (key) SELECT * FROM generate_series(1, 1000);
`)

require.Equal(t, int64(0), defaultSLI.CloudstorageBufferedBytes.Value())

format := "json"
if rng.Float32() < 0.5 {
format = "parquet"
}
foo, err := f.Feed(fmt.Sprintf("CREATE CHANGEFEED FOR TABLE foo WITH format='%s'", format))
require.NoError(t, err)

// Because checkpoints are disabled, we should have some bytes build up
// in the sink.
targetBytes := int64(40000)
if format == "parquet" {
// Parquet is a much more efficient format, so the buffered files will
// be much smaller.
targetBytes = 2000
}
testutils.SucceedsSoon(t, func() error {
numBytes := defaultSLI.CloudstorageBufferedBytes.Value()
if defaultSLI.CloudstorageBufferedBytes.Value() < targetBytes {
return errors.Newf("expected at least %d buffered bytes but found %d", targetBytes, numBytes)
}
return nil
})

// Allow checkpoints to pass through and flush the sink. We should see
// zero bytes buffered after that.
shouldEmit.Store(true)
testutils.SucceedsSoon(t, func() error {
numBytes := defaultSLI.CloudstorageBufferedBytes.Value()
if defaultSLI.CloudstorageBufferedBytes.Value() != 0 {
return errors.Newf("expected at least %d buffered bytes but found %d", 0, numBytes)
}
return nil
})

require.NoError(t, foo.Close())
}

cdcTest(t, testFn, feedTestForceSink("cloudstorage"))
}
23 changes: 23 additions & 0 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type AggMetrics struct {
AggregatorProgress *aggmetric.AggGauge
CheckpointProgress *aggmetric.AggGauge
LaggingRanges *aggmetric.AggGauge
CloudstorageBufferedBytes *aggmetric.AggGauge

// There is always at least 1 sliMetrics created for defaultSLI scope.
mu struct {
Expand Down Expand Up @@ -100,6 +101,7 @@ type metricsRecorder interface {
recordSizeBasedFlush()
recordParallelIOQueueLatency(time.Duration)
recordSinkIOInflightChange(int64)
makeCloudstorageFileAllocCallback() func(delta int64)
}

var _ metricsRecorder = (*sliMetrics)(nil)
Expand Down Expand Up @@ -134,6 +136,7 @@ type sliMetrics struct {
AggregatorProgress *aggmetric.Gauge
CheckpointProgress *aggmetric.Gauge
LaggingRanges *aggmetric.Gauge
CloudstorageBufferedBytes *aggmetric.Gauge

mu struct {
syncutil.Mutex
Expand Down Expand Up @@ -207,6 +210,14 @@ func (m *sliMetrics) recordMessageSize(sz int64) {
}
}

func (m *sliMetrics) makeCloudstorageFileAllocCallback() func(delta int64) {
return func(delta int64) {
if m != nil {
m.CloudstorageBufferedBytes.Inc(delta)
}
}
}

func (m *sliMetrics) recordInternalRetry(numMessages int64, reducedBatchSize bool) {
if m == nil {
return
Expand Down Expand Up @@ -361,6 +372,10 @@ func (w *wrappingCostController) recordMessageSize(sz int64) {
w.inner.recordMessageSize(sz)
}

func (w *wrappingCostController) makeCloudstorageFileAllocCallback() func(delta int64) {
return w.inner.makeCloudstorageFileAllocCallback()
}

func (w *wrappingCostController) recordInternalRetry(numMessages int64, reducedBatchSize bool) {
w.inner.recordInternalRetry(numMessages, reducedBatchSize)
}
Expand Down Expand Up @@ -615,6 +630,12 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics {
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
}
metaCloudstorageBufferedBytes := metric.Metadata{
Name: "changefeed.cloudstorage_buffered_bytes",
Help: "The number of bytes buffered in cloudstorage sink files which have not been emitted yet",
Measurement: "Bytes",
Unit: metric.Unit_COUNT,
}

functionalGaugeMinFn := func(childValues []int64) int64 {
var min int64
Expand Down Expand Up @@ -691,6 +712,7 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics {
AggregatorProgress: b.FunctionalGauge(metaAggregatorProgress, functionalGaugeMinFn),
CheckpointProgress: b.FunctionalGauge(metaCheckpointProgress, functionalGaugeMinFn),
LaggingRanges: b.Gauge(metaLaggingRangePercentage),
CloudstorageBufferedBytes: b.Gauge(metaCloudstorageBufferedBytes),
}
a.mu.sliMetrics = make(map[string]*sliMetrics)
_, err := a.getOrCreateScope(defaultSLIScope)
Expand Down Expand Up @@ -751,6 +773,7 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
SchemaRegistryRetries: a.SchemaRegistryRetries.AddChild(scope),
SchemaRegistrations: a.SchemaRegistrations.AddChild(scope),
LaggingRanges: a.LaggingRanges.AddChild(scope),
CloudstorageBufferedBytes: a.CloudstorageBufferedBytes.AddChild(scope),
}
sm.mu.resolved = make(map[int64]hlc.Timestamp)
sm.mu.checkpoint = make(map[int64]hlc.Timestamp)
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (parquetSink *parquetCloudStorageSink) EncodeAndEmitRow(
if err != nil {
return err
}
file.alloc.Merge(&alloc)
file.mergeAlloc(&alloc)

if file.parquetCodec == nil {
var err error
Expand Down Expand Up @@ -231,7 +231,7 @@ func (parquetSink *parquetCloudStorageSink) EncodeAndEmitRow(
// the number of bytes in the file and the number of buffered bytes.
prevAllocSize := file.alloc.Bytes()
newAllocSize := int64(file.buf.Len()) + bufferedBytesEstimate
file.alloc.AdjustBytesToTarget(ctx, newAllocSize)
file.adjustBytesToTarget(ctx, newAllocSize)

if log.V(1) && parquetSink.everyN.ShouldLog() {
log.Infof(ctx, "topic: %d/%d, written: %d, buffered %d, new alloc: %d, old alloc: %d",
Expand Down
40 changes: 30 additions & 10 deletions pkg/ccl/changefeedccl/sink_cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,33 @@ func cloudStorageFormatTime(ts hlc.Timestamp) string {

type cloudStorageSinkFile struct {
cloudStorageSinkKey
created time.Time
codec io.WriteCloser
rawSize int
numMessages int
buf bytes.Buffer
alloc kvevent.Alloc
oldestMVCC hlc.Timestamp
parquetCodec *parquetWriter
created time.Time
codec io.WriteCloser
rawSize int
numMessages int
buf bytes.Buffer
alloc kvevent.Alloc
oldestMVCC hlc.Timestamp
parquetCodec *parquetWriter
allocCallback func(delta int64)
}

func (f *cloudStorageSinkFile) mergeAlloc(other *kvevent.Alloc) {
prev := f.alloc.Bytes()
f.alloc.Merge(other)
f.allocCallback(f.alloc.Bytes() - prev)
}

func (f *cloudStorageSinkFile) releaseAlloc(ctx context.Context) {
prev := f.alloc.Bytes()
f.alloc.Release(ctx)
f.allocCallback(-prev)
}

func (f *cloudStorageSinkFile) adjustBytesToTarget(ctx context.Context, targetBytes int64) {
prev := f.alloc.Bytes()
f.alloc.AdjustBytesToTarget(ctx, targetBytes)
f.allocCallback(f.alloc.Bytes() - prev)
}

var _ io.Writer = &cloudStorageSinkFile{}
Expand Down Expand Up @@ -510,6 +529,7 @@ func (s *cloudStorageSink) getOrCreateFile(
created: timeutil.Now(),
cloudStorageSinkKey: key,
oldestMVCC: eventMVCC,
allocCallback: s.metrics.makeCloudstorageFileAllocCallback(),
}

if s.compression.enabled() {
Expand Down Expand Up @@ -557,7 +577,7 @@ func (s *cloudStorageSink) EmitRow(
if err != nil {
return err
}
file.alloc.Merge(&alloc)
file.mergeAlloc(&alloc)

if _, err := file.Write(value); err != nil {
return err
Expand Down Expand Up @@ -814,7 +834,7 @@ func (s *cloudStorageSink) asyncFlusher(ctx context.Context) error {
func (f *cloudStorageSinkFile) flushToStorage(
ctx context.Context, es cloud.ExternalStorage, dest string, m metricsRecorder,
) error {
defer f.alloc.Release(ctx)
defer f.releaseAlloc(ctx)

if f.rawSize == 0 {
// This method shouldn't be called with an empty file, but be defensive
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/changefeedccl/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ func (r *telemetryMetricsRecorder) recordMessageSize(sz int64) {
r.inner.recordMessageSize(sz)
}

func (r *telemetryMetricsRecorder) makeCloudstorageFileAllocCallback() func(delta int64) {
return r.inner.makeCloudstorageFileAllocCallback()
}

func (r *telemetryMetricsRecorder) recordInternalRetry(numMessages int64, reducedBatchSize bool) {
r.inner.recordInternalRetry(numMessages, reducedBatchSize)
}
Expand Down

0 comments on commit f6b7212

Please sign in to comment.