Skip to content

Commit

Permalink
Merge #60504
Browse files Browse the repository at this point in the history
60504: kv: fix lock ordering in Replica.applySnapshot r=tbg a=nvanbenschoten

Fixes #60479.

ad78116 was a nice improvement that avoided a number of tricky questions about exposing an inconsistent in-memory replica state during a snapshot. However, it appears to have introduced a deadlock due to lock ordering issues (see referenced issue).

This commit fixes that issue by locking the Store mutex before the Replica mutex in `Replica.applySnapshot`'s combined critical section.

Release note: None


Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed Feb 16, 2021
2 parents 68b86d5 + 3f97668 commit 4ec622e
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 20 deletions.
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/gc_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -907,9 +907,9 @@ func TestGCQueueTransactionTable(t *testing.T) {
batch := tc.engine.NewSnapshot()
defer batch.Close()
tc.repl.raftMu.Lock()
tc.repl.mu.Lock()
tc.repl.assertStateLocked(ctx, batch) // check that in-mem and on-disk state were updated
tc.repl.mu.Unlock()
tc.repl.mu.RLock()
tc.repl.assertStateRaftMuLockedReplicaMuRLocked(ctx, batch) // check that in-mem and on-disk state were updated
tc.repl.mu.RUnlock()
tc.repl.raftMu.Unlock()
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,9 @@ func NewTestStorePool(cfg StoreConfig) *StorePool {
func (r *Replica) AssertState(ctx context.Context, reader storage.Reader) {
r.raftMu.Lock()
defer r.raftMu.Unlock()
r.mu.Lock()
defer r.mu.Unlock()
r.assertStateLocked(ctx, reader)
r.mu.RLock()
defer r.mu.RUnlock()
r.assertStateRaftMuLockedReplicaMuRLocked(ctx, reader)
}

func (r *Replica) RaftLock() {
Expand Down
10 changes: 6 additions & 4 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1134,10 +1134,12 @@ func (r *Replica) State() kvserverpb.RangeInfo {
return ri
}

// assertStateLocked can be called from the Raft goroutine to check that the
// in-memory and on-disk states of the Replica are congruent.
// Requires that both r.raftMu and r.mu are held.
func (r *Replica) assertStateLocked(ctx context.Context, reader storage.Reader) {
// assertStateRaftMuLockedReplicaMuRLocked can be called from the Raft goroutine
// to check that the in-memory and on-disk states of the Replica are congruent.
// Requires that r.raftMu is locked and r.mu is read locked.
func (r *Replica) assertStateRaftMuLockedReplicaMuRLocked(
ctx context.Context, reader storage.Reader,
) {
diskState, err := r.mu.stateLoader.Load(ctx, reader, r.mu.state.Desc)
if err != nil {
log.Fatalf(ctx, "%v", err)
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1045,9 +1045,9 @@ func (sm *replicaStateMachine) ApplySideEffects(
if shouldAssert {
// Assert that the on-disk state doesn't diverge from the in-memory
// state as a result of the side effects.
sm.r.mu.Lock()
sm.r.assertStateLocked(ctx, sm.r.store.Engine())
sm.r.mu.Unlock()
sm.r.mu.RLock()
sm.r.assertStateRaftMuLockedReplicaMuRLocked(ctx, sm.r.store.Engine())
sm.r.mu.RUnlock()
sm.stats.stateAssertions++
}
} else if res := cmd.replicatedResult(); !res.IsZero() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (r *Replica) loadRaftMuLockedReplicaMuLocked(desc *roachpb.RangeDescriptor)
); err != nil {
return errors.Wrap(err, "while initializing sideloaded storage")
}
r.assertStateLocked(ctx, r.store.Engine())
r.assertStateRaftMuLockedReplicaMuRLocked(ctx, r.store.Engine())
return nil
}

Expand Down
14 changes: 12 additions & 2 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -969,15 +969,18 @@ func (r *Replica) applySnapshot(
// consume the SSTs above, meaning that at this point the in-memory state lags
// the on-disk state.

r.mu.Lock()
r.store.mu.Lock()
r.mu.Lock()
if r.store.removePlaceholderLocked(ctx, r.RangeID) {
atomic.AddInt32(&r.store.counts.filledPlaceholders, 1)
}
r.setDescLockedRaftMuLocked(ctx, s.Desc)
if err := r.store.maybeMarkReplicaInitializedLockedReplLocked(ctx, r); err != nil {
log.Fatalf(ctx, "unable to mark replica initialized while applying snapshot: %+v", err)
}
// NOTE: even though we acquired the store mutex first (according to the
// lock ordering rules described on Store.mu), it is safe to drop it first
// without risking a lock-ordering deadlock.
r.store.mu.Unlock()

// Invoke the leasePostApply method to ensure we properly initialize the
Expand Down Expand Up @@ -1010,9 +1013,16 @@ func (r *Replica) applySnapshot(
// Snapshots typically have fewer log entries than the leaseholder. The next
// time we hold the lease, recompute the log size before making decisions.
r.mu.raftLogSizeTrusted = false
r.assertStateLocked(ctx, r.store.Engine())
r.mu.Unlock()

// Assert that the in-memory and on-disk states of the Replica are congruent
// after the application of the snapshot. Do so under a read lock, as this
// operation can be expensive. This is safe, as we hold the Replica.raftMu
// across both Replica.mu critical sections.
r.mu.RLock()
r.assertStateRaftMuLockedReplicaMuRLocked(ctx, r.store.Engine())
r.mu.RUnlock()

// The rangefeed processor is listening for the logical ops attached to
// each raft command. These will be lost during a snapshot, so disconnect
// the rangefeed, if one exists.
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,9 +408,9 @@ func (tc *testContext) addBogusReplicaToRangeDesc(

tc.repl.setDescRaftMuLocked(ctx, &newDesc)
tc.repl.raftMu.Lock()
tc.repl.mu.Lock()
tc.repl.assertStateLocked(ctx, tc.engine)
tc.repl.mu.Unlock()
tc.repl.mu.RLock()
tc.repl.assertStateRaftMuLockedReplicaMuRLocked(ctx, tc.engine)
tc.repl.mu.RUnlock()
tc.repl.raftMu.Unlock()
return secondReplica, nil
}
Expand Down Expand Up @@ -9989,7 +9989,7 @@ func TestReplicaRecomputeStats(t *testing.T) {
disturbMS.ContainsEstimates = 0
ms.Add(*disturbMS)
err := repl.raftMu.stateLoader.SetMVCCStats(ctx, tc.engine, ms)
repl.assertStateLocked(ctx, tc.engine)
repl.assertStateRaftMuLockedReplicaMuRLocked(ctx, tc.engine)
repl.mu.Unlock()
repl.raftMu.Unlock()

Expand Down

0 comments on commit 4ec622e

Please sign in to comment.