Skip to content

Commit

Permalink
kvserver: fix race in durability callback queueing in raftLogTruncator
Browse files Browse the repository at this point in the history
The existing code admitted the following interleaving between
thread-1, running the async raft log truncation, and thread-2
which is running a new durabilityAdvancedCallback.

thread-1: executes queued := t.mu.queuedDurabilityCB and
 sees queued is false
thread-2: sees t.mu.runningTruncation is true and sets
 t.mu.queuedDurabilityCB = true
thread-1: Sets t.mu.runningTruncation = false and returns

Now the queued callback will never run. This can happen in tests
that wait for truncation before doing the next truncation step,
because they will stop waiting once the truncation is observed
on a Replica, which happens before any of the steps listed above
for thread-1.

Fixes cockroachdb#77046

Release justification: Bug fix

Release note: None
  • Loading branch information
sumeerbhola committed Mar 2, 2022
1 parent bb54797 commit 3ba6dcb
Showing 1 changed file with 14 additions and 12 deletions.
26 changes: 14 additions & 12 deletions pkg/kv/kvserver/raft_log_truncator.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,19 +380,10 @@ func (r rangesByRangeID) Swap(i, j int) {
// deadlock (see storage.Engine.RegisterFlushCompletedCallback).
func (t *raftLogTruncator) durabilityAdvancedCallback() {
runTruncation := false
doneRunning := func() {}
t.mu.Lock()
if !t.mu.runningTruncation && len(t.mu.addRanges) > 0 {
runTruncation = true
t.mu.runningTruncation = true
doneRunning = func() {
t.mu.Lock()
defer t.mu.Unlock()
if !t.mu.runningTruncation {
panic("expected runningTruncation")
}
t.mu.runningTruncation = false
}
}
if !runTruncation && len(t.mu.addRanges) > 0 {
t.mu.queuedDurabilityCB = true
Expand All @@ -403,20 +394,31 @@ func (t *raftLogTruncator) durabilityAdvancedCallback() {
}
if err := t.stopper.RunAsyncTask(t.ambientCtx, "raft-log-truncation",
func(ctx context.Context) {
defer doneRunning()
for {
t.durabilityAdvanced(ctx)
shouldReturn := false
t.mu.Lock()
queued := t.mu.queuedDurabilityCB
t.mu.queuedDurabilityCB = false
t.mu.Unlock()
if !queued {
t.mu.runningTruncation = false
shouldReturn = true
}
t.mu.Unlock()
if shouldReturn {
return
}
}
}); err != nil {
// Task did not run because stopper is stopped.
doneRunning()
func() {
t.mu.Lock()
defer t.mu.Unlock()
if !t.mu.runningTruncation {
panic("expected runningTruncation")
}
t.mu.runningTruncation = false
}()
}
}

Expand Down

0 comments on commit 3ba6dcb

Please sign in to comment.