From ed685b8f5cf3a55df32b01107051ee09eacb8b69 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Tue, 6 Sep 2022 18:11:27 -0400 Subject: [PATCH] kvevent: Ensure out of quota events correctly handled Ensure that out of quota events are not lost and propagated if necessary to the consumer. Prior to this change, it was possible for an out of quota notification to be "lost" because "blocked" bit would be cleared out when an event was enqueued. Instead of relying on a boolean bit, we now keep track of the number of consumers currently blocked, and issue flush request if there are non-zero blocked consumers with zero events currently queued. Fixes #86828 Release justification: bug fix Release note: None --- .../changefeedccl/kvevent/blocking_buffer.go | 79 ++++++++++++------- .../kvevent/blocking_buffer_test.go | 2 +- 2 files changed, 53 insertions(+), 28 deletions(-) diff --git a/pkg/ccl/changefeedccl/kvevent/blocking_buffer.go b/pkg/ccl/changefeedccl/kvevent/blocking_buffer.go index bb17da8a97ae..2b766182bbb6 100644 --- a/pkg/ccl/changefeedccl/kvevent/blocking_buffer.go +++ b/pkg/ccl/changefeedccl/kvevent/blocking_buffer.go @@ -38,11 +38,12 @@ type blockingBuffer struct { mu struct { syncutil.Mutex - closed bool // True when buffer closed. - reason error // Reason buffer is closed. - drainCh chan struct{} // Set when Drain request issued. - blocked bool // Set when event is blocked, waiting to acquire quota. - queue *bufferEventChunkQueue // Queue of added events. + closed bool // True when buffer closed. + reason error // Reason buffer is closed. + drainCh chan struct{} // Set when Drain request issued. + numBlocked int // Number of waitors blocked to acquire quota. + canFlush bool + queue *bufferEventChunkQueue // Queue of added events. } } @@ -50,23 +51,54 @@ type blockingBuffer struct { // It will grow the bound account to buffer more messages but will block if it // runs out of space. If ever any entry exceeds the allocatable size of the // account, an error will be returned when attempting to buffer it. -func NewMemBuffer( - acc mon.BoundAccount, sv *settings.Values, metrics *Metrics, opts ...quotapool.Option, +func NewMemBuffer(acc mon.BoundAccount, sv *settings.Values, metrics *Metrics) Buffer { + return newMemBuffer(acc, sv, metrics, nil) +} + +// TestingNewMemBuffer allows test to construct buffer which will invoked +// specified notification function when blocked, waiting for memory. +func TestingNewMemBuffer( + acc mon.BoundAccount, + sv *settings.Values, + metrics *Metrics, + onWaitStart quotapool.OnWaitStartFunc, ) Buffer { - const slowAcquisitionThreshold = 5 * time.Second + return newMemBuffer(acc, sv, metrics, onWaitStart) +} - opts = append(opts, - quotapool.OnSlowAcquisition(slowAcquisitionThreshold, logSlowAcquisition(slowAcquisitionThreshold)), - quotapool.OnWaitFinish( - func(ctx context.Context, poolName string, r quotapool.Request, start time.Time) { - metrics.BufferPushbackNanos.Inc(timeutil.Since(start).Nanoseconds()) - })) +func newMemBuffer( + acc mon.BoundAccount, + sv *settings.Values, + metrics *Metrics, + onWaitStart quotapool.OnWaitStartFunc, +) Buffer { + const slowAcquisitionThreshold = 5 * time.Second b := &blockingBuffer{ signalCh: make(chan struct{}, 1), metrics: metrics, sv: sv, } + + opts := []quotapool.Option{ + quotapool.OnSlowAcquisition(slowAcquisitionThreshold, logSlowAcquisition(slowAcquisitionThreshold)), + quotapool.OnWaitStart(func(ctx context.Context, poolName string, r quotapool.Request) { + if onWaitStart != nil { + onWaitStart(ctx, poolName, r) + } + b.mu.Lock() + b.mu.numBlocked++ + b.mu.Unlock() + }), + quotapool.OnWaitFinish( + func(ctx context.Context, poolName string, r quotapool.Request, start time.Time) { + metrics.BufferPushbackNanos.Inc(timeutil.Since(start).Nanoseconds()) + b.mu.Lock() + b.mu.numBlocked-- + b.mu.Unlock() + }, + ), + } quota := &memQuota{acc: acc, notifyOutOfQuota: b.notifyOutOfQuota} b.qp = allocPool{ AbstractPool: quotapool.New("changefeed", quota, opts...), @@ -88,7 +120,7 @@ func (b *blockingBuffer) pop() (e Event, ok bool, err error) { e, ok = b.mu.queue.dequeue() - if !ok && b.mu.blocked { + if !ok && b.mu.numBlocked > 0 && b.mu.canFlush { // Here, we know that we are blocked, waiting for memory; yet we have nothing queued up // (and thus, no resources that could be released by draining the queue). // This means that all the previously added entries have been read by the consumer, @@ -100,8 +132,6 @@ func (b *blockingBuffer) pop() (e Event, ok bool, err error) { // So, we issue the flush request to the consumer to ensure that we release some memory. e = Event{flush: true} ok = true - // Ensure we notify only once. - b.mu.blocked = false } if b.mu.drainCh != nil && b.mu.queue.empty() { @@ -113,14 +143,10 @@ func (b *blockingBuffer) pop() (e Event, ok bool, err error) { // notifyOutOfQuota is invoked by memQuota to notify blocking buffer that // event is blocked, waiting for more resources. -func (b *blockingBuffer) notifyOutOfQuota() { +func (b *blockingBuffer) notifyOutOfQuota(canFlush bool) { b.mu.Lock() - defer b.mu.Unlock() - - if b.mu.closed { - return - } - b.mu.blocked = true + b.mu.canFlush = canFlush + b.mu.Unlock() select { case b.signalCh <- struct{}{}: @@ -160,7 +186,6 @@ func (b *blockingBuffer) enqueue(ctx context.Context, e Event) (err error) { } b.metrics.BufferEntriesIn.Inc(1) - b.mu.blocked = false b.mu.queue.enqueue(e) select { @@ -290,7 +315,7 @@ type memQuota struct { // When memQuota blocks waiting for resources, invoke the callback // to notify about this. The notification maybe invoked multiple // times for a single request that's blocked. - notifyOutOfQuota func() + notifyOutOfQuota func(canFlush bool) acc mon.BoundAccount } @@ -306,7 +331,7 @@ func (r *memRequest) Acquire( quota := resource.(*memQuota) fulfilled, tryAgainAfter = r.acquireQuota(ctx, quota) if !fulfilled { - quota.notifyOutOfQuota() + quota.notifyOutOfQuota(quota.allocated == 0 || quota.canAllocateBelow > 0) } return fulfilled, tryAgainAfter diff --git a/pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go b/pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go index fc118bc11ae0..c3043d4a571a 100644 --- a/pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go +++ b/pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go @@ -80,7 +80,7 @@ func TestBlockingBuffer(t *testing.T) { } } st := cluster.MakeTestingClusterSettings() - buf := kvevent.NewMemBuffer(ba, &st.SV, &metrics, quotapool.OnWaitStart(notifyWait)) + buf := kvevent.TestingNewMemBuffer(ba, &st.SV, &metrics, notifyWait) defer func() { require.NoError(t, buf.CloseWithReason(context.Background(), nil)) }()