Skip to content

Commit

Permalink
changefeedccl: Improve blocking buffer observability
Browse files Browse the repository at this point in the history
Improve blocking buffer observability by adding:
  * `changefeed.buffer_entries.allocated_memory` -- gauge keeping track
    of currently allocated memory for the events added to the blocking buffer.
  * `changefeed.buffer_entries.<event_type>` -- counters keeping
   track of the number of event type events (flush, KV, resolved) added.

Informs cockroachdb#108464

Release note: None
  • Loading branch information
Yevgeniy Miretskiy committed Sep 7, 2023
1 parent b2863ec commit e456872
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 11 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/kvevent/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ go_test(
"//pkg/sql/rowenc/keyside",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/testutils",
"//pkg/util",
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
Expand Down
10 changes: 10 additions & 0 deletions pkg/ccl/changefeedccl/kvevent/blocking_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func newMemBuffer(

b.qp = allocPool{
AbstractPool: quotapool.New("changefeed", quota, opts...),
sv: sv,
metrics: metrics,
}

Expand Down Expand Up @@ -218,6 +219,7 @@ func (b *blockingBuffer) enqueue(ctx context.Context, e Event) (err error) {
}

b.metrics.BufferEntriesIn.Inc(1)
b.metrics.BufferEntriesByType[e.et.Index()].Inc(1)
b.mu.queue.enqueue(e)

select {
Expand Down Expand Up @@ -246,6 +248,7 @@ func (b *blockingBuffer) AcquireMemory(ctx context.Context, n int64) (alloc Allo
return alloc, err
}
b.metrics.BufferEntriesMemAcquired.Inc(n)
b.metrics.AllocatedMem.Inc(n)
return alloc, nil
}

Expand Down Expand Up @@ -324,6 +327,7 @@ func (b *blockingBuffer) CloseWithReason(ctx context.Context, reason error) erro
quota := r.(*memQuota)
quota.closed = true
quota.acc.Close(ctx)
b.metrics.AllocatedMem.Dec(quota.allocated)
return false
})

