From 12a1b04deb7595cdfff309dd8bf0d19f33a49747 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Tue, 6 Sep 2022 18:11:27 -0400 Subject: [PATCH] kvevent: Ensure out of quota events correctly handled Ensure that out of quota events are not lost and propagated if necessary to the consumer. Prior to this change, it was possible for an out of quota notification to be "lost" because "blocked" bit would be cleared out when an event was enqueued. Instead of relying on a boolean bit, we now keep track of the number of consumers currently blocked, and issue flush request if there are non-zero blocked consumers with zero events currently queued. Fixes #86828 Release justification: bug fix Release note: None --- .../changefeedccl/kvevent/blocking_buffer.go | 97 +++++++++++++------ .../kvevent/blocking_buffer_test.go | 2 +- 2 files changed, 71 insertions(+), 28 deletions(-) diff --git a/pkg/ccl/changefeedccl/kvevent/blocking_buffer.go b/pkg/ccl/changefeedccl/kvevent/blocking_buffer.go index bb17da8a97ae..6803216fb78f 100644 --- a/pkg/ccl/changefeedccl/kvevent/blocking_buffer.go +++ b/pkg/ccl/changefeedccl/kvevent/blocking_buffer.go @@ -38,11 +38,12 @@ type blockingBuffer struct { mu struct { syncutil.Mutex - closed bool // True when buffer closed. - reason error // Reason buffer is closed. - drainCh chan struct{} // Set when Drain request issued. - blocked bool // Set when event is blocked, waiting to acquire quota. - queue *bufferEventChunkQueue // Queue of added events. + closed bool // True when buffer closed. + reason error // Reason buffer is closed. + drainCh chan struct{} // Set when Drain request issued. + numBlocked int // Number of waitors blocked to acquire quota. + canFlush bool + queue *bufferEventChunkQueue // Queue of added events. } } @@ -50,23 +51,54 @@ type blockingBuffer struct { // It will grow the bound account to buffer more messages but will block if it // runs out of space. If ever any entry exceeds the allocatable size of the // account, an error will be returned when attempting to buffer it. -func NewMemBuffer( - acc mon.BoundAccount, sv *settings.Values, metrics *Metrics, opts ...quotapool.Option, +func NewMemBuffer(acc mon.BoundAccount, sv *settings.Values, metrics *Metrics) Buffer { + return newMemBuffer(acc, sv, metrics, nil) +} + +// TestingNewMemBuffer allows test to construct buffer which will invoked +// specified notification function when blocked, waiting for memory. +func TestingNewMemBuffer( + acc mon.BoundAccount, + sv *settings.Values, + metrics *Metrics, + onWaitStart quotapool.OnWaitStartFunc, ) Buffer { - const slowAcquisitionThreshold = 5 * time.Second + return newMemBuffer(acc, sv, metrics, onWaitStart) +} - opts = append(opts, - quotapool.OnSlowAcquisition(slowAcquisitionThreshold, logSlowAcquisition(slowAcquisitionThreshold)), - quotapool.OnWaitFinish( - func(ctx context.Context, poolName string, r quotapool.Request, start time.Time) { - metrics.BufferPushbackNanos.Inc(timeutil.Since(start).Nanoseconds()) - })) +func newMemBuffer( + acc mon.BoundAccount, + sv *settings.Values, + metrics *Metrics, + onWaitStart quotapool.OnWaitStartFunc, +) Buffer { + const slowAcquisitionThreshold = 5 * time.Second b := &blockingBuffer{ signalCh: make(chan struct{}, 1), metrics: metrics, sv: sv, } + + opts := []quotapool.Option{ + quotapool.OnSlowAcquisition(slowAcquisitionThreshold, logSlowAcquisition(slowAcquisitionThreshold)), + quotapool.OnWaitStart(func(ctx context.Context, poolName string, r quotapool.Request) { + if onWaitStart != nil { + onWaitStart(ctx, poolName, r) + } + b.mu.Lock() + b.mu.numBlocked++ + b.mu.Unlock() + }), + quotapool.OnWaitFinish( + func(ctx context.Context, poolName string, r quotapool.Request, start time.Time) { + metrics.BufferPushbackNanos.Inc(timeutil.Since(start).Nanoseconds()) + b.mu.Lock() + b.mu.numBlocked-- + b.mu.Unlock() + }, + ), + } quota := &memQuota{acc: acc, notifyOutOfQuota: b.notifyOutOfQuota} b.qp = allocPool{ AbstractPool: quotapool.New("changefeed", quota, opts...), @@ -88,7 +120,7 @@ func (b *blockingBuffer) pop() (e Event, ok bool, err error) { e, ok = b.mu.queue.dequeue() - if !ok && b.mu.blocked { + if !ok && b.mu.numBlocked > 0 && b.mu.canFlush { // Here, we know that we are blocked, waiting for memory; yet we have nothing queued up // (and thus, no resources that could be released by draining the queue). // This means that all the previously added entries have been read by the consumer, @@ -100,8 +132,6 @@ func (b *blockingBuffer) pop() (e Event, ok bool, err error) { // So, we issue the flush request to the consumer to ensure that we release some memory. e = Event{flush: true} ok = true - // Ensure we notify only once. - b.mu.blocked = false } if b.mu.drainCh != nil && b.mu.queue.empty() { @@ -113,14 +143,10 @@ func (b *blockingBuffer) pop() (e Event, ok bool, err error) { // notifyOutOfQuota is invoked by memQuota to notify blocking buffer that // event is blocked, waiting for more resources. -func (b *blockingBuffer) notifyOutOfQuota() { +func (b *blockingBuffer) notifyOutOfQuota(canFlush bool) { b.mu.Lock() - defer b.mu.Unlock() - - if b.mu.closed { - return - } - b.mu.blocked = true + b.mu.canFlush = canFlush + b.mu.Unlock() select { case b.signalCh <- struct{}{}: @@ -160,7 +186,6 @@ func (b *blockingBuffer) enqueue(ctx context.Context, e Event) (err error) { } b.metrics.BufferEntriesIn.Inc(1) - b.mu.blocked = false b.mu.queue.enqueue(e) select { @@ -290,7 +315,7 @@ type memQuota struct { // When memQuota blocks waiting for resources, invoke the callback // to notify about this. The notification maybe invoked multiple // times for a single request that's blocked. - notifyOutOfQuota func() + notifyOutOfQuota func(canFlush bool) acc mon.BoundAccount } @@ -306,7 +331,25 @@ func (r *memRequest) Acquire( quota := resource.(*memQuota) fulfilled, tryAgainAfter = r.acquireQuota(ctx, quota) if !fulfilled { - quota.notifyOutOfQuota() + // canFlush indicates to the consumer (Get() caller) that it may issue flush + // request if necessary. + // + // Consider the case when we have 2 producers that are blocked (Pa, Pb). + // Consumer will issue flush request if no events are buffered in this + // blocking buffer, and we have blocked producers (i.e. Pa and Pb). As soon + // as flush completes and releases lots of resources (actually, the entirety + // of mem buffer limit worth of resources are released), Pa manages to put + // in the event into the queue. If the consumer consumes that message, plus + // attempts to consume the next message, before Pb had a chance to unblock + // itself, the consumer will mistakenly think that it must flush to release + // resources (i.e. just after 1 message). + // + // canFlush is set to true if we are *really* blocked -- i.e. we + // have non-zero canAllocateBelow threshold; OR in the corner case when + // nothing is allocated (and we are still blocked -- see comment in + // acquireQuota) + canFlush := quota.allocated == 0 || quota.canAllocateBelow > 0 + quota.notifyOutOfQuota(canFlush) } return fulfilled, tryAgainAfter diff --git a/pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go b/pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go index fc118bc11ae0..c3043d4a571a 100644 --- a/pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go +++ b/pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go @@ -80,7 +80,7 @@ func TestBlockingBuffer(t *testing.T) { } } st := cluster.MakeTestingClusterSettings() - buf := kvevent.NewMemBuffer(ba, &st.SV, &metrics, quotapool.OnWaitStart(notifyWait)) + buf := kvevent.TestingNewMemBuffer(ba, &st.SV, &metrics, notifyWait) defer func() { require.NoError(t, buf.CloseWithReason(context.Background(), nil)) }()