From ec91985811d64ffeb627d22c9849e151a96daf79 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Wed, 8 Feb 2023 18:18:29 +0000 Subject: [PATCH] kvserver: use replicasByKey addition func in snapshot path This commit makes one step towards better code sharing between Replica initialization paths: split trigger and snapshot application. It makes both to use the same method to check and insert the initialized Replica to replicasByKey map. Release note: none Epic: none --- pkg/kv/kvserver/helpers_test.go | 5 +-- pkg/kv/kvserver/replica_consistency_test.go | 2 +- pkg/kv/kvserver/store.go | 6 +++- pkg/kv/kvserver/store_create_replica.go | 38 ++++++++------------- pkg/kv/kvserver/store_split.go | 8 +++-- 5 files changed, 30 insertions(+), 29 deletions(-) diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index 7ce77d9735c1..ff20b161833d 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -67,10 +67,11 @@ func (s *Store) FindTargetAndTransferLease( func (s *Store) AddReplica(repl *Replica) error { s.mu.Lock() defer s.mu.Unlock() + repl.mu.RLock() + defer repl.mu.RUnlock() if err := s.addToReplicasByRangeIDLocked(repl); err != nil { return err - } - if err := s.addToReplicasByKeyLocked(repl); err != nil { + } else if err := s.addToReplicasByKeyLockedReplicaRLocked(repl); err != nil { return err } s.metrics.ReplicaCount.Inc(1) diff --git a/pkg/kv/kvserver/replica_consistency_test.go b/pkg/kv/kvserver/replica_consistency_test.go index 78626f5ee4b2..2660254ac5dc 100644 --- a/pkg/kv/kvserver/replica_consistency_test.go +++ b/pkg/kv/kvserver/replica_consistency_test.go @@ -102,7 +102,7 @@ func TestStoreCheckpointSpans(t *testing.T) { r.isInitialized.Set(desc.IsInitialized()) require.NoError(t, s.addToReplicasByRangeIDLocked(r)) if r.IsInitialized() { - require.NoError(t, s.addToReplicasByKeyLocked(r)) + require.NoError(t, s.addToReplicasByKeyLockedReplicaRLocked(r)) descs = append(descs, desc) } } diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 72d8b09ebf6b..89af14be63d9 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -1879,7 +1879,11 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { // TODO(pavelkalinnikov): hide these in Store's replica create functions. err = s.addToReplicasByRangeIDLocked(rep) if err == nil { - err = s.addToReplicasByKeyLocked(rep) + // NB: no locking of the Replica is needed since it's being created, but + // just in case. + rep.mu.RLock() + err = s.addToReplicasByKeyLockedReplicaRLocked(rep) + rep.mu.RUnlock() } s.mu.Unlock() if err != nil { diff --git a/pkg/kv/kvserver/store_create_replica.go b/pkg/kv/kvserver/store_create_replica.go index cb6b9fb849b2..5bd03adc60b2 100644 --- a/pkg/kv/kvserver/store_create_replica.go +++ b/pkg/kv/kvserver/store_create_replica.go @@ -249,28 +249,25 @@ func fromReplicaIsTooOldRLocked(toReplica *Replica, fromReplica *roachpb.Replica return !found && fromReplica.ReplicaID < desc.NextReplicaID } -// addToReplicasByKeyLocked adds the replica to the replicasByKey btree. The -// replica must already be in replicasByRangeID. Requires that Store.mu is held. -// -// Returns an error if a different replica with the same range ID, or an -// overlapping replica or placeholder exists in this Store. -func (s *Store) addToReplicasByKeyLocked(repl *Replica) error { +// addToReplicasByKeyLockedReplicaRLocked adds the replica to the replicasByKey +// btree. The replica must already be in replicasByRangeID. Returns an error if +// a different replica with the same range ID, or an overlapping replica or +// placeholder exists in this Store. Replica.mu must be at least read-locked. +func (s *Store) addToReplicasByKeyLockedReplicaRLocked(repl *Replica) error { if !repl.IsInitialized() { - return errors.Errorf("attempted to add uninitialized replica %s", repl) + return errors.Errorf("%s: attempted to add uninitialized replica %s", s, repl) } if got := s.GetReplicaIfExists(repl.RangeID); got != repl { // NB: got can be nil too - return errors.Errorf("replica %s not in replicasByRangeID; got %s", repl, got) + return errors.Errorf("%s: replica %s not in replicasByRangeID; got %s", s, repl, got) } - - if it := s.getOverlappingKeyRangeLocked(repl.Desc()); it.item != nil { - return errors.Errorf("%s: cannot addToReplicasByKeyLocked; range %s has overlapping range %s", s, repl, it.Desc()) + if it := s.getOverlappingKeyRangeLocked(repl.descRLocked()); it.item != nil { + return errors.Errorf( + "%s: cannot add to replicasByKey: range %s overlaps with %s", s, repl, it.Desc()) } - if it := s.mu.replicasByKey.ReplaceOrInsertReplica(context.Background(), repl); it.item != nil { - return errors.Errorf("%s: cannot addToReplicasByKeyLocked; range for key %v already exists in replicasByKey btree", s, - it.item.key()) + return errors.Errorf( + "%s: cannot add to replicasByKey: key %v already exists in the btree", s, it.item.key()) } - return nil } @@ -319,16 +316,11 @@ func (s *Store) maybeMarkReplicaInitializedLockedReplLocked( } delete(s.mu.uninitReplicas, rangeID) - if it := s.getOverlappingKeyRangeLocked(desc); it.item != nil { - return errors.AssertionFailedf("%s: cannot initialize replica; %s has overlapping range %s", - s, desc, it.Desc()) - } - // Copy of the start key needs to be set before inserting into replicasByKey. lockedRepl.setStartKeyLocked(desc.StartKey) - if it := s.mu.replicasByKey.ReplaceOrInsertReplica(ctx, lockedRepl); it.item != nil { - return errors.AssertionFailedf("range for key %v already exists in replicasByKey btree: %+v", - it.item.key(), it) + + if err := s.addToReplicasByKeyLockedReplicaRLocked(lockedRepl); err != nil { + return err } // Unquiesce the replica. We don't allow uninitialized replicas to unquiesce, diff --git a/pkg/kv/kvserver/store_split.go b/pkg/kv/kvserver/store_split.go index 701d81debb2b..10ad2c7284a3 100644 --- a/pkg/kv/kvserver/store_split.go +++ b/pkg/kv/kvserver/store_split.go @@ -350,16 +350,20 @@ func (s *Store) SplitRange( // assumption that distribution across all tracked load stats is // identical. leftRepl.loadStats.Split(rightRepl.loadStats) - if err := s.addToReplicasByKeyLocked(rightRepl); err != nil { + rightRepl.mu.RLock() + if err := s.addToReplicasByKeyLockedReplicaRLocked(rightRepl); err != nil { + rightRepl.mu.RUnlock() return errors.Wrapf(err, "unable to add replica %v", rightRepl) } // Update the replica's cached byte thresholds. This is a no-op if the system // config is not available, in which case we rely on the next gossip update // to perform the update. - if err := rightRepl.updateRangeInfo(ctx, rightRepl.Desc()); err != nil { + if err := rightRepl.updateRangeInfo(ctx, rightRepl.descRLocked()); err != nil { return err } + rightRepl.mu.RUnlock() + // Add the range to metrics and maybe gossip on capacity change. s.metrics.ReplicaCount.Inc(1) s.storeGossip.MaybeGossipOnCapacityChange(ctx, RangeAddEvent)