Expand Down Expand Up @@ -442,16 +446,22 @@ func (r *memRequest) ShouldWait() bool {
type allocPool struct {
*quotapool.AbstractPool
metrics *Metrics
sv *settings.Values
}

func (ap allocPool) Release(ctx context.Context, bytes, entries int64) {
if bytes < 0 {
logcrash.ReportOrPanic(ctx, ap.sv, "attempt to release negative bytes (%d) into pool", bytes)
}

ap.AbstractPool.Update(func(r quotapool.Resource) (shouldNotify bool) {
quota := r.(*memQuota)
if quota.closed {
return false
}
quota.acc.Shrink(ctx, bytes)
quota.allocated -= bytes
ap.metrics.AllocatedMem.Dec(bytes)
ap.metrics.BufferEntriesMemReleased.Inc(bytes)
ap.metrics.BufferEntriesReleased.Inc(entries)
return true
Expand Down
67 changes: 60 additions & 7 deletions pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/randgen"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc/keyside"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
Expand All @@ -32,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -94,9 +97,6 @@ func TestBlockingBuffer(t *testing.T) {
}
st := cluster.MakeTestingClusterSettings()
buf := kvevent.TestingNewMemBuffer(ba, &st.SV, &metrics, notifyWait)
defer func() {
require.NoError(t, buf.CloseWithReason(context.Background(), nil))
}()

producerCtx, stopProducers := context.WithCancel(context.Background())
wg := ctxgroup.WithContext(producerCtx)
Expand All @@ -105,26 +105,79 @@ func TestBlockingBuffer(t *testing.T) {
}()

// Start adding KVs to the buffer until we block.
var numResolvedEvents, numKVEvents int
wg.GoCtx(func(ctx context.Context) error {
rnd, _ := randutil.NewTestRand()
for {
err := buf.Add(ctx, kvevent.MakeKVEvent(makeRangeFeedEvent(rnd, 256, 0)))
if err != nil {
return err
if rnd.Int()%20 == 0 {
prefix := keys.SystemSQLCodec.TablePrefix(42)
sp := roachpb.Span{Key: prefix, EndKey: prefix.Next()}
if err := buf.Add(ctx, kvevent.NewBackfillResolvedEvent(sp, hlc.Timestamp{}, jobspb.ResolvedSpan_BACKFILL)); err != nil {
return err
}
numResolvedEvents++
} else {
if err := buf.Add(ctx, kvevent.MakeKVEvent(makeRangeFeedEvent(rnd, 256, 0))); err != nil {
return err
}
numKVEvents++
}
}
})

<-waitCh
require.NoError(t, timeutil.RunWithTimeout(
context.Background(), "wait", 10*time.Second, func(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-waitCh:
return nil
}
}))

// Keep consuming events until we get pushback metrics updated.
var numPopped, numFlush int
for metrics.BufferPushbackNanos.Count() == 0 {
e, err := buf.Get(context.Background())
require.NoError(t, err)
a := e.DetachAlloc()
a.Release(context.Background())
numPopped++
if e.Type() == kvevent.TypeFlush {
numFlush++
}
}

// Allocated memory gauge should be non-zero once we buffer some events.
testutils.SucceedsWithin(t, func() error {
if metrics.AllocatedMem.Value() > 0 {
return nil
}
return errors.New("waiting for allocated mem > 0")
}, 5*time.Second)

stopProducers()
require.ErrorIs(t, wg.Wait(), context.Canceled)

require.EqualValues(t, numKVEvents+numResolvedEvents, metrics.BufferEntriesIn.Count())
require.EqualValues(t, numPopped, metrics.BufferEntriesOut.Count())
require.Greater(t, metrics.BufferEntriesMemReleased.Count(), int64(0))

// Flush events are special in that they are ephemeral event that doesn't get
// counted when releasing (it's 0 entries and 0 byte event).
require.EqualValues(t, numPopped-numFlush, metrics.BufferEntriesReleased.Count())

require.EqualValues(t, numKVEvents, metrics.BufferEntriesByType[kvevent.TypeKV].Count())
require.EqualValues(t, numResolvedEvents, metrics.BufferEntriesByType[kvevent.TypeResolved].Count())

// We might have seen numFlush events, but they are synthetic, and only explicitly enqueued
// flush events are counted.
require.EqualValues(t, 0, metrics.BufferEntriesByType[kvevent.TypeFlush].Count())

// After buffer closed, resources are released, and metrics adjusted to reflect.
require.NoError(t, buf.CloseWithReason(context.Background(), context.Canceled))

require.EqualValues(t, 0, metrics.AllocatedMem.Value())
}

func TestBlockingBufferNotifiesConsumerWhenOutOfMemory(t *testing.T) {
Expand Down
24 changes: 20 additions & 4 deletions pkg/ccl/changefeedccl/kvevent/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,10 @@ type MemAllocator interface {
type Type uint8

const (
// TypeUnknown indicates the event could not be parsed. Will fail the feed.
TypeUnknown Type = iota

// TypeFlush indicates a request to flush buffered data.
// This request type is emitted by blocking buffer when it's blocked, waiting
// for more memory.
TypeFlush
TypeFlush Type = iota

// TypeKV indicates that the KV, PrevKeyValue, and BackfillTimestamp methods
// on the Event meaningful.
Expand All @@ -98,6 +95,9 @@ const (
// TypeResolved indicates that the Resolved method on the Event will be
// meaningful.
TypeResolved = resolvedNone

// number of event types.
numEventTypes = TypeResolved + 1
)

// Event represents an event emitted by a kvfeed. It is either a KV or a
Expand All @@ -120,6 +120,22 @@ func (e *Event) Type() Type {
}
}

// Index returns numerical/ordinal type index suitable for indexing into arrays.
func (t Type) Index() int {
switch t {
case TypeFlush:
return int(TypeFlush)
case TypeKV:
return int(TypeKV)
case TypeResolved, resolvedBackfill, resolvedRestart, resolvedExit:
return int(TypeResolved)
default:
log.Warningf(context.TODO(),
"returning TypeFlush boundary type for unknown event type %d", t)
return int(TypeFlush)
}
}

// ApproximateSize returns events approximate size in bytes.
func (e *Event) ApproximateSize() int {
if e.et == TypeFlush {
Expand Down
33 changes: 33 additions & 0 deletions pkg/ccl/changefeedccl/kvevent/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package kvevent

import (
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/util/metric"
Expand Down Expand Up @@ -51,6 +52,12 @@ var (
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
metaChangefeedAllocatedMemory = metric.Metadata{
Name: "changefeed.buffer_entries.allocated_mem",
Help: "Current quota pool memory allocation",
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
)

// Metrics is a metric.Struct for kvfeed metrics.
Expand All @@ -61,17 +68,43 @@ type Metrics struct {
BufferPushbackNanos *metric.Counter
BufferEntriesMemAcquired *metric.Counter
BufferEntriesMemReleased *metric.Counter
AllocatedMem *metric.Gauge
BufferEntriesByType [numEventTypes]*metric.Counter
}

// MakeMetrics constructs a Metrics struct with the provided histogram window.
func MakeMetrics(histogramWindow time.Duration) Metrics {
eventTypeMeta := func(et Type) metric.Metadata {
eventTypeName := func() string {
switch et {
case TypeFlush:
return "flush"
case TypeKV:
return "kv"
default:
return "resolved"
}
}()
return metric.Metadata{
Name: fmt.Sprintf("changefeed.buffer_entries.%s", eventTypeName),
Help: fmt.Sprintf("Number of %s elements added to the buffer", eventTypeName),
Measurement: "Events",
Unit: metric.Unit_COUNT,
}
}
return Metrics{
BufferEntriesIn: metric.NewCounter(metaChangefeedBufferEntriesIn),
BufferEntriesOut: metric.NewCounter(metaChangefeedBufferEntriesOut),
BufferEntriesReleased: metric.NewCounter(metaChangefeedBufferEntriesReleased),
BufferEntriesMemAcquired: metric.NewCounter(metaChangefeedBufferMemAcquired),
BufferEntriesMemReleased: metric.NewCounter(metaChangefeedBufferMemReleased),
BufferPushbackNanos: metric.NewCounter(metaChangefeedBufferPushbackNanos),
AllocatedMem: metric.NewGauge(metaChangefeedAllocatedMemory),
BufferEntriesByType: [numEventTypes]*metric.Counter{
metric.NewCounter(eventTypeMeta(TypeFlush)),
metric.NewCounter(eventTypeMeta(TypeKV)),
metric.NewCounter(eventTypeMeta(TypeResolved)),
},
}
}

Expand Down

0 comments on commit e456872

Please sign in to comment.