Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
86734: kvserver: avoid race in preSplitApply r=erikgrinaker a=tbg

When `splitPreApply` has to handle a right-hand side replica that is
newer than the split, the split needs to throw the "snapshot" it was
going to install into the right-hand side away. It does so by deleting
all data in the RHS and replacing the raft state bits. It is using
the RHS replica's stateloader to that effect, but didn't actually
hold the raftMu to make this safe. The mutex acquisition has been
added.

Fixes #86669.
Fixes #86734.

No release note since the bug shouldn't be visible to end users (it is
very rare in the first place, and having noticeable effect even rarer),
and if so it would likely look like unspecific Raft corruption that will
be hard to trace back to this race.

Release justification: this will merge on master only after branch cut.
Release note: None


87385: roachtest: update a comment r=renatolabs a=tbg

Release justification: changes a comment in testing code.
Release note: None


87464: kvevent: Ensure out of quota events correctly handled r=miretskiy a=miretskiy

Ensure that out of quota events are not lost and propagated if necessary to the consumer.

Prior to this change, it was possible for an out of quota notification to be "lost" because "blocked" bit would be cleared out when an event was enqueued.
Instead of relying on a boolean bit, we now keep track of the number of consumers currently blocked, and issue flush request if there are non-zero blocked consumers with zero events currently queued.

Fixes #86828

Release justification: bug fix
Release note: None

87511: authors: add angeladietz to authors r=angeladietz a=angeladietz

Release note: None
Release justification: non-production code change

Co-authored-by: Tobias Grieger <[email protected]>
Co-authored-by: Yevgeniy Miretskiy <[email protected]>
Co-authored-by: Angela Dietz <[email protected]>
  • Loading branch information
4 people committed Sep 8, 2022
5 parents d33e93f + f19bb2a + 777ead5 + 12a1b04 + a55e538 commit 05b4853
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 53 deletions.
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ Andy Kimball <[email protected]> <[email protected]> <32096062+andy-k
Andy Woods <[email protected]> Andrew Woods <[email protected]>
Andy Yang <[email protected]> <[email protected]> andyyang890 <[email protected]>
Angela Chang <[email protected]> changangela <[email protected]> <[email protected]>
Angela Dietz <[email protected]>
Angela Wen <[email protected]> angelapwen <[email protected]>
Angela Xu <[email protected]> angelazxu <[email protected]>
Anne Zhu <[email protected]>
Expand Down
97 changes: 70 additions & 27 deletions pkg/ccl/changefeedccl/kvevent/blocking_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,35 +38,67 @@ type blockingBuffer struct {

mu struct {
syncutil.Mutex
closed bool // True when buffer closed.
reason error // Reason buffer is closed.
drainCh chan struct{} // Set when Drain request issued.
blocked bool // Set when event is blocked, waiting to acquire quota.
queue *bufferEventChunkQueue // Queue of added events.
closed bool // True when buffer closed.
reason error // Reason buffer is closed.
drainCh chan struct{} // Set when Drain request issued.
numBlocked int // Number of waitors blocked to acquire quota.
canFlush bool
queue *bufferEventChunkQueue // Queue of added events.
}
}

