Skip to content

Commit

Permalink
kvserver: avoid unprotected keyspace during MergeRange
Browse files Browse the repository at this point in the history
In cockroachdb#73721 we saw the following assertion fire:

> kv/kvserver/replica_raftstorage.go:932  [n4,s4,r46/3:{-}] 1  unable to
> remove placeholder: corrupted replicasByKey map: <nil> and [...]

This is because `MergeRange` removes from the Store's in-memory map the
right-hand side Replica before extending the left-hand side, leaving a
gap for a snapshot to sneak in. A similar problem exists when a snapshot
widens the existing range (i.e. the snapshot reflects the results of a
merge). This commit closes both gaps.

I verified the fix by inserting this code & calling it at the top of
`(*Store).MergeRange` as well as `applySnapshot`:

```go
func (s *Store) assertNoHole(ctx context.Context, from, to roachpb.RKey) func() {
	caller := string(debug.Stack())
	if from.Equal(roachpb.RKeyMax) {
		// There will be a hole to the right of RKeyMax but it's just the end of
		// the addressable keyspace.
		return func() {}
	}
	// Check that there's never a gap to the right of the pre-merge LHS in replicasByKey.
	ctx, stopAsserting := context.WithCancel(ctx)
	_ = s.stopper.RunAsyncTask(ctx, "force-assertion", func(ctx context.Context) {
		for ctx.Err() == nil {
			func() {
				s.mu.Lock()
				defer s.mu.Unlock()
				var it replicaOrPlaceholder
				if err := s.mu.replicasByKey.VisitKeyRange(
					context.Background(), from, to, AscendingKeyOrder,
					func(ctx context.Context, iit replicaOrPlaceholder) error {
						it = iit
						return iterutil.StopIteration()
					}); err != nil {
					log.Fatalf(ctx, "%v", err)
				}
				if it.item != nil {
					return
				}
				log.Fatalf(ctx, "found hole in keyspace [%s,%s), during:\n%s", from, to, caller)
			}()
		}
	})
	return stopAsserting
}
```

```go
// (*Store).applySnapshot
	{
		var from, to roachpb.RKey
		if isInitialSnap {
			// For uninitialized replicas, there must be a placeholder that covers
			// the snapshot's bounds, so basically check that. A synchronous check
			// here would be simpler but this works well enough.
			d := inSnap.placeholder.Desc()
			from, to = d.StartKey, d.EndKey
		} else {
			// For snapshots to existing replicas, from and to usually match (i.e.
			// nothing is asserted) but if the snapshot spans a merge then we're
			// going to assert that we're transferring the keyspace from the subsumed
			// replicas to this replica seamlessly.
			d := r.Desc()
			from, to = d.EndKey, inSnap.Desc.EndKey
		}

		defer r.store.assertNoHole(ctx, from, to)()
	}

// (*Store).MergeRange
	defer s.assertNoHole(ctx, leftRepl.Desc().EndKey, newLeftDesc.EndKey)()
```

The bug reproduced, before this fix, in
`TestStoreRangeMergeTimestampCache` and
`TestChangeReplicasLeaveAtomicRacesWithMerge`, covering both the
snapshot and merge trigger cases. I'm not too happy to merge this
without the same kind of active test coverage, but the above has a
chance of false positives (if Replica gets removed while assertion loop
still running) and it's unclear when exactly we would enable it (behind
the `crdbtest` tag perhaps)?

I am dissatisfied with a few things I realized (or rather, rediscovered)
while working on this, but since this PR needs to be backported possibly
to all past versions, I am refraining from any refactors. Nevertheless,
here's what annoyed me:

- There is no unified API for managing the store's tracked replicas. As
  a result, there are lots of callers meddling with the `replicasByKey`,
  `uninitializedRanges`, etc, maps, adding complexity.
- the `replicasByKey` btree contains initialized `Replicas` and uses
  their key bounds. This makes for a fairly complex locking story, and
  in particular it's easy to deadlock when holding any replica's lock and
  accessing the btree. It could be easier, in conjunction with the above
  point, to make the btree not hold the `Replica` directly, and to mutate
  the btree in a critical section with calling `(*Replica).setDescLocked`.

Release note (bug fix): A bug was fixed that, in very rare cases, could
result in a node terminating with fatal error "unable to remove
placeholder: corrupted replicasByKey map". To avoid potential data
corruption, users affected by this crash should not restart the node,
but instead decommission it in absentia and have it rejoin the cluster
under a new NodeID.
  • Loading branch information
