diff --git a/pkg/ccl/changefeedccl/kvevent/BUILD.bazel b/pkg/ccl/changefeedccl/kvevent/BUILD.bazel index 12e69fd00e45..6ddb3f73044f 100644 --- a/pkg/ccl/changefeedccl/kvevent/BUILD.bazel +++ b/pkg/ccl/changefeedccl/kvevent/BUILD.bazel @@ -52,6 +52,7 @@ go_test( "//pkg/sql/rowenc/keyside", "//pkg/sql/sem/tree", "//pkg/sql/types", + "//pkg/testutils", "//pkg/util", "//pkg/util/ctxgroup", "//pkg/util/encoding", diff --git a/pkg/ccl/changefeedccl/kvevent/blocking_buffer.go b/pkg/ccl/changefeedccl/kvevent/blocking_buffer.go index 706a677c9909..d17017d64806 100644 --- a/pkg/ccl/changefeedccl/kvevent/blocking_buffer.go +++ b/pkg/ccl/changefeedccl/kvevent/blocking_buffer.go @@ -105,6 +105,7 @@ func newMemBuffer( b.qp = allocPool{ AbstractPool: quotapool.New("changefeed", quota, opts...), + sv: sv, metrics: metrics, } @@ -218,6 +219,7 @@ func (b *blockingBuffer) enqueue(ctx context.Context, e Event) (err error) { } b.metrics.BufferEntriesIn.Inc(1) + b.metrics.BufferEntriesByType[e.et.Index()].Inc(1) b.mu.queue.enqueue(e) select { @@ -246,6 +248,7 @@ func (b *blockingBuffer) AcquireMemory(ctx context.Context, n int64) (alloc Allo return alloc, err } b.metrics.BufferEntriesMemAcquired.Inc(n) + b.metrics.AllocatedMem.Inc(n) return alloc, nil } @@ -324,6 +327,7 @@ func (b *blockingBuffer) CloseWithReason(ctx context.Context, reason error) erro quota := r.(*memQuota) quota.closed = true quota.acc.Close(ctx) + b.metrics.AllocatedMem.Dec(quota.allocated) return false }) @@ -442,9 +446,14 @@ func (r *memRequest) ShouldWait() bool { type allocPool struct { *quotapool.AbstractPool metrics *Metrics + sv *settings.Values } func (ap allocPool) Release(ctx context.Context, bytes, entries int64) { + if bytes < 0 { + logcrash.ReportOrPanic(ctx, ap.sv, "attempt to release negative bytes (%d) into pool", bytes) + } + ap.AbstractPool.Update(func(r quotapool.Resource) (shouldNotify bool) { quota := r.(*memQuota) if quota.closed { @@ -452,6 +461,7 @@ func (ap allocPool) Release(ctx context.Context, bytes, entries int64) { } quota.acc.Shrink(ctx, bytes) quota.allocated -= bytes + ap.metrics.AllocatedMem.Dec(bytes) ap.metrics.BufferEntriesMemReleased.Inc(bytes) ap.metrics.BufferEntriesReleased.Inc(entries) return true diff --git a/pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go b/pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go index 612fd866f59c..e42163fcce66 100644 --- a/pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go +++ b/pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go @@ -15,6 +15,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -22,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/randgen" "github.com/cockroachdb/cockroach/pkg/sql/rowenc/keyside" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/encoding" @@ -32,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/quotapool" "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -94,9 +97,6 @@ func TestBlockingBuffer(t *testing.T) { } st := cluster.MakeTestingClusterSettings() buf := kvevent.TestingNewMemBuffer(ba, &st.SV, &metrics, notifyWait) - defer func() { - require.NoError(t, buf.CloseWithReason(context.Background(), nil)) - }() producerCtx, stopProducers := context.WithCancel(context.Background()) wg := ctxgroup.WithContext(producerCtx) @@ -105,26 +105,79 @@ func TestBlockingBuffer(t *testing.T) { }() // Start adding KVs to the buffer until we block. + var numResolvedEvents, numKVEvents int wg.GoCtx(func(ctx context.Context) error { rnd, _ := randutil.NewTestRand() for { - err := buf.Add(ctx, kvevent.MakeKVEvent(makeRangeFeedEvent(rnd, 256, 0))) - if err != nil { - return err + if rnd.Int()%20 == 0 { + prefix := keys.SystemSQLCodec.TablePrefix(42) + sp := roachpb.Span{Key: prefix, EndKey: prefix.Next()} + if err := buf.Add(ctx, kvevent.NewBackfillResolvedEvent(sp, hlc.Timestamp{}, jobspb.ResolvedSpan_BACKFILL)); err != nil { + return err + } + numResolvedEvents++ + } else { + if err := buf.Add(ctx, kvevent.MakeKVEvent(makeRangeFeedEvent(rnd, 256, 0))); err != nil { + return err + } + numKVEvents++ } } }) - <-waitCh + require.NoError(t, timeutil.RunWithTimeout( + context.Background(), "wait", 10*time.Second, func(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-waitCh: + return nil + } + })) // Keep consuming events until we get pushback metrics updated. + var numPopped, numFlush int for metrics.BufferPushbackNanos.Count() == 0 { e, err := buf.Get(context.Background()) require.NoError(t, err) a := e.DetachAlloc() a.Release(context.Background()) + numPopped++ + if e.Type() == kvevent.TypeFlush { + numFlush++ + } } + + // Allocated memory gauge should be non-zero once we buffer some events. + testutils.SucceedsWithin(t, func() error { + if metrics.AllocatedMem.Value() > 0 { + return nil + } + return errors.New("waiting for allocated mem > 0") + }, 5*time.Second) + stopProducers() + require.ErrorIs(t, wg.Wait(), context.Canceled) + + require.EqualValues(t, numKVEvents+numResolvedEvents, metrics.BufferEntriesIn.Count()) + require.EqualValues(t, numPopped, metrics.BufferEntriesOut.Count()) + require.Greater(t, metrics.BufferEntriesMemReleased.Count(), int64(0)) + + // Flush events are special in that they are ephemeral event that doesn't get + // counted when releasing (it's 0 entries and 0 byte event). + require.EqualValues(t, numPopped-numFlush, metrics.BufferEntriesReleased.Count()) + + require.EqualValues(t, numKVEvents, metrics.BufferEntriesByType[kvevent.TypeKV].Count()) + require.EqualValues(t, numResolvedEvents, metrics.BufferEntriesByType[kvevent.TypeResolved].Count()) + + // We might have seen numFlush events, but they are synthetic, and only explicitly enqueued + // flush events are counted. + require.EqualValues(t, 0, metrics.BufferEntriesByType[kvevent.TypeFlush].Count()) + + // After buffer closed, resources are released, and metrics adjusted to reflect. + require.NoError(t, buf.CloseWithReason(context.Background(), context.Canceled)) + + require.EqualValues(t, 0, metrics.AllocatedMem.Value()) } func TestBlockingBufferNotifiesConsumerWhenOutOfMemory(t *testing.T) { diff --git a/pkg/ccl/changefeedccl/kvevent/event.go b/pkg/ccl/changefeedccl/kvevent/event.go index 1815e412636a..e18de9917899 100644 --- a/pkg/ccl/changefeedccl/kvevent/event.go +++ b/pkg/ccl/changefeedccl/kvevent/event.go @@ -77,13 +77,10 @@ type MemAllocator interface { type Type uint8 const ( - // TypeUnknown indicates the event could not be parsed. Will fail the feed. - TypeUnknown Type = iota - // TypeFlush indicates a request to flush buffered data. // This request type is emitted by blocking buffer when it's blocked, waiting // for more memory. - TypeFlush + TypeFlush Type = iota // TypeKV indicates that the KV, PrevKeyValue, and BackfillTimestamp methods // on the Event meaningful. @@ -98,6 +95,9 @@ const ( // TypeResolved indicates that the Resolved method on the Event will be // meaningful. TypeResolved = resolvedNone + + // number of event types. + numEventTypes = TypeResolved + 1 ) // Event represents an event emitted by a kvfeed. It is either a KV or a @@ -120,6 +120,22 @@ func (e *Event) Type() Type { } } +// Index returns numerical/ordinal type index suitable for indexing into arrays. +func (t Type) Index() int { + switch t { + case TypeFlush: + return int(TypeFlush) + case TypeKV: + return int(TypeKV) + case TypeResolved, resolvedBackfill, resolvedRestart, resolvedExit: + return int(TypeResolved) + default: + log.Warningf(context.TODO(), + "returning TypeFlush boundary type for unknown event type %d", t) + return int(TypeFlush) + } +} + // ApproximateSize returns events approximate size in bytes. func (e *Event) ApproximateSize() int { if e.et == TypeFlush { diff --git a/pkg/ccl/changefeedccl/kvevent/metrics.go b/pkg/ccl/changefeedccl/kvevent/metrics.go index 8e07f2a176f5..1212a5b278ef 100644 --- a/pkg/ccl/changefeedccl/kvevent/metrics.go +++ b/pkg/ccl/changefeedccl/kvevent/metrics.go @@ -9,6 +9,7 @@ package kvevent import ( + "fmt" "time" "github.com/cockroachdb/cockroach/pkg/util/metric" @@ -51,6 +52,12 @@ var ( Measurement: "Nanoseconds", Unit: metric.Unit_NANOSECONDS, } + metaChangefeedAllocatedMemory = metric.Metadata{ + Name: "changefeed.buffer_entries.allocated_mem", + Help: "Current quota pool memory allocation", + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } ) // Metrics is a metric.Struct for kvfeed metrics. @@ -61,10 +68,30 @@ type Metrics struct { BufferPushbackNanos *metric.Counter BufferEntriesMemAcquired *metric.Counter BufferEntriesMemReleased *metric.Counter + AllocatedMem *metric.Gauge + BufferEntriesByType [numEventTypes]*metric.Counter } // MakeMetrics constructs a Metrics struct with the provided histogram window. func MakeMetrics(histogramWindow time.Duration) Metrics { + eventTypeMeta := func(et Type) metric.Metadata { + eventTypeName := func() string { + switch et { + case TypeFlush: + return "flush" + case TypeKV: + return "kv" + default: + return "resolved" + } + }() + return metric.Metadata{ + Name: fmt.Sprintf("changefeed.buffer_entries.%s", eventTypeName), + Help: fmt.Sprintf("Number of %s elements added to the buffer", eventTypeName), + Measurement: "Events", + Unit: metric.Unit_COUNT, + } + } return Metrics{ BufferEntriesIn: metric.NewCounter(metaChangefeedBufferEntriesIn), BufferEntriesOut: metric.NewCounter(metaChangefeedBufferEntriesOut), @@ -72,6 +99,12 @@ func MakeMetrics(histogramWindow time.Duration) Metrics { BufferEntriesMemAcquired: metric.NewCounter(metaChangefeedBufferMemAcquired), BufferEntriesMemReleased: metric.NewCounter(metaChangefeedBufferMemReleased), BufferPushbackNanos: metric.NewCounter(metaChangefeedBufferPushbackNanos), + AllocatedMem: metric.NewGauge(metaChangefeedAllocatedMemory), + BufferEntriesByType: [numEventTypes]*metric.Counter{ + metric.NewCounter(eventTypeMeta(TypeFlush)), + metric.NewCounter(eventTypeMeta(TypeKV)), + metric.NewCounter(eventTypeMeta(TypeResolved)), + }, } }