Skip to content

Commit

Permalink
kvevent: Ensure out of quota events correctly handled
Browse files Browse the repository at this point in the history
Ensure that out of quota events are not lost and propagated
if necessary to the consumer.

Prior to this change, it was possible for an out of quota
notification to be "lost" because "blocked" bit would be cleared
out when an event was enqueued.
Instead of relying on a boolean bit, we now keep track of the
number of consumers currently blocked, and issue flush request
if there are non-zero blocked consumers with zero events
currently queued.

Fixes #86828

Release justification: bug fix
Release note: None
  • Loading branch information
Yevgeniy Miretskiy committed Sep 7, 2022
1 parent a1d7e47 commit 12a1b04
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 28 deletions.
97 changes: 70 additions & 27 deletions pkg/ccl/changefeedccl/kvevent/blocking_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,35 +38,67 @@ type blockingBuffer struct {

mu struct {
syncutil.Mutex
closed bool // True when buffer closed.
reason error // Reason buffer is closed.
drainCh chan struct{} // Set when Drain request issued.
blocked bool // Set when event is blocked, waiting to acquire quota.
queue *bufferEventChunkQueue // Queue of added events.
closed bool // True when buffer closed.
reason error // Reason buffer is closed.
drainCh chan struct{} // Set when Drain request issued.
numBlocked int // Number of waitors blocked to acquire quota.
canFlush bool
queue *bufferEventChunkQueue // Queue of added events.
}
}

