Skip to content

Commit

Permalink
kvserver: use replicasByKey addition func in snapshot path
Browse files Browse the repository at this point in the history
This commit makes one step towards better code sharing between Replica
initialization paths: split trigger and snapshot application. It makes both to
use the same method to check and insert the initialized Replica to
replicasByKey map.

Release note: none
Epic: none
  • Loading branch information
pav-kv committed Feb 9, 2023
1 parent 5f2bfd4 commit ec91985
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 29 deletions.
5 changes: 3 additions & 2 deletions pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,11 @@ func (s *Store) FindTargetAndTransferLease(
func (s *Store) AddReplica(repl *Replica) error {
s.mu.Lock()
defer s.mu.Unlock()
repl.mu.RLock()
defer repl.mu.RUnlock()
if err := s.addToReplicasByRangeIDLocked(repl); err != nil {
return err
}
if err := s.addToReplicasByKeyLocked(repl); err != nil {
} else if err := s.addToReplicasByKeyLockedReplicaRLocked(repl); err != nil {
return err
}
s.metrics.ReplicaCount.Inc(1)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_consistency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func TestStoreCheckpointSpans(t *testing.T) {
r.isInitialized.Set(desc.IsInitialized())
require.NoError(t, s.addToReplicasByRangeIDLocked(r))
if r.IsInitialized() {
require.NoError(t, s.addToReplicasByKeyLocked(r))
require.NoError(t, s.addToReplicasByKeyLockedReplicaRLocked(r))
descs = append(descs, desc)
}
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1879,7 +1879,11 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
// TODO(pavelkalinnikov): hide these in Store's replica create functions.
err = s.addToReplicasByRangeIDLocked(rep)
if err == nil {
err = s.addToReplicasByKeyLocked(rep)
// NB: no locking of the Replica is needed since it's being created, but
// just in case.
rep.mu.RLock()
err = s.addToReplicasByKeyLockedReplicaRLocked(rep)
rep.mu.RUnlock()
}
s.mu.Unlock()
if err != nil {
Expand Down
38 changes: 15 additions & 23 deletions pkg/kv/kvserver/store_create_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,28 +249,25 @@ func fromReplicaIsTooOldRLocked(toReplica *Replica, fromReplica *roachpb.Replica
return !found && fromReplica.ReplicaID < desc.NextReplicaID
}

// addToReplicasByKeyLocked adds the replica to the replicasByKey btree. The
// replica must already be in replicasByRangeID. Requires that Store.mu is held.
//
// Returns an error if a different replica with the same range ID, or an
// overlapping replica or placeholder exists in this Store.
func (s *Store) addToReplicasByKeyLocked(repl *Replica) error {
// addToReplicasByKeyLockedReplicaRLocked adds the replica to the replicasByKey
// btree. The replica must already be in replicasByRangeID. Returns an error if
// a different replica with the same range ID, or an overlapping replica or
// placeholder exists in this Store. Replica.mu must be at least read-locked.
func (s *Store) addToReplicasByKeyLockedReplicaRLocked(repl *Replica) error {
if !repl.IsInitialized() {
return errors.Errorf("attempted to add uninitialized replica %s", repl)
return errors.Errorf("%s: attempted to add uninitialized replica %s", s, repl)
}
if got := s.GetReplicaIfExists(repl.RangeID); got != repl { // NB: got can be nil too
return errors.Errorf("replica %s not in replicasByRangeID; got %s", repl, got)
return errors.Errorf("%s: replica %s not in replicasByRangeID; got %s", s, repl, got)
}

if it := s.getOverlappingKeyRangeLocked(repl.Desc()); it.item != nil {
return errors.Errorf("%s: cannot addToReplicasByKeyLocked; range %s has overlapping range %s", s, repl, it.Desc())
if it := s.getOverlappingKeyRangeLocked(repl.descRLocked()); it.item != nil {
return errors.Errorf(
"%s: cannot add to replicasByKey: range %s overlaps with %s", s, repl, it.Desc())
}

if it := s.mu.replicasByKey.ReplaceOrInsertReplica(context.Background(), repl); it.item != nil {
return errors.Errorf("%s: cannot addToReplicasByKeyLocked; range for key %v already exists in replicasByKey btree", s,
it.item.key())
return errors.Errorf(
"%s: cannot add to replicasByKey: key %v already exists in the btree", s, it.item.key())
}

return nil
}

Expand Down Expand Up @@ -319,16 +316,11 @@ func (s *Store) maybeMarkReplicaInitializedLockedReplLocked(
}
delete(s.mu.uninitReplicas, rangeID)

if it := s.getOverlappingKeyRangeLocked(desc); it.item != nil {
return errors.AssertionFailedf("%s: cannot initialize replica; %s has overlapping range %s",
s, desc, it.Desc())
}

// Copy of the start key needs to be set before inserting into replicasByKey.
lockedRepl.setStartKeyLocked(desc.StartKey)
if it := s.mu.replicasByKey.ReplaceOrInsertReplica(ctx, lockedRepl); it.item != nil {
return errors.AssertionFailedf("range for key %v already exists in replicasByKey btree: %+v",
it.item.key(), it)

if err := s.addToReplicasByKeyLockedReplicaRLocked(lockedRepl); err != nil {
return err
}

// Unquiesce the replica. We don't allow uninitialized replicas to unquiesce,
Expand Down
8 changes: 6 additions & 2 deletions pkg/kv/kvserver/store_split.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,16 +350,20 @@ func (s *Store) SplitRange(
// assumption that distribution across all tracked load stats is
// identical.
leftRepl.loadStats.Split(rightRepl.loadStats)
if err := s.addToReplicasByKeyLocked(rightRepl); err != nil {
rightRepl.mu.RLock()
if err := s.addToReplicasByKeyLockedReplicaRLocked(rightRepl); err != nil {
rightRepl.mu.RUnlock()
return errors.Wrapf(err, "unable to add replica %v", rightRepl)
}

// Update the replica's cached byte thresholds. This is a no-op if the system
// config is not available, in which case we rely on the next gossip update
// to perform the update.
if err := rightRepl.updateRangeInfo(ctx, rightRepl.Desc()); err != nil {
if err := rightRepl.updateRangeInfo(ctx, rightRepl.descRLocked()); err != nil {
return err
}
rightRepl.mu.RUnlock()

// Add the range to metrics and maybe gossip on capacity change.
s.metrics.ReplicaCount.Inc(1)
s.storeGossip.MaybeGossipOnCapacityChange(ctx, RangeAddEvent)
Expand Down

0 comments on commit ec91985

Please sign in to comment.