tbg committed Dec 17, 2021
1 parent 02946c3 commit 3d1c09d
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 31 deletions.
35 changes: 35 additions & 0 deletions pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"fmt"
"math/rand"
"runtime/debug"
"testing"
"time"

Expand All @@ -36,6 +37,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
Expand Down Expand Up @@ -203,6 +205,39 @@ func (s *Store) RaftSchedulerPriorityID() roachpb.RangeID {
return s.scheduler.PriorityID()
}

func (s *Store) assertNoHole(ctx context.Context, from, to roachpb.RKey) func() {
caller := string(debug.Stack())
if from.Equal(roachpb.RKeyMax) {
// There will be a hole to the right of RKeyMax but it's just the end of
// the addressable keyspace.
return func() {}
}
// Check that there's never a gap to the right of the pre-merge LHS in replicasByKey.
ctx, stopAsserting := context.WithCancel(ctx)
_ = s.stopper.RunAsyncTask(ctx, "force-assertion", func(ctx context.Context) {
for ctx.Err() == nil {
func() {
s.mu.Lock()
defer s.mu.Unlock()
var it replicaOrPlaceholder
if err := s.mu.replicasByKey.VisitKeyRange(
context.Background(), from, to, AscendingKeyOrder,
func(ctx context.Context, iit replicaOrPlaceholder) error {
it = iit
return iterutil.StopIteration()
}); err != nil {
log.Fatalf(ctx, "%v", err)
}
if it.item != nil {
return
}
log.Fatalf(ctx, "found hole in keyspace [%s,%s), during:\n%s", from, to, caller)
}()
}
})
return stopAsserting
}

