diff --git a/pkg/ccl/changefeedccl/kvevent/blocking_buffer.go b/pkg/ccl/changefeedccl/kvevent/blocking_buffer.go index bb17da8a97ae..6803216fb78f 100644 --- a/pkg/ccl/changefeedccl/kvevent/blocking_buffer.go +++ b/pkg/ccl/changefeedccl/kvevent/blocking_buffer.go @@ -38,11 +38,12 @@ type blockingBuffer struct { mu struct { syncutil.Mutex - closed bool // True when buffer closed. - reason error // Reason buffer is closed. - drainCh chan struct{} // Set when Drain request issued. - blocked bool // Set when event is blocked, waiting to acquire quota. - queue *bufferEventChunkQueue // Queue of added events. + closed bool // True when buffer closed. + reason error // Reason buffer is closed. + drainCh chan struct{} // Set when Drain request issued. + numBlocked int // Number of waitors blocked to acquire quota. + canFlush bool + queue *bufferEventChunkQueue // Queue of added events. } } @@ -50,23 +51,54 @@ type blockingBuffer struct { // It will grow the bound account to buffer more messages but will block if it // runs out of space. If ever any entry exceeds the allocatable size of the // account, an error will be returned when attempting to buffer it. -func NewMemBuffer( - acc mon.BoundAccount, sv *settings.Values, metrics *Metrics, opts ...quotapool.Option, +func NewMemBuffer(acc mon.BoundAccount, sv *settings.Values, metrics *Metrics) Buffer { + return newMemBuffer(acc, sv, metrics, nil) +} + +// TestingNewMemBuffer allows test to construct buffer which will invoked +// specified notification function when blocked, waiting for memory. +func TestingNewMemBuffer( + acc mon.BoundAccount, + sv *settings.Values, + metrics *Metrics, + onWaitStart quotapool.OnWaitStartFunc, ) Buffer { - const slowAcquisitionThreshold = 5 * time.Second + return newMemBuffer(acc, sv, metrics, onWaitStart) +} - opts = append(opts, - quotapool.OnSlowAcquisition(slowAcquisitionThreshold, logSlowAcquisition(slowAcquisitionThreshold)), - quotapool.OnWaitFinish( - func(ctx context.Context, poolName string, r quotapool.Request, start time.Time) { - metrics.BufferPushbackNanos.Inc(timeutil.Since(start).Nanoseconds()) - })) +func newMemBuffer( + acc mon.BoundAccount, + sv *settings.Values, + metrics *Metrics, + onWaitStart quotapool.OnWaitStartFunc, +) Buffer { + const slowAcquisitionThreshold = 5 * time.Second b := &blockingBuffer{ signalCh: make(chan struct{}, 1), metrics: metrics, sv: sv, } + + opts := []quotapool.Option{ + quotapool.OnSlowAcquisition(slowAcquisitionThreshold, logSlowAcquisition(slowAcquisitionThreshold)), + quotapool.OnWaitStart(func(ctx context.Context, poolName string, r quotapool.Request) { + if onWaitStart != nil { + onWaitStart(ctx, poolName, r) + } + b.mu.Lock() + b.mu.numBlocked++ + b.mu.Unlock() + }), + quotapool.OnWaitFinish( + func(ctx context.Context, poolName string, r quotapool.Request, start time.Time) { + metrics.BufferPushbackNanos.Inc(timeutil.Since(start).Nanoseconds()) + b.mu.Lock() + b.mu.numBlocked-- + b.mu.Unlock() + }, + ), + } quota := &memQuota{acc: acc, notifyOutOfQuota: b.notifyOutOfQuota} b.qp = allocPool{ AbstractPool: quotapool.New("changefeed", quota, opts...), @@ -88,7 +120,7 @@ func (b *blockingBuffer) pop() (e Event, ok bool, err error) { e, ok = b.mu.queue.dequeue() - if !ok && b.mu.blocked { + if !ok && b.mu.numBlocked > 0 && b.mu.canFlush { // Here, we know that we are blocked, waiting for memory; yet we have nothing queued up // (and thus, no resources that could be released by draining the queue). // This means that all the previously added entries have been read by the consumer, @@ -100,8 +132,6 @@ func (b *blockingBuffer) pop() (e Event, ok bool, err error) { // So, we issue the flush request to the consumer to ensure that we release some memory. e = Event{flush: true} ok = true - // Ensure we notify only once. - b.mu.blocked = false } if b.mu.drainCh != nil && b.mu.queue.empty() { @@ -113,14 +143,10 @@ func (b *blockingBuffer) pop() (e Event, ok bool, err error) { // notifyOutOfQuota is invoked by memQuota to notify blocking buffer that // event is blocked, waiting for more resources. -func (b *blockingBuffer) notifyOutOfQuota() { +func (b *blockingBuffer) notifyOutOfQuota(canFlush bool) { b.mu.Lock() - defer b.mu.Unlock() - - if b.mu.closed { - return - } - b.mu.blocked = true + b.mu.canFlush = canFlush + b.mu.Unlock() select { case b.signalCh <- struct{}{}: @@ -160,7 +186,6 @@ func (b *blockingBuffer) enqueue(ctx context.Context, e Event) (err error) { } b.metrics.BufferEntriesIn.Inc(1) - b.mu.blocked = false b.mu.queue.enqueue(e) select { @@ -290,7 +315,7 @@ type memQuota struct { // When memQuota blocks waiting for resources, invoke the callback // to notify about this. The notification maybe invoked multiple // times for a single request that's blocked. - notifyOutOfQuota func() + notifyOutOfQuota func(canFlush bool) acc mon.BoundAccount } @@ -306,7 +331,25 @@ func (r *memRequest) Acquire( quota := resource.(*memQuota) fulfilled, tryAgainAfter = r.acquireQuota(ctx, quota) if !fulfilled { - quota.notifyOutOfQuota() + // canFlush indicates to the consumer (Get() caller) that it may issue flush + // request if necessary. + // + // Consider the case when we have 2 producers that are blocked (Pa, Pb). + // Consumer will issue flush request if no events are buffered in this + // blocking buffer, and we have blocked producers (i.e. Pa and Pb). As soon + // as flush completes and releases lots of resources (actually, the entirety + // of mem buffer limit worth of resources are released), Pa manages to put + // in the event into the queue. If the consumer consumes that message, plus + // attempts to consume the next message, before Pb had a chance to unblock + // itself, the consumer will mistakenly think that it must flush to release + // resources (i.e. just after 1 message). + // + // canFlush is set to true if we are *really* blocked -- i.e. we + // have non-zero canAllocateBelow threshold; OR in the corner case when + // nothing is allocated (and we are still blocked -- see comment in + // acquireQuota) + canFlush := quota.allocated == 0 || quota.canAllocateBelow > 0 + quota.notifyOutOfQuota(canFlush) } return fulfilled, tryAgainAfter diff --git a/pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go b/pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go index fc118bc11ae0..c3043d4a571a 100644 --- a/pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go +++ b/pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go @@ -80,7 +80,7 @@ func TestBlockingBuffer(t *testing.T) { } } st := cluster.MakeTestingClusterSettings() - buf := kvevent.NewMemBuffer(ba, &st.SV, &metrics, quotapool.OnWaitStart(notifyWait)) + buf := kvevent.TestingNewMemBuffer(ba, &st.SV, &metrics, notifyWait) defer func() { require.NoError(t, buf.CloseWithReason(context.Background(), nil)) }()