Skip to content

Commit

Permalink
kvevent: Ensure out of quota events correctly handled
Browse files Browse the repository at this point in the history
Ensure that out of quota events are not lost and propagated
if necessary to the consumer.

Prior to this change, it was possible for an out of quota
notification to be "lost" because "blocked" bit would be cleared
out when an event was enqueued.
Instead of relying on a boolean bit, we now keep track of the
number of consumers currently blocked, and issue flush request
if there are non-zero blocked consumers with zero events
currently queued.

Fixes cockroachdb#86828

Release justification: bug fix
Release note: None
  • Loading branch information
Yevgeniy Miretskiy committed Sep 7, 2022
1 parent a1d7e47 commit ed685b8
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 28 deletions.
79 changes: 52 additions & 27 deletions pkg/ccl/changefeedccl/kvevent/blocking_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,35 +38,67 @@ type blockingBuffer struct {

mu struct {
syncutil.Mutex
closed bool // True when buffer closed.
reason error // Reason buffer is closed.
drainCh chan struct{} // Set when Drain request issued.
blocked bool // Set when event is blocked, waiting to acquire quota.
queue *bufferEventChunkQueue // Queue of added events.
closed bool // True when buffer closed.
reason error // Reason buffer is closed.
drainCh chan struct{} // Set when Drain request issued.
numBlocked int // Number of waitors blocked to acquire quota.
canFlush bool
queue *bufferEventChunkQueue // Queue of added events.
}
}

// NewMemBuffer returns a new in-memory buffer which will store events.
// It will grow the bound account to buffer more messages but will block if it
// runs out of space. If ever any entry exceeds the allocatable size of the
// account, an error will be returned when attempting to buffer it.
func NewMemBuffer(
acc mon.BoundAccount, sv *settings.Values, metrics *Metrics, opts ...quotapool.Option,
func NewMemBuffer(acc mon.BoundAccount, sv *settings.Values, metrics *Metrics) Buffer {
return newMemBuffer(acc, sv, metrics, nil)
}

// TestingNewMemBuffer allows test to construct buffer which will invoked
// specified notification function when blocked, waiting for memory.
func TestingNewMemBuffer(
acc mon.BoundAccount,
sv *settings.Values,
metrics *Metrics,
onWaitStart quotapool.OnWaitStartFunc,
) Buffer {
const slowAcquisitionThreshold = 5 * time.Second
return newMemBuffer(acc, sv, metrics, onWaitStart)
}

opts = append(opts,
quotapool.OnSlowAcquisition(slowAcquisitionThreshold, logSlowAcquisition(slowAcquisitionThreshold)),
quotapool.OnWaitFinish(
func(ctx context.Context, poolName string, r quotapool.Request, start time.Time) {
metrics.BufferPushbackNanos.Inc(timeutil.Since(start).Nanoseconds())
}))
func newMemBuffer(
acc mon.BoundAccount,
sv *settings.Values,
metrics *Metrics,
onWaitStart quotapool.OnWaitStartFunc,
) Buffer {
const slowAcquisitionThreshold = 5 * time.Second

b := &blockingBuffer{
signalCh: make(chan struct{}, 1),
metrics: metrics,
sv: sv,
}

opts := []quotapool.Option{
quotapool.OnSlowAcquisition(slowAcquisitionThreshold, logSlowAcquisition(slowAcquisitionThreshold)),
quotapool.OnWaitStart(func(ctx context.Context, poolName string, r quotapool.Request) {
if onWaitStart != nil {
onWaitStart(ctx, poolName, r)
}
b.mu.Lock()
b.mu.numBlocked++
b.mu.Unlock()
}),
quotapool.OnWaitFinish(
func(ctx context.Context, poolName string, r quotapool.Request, start time.Time) {
metrics.BufferPushbackNanos.Inc(timeutil.Since(start).Nanoseconds())
b.mu.Lock()
b.mu.numBlocked--
b.mu.Unlock()
},
),
}
quota := &memQuota{acc: acc, notifyOutOfQuota: b.notifyOutOfQuota}
b.qp = allocPool{
AbstractPool: quotapool.New("changefeed", quota, opts...),
Expand All @@ -88,7 +120,7 @@ func (b *blockingBuffer) pop() (e Event, ok bool, err error) {

e, ok = b.mu.queue.dequeue()

if !ok && b.mu.blocked {
if !ok && b.mu.numBlocked > 0 && b.mu.canFlush {
// Here, we know that we are blocked, waiting for memory; yet we have nothing queued up
// (and thus, no resources that could be released by draining the queue).
// This means that all the previously added entries have been read by the consumer,
Expand All @@ -100,8 +132,6 @@ func (b *blockingBuffer) pop() (e Event, ok bool, err error) {
// So, we issue the flush request to the consumer to ensure that we release some memory.
e = Event{flush: true}
ok = true
// Ensure we notify only once.
b.mu.blocked = false
}

if b.mu.drainCh != nil && b.mu.queue.empty() {
Expand All @@ -113,14 +143,10 @@ func (b *blockingBuffer) pop() (e Event, ok bool, err error) {

// notifyOutOfQuota is invoked by memQuota to notify blocking buffer that
// event is blocked, waiting for more resources.
func (b *blockingBuffer) notifyOutOfQuota() {
func (b *blockingBuffer) notifyOutOfQuota(canFlush bool) {
b.mu.Lock()
defer b.mu.Unlock()

if b.mu.closed {
return
}
b.mu.blocked = true
b.mu.canFlush = canFlush
b.mu.Unlock()

select {
case b.signalCh <- struct{}{}:
Expand Down Expand Up @@ -160,7 +186,6 @@ func (b *blockingBuffer) enqueue(ctx context.Context, e Event) (err error) {
}

b.metrics.BufferEntriesIn.Inc(1)
b.mu.blocked = false
b.mu.queue.enqueue(e)

select {
Expand Down Expand Up @@ -290,7 +315,7 @@ type memQuota struct {
// When memQuota blocks waiting for resources, invoke the callback
// to notify about this. The notification maybe invoked multiple
// times for a single request that's blocked.
notifyOutOfQuota func()
notifyOutOfQuota func(canFlush bool)

acc mon.BoundAccount
}
Expand All @@ -306,7 +331,7 @@ func (r *memRequest) Acquire(
quota := resource.(*memQuota)
fulfilled, tryAgainAfter = r.acquireQuota(ctx, quota)
if !fulfilled {
quota.notifyOutOfQuota()
quota.notifyOutOfQuota(quota.allocated == 0 || quota.canAllocateBelow > 0)
}

return fulfilled, tryAgainAfter
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestBlockingBuffer(t *testing.T) {
}
}
st := cluster.MakeTestingClusterSettings()
buf := kvevent.NewMemBuffer(ba, &st.SV, &metrics, quotapool.OnWaitStart(notifyWait))
buf := kvevent.TestingNewMemBuffer(ba, &st.SV, &metrics, notifyWait)
defer func() {
require.NoError(t, buf.CloseWithReason(context.Background(), nil))
}()
Expand Down

0 comments on commit ed685b8

Please sign in to comment.