Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvevent: Ensure out of quota events correctly handled #87464

Merged
merged 1 commit into from
Sep 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 70 additions & 27 deletions pkg/ccl/changefeedccl/kvevent/blocking_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,35 +38,67 @@ type blockingBuffer struct {

mu struct {
syncutil.Mutex
closed bool // True when buffer closed.
reason error // Reason buffer is closed.
drainCh chan struct{} // Set when Drain request issued.
blocked bool // Set when event is blocked, waiting to acquire quota.
queue *bufferEventChunkQueue // Queue of added events.
closed bool // True when buffer closed.
reason error // Reason buffer is closed.
drainCh chan struct{} // Set when Drain request issued.
numBlocked int // Number of waitors blocked to acquire quota.
canFlush bool
queue *bufferEventChunkQueue // Queue of added events.
}
}

// NewMemBuffer returns a new in-memory buffer which will store events.
// It will grow the bound account to buffer more messages but will block if it
// runs out of space. If ever any entry exceeds the allocatable size of the
// account, an error will be returned when attempting to buffer it.
func NewMemBuffer(
acc mon.BoundAccount, sv *settings.Values, metrics *Metrics, opts ...quotapool.Option,
func NewMemBuffer(acc mon.BoundAccount, sv *settings.Values, metrics *Metrics) Buffer {
return newMemBuffer(acc, sv, metrics, nil)
}

// TestingNewMemBuffer allows test to construct buffer which will invoked
// specified notification function when blocked, waiting for memory.
func TestingNewMemBuffer(
acc mon.BoundAccount,
sv *settings.Values,
metrics *Metrics,
onWaitStart quotapool.OnWaitStartFunc,
) Buffer {
const slowAcquisitionThreshold = 5 * time.Second
return newMemBuffer(acc, sv, metrics, onWaitStart)
}

opts = append(opts,
quotapool.OnSlowAcquisition(slowAcquisitionThreshold, logSlowAcquisition(slowAcquisitionThreshold)),
quotapool.OnWaitFinish(
func(ctx context.Context, poolName string, r quotapool.Request, start time.Time) {
metrics.BufferPushbackNanos.Inc(timeutil.Since(start).Nanoseconds())
}))
func newMemBuffer(
acc mon.BoundAccount,
sv *settings.Values,
metrics *Metrics,
onWaitStart quotapool.OnWaitStartFunc,
) Buffer {
const slowAcquisitionThreshold = 5 * time.Second

b := &blockingBuffer{
signalCh: make(chan struct{}, 1),
metrics: metrics,
sv: sv,
}

opts := []quotapool.Option{
quotapool.OnSlowAcquisition(slowAcquisitionThreshold, logSlowAcquisition(slowAcquisitionThreshold)),
quotapool.OnWaitStart(func(ctx context.Context, poolName string, r quotapool.Request) {
if onWaitStart != nil {
onWaitStart(ctx, poolName, r)
}
b.mu.Lock()
b.mu.numBlocked++
b.mu.Unlock()
}),
quotapool.OnWaitFinish(
func(ctx context.Context, poolName string, r quotapool.Request, start time.Time) {
metrics.BufferPushbackNanos.Inc(timeutil.Since(start).Nanoseconds())
b.mu.Lock()
b.mu.numBlocked--
b.mu.Unlock()
},
),
}
quota := &memQuota{acc: acc, notifyOutOfQuota: b.notifyOutOfQuota}
b.qp = allocPool{
AbstractPool: quotapool.New("changefeed", quota, opts...),
Expand All @@ -88,7 +120,7 @@ func (b *blockingBuffer) pop() (e Event, ok bool, err error) {

e, ok = b.mu.queue.dequeue()

if !ok && b.mu.blocked {
if !ok && b.mu.numBlocked > 0 && b.mu.canFlush {
// Here, we know that we are blocked, waiting for memory; yet we have nothing queued up
// (and thus, no resources that could be released by draining the queue).
// This means that all the previously added entries have been read by the consumer,
Expand All @@ -100,8 +132,6 @@ func (b *blockingBuffer) pop() (e Event, ok bool, err error) {
// So, we issue the flush request to the consumer to ensure that we release some memory.
e = Event{flush: true}
ok = true
// Ensure we notify only once.
b.mu.blocked = false
}

if b.mu.drainCh != nil && b.mu.queue.empty() {
Expand All @@ -113,14 +143,10 @@ func (b *blockingBuffer) pop() (e Event, ok bool, err error) {

// notifyOutOfQuota is invoked by memQuota to notify blocking buffer that
// event is blocked, waiting for more resources.
func (b *blockingBuffer) notifyOutOfQuota() {
func (b *blockingBuffer) notifyOutOfQuota(canFlush bool) {
b.mu.Lock()
defer b.mu.Unlock()

if b.mu.closed {
return
}
b.mu.blocked = true
b.mu.canFlush = canFlush
b.mu.Unlock()

select {
case b.signalCh <- struct{}{}:
Expand Down Expand Up @@ -160,7 +186,6 @@ func (b *blockingBuffer) enqueue(ctx context.Context, e Event) (err error) {
}

b.metrics.BufferEntriesIn.Inc(1)
b.mu.blocked = false
b.mu.queue.enqueue(e)

select {
Expand Down Expand Up @@ -290,7 +315,7 @@ type memQuota struct {
// When memQuota blocks waiting for resources, invoke the callback
// to notify about this. The notification maybe invoked multiple
// times for a single request that's blocked.
notifyOutOfQuota func()
notifyOutOfQuota func(canFlush bool)

acc mon.BoundAccount
}
Expand All @@ -306,7 +331,25 @@ func (r *memRequest) Acquire(
quota := resource.(*memQuota)
fulfilled, tryAgainAfter = r.acquireQuota(ctx, quota)
if !fulfilled {
quota.notifyOutOfQuota()
// canFlush indicates to the consumer (Get() caller) that it may issue flush
// request if necessary.
//
// Consider the case when we have 2 producers that are blocked (Pa, Pb).
// Consumer will issue flush request if no events are buffered in this
// blocking buffer, and we have blocked producers (i.e. Pa and Pb). As soon
// as flush completes and releases lots of resources (actually, the entirety
// of mem buffer limit worth of resources are released), Pa manages to put
// in the event into the queue. If the consumer consumes that message, plus
// attempts to consume the next message, before Pb had a chance to unblock
// itself, the consumer will mistakenly think that it must flush to release
// resources (i.e. just after 1 message).
//
// canFlush is set to true if we are *really* blocked -- i.e. we
// have non-zero canAllocateBelow threshold; OR in the corner case when
// nothing is allocated (and we are still blocked -- see comment in
// acquireQuota)
canFlush := quota.allocated == 0 || quota.canAllocateBelow > 0
quota.notifyOutOfQuota(canFlush)
}

return fulfilled, tryAgainAfter
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestBlockingBuffer(t *testing.T) {
}
}
st := cluster.MakeTestingClusterSettings()
buf := kvevent.NewMemBuffer(ba, &st.SV, &metrics, quotapool.OnWaitStart(notifyWait))
buf := kvevent.TestingNewMemBuffer(ba, &st.SV, &metrics, notifyWait)
defer func() {
require.NoError(t, buf.CloseWithReason(context.Background(), nil))
}()
Expand Down