// NewMemBuffer returns a new in-memory buffer which will store events.
// It will grow the bound account to buffer more messages but will block if it
// runs out of space. If ever any entry exceeds the allocatable size of the
// account, an error will be returned when attempting to buffer it.
func NewMemBuffer(
acc mon.BoundAccount, sv *settings.Values, metrics *Metrics, opts ...quotapool.Option,
func NewMemBuffer(acc mon.BoundAccount, sv *settings.Values, metrics *Metrics) Buffer {
return newMemBuffer(acc, sv, metrics, nil)
}

// TestingNewMemBuffer allows test to construct buffer which will invoked
// specified notification function when blocked, waiting for memory.
func TestingNewMemBuffer(
acc mon.BoundAccount,
sv *settings.Values,
metrics *Metrics,
onWaitStart quotapool.OnWaitStartFunc,
) Buffer {
const slowAcquisitionThreshold = 5 * time.Second
return newMemBuffer(acc, sv, metrics, onWaitStart)
}

opts = append(opts,
quotapool.OnSlowAcquisition(slowAcquisitionThreshold, logSlowAcquisition(slowAcquisitionThreshold)),
quotapool.OnWaitFinish(
func(ctx context.Context, poolName string, r quotapool.Request, start time.Time) {
metrics.BufferPushbackNanos.Inc(timeutil.Since(start).Nanoseconds())
}))
func newMemBuffer(
acc mon.BoundAccount,
sv *settings.Values,
metrics *Metrics,
onWaitStart quotapool.OnWaitStartFunc,
) Buffer {
const slowAcquisitionThreshold = 5 * time.Second

b := &blockingBuffer{
signalCh: make(chan struct{}, 1),
metrics: metrics,
sv: sv,
}

opts := []quotapool.Option{
quotapool.OnSlowAcquisition(slowAcquisitionThreshold, logSlowAcquisition(slowAcquisitionThreshold)),
quotapool.OnWaitStart(func(ctx context.Context, poolName string, r quotapool.Request) {
if onWaitStart != nil {
onWaitStart(ctx, poolName, r)
}
b.mu.Lock()
b.mu.numBlocked++
b.mu.Unlock()
}),
quotapool.OnWaitFinish(
func(ctx context.Context, poolName string, r quotapool.Request, start time.Time) {
metrics.BufferPushbackNanos.Inc(timeutil.Since(start).Nanoseconds())
b.mu.Lock()
b.mu.numBlocked--
b.mu.Unlock()
},
),
}
quota := &memQuota{acc: acc, notifyOutOfQuota: b.notifyOutOfQuota}
b.qp = allocPool{
AbstractPool: quotapool.New("changefeed", quota, opts...),
Expand All @@ -88,7 +120,7 @@ func (b *blockingBuffer) pop() (e Event, ok bool, err error) {

e, ok = b.mu.queue.dequeue()

if !ok && b.mu.blocked {
if !ok && b.mu.numBlocked > 0 && b.mu.canFlush {
// Here, we know that we are blocked, waiting for memory; yet we have nothing queued up
// (and thus, no resources that could be released by draining the queue).
// This means that all the previously added entries have been read by the consumer,
Expand All @@ -100,8 +132,6 @@ func (b *blockingBuffer) pop() (e Event, ok bool, err error) {
// So, we issue the flush request to the consumer to ensure that we release some memory.
e = Event{flush: true}
ok = true
// Ensure we notify only once.
b.mu.blocked = false
}

if b.mu.drainCh != nil && b.mu.queue.empty() {
Expand All @@ -113,14 +143,10 @@ func (b *blockingBuffer) pop() (e Event, ok bool, err error) {

// notifyOutOfQuota is invoked by memQuota to notify blocking buffer that
// event is blocked, waiting for more resources.
func (b *blockingBuffer) notifyOutOfQuota() {
func (b *blockingBuffer) notifyOutOfQuota(canFlush bool) {
b.mu.Lock()
defer b.mu.Unlock()

if b.mu.closed {
return
}
b.mu.blocked = true
b.mu.canFlush = canFlush
b.mu.Unlock()

select {
case b.signalCh <- struct{}{}:
Expand Down Expand Up @@ -160,7 +186,6 @@ func (b *blockingBuffer) enqueue(ctx context.Context, e Event) (err error) {
}

b.metrics.BufferEntriesIn.Inc(1)
b.mu.blocked = false
b.mu.queue.enqueue(e)

select {
Expand Down Expand Up @@ -290,7 +315,7 @@ type memQuota struct {
// When memQuota blocks waiting for resources, invoke the callback
// to notify about this. The notification maybe invoked multiple
// times for a single request that's blocked.
notifyOutOfQuota func()
notifyOutOfQuota func(canFlush bool)

acc mon.BoundAccount
}
Expand All @@ -306,7 +331,25 @@ func (r *memRequest) Acquire(
quota := resource.(*memQuota)
fulfilled, tryAgainAfter = r.acquireQuota(ctx, quota)
if !fulfilled {
quota.notifyOutOfQuota()
// canFlush indicates to the consumer (Get() caller) that it may issue flush
// request if necessary.
//
// Consider the case when we have 2 producers that are blocked (Pa, Pb).
// Consumer will issue flush request if no events are buffered in this
// blocking buffer, and we have blocked producers (i.e. Pa and Pb). As soon
// as flush completes and releases lots of resources (actually, the entirety
// of mem buffer limit worth of resources are released), Pa manages to put
// in the event into the queue. If the consumer consumes that message, plus
// attempts to consume the next message, before Pb had a chance to unblock
// itself, the consumer will mistakenly think that it must flush to release
// resources (i.e. just after 1 message).
//
// canFlush is set to true if we are *really* blocked -- i.e. we
// have non-zero canAllocateBelow threshold; OR in the corner case when
// nothing is allocated (and we are still blocked -- see comment in
// acquireQuota)
canFlush := quota.allocated == 0 || quota.canAllocateBelow > 0
quota.notifyOutOfQuota(canFlush)
}

return fulfilled, tryAgainAfter
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestBlockingBuffer(t *testing.T) {
}
}
st := cluster.MakeTestingClusterSettings()
buf := kvevent.NewMemBuffer(ba, &st.SV, &metrics, quotapool.OnWaitStart(notifyWait))
buf := kvevent.TestingNewMemBuffer(ba, &st.SV, &metrics, notifyWait)
defer func() {
require.NoError(t, buf.CloseWithReason(context.Background(), nil))
}()
Expand Down

0 comments on commit 12a1b04

Please sign in to comment.