// NewMemBuffer returns a new in-memory buffer which will store events.
// It will grow the bound account to buffer more messages but will block if it
// runs out of space. If ever any entry exceeds the allocatable size of the
// account, an error will be returned when attempting to buffer it.
func NewMemBuffer(
acc mon.BoundAccount, sv *settings.Values, metrics *Metrics, opts ...quotapool.Option,
func NewMemBuffer(acc mon.BoundAccount, sv *settings.Values, metrics *Metrics) Buffer {
return newMemBuffer(acc, sv, metrics, nil)
}

// TestingNewMemBuffer allows test to construct buffer which will invoked
// specified notification function when blocked, waiting for memory.
func TestingNewMemBuffer(
acc mon.BoundAccount,
sv *settings.Values,
metrics *Metrics,
onWaitStart quotapool.OnWaitStartFunc,
) Buffer {
const slowAcquisitionThreshold = 5 * time.Second
return newMemBuffer(acc, sv, metrics, onWaitStart)
}

opts = append(opts,
quotapool.OnSlowAcquisition(slowAcquisitionThreshold, logSlowAcquisition(slowAcquisitionThreshold)),
quotapool.OnWaitFinish(
func(ctx context.Context, poolName string, r quotapool.Request, start time.Time) {
metrics.BufferPushbackNanos.Inc(timeutil.Since(start).Nanoseconds())
}))
func newMemBuffer(
acc mon.BoundAccount,
sv *settings.Values,
metrics *Metrics,
onWaitStart quotapool.OnWaitStartFunc,
) Buffer {
const slowAcquisitionThreshold = 5 * time.Second

b := &blockingBuffer{
signalCh: make(chan struct{}, 1),
metrics: metrics,
sv: sv,
}

opts := []quotapool.Option{
quotapool.OnSlowAcquisition(slowAcquisitionThreshold, logSlowAcquisition(slowAcquisitionThreshold)),
quotapool.OnWaitStart(func(ctx context.Context, poolName string, r quotapool.Request) {
if onWaitStart != nil {
onWaitStart(ctx, poolName, r)
}
b.mu.Lock()
b.mu.numBlocked++
b.mu.Unlock()
}),
quotapool.OnWaitFinish(
func(ctx context.Context, poolName string, r quotapool.Request, start time.Time) {
metrics.BufferPushbackNanos.Inc(timeutil.Since(start).Nanoseconds())
b.mu.Lock()
b.mu.numBlocked--
b.mu.Unlock()
},
),
}
quota := &memQuota{acc: acc, notifyOutOfQuota: b.notifyOutOfQuota}
b.qp = allocPool{
AbstractPool: quotapool.New("changefeed", quota, opts...),
Expand All @@ -88,7 +120,7 @@ func (b *blockingBuffer) pop() (e Event, ok bool, err error) {

e, ok = b.mu.queue.dequeue()

if !ok && b.mu.blocked {
if !ok && b.mu.numBlocked > 0 && b.mu.canFlush {
// Here, we know that we are blocked, waiting for memory; yet we have nothing queued up
// (and thus, no resources that could be released by draining the queue).
// This means that all the previously added entries have been read by the consumer,
Expand All @@ -100,8 +132,6 @@ func (b *blockingBuffer) pop() (e Event, ok bool, err error) {
// So, we issue the flush request to the consumer to ensure that we release some memory.
e = Event{flush: true}
ok = true
// Ensure we notify only once.
b.mu.blocked = false
}

if b.mu.drainCh != nil && b.mu.queue.empty() {
Expand All @@ -113,14 +143,10 @@ func (b *blockingBuffer) pop() (e Event, ok bool, err error) {

// notifyOutOfQuota is invoked by memQuota to notify blocking buffer that
// event is blocked, waiting for more resources.
func (b *blockingBuffer) notifyOutOfQuota() {
func (b *blockingBuffer) notifyOutOfQuota(canFlush bool) {
b.mu.Lock()
defer b.mu.Unlock()

if b.mu.closed {
return
}
b.mu.blocked = true
b.mu.canFlush = canFlush
b.mu.Unlock()

select {
case b.signalCh <- struct{}{}:
Expand Down Expand Up @@ -160,7 +186,6 @@ func (b *blockingBuffer) enqueue(ctx context.Context, e Event) (err error) {
}

b.metrics.BufferEntriesIn.Inc(1)
b.mu.blocked = false
b.mu.queue.enqueue(e)

select {
Expand Down Expand Up @@ -290,7 +315,7 @@ type memQuota struct {
// When memQuota blocks waiting for resources, invoke the callback
// to notify about this. The notification maybe invoked multiple
// times for a single request that's blocked.
notifyOutOfQuota func()
notifyOutOfQuota func(canFlush bool)

acc mon.BoundAccount
}
Expand All @@ -306,7 +331,25 @@ func (r *memRequest) Acquire(
quota := resource.(*memQuota)
fulfilled, tryAgainAfter = r.acquireQuota(ctx, quota)
if !fulfilled {
quota.notifyOutOfQuota()
// canFlush indicates to the consumer (Get() caller) that it may issue flush
// request if necessary.
//
// Consider the case when we have 2 producers that are blocked (Pa, Pb).
// Consumer will issue flush request if no events are buffered in this
// blocking buffer, and we have blocked producers (i.e. Pa and Pb). As soon
// as flush completes and releases lots of resources (actually, the entirety
// of mem buffer limit worth of resources are released), Pa manages to put
// in the event into the queue. If the consumer consumes that message, plus
// attempts to consume the next message, before Pb had a chance to unblock
// itself, the consumer will mistakenly think that it must flush to release
// resources (i.e. just after 1 message).
//
// canFlush is set to true if we are *really* blocked -- i.e. we
// have non-zero canAllocateBelow threshold; OR in the corner case when
// nothing is allocated (and we are still blocked -- see comment in
// acquireQuota)
canFlush := quota.allocated == 0 || quota.canAllocateBelow > 0
quota.notifyOutOfQuota(canFlush)
}

return fulfilled, tryAgainAfter
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestBlockingBuffer(t *testing.T) {
}
}
st := cluster.MakeTestingClusterSettings()
buf := kvevent.NewMemBuffer(ba, &st.SV, &metrics, quotapool.OnWaitStart(notifyWait))
buf := kvevent.TestingNewMemBuffer(ba, &st.SV, &metrics, notifyWait)
defer func() {
require.NoError(t, buf.CloseWithReason(context.Background(), nil))
}()
Expand Down
8 changes: 3 additions & 5 deletions pkg/cmd/roachtest/test_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,11 +781,9 @@ func (r *testRunner) runTest(
t.end = timeutil.Now()

// We only have to record panics if the panic'd value is not the sentinel
// produced by t.Fatal*().
//
// TODO(test-eng): we shouldn't be seeing errTestFatal here unless this
// goroutine accidentally ends up calling t.Fatal; the test runs in a
// different goroutine.
// produced by t.Fatal*(). We may see calls to t.Fatal from this goroutine
// during the post-flight checks; the test itself runs on a different
// goroutine and has similar code to terminate errTestFatal.
if err := recover(); err != nil && err != errTestFatal {
t.mu.Lock()
t.mu.failed = true
Expand Down
44 changes: 24 additions & 20 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1712,31 +1712,35 @@ func (r *Replica) shouldWaitForPendingMergeRLocked(

// isNewerThanSplit is a helper used in split(Pre|Post)Apply to
// determine whether the Replica on the right hand side of the split must
// have been removed from this store after the split. There is one
// false negative where false will be returned but the hard state may
// be due to a newer replica which is outlined below. It should be safe.
// have been removed from this store after the split.
//
// TODO(ajwerner): Ideally if this store had ever learned that the replica
// created by the split were removed it would not forget that fact.
// There exists one edge case where the store may learn that it should house
// a replica of the same range with a higher replica ID and then forget.
// If the first raft message this store ever receives for the this range
// contains a replica ID higher than the replica ID in the split trigger
// then an in-memory replica at that higher replica ID will be created and
// no tombstone at a lower replica ID will be written. If the server then
// crashes it will forget that it had ever been the higher replica ID. The
// server may then proceed to process the split and initialize a replica at
// the replica ID implied by the split. This is potentially problematic as
// TODO(tbg): the below is true as of 22.2: we persist any Replica's ReplicaID
// under RaftReplicaIDKey, so the below caveats should be addressed now and we
// should be able to simplify isNewerThanSplit to just compare replicaIDs.
//
// TODO(ajwerner): There is one false negative where false will be returned but
// the hard state may be due to a newer replica which is outlined below. It
// should be safe.
// Ideally if this store had ever learned that the replica created by the split
// were removed it would not forget that fact. There exists one edge case where
// the store may learn that it should house a replica of the same range with a
// higher replica ID and then forget. If the first raft message this store ever
// receives for the this range contains a replica ID higher than the replica ID
// in the split trigger then an in-memory replica at that higher replica ID will
// be created and no tombstone at a lower replica ID will be written. If the
// server then crashes it will forget that it had ever been the higher replica
// ID. The server may then proceed to process the split and initialize a replica
// at the replica ID implied by the split. This is potentially problematic as
// the replica may have voted as this higher replica ID and when it rediscovers
// the higher replica ID it will delete all of the state corresponding to the
// older replica ID including its hard state which may have been synthesized
// with votes as the newer replica ID. This case tends to be handled safely
// in practice because the replica should only be receiving messages as the
// newer replica ID after it has been added to the range as a learner.
// with votes as the newer replica ID. This case tends to be handled safely in
// practice because the replica should only be receiving messages as the newer
// replica ID after it has been added to the range as a learner.
//
// Despite the safety due to the change replicas protocol explained above
// it'd be good to know for sure that a replica ID for a range on a store
// is always monotonically increasing, even across restarts.
// Despite the safety due to the change replicas protocol explained above it'd
// be good to know for sure that a replica ID for a range on a store is always
// monotonically increasing, even across restarts.
//
// See TestProcessSplitAfterRightHandSideHasBeenRemoved.
func (r *Replica) isNewerThanSplit(split *roachpb.SplitTrigger) bool {
Expand Down
10 changes: 10 additions & 0 deletions pkg/kv/kvserver/store_split.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ func splitPreApply(
// We're in the rare case where we know that the RHS has been removed
// and re-added with a higher replica ID (and then maybe removed again).
//
// If rightRepl is not nil, we are *not* holding raftMu.
//
// To apply the split, we need to "throw away" the data that would belong to
// the RHS, i.e. we clear the user data the RHS would have inherited from the
// LHS due to the split and additionally clear all of the range ID local state
Expand All @@ -80,8 +82,16 @@ func splitPreApply(
// the HardState and tombstone. Note that we only do this if rightRepl
// exists; if it doesn't, there's no Raft state to massage (when rightRepl
// was removed, a tombstone was written instead).
//
// TODO(tbg): it would be cleaner to teach clearRangeData to only remove
// the replicated state if rightRepl != nil, as opposed to writing back
// internal raft state. As is, it's going to break with any new piece of
// local state that we add, and it introduces locking between two Replicas
// that don't ever need to interact.
var hs raftpb.HardState
if rightRepl != nil {
rightRepl.raftMu.Lock()
defer rightRepl.raftMu.Unlock()
// Assert that the rightRepl is not initialized. We're about to clear out
// the data of the RHS of the split; we cannot have already accepted a
// snapshot to initialize this newer RHS.
Expand Down

0 comments on commit 05b4853

Please sign in to comment.