diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 5a8171940b37..9e6053192dc8 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -83,6 +83,7 @@ go_library( "store_rebalancer.go", "store_remove_replica.go", "store_replica_btree.go", + "store_replicas_by_rangeid.go", "store_send.go", "store_snapshot.go", "store_split.go", diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index e0e34040518d..70fefa4ea431 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -21,7 +21,6 @@ import ( "math/rand" "testing" "time" - "unsafe" circuit "github.com/cockroachdb/circuitbreaker" "github.com/cockroachdb/cockroach/pkg/kv" @@ -515,7 +514,7 @@ func WriteRandomDataToRange( } func WatchForDisappearingReplicas(t testing.TB, store *Store) { - m := make(map[int64]struct{}) + m := make(map[roachpb.RangeID]struct{}) for { select { case <-store.Stopper().ShouldQuiesce(): @@ -523,9 +522,9 @@ func WatchForDisappearingReplicas(t testing.TB, store *Store) { default: } - store.mu.replicas.Range(func(k int64, v unsafe.Pointer) bool { - m[k] = struct{}{} - return true + _ = store.mu.replicas.Range(func(repl *Replica) error { + m[repl.RangeID] = struct{}{} + return nil }) for k := range m { diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 7c80312f6930..dbe78e872c58 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -364,9 +364,9 @@ func (rs *storeReplicaVisitor) Visit(visitor func(*Replica) bool) { // stale) view of all Replicas without holding the Store lock. In particular, // no locks are acquired during the copy process. rs.repls = nil - rs.store.mu.replicas.Range(func(k int64, v unsafe.Pointer) bool { - rs.repls = append(rs.repls, (*Replica)(v)) - return true + _ = rs.store.mu.replicas.Range(func(repl *Replica) error { + rs.repls = append(rs.repls, repl) + return nil }) if rs.ordered { @@ -586,7 +586,7 @@ type Store struct { syncutil.RWMutex // Map of replicas by Range ID (map[roachpb.RangeID]*Replica). This // includes `uninitReplicas`. May be read without holding Store.mu. - replicas syncutil.IntMap + replicas rangeIDReplicaMap // A btree key containing objects of type *Replica or *ReplicaPlaceholder. // Both types have an associated key range; the btree is keyed on their // start keys. @@ -2411,8 +2411,8 @@ func (s *Store) GetReplica(rangeID roachpb.RangeID) (*Replica, error) { // GetReplicaIfExists returns the replica with the given RangeID or nil. func (s *Store) GetReplicaIfExists(rangeID roachpb.RangeID) *Replica { - if value, ok := s.mu.replicas.Load(int64(rangeID)); ok { - return (*Replica)(value) + if repl, ok := s.mu.replicas.Load(rangeID); ok { + return repl } return nil } @@ -2459,8 +2459,8 @@ func (s *Store) getOverlappingKeyRangeLocked( // RaftStatus returns the current raft status of the local replica of // the given range. func (s *Store) RaftStatus(rangeID roachpb.RangeID) *raft.Status { - if value, ok := s.mu.replicas.Load(int64(rangeID)); ok { - return (*Replica)(value).RaftStatus() + if repl, ok := s.mu.replicas.Load(rangeID); ok { + return repl.RaftStatus() } return nil } @@ -2590,9 +2590,9 @@ func (s *Store) Capacity(ctx context.Context, useCached bool) (roachpb.StoreCapa // performance critical code. func (s *Store) ReplicaCount() int { var count int - s.mu.replicas.Range(func(_ int64, _ unsafe.Pointer) bool { + _ = s.mu.replicas.Range(func(*Replica) error { count++ - return true + return nil }) return count } diff --git a/pkg/kv/kvserver/store_create_replica.go b/pkg/kv/kvserver/store_create_replica.go index a3ed8b7f2a81..910453ba6c0a 100644 --- a/pkg/kv/kvserver/store_create_replica.go +++ b/pkg/kv/kvserver/store_create_replica.go @@ -13,7 +13,6 @@ package kvserver import ( "context" "time" - "unsafe" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -80,8 +79,7 @@ func (s *Store) tryGetOrCreateReplica( creatingReplica *roachpb.ReplicaDescriptor, ) (_ *Replica, created bool, _ error) { // The common case: look up an existing (initialized) replica. - if value, ok := s.mu.replicas.Load(int64(rangeID)); ok { - repl := (*Replica)(value) + if repl, ok := s.mu.replicas.Load(rangeID); ok { repl.raftMu.Lock() // not unlocked on success repl.mu.Lock() @@ -298,7 +296,7 @@ func (s *Store) addReplicaToRangeMapLocked(repl *Replica) error { // same replica object. This occurs during splits where the right-hand side // is added to the replicas map before it is initialized. if existing, loaded := s.mu.replicas.LoadOrStore( - int64(repl.RangeID), unsafe.Pointer(repl)); loaded && (*Replica)(existing) != repl { + repl.RangeID, repl); loaded && existing != repl { return errors.Errorf("%s: replica already exists", repl) } // Check whether the replica is unquiesced but not in the map. This @@ -314,7 +312,7 @@ func (s *Store) addReplicaToRangeMapLocked(repl *Replica) error { } // maybeMarkReplicaInitializedLocked should be called whenever a previously -// unintialized replica has become initialized so that the store can update its +// uninitialized replica has become initialized so that the store can update its // internal bookkeeping. It requires that Store.mu and Replica.raftMu // are locked. func (s *Store) maybeMarkReplicaInitializedLockedReplLocked( diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index c1c06cc8f491..cfe741f2df50 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -484,7 +484,7 @@ func (s *Store) processRequestQueue(ctx context.Context, rangeID roachpb.RangeID // forgiving. // // See https://github.com/cockroachdb/cockroach/issues/30951#issuecomment-428010411. - if _, exists := s.mu.replicas.Load(int64(rangeID)); !exists { + if _, exists := s.mu.replicas.Load(rangeID); !exists { q.Lock() if len(q.infos) == 0 { s.replicaQueues.Delete(int64(rangeID)) @@ -500,12 +500,11 @@ func (s *Store) processRequestQueue(ctx context.Context, rangeID roachpb.RangeID } func (s *Store) processReady(ctx context.Context, rangeID roachpb.RangeID) { - value, ok := s.mu.replicas.Load(int64(rangeID)) + r, ok := s.mu.replicas.Load(rangeID) if !ok { return } - r := (*Replica)(value) ctx = r.raftSchedulerCtx(ctx) start := timeutil.Now() stats, expl, err := r.handleRaftReady(ctx, noSnap) @@ -524,14 +523,13 @@ func (s *Store) processReady(ctx context.Context, rangeID roachpb.RangeID) { } func (s *Store) processTick(ctx context.Context, rangeID roachpb.RangeID) bool { - value, ok := s.mu.replicas.Load(int64(rangeID)) + r, ok := s.mu.replicas.Load(rangeID) if !ok { return false } livenessMap, _ := s.livenessMap.Load().(liveness.IsLiveMap) start := timeutil.Now() - r := (*Replica)(value) ctx = r.raftSchedulerCtx(ctx) exists, err := r.tick(ctx, livenessMap) if err != nil { @@ -560,8 +558,7 @@ func (s *Store) processTick(ctx context.Context, rangeID roachpb.RangeID) bool { func (s *Store) nodeIsLiveCallback(l livenesspb.Liveness) { s.updateLivenessMap() - s.mu.replicas.Range(func(k int64, v unsafe.Pointer) bool { - r := (*Replica)(v) + _ = s.mu.replicas.Range(func(r *Replica) error { r.mu.RLock() quiescent := r.mu.quiescent lagging := r.mu.laggingFollowersOnQuiesce @@ -570,7 +567,7 @@ func (s *Store) nodeIsLiveCallback(l livenesspb.Liveness) { if quiescent && (lagging.MemberStale(l) || !laggingAccurate) { r.unquiesce() } - return true + return nil }) } @@ -730,13 +727,13 @@ func (s *Store) sendQueuedHeartbeatsToNode( if !s.cfg.Transport.SendAsync(chReq, rpc.SystemClass) { for _, beat := range beats { - if value, ok := s.mu.replicas.Load(int64(beat.RangeID)); ok { - (*Replica)(value).addUnreachableRemoteReplica(beat.ToReplicaID) + if repl, ok := s.mu.replicas.Load(beat.RangeID); ok { + repl.addUnreachableRemoteReplica(beat.ToReplicaID) } } for _, resp := range resps { - if value, ok := s.mu.replicas.Load(int64(resp.RangeID)); ok { - (*Replica)(value).addUnreachableRemoteReplica(resp.ToReplicaID) + if repl, ok := s.mu.replicas.Load(resp.RangeID); ok { + repl.addUnreachableRemoteReplica(resp.ToReplicaID) } } return 0 diff --git a/pkg/kv/kvserver/store_remove_replica.go b/pkg/kv/kvserver/store_remove_replica.go index d919f29abf67..cb8cd493d2f5 100644 --- a/pkg/kv/kvserver/store_remove_replica.go +++ b/pkg/kv/kvserver/store_remove_replica.go @@ -238,11 +238,10 @@ func (s *Store) removeUninitializedReplicaRaftMuLocked( defer s.mu.Unlock() // Sanity check, could be removed. - value, stillExists := s.mu.replicas.Load(int64(rep.RangeID)) + existing, stillExists := s.mu.replicas.Load(rep.RangeID) if !stillExists { log.Fatalf(ctx, "uninitialized replica was removed in the meantime") } - existing := (*Replica)(value) if existing == rep { log.Infof(ctx, "removing uninitialized replica %v", rep) } else { @@ -264,7 +263,7 @@ func (s *Store) unlinkReplicaByRangeIDLocked(ctx context.Context, rangeID roachp s.unquiescedReplicas.Unlock() delete(s.mu.uninitReplicas, rangeID) s.replicaQueues.Delete(int64(rangeID)) - s.mu.replicas.Delete(int64(rangeID)) + s.mu.replicas.Delete(rangeID) s.unregisterLeaseholderByID(ctx, rangeID) } diff --git a/pkg/kv/kvserver/store_replicas_by_rangeid.go b/pkg/kv/kvserver/store_replicas_by_rangeid.go new file mode 100644 index 000000000000..c98bd471e5ac --- /dev/null +++ b/pkg/kv/kvserver/store_replicas_by_rangeid.go @@ -0,0 +1,63 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver + +import ( + "unsafe" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/iterutil" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" +) + +type rangeIDReplicaMap struct { + m syncutil.IntMap +} + +// Load loads the Replica for the RangeID. If not found, returns +// (nil, false), otherwise the Replica and true. +func (m *rangeIDReplicaMap) Load(rangeID roachpb.RangeID) (*Replica, bool) { + val, ok := m.m.Load(int64(rangeID)) + return (*Replica)(val), ok +} + +// LoadOrStore loads the replica and returns it (and `true`). If it does not +// exist, atomically inserts the provided Replica and returns it along with +// `false`. +func (m *rangeIDReplicaMap) LoadOrStore( + rangeID roachpb.RangeID, repl *Replica, +) (_ *Replica, loaded bool) { + val, loaded := m.m.LoadOrStore(int64(rangeID), unsafe.Pointer(repl)) + return (*Replica)(val), loaded +} + +// Delete drops the Replica if it existed in the map. +func (m *rangeIDReplicaMap) Delete(rangeID roachpb.RangeID) { + m.m.Delete(int64(rangeID)) +} + +// Range invokes the provided function with each Replica in the map. +// Iteration stops on any error. `iterutil.StopIteration()` can be +// returned from the closure to stop iteration without an error +// resulting from Range(). +func (m *rangeIDReplicaMap) Range(f func(*Replica) error) error { + var err error + v := func(k int64, v unsafe.Pointer) (wantMore bool) { + err = f((*Replica)(v)) + return err == nil + } + m.m.Range(v) + if errors.Is(err, iterutil.StopIteration()) { + return nil + } + return nil +} diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index d5bf8264c207..7ba412895112 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -450,13 +450,10 @@ func (s *Store) canAcceptSnapshotLocked( desc := *snapHeader.State.Desc // First, check for an existing Replica. - v, ok := s.mu.replicas.Load( - int64(desc.RangeID), - ) + existingRepl, ok := s.mu.replicas.Load(desc.RangeID) if !ok { return nil, errors.Errorf("canAcceptSnapshotLocked requires a replica present") } - existingRepl := (*Replica)(v) // The raftMu is held which allows us to use the existing replica as a // placeholder when we decide that the snapshot can be applied. As long // as the caller releases the raftMu only after feeding the snapshot