func NewTestStorePool(cfg StoreConfig) *StorePool {
TimeUntilStoreDead.Override(context.Background(), &cfg.Settings.SV, TestTimeUntilStoreDeadOff)
return NewStorePool(
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_application_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func (r *Replica) handleChangeReplicasResult(
log.Infof(ctx, "removing replica due to ChangeReplicasTrigger: %v", chng)
}

if err := r.store.removeInitializedReplicaRaftMuLocked(ctx, r, chng.NextReplicaID(), RemoveOptions{
if _, err := r.store.removeInitializedReplicaRaftMuLocked(ctx, r, chng.NextReplicaID(), RemoveOptions{
// We destroyed the data when the batch committed so don't destroy it again.
DestroyData: false,
}); err != nil {
Expand Down
56 changes: 45 additions & 11 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,25 @@ func (r *Replica) applySnapshot(
}

isInitialSnap := !r.IsInitialized()
{
var from, to roachpb.RKey
if isInitialSnap {
// For uninitialized replicas, there must be a placeholder that covers
// the snapshot's bounds, so basically check that. A synchronous check
// here would be simpler but this works well enough.
d := inSnap.placeholder.Desc()
from, to = d.StartKey, d.EndKey
} else {
// For snapshots to existing replicas, from and to usually match (i.e.
// nothing is asserted) but if the snapshot spans a merge then we're
// going to assert that we're transferring the keyspace from the subsumed
// replicas to this replica seamlessly.
d := r.Desc()
from, to = d.EndKey, inSnap.Desc.EndKey
}

defer r.store.assertNoHole(ctx, from, to)()
}
defer func() {
if e := recover(); e != nil {
// Re-panic to avoid the log.Fatal() below.
Expand Down Expand Up @@ -906,7 +925,8 @@ func (r *Replica) applySnapshot(
// has not yet been updated. Any errors past this point must therefore be
// treated as fatal.

if err := r.clearSubsumedReplicaInMemoryData(ctx, subsumedRepls, mergedTombstoneReplicaID); err != nil {
subPHs, err := r.clearSubsumedReplicaInMemoryData(ctx, subsumedRepls, mergedTombstoneReplicaID)
if err != nil {
log.Fatalf(ctx, "failed to clear in-memory data of subsumed replicas while applying snapshot: %+v", err)
}

Expand All @@ -925,13 +945,19 @@ func (r *Replica) applySnapshot(
// the on-disk state.

r.store.mu.Lock()
r.mu.Lock()
if inSnap.placeholder != nil {
_, err := r.store.removePlaceholderLocked(ctx, inSnap.placeholder, removePlaceholderFilled)
subPHs = append(subPHs, inSnap.placeholder)
}
for _, ph := range subPHs {
_, err := r.store.removePlaceholderLocked(ctx, ph, removePlaceholderFilled)
if err != nil {
log.Fatalf(ctx, "unable to remove placeholder: %s", err)
log.Fatalf(ctx, "unable to remove placeholder %s: %s", ph, err)
}
}

// NB: we lock `r.mu` only now because removePlaceholderLocked operates on
// replicasByKey and this may end up calling r.Desc().
r.mu.Lock()
r.setDescLockedRaftMuLocked(ctx, desc)
if err := r.store.maybeMarkReplicaInitializedLockedReplLocked(ctx, r); err != nil {
log.Fatalf(ctx, "unable to mark replica initialized while applying snapshot: %+v", err)
Expand Down Expand Up @@ -1136,7 +1162,9 @@ func (r *Replica) clearSubsumedReplicaDiskData(
// held.
func (r *Replica) clearSubsumedReplicaInMemoryData(
ctx context.Context, subsumedRepls []*Replica, subsumedNextReplicaID roachpb.ReplicaID,
) error {
) ([]*ReplicaPlaceholder, error) {
//
var phs []*ReplicaPlaceholder
for _, sr := range subsumedRepls {
// We already hold sr's raftMu, so we must call removeReplicaImpl directly.
// Note that it's safe to update the store's metadata for sr's removal
Expand All @@ -1145,19 +1173,25 @@ func (r *Replica) clearSubsumedReplicaInMemoryData(
// acquisition leaves the store in a consistent state, and access to the
// replicas themselves is protected by their raftMus, which are held from
// start to finish.
if err := r.store.removeInitializedReplicaRaftMuLocked(ctx, sr, subsumedNextReplicaID, RemoveOptions{
//
// TODO(tbg): this leaves the RHS keyspace unprotected, isn't this a problem
// similar to #73721? This also needs to use the InsertPlaceholder option.
ph, err := r.store.removeInitializedReplicaRaftMuLocked(ctx, sr, subsumedNextReplicaID, RemoveOptions{
// The data was already destroyed by clearSubsumedReplicaDiskData.
DestroyData: false,
}); err != nil {
return err
DestroyData: false,
InsertPlaceholder: true,
})
if err != nil {
return nil, err
}
phs = append(phs, ph)
// We removed sr's data when we committed the batch. Finish subsumption by
// updating the in-memory bookkeping.
if err := sr.postDestroyRaftMuLocked(ctx, sr.GetMVCCStats()); err != nil {
return err
return nil, err
}
}
return nil
return phs, nil
}

type raftCommandEncodingVersion byte
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6940,7 +6940,7 @@ func TestReplicaDestroy(t *testing.T) {
func() {
tc.repl.raftMu.Lock()
defer tc.repl.raftMu.Unlock()
if err := tc.store.removeInitializedReplicaRaftMuLocked(ctx, tc.repl, repl.Desc().NextReplicaID, RemoveOptions{
if _, err := tc.store.removeInitializedReplicaRaftMuLocked(ctx, tc.repl, repl.Desc().NextReplicaID, RemoveOptions{
DestroyData: true,
}); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -7034,7 +7034,7 @@ func TestQuotaPoolAccessOnDestroyedReplica(t *testing.T) {
func() {
tc.repl.raftMu.Lock()
defer tc.repl.raftMu.Unlock()
if err := tc.store.removeInitializedReplicaRaftMuLocked(ctx, repl, repl.Desc().NextReplicaID, RemoveOptions{
if _, err := tc.store.removeInitializedReplicaRaftMuLocked(ctx, repl, repl.Desc().NextReplicaID, RemoveOptions{
DestroyData: true,
}); err != nil {
t.Fatal(err)
Expand Down
31 changes: 26 additions & 5 deletions pkg/kv/kvserver/store_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func (s *Store) MergeRange(
rightClosedTS hlc.Timestamp,
rightReadSum *rspb.ReadSummary,
) error {
defer s.assertNoHole(ctx, leftRepl.Desc().EndKey, newLeftDesc.EndKey)()
if oldLeftDesc := leftRepl.Desc(); !oldLeftDesc.EndKey.Less(newLeftDesc.EndKey) {
return errors.Errorf("the new end key is not greater than the current one: %+v <= %+v",
newLeftDesc.EndKey, oldLeftDesc.EndKey)
Expand All @@ -49,11 +50,16 @@ func (s *Store) MergeRange(
// Note that we were called (indirectly) from raft processing so we must
// call removeInitializedReplicaRaftMuLocked directly to avoid deadlocking
// on the right-hand replica's raftMu.
if err := s.removeInitializedReplicaRaftMuLocked(ctx, rightRepl, rightDesc.NextReplicaID, RemoveOptions{
//
// We ask removeInitializedReplicaRaftMuLocked to install a placeholder which
// we'll drop atomically with extending the right-hand side down below.
ph, err := s.removeInitializedReplicaRaftMuLocked(ctx, rightRepl, rightDesc.NextReplicaID, RemoveOptions{
// The replica was destroyed by the tombstones added to the batch in
// runPreApplyTriggersAfterStagingWriteBatch.
DestroyData: false,
}); err != nil {
DestroyData: false,
InsertPlaceholder: true,
})
if err != nil {
return errors.Wrap(err, "cannot remove range")
}

Expand Down Expand Up @@ -116,7 +122,22 @@ func (s *Store) MergeRange(
applyReadSummaryToTimestampCache(s.tsCache, &rightDesc, sum)
}

// Update the subsuming range's descriptor.
leftRepl.setDescRaftMuLocked(ctx, &newLeftDesc)
// Update the subsuming range's descriptor, atomically widening it while
// dropping the placeholder representing the right-hand side.
s.mu.Lock()
defer s.mu.Unlock()
removed, err := s.removePlaceholderLocked(ctx, ph, removePlaceholderFilled)
if err != nil {
return err
}
if !removed {
return errors.AssertionFailedf("did not find placeholder %s", ph)
}
// NB: we have to be careful not to lock leftRepl before this step, as
// removePlaceholderLocked traverses the replicasByKey btree and may call
// leftRepl.Desc().
leftRepl.mu.Lock()
defer leftRepl.mu.Unlock()
leftRepl.setDescLockedRaftMuLocked(ctx, &newLeftDesc)
return nil
}
59 changes: 50 additions & 9 deletions pkg/kv/kvserver/store_remove_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ import (
type RemoveOptions struct {
// If true, the replica's destroyStatus must be marked as removed.
DestroyData bool
// InsertPlaceholder can be specified when removing an initialized Replica
// and will result in the insertion of a ReplicaPlaceholder covering the
// keyspace previously occupied by the (now deleted) Replica.
InsertPlaceholder bool
}

// RemoveReplica removes the replica from the store's replica map and from the
Expand All @@ -40,7 +44,11 @@ func (s *Store) RemoveReplica(
) error {
rep.raftMu.Lock()
defer rep.raftMu.Unlock()
return s.removeInitializedReplicaRaftMuLocked(ctx, rep, nextReplicaID, opts)
if opts.InsertPlaceholder {
return errors.Errorf("InsertPlaceholder not supported in RemoveReplica")
}
_, err := s.removeInitializedReplicaRaftMuLocked(ctx, rep, nextReplicaID, opts)
return err
}

// removeReplicaRaftMuLocked removes the passed replica. If the replica is
Expand All @@ -50,7 +58,11 @@ func (s *Store) removeReplicaRaftMuLocked(
) error {
rep.raftMu.AssertHeld()
if rep.IsInitialized() {
return errors.Wrap(s.removeInitializedReplicaRaftMuLocked(ctx, rep, nextReplicaID, opts),
if opts.InsertPlaceholder {
return errors.Errorf("InsertPlaceholder unsupported in removeReplicaRaftMuLocked")
}
_, err := s.removeInitializedReplicaRaftMuLocked(ctx, rep, nextReplicaID, opts)
return errors.Wrap(err,
"failed to remove replica")
}
s.removeUninitializedReplicaRaftMuLocked(ctx, rep, nextReplicaID)
Expand All @@ -62,9 +74,18 @@ func (s *Store) removeReplicaRaftMuLocked(
// It requires that Replica.raftMu is held and that s.mu is not held.
func (s *Store) removeInitializedReplicaRaftMuLocked(
ctx context.Context, rep *Replica, nextReplicaID roachpb.ReplicaID, opts RemoveOptions,
) error {
) (*ReplicaPlaceholder, error) {
rep.raftMu.AssertHeld()

if opts.InsertPlaceholder {
if opts.DestroyData {
return nil, errors.AssertionFailedf("cannot specify both InsertPlaceholder and DestroyData")
}
if !rep.IsInitialized() {
return nil, errors.AssertionFailedf("cannot specify InsertPlaceholder for an uninitialized Replica %s", rep)
}
}

// Sanity checks before committing to the removal by setting the
// destroy status.
var desc *roachpb.RangeDescriptor
Expand All @@ -78,7 +99,7 @@ func (s *Store) removeInitializedReplicaRaftMuLocked(
if rep.mu.destroyStatus.Removed() {
rep.mu.Unlock()
rep.readOnlyCmdMu.Unlock()
return nil // already removed, noop
return nil, nil // already removed, noop
}
} else {
// If the caller doesn't want to destroy the data because it already
Expand Down Expand Up @@ -158,11 +179,11 @@ func (s *Store) removeInitializedReplicaRaftMuLocked(
rep.disconnectReplicationRaftMuLocked(ctx)
if opts.DestroyData {
if err := rep.destroyRaftMuLocked(ctx, nextReplicaID); err != nil {
return err
return nil, err
}
}

func() {
ph := func() *ReplicaPlaceholder {
s.mu.Lock()
defer s.mu.Unlock() // must unlock before s.scanner.RemoveReplica(), to avoid deadlock

Expand All @@ -173,19 +194,39 @@ func (s *Store) removeInitializedReplicaRaftMuLocked(
if ph, ok := s.mu.replicaPlaceholders[rep.RangeID]; ok {
log.Fatalf(ctx, "initialized replica %s unexpectedly had a placeholder: %+v", rep, ph)
}
if it := s.mu.replicasByKey.DeleteReplica(ctx, rep); it.repl != rep {
desc := rep.Desc()
ph := &ReplicaPlaceholder{
rangeDesc: *roachpb.NewRangeDescriptor(desc.RangeID, desc.StartKey, desc.EndKey, desc.Replicas()),
}

if it := s.mu.replicasByKey.ReplaceOrInsertPlaceholder(ctx, ph); it.repl != rep {
// We already checked that our replica was present in replicasByKey
// above. Nothing should have been able to change that.
log.Fatalf(ctx, "replica %+v unexpectedly overlapped by %+v", rep, it.item)
}
if it := s.getOverlappingKeyRangeLocked(desc); it.item != nil {
if exPH, ok := s.mu.replicaPlaceholders[desc.RangeID]; ok {
log.Fatalf(ctx, "cannot insert placeholder %s, already have %s", ph, exPH)
}
s.mu.replicaPlaceholders[desc.RangeID] = ph

if opts.InsertPlaceholder {
return ph
}
// If placeholder not desired, remove it now, otherwise, that's the caller's
// job. We could elide the placeholder altogether but wish to instead
// minimize the divergence between the two code paths.

s.mu.replicasByKey.DeletePlaceholder(ctx, ph)
delete(s.mu.replicaPlaceholders, desc.RangeID)
if it := s.getOverlappingKeyRangeLocked(desc); it.item != nil && it.item != ph {
log.Fatalf(ctx, "corrupted replicasByKey map: %s and %s overlapped", rep, it.item)
}
return nil
}()

s.maybeGossipOnCapacityChange(ctx, rangeRemoveEvent)
s.scanner.RemoveReplica(rep)
return nil
return ph, nil
}

// removeUninitializedReplicaRaftMuLocked removes an uninitialized replica.
Expand Down
9 changes: 6 additions & 3 deletions pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,9 +462,12 @@ func (s *Store) canAcceptSnapshotLocked(
return nil, errors.Errorf("canAcceptSnapshotLocked requires a replica present")
}
// The raftMu is held which allows us to use the existing replica as a
// placeholder when we decide that the snapshot can be applied. As long
// as the caller releases the raftMu only after feeding the snapshot
// into the replica, this is safe.
// placeholder when we decide that the snapshot can be applied. As long as the
// caller releases the raftMu only after feeding the snapshot into the
// replica, this is safe. This is true even when the snapshot spans a merge,
// because we will be guaranteed to have the subsumed (initialized) Replicas
// in place as well. This is because they are present when the merge first
// commits, and cannot have been replicaGC'ed yet (see replicaGCQueue.process).
existingRepl.raftMu.AssertHeld()

existingRepl.mu.RLock()
Expand Down

0 comments on commit 3d1c09d

Please sign in to comment.