diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 23b42d1dbe34..6aa0d1612d59 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -199,6 +199,11 @@ type Replica struct { log.AmbientContext RangeID roachpb.RangeID // Only set by the constructor + // The ID of the replica within the Raft group. Only set by the constructor, + // so it will not change over the lifetime of this replica. If addressed + // under a newer replicaID, the replica immediately replicaGCs itself to + // make way for the newer incarnation. + replicaID roachpb.ReplicaID // The start key of a Range remains constant throughout its lifetime (it does // not change through splits or merges). This field carries a copy of @@ -516,13 +521,6 @@ type Replica struct { applyingEntries bool // The replica's Raft group "node". internalRaftGroup *raft.RawNode - // The ID of the replica within the Raft group. This value may never be 0. - // It will not change over the lifetime of this replica. If addressed under - // a newer replicaID, the replica immediately replicaGCs itself to make - // way for the newer incarnation. - // TODO(sumeer): since this is initialized in newUnloadedReplica and never - // changed, lift this out of the mu struct. - replicaID roachpb.ReplicaID // The minimum allowed ID for this replica. Initialized from // RangeTombstone.NextReplicaID. tombstoneMinReplicaID roachpb.ReplicaID @@ -715,11 +713,7 @@ func (r *Replica) SafeFormat(w redact.SafePrinter, _ rune) { // ReplicaID returns the ID for the Replica. This value is fixed for the // lifetime of the Replica. func (r *Replica) ReplicaID() roachpb.ReplicaID { - // The locking of mu is unnecessary. It will be removed when we lift - // replicaID out of the mu struct. - r.mu.RLock() - defer r.mu.RUnlock() - return r.mu.replicaID + return r.replicaID } // cleanupFailedProposal cleans up after a proposal that has failed. It @@ -1320,8 +1314,8 @@ func (r *Replica) assertStateRaftMuLockedReplicaMuRLocked( if !ok { log.Fatalf(ctx, "%+v does not contain local store s%d", r.mu.state.Desc, r.store.StoreID()) } - if replDesc.ReplicaID != r.mu.replicaID { - log.Fatalf(ctx, "replica's replicaID %d diverges from descriptor %+v", r.mu.replicaID, r.mu.state.Desc) + if replDesc.ReplicaID != r.replicaID { + log.Fatalf(ctx, "replica's replicaID %d diverges from descriptor %+v", r.replicaID, r.mu.state.Desc) } } } @@ -1677,7 +1671,7 @@ func (r *Replica) isNewerThanSplitRLocked(split *roachpb.SplitTrigger) bool { // ID which is above the replica ID of the split then we would not have // written a tombstone but we will have a replica ID that will exceed the // split replica ID. - r.mu.replicaID > rightDesc.ReplicaID + r.replicaID > rightDesc.ReplicaID } // WatchForMerge is like maybeWatchForMergeLocked, except it expects a merge to diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index dd3bf9f591ed..06d779def727 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -72,6 +72,7 @@ func newUnloadedReplica( r := &Replica{ AmbientContext: store.cfg.AmbientCtx, RangeID: desc.RangeID, + replicaID: replicaID, creationTime: timeutil.Now(), store: store, abortSpan: abortspan.New(desc.RangeID), @@ -93,7 +94,6 @@ func newUnloadedReplica( r.mu.stateLoader = stateloader.Make(desc.RangeID) r.mu.quiescent = true r.mu.conf = store.cfg.DefaultSpanConfig - r.mu.replicaID = replicaID split.Init(&r.loadBasedSplitter, rand.Intn, func() float64 { return float64(SplitByLoadQPSThreshold.Get(&store.cfg.Settings.SV)) }, func() time.Duration { @@ -179,7 +179,7 @@ func (r *Replica) loadRaftMuLockedReplicaMuLocked(desc *roachpb.RangeDescriptor) ctx := r.AnnotateCtx(context.TODO()) if r.mu.state.Desc != nil && r.isInitializedRLocked() { log.Fatalf(ctx, "r%d: cannot reinitialize an initialized replica", desc.RangeID) - } else if r.mu.replicaID == 0 { + } else if r.replicaID == 0 { // NB: This is just a defensive check as r.mu.replicaID should never be 0. log.Fatalf(ctx, "r%d: cannot initialize replica without a replicaID", desc.RangeID) } @@ -204,15 +204,15 @@ func (r *Replica) loadRaftMuLockedReplicaMuLocked(desc *roachpb.RangeDescriptor) // Ensure that we're not trying to load a replica with a different ID than // was used to construct this Replica. - replicaID := r.mu.replicaID + replicaID := r.replicaID if replicaDesc, found := r.mu.state.Desc.GetReplicaDescriptor(r.StoreID()); found { replicaID = replicaDesc.ReplicaID } else if desc.IsInitialized() { log.Fatalf(ctx, "r%d: cannot initialize replica which is not in descriptor %v", desc.RangeID, desc) } - if r.mu.replicaID != replicaID { + if r.replicaID != replicaID { log.Fatalf(ctx, "attempting to initialize a replica which has ID %d with ID %d", - r.mu.replicaID, replicaID) + r.replicaID, replicaID) } r.setDescLockedRaftMuLocked(ctx, desc) @@ -346,9 +346,9 @@ func (r *Replica) setDescLockedRaftMuLocked(ctx context.Context, desc *roachpb.R // store to exist on disk. // 3) Various unit tests do not provide a valid descriptor. replDesc, found := desc.GetReplicaDescriptor(r.StoreID()) - if found && replDesc.ReplicaID != r.mu.replicaID { + if found && replDesc.ReplicaID != r.replicaID { log.Fatalf(ctx, "attempted to change replica's ID from %d to %d", - r.mu.replicaID, replDesc.ReplicaID) + r.replicaID, replDesc.ReplicaID) } // Initialize the tenant. The must be the first time that the descriptor has @@ -380,7 +380,7 @@ func (r *Replica) setDescLockedRaftMuLocked(ctx context.Context, desc *roachpb.R r.mu.lastReplicaAddedTime = time.Time{} } - r.rangeStr.store(r.mu.replicaID, desc) + r.rangeStr.store(r.replicaID, desc) r.connectionClass.set(rpc.ConnectionClassForKey(desc.StartKey)) r.concMgr.OnRangeDescUpdated(desc) r.mu.state.Desc = desc diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index 67ce6ea085ce..1cb6cd6c1bcd 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -257,7 +257,7 @@ func (r *Replica) computeChecksumPostApply(ctx context.Context, cc kvserverpb.Co var shouldFatal bool for _, rDesc := range cc.Terminate { - if rDesc.StoreID == r.store.StoreID() && rDesc.ReplicaID == r.mu.replicaID { + if rDesc.StoreID == r.store.StoreID() && rDesc.ReplicaID == r.replicaID { shouldFatal = true } } @@ -381,7 +381,7 @@ func (r *Replica) leasePostApplyLocked( } } - iAmTheLeaseHolder := newLease.Replica.ReplicaID == r.mu.replicaID + iAmTheLeaseHolder := newLease.Replica.ReplicaID == r.replicaID // NB: in the case in which a node restarts, minLeaseProposedTS forces it to // get a new lease and we make sure it gets a new sequence number, thus // causing the right half of the disjunction to fire so that we update the diff --git a/pkg/kv/kvserver/replica_proposal_buf.go b/pkg/kv/kvserver/replica_proposal_buf.go index c6b33ea7c459..99de480570fe 100644 --- a/pkg/kv/kvserver/replica_proposal_buf.go +++ b/pkg/kv/kvserver/replica_proposal_buf.go @@ -135,7 +135,7 @@ type proposer interface { locker() sync.Locker rlocker() sync.Locker // The following require the proposer to hold (at least) a shared lock. - replicaID() roachpb.ReplicaID + getReplicaID() roachpb.ReplicaID destroyed() destroyStatus leaseAppliedIndex() uint64 enqueueUpdateCheck() @@ -395,7 +395,8 @@ func (b *propBuf) FlushLockedWithRaftGroup( if raftGroup != nil { leaderInfo = b.p.leaderStatusRLocked(raftGroup) // Sanity check. - if leaderInfo.leaderKnown && leaderInfo.leader == b.p.replicaID() && !leaderInfo.iAmTheLeader { + if leaderInfo.leaderKnown && leaderInfo.leader == b.p.getReplicaID() && + !leaderInfo.iAmTheLeader { log.Fatalf(ctx, "inconsistent Raft state: state %s while the current replica is also the lead: %d", raftGroup.BasicStatus().RaftState, leaderInfo.leader) @@ -531,7 +532,7 @@ func (b *propBuf) FlushLockedWithRaftGroup( // Flush any previously batched (non-conf change) proposals to // preserve the correct ordering or proposals. Later proposals // will start a new batch. - if err := proposeBatch(raftGroup, b.p.replicaID(), ents); err != nil { + if err := proposeBatch(raftGroup, b.p.getReplicaID(), ents); err != nil { firstErr = err continue } @@ -581,7 +582,7 @@ func (b *propBuf) FlushLockedWithRaftGroup( if firstErr != nil { return 0, firstErr } - return used, proposeBatch(raftGroup, b.p.replicaID(), ents) + return used, proposeBatch(raftGroup, b.p.getReplicaID(), ents) } // allocateLAIAndClosedTimestampLocked computes a LAI and closed timestamp to be @@ -997,8 +998,8 @@ func (rp *replicaProposer) rlocker() sync.Locker { return rp.mu.RWMutex.RLocker() } -func (rp *replicaProposer) replicaID() roachpb.ReplicaID { - return rp.mu.replicaID +func (rp *replicaProposer) getReplicaID() roachpb.ReplicaID { + return rp.replicaID } func (rp *replicaProposer) destroyed() destroyStatus { diff --git a/pkg/kv/kvserver/replica_proposal_buf_test.go b/pkg/kv/kvserver/replica_proposal_buf_test.go index 976e7c5cec12..1dac0ff03a43 100644 --- a/pkg/kv/kvserver/replica_proposal_buf_test.go +++ b/pkg/kv/kvserver/replica_proposal_buf_test.go @@ -117,7 +117,7 @@ func (t *testProposer) rlocker() sync.Locker { return t.RWMutex.RLocker() } -func (t *testProposer) replicaID() roachpb.ReplicaID { +func (t *testProposer) getReplicaID() roachpb.ReplicaID { return 1 } @@ -170,7 +170,7 @@ func (t *testProposer) leaderStatusRLocked(raftGroup proposerRaft) rangeLeaderIn var iAmTheLeader, leaderEligibleForLease bool if leaderKnown { leaderRep = roachpb.ReplicaID(raftGroup.BasicStatus().Lead) - iAmTheLeader = leaderRep == t.replicaID() + iAmTheLeader = leaderRep == t.getReplicaID() repDesc := roachpb.ReplicaDescriptor{ ReplicaID: leaderRep, Type: &t.leaderReplicaType, @@ -521,9 +521,9 @@ func TestProposalBufferRejectLeaseAcqOnFollower(t *testing.T) { t.Run(tc.name, func(t *testing.T) { var p testProposer var pc proposalCreator - // p.replicaID() is hardcoded; it'd better be hardcoded to what this test - // expects. - require.Equal(t, self, uint64(p.replicaID())) + // p.getReplicaID() is hardcoded; it'd better be hardcoded to what this + // test expects. + require.Equal(t, self, uint64(p.getReplicaID())) var rejected roachpb.ReplicaID if tc.expRejection { diff --git a/pkg/kv/kvserver/replica_proposal_quota.go b/pkg/kv/kvserver/replica_proposal_quota.go index 3cb495ae0bf4..a89ee1b805db 100644 --- a/pkg/kv/kvserver/replica_proposal_quota.go +++ b/pkg/kv/kvserver/replica_proposal_quota.go @@ -87,7 +87,7 @@ func (r *Replica) updateProposalQuotaRaftMuLocked( status := r.mu.internalRaftGroup.BasicStatus() if r.mu.leaderID != lastLeaderID { - if r.mu.replicaID == r.mu.leaderID { + if r.replicaID == r.mu.leaderID { // We're becoming the leader. // Initialize the proposalQuotaBaseIndex at the applied index. // After the proposal quota is enabled all entries applied by this replica @@ -125,7 +125,7 @@ func (r *Replica) updateProposalQuotaRaftMuLocked( } return } else if r.mu.proposalQuota == nil { - if r.mu.replicaID == r.mu.leaderID { + if r.replicaID == r.mu.leaderID { log.Fatal(ctx, "leader has uninitialized proposalQuota pool") } // We're a follower. diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 9084a612a453..2bb9d5c2e6f5 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -868,7 +868,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( r.mu.leaderID = leaderID // Clear the remote proposal set. Would have been nil already if not // previously the leader. - becameLeader = r.mu.leaderID == r.mu.replicaID + becameLeader = r.mu.leaderID == r.replicaID } r.mu.Unlock() @@ -1040,8 +1040,8 @@ func (r *Replica) tick(ctx context.Context, livenessMap liveness.IsLiveMap) (boo // into the local Raft group. The leader won't hit that path, so we update // it whenever it ticks. In effect, this makes sure it always sees itself as // alive. - if r.mu.replicaID == r.mu.leaderID { - r.mu.lastUpdateTimes.update(r.mu.replicaID, timeutil.Now()) + if r.replicaID == r.mu.leaderID { + r.mu.lastUpdateTimes.update(r.replicaID, timeutil.Now()) } r.mu.ticks++ @@ -1585,7 +1585,7 @@ func (r *Replica) withRaftGroupLocked( ctx := r.AnnotateCtx(context.TODO()) raftGroup, err := raft.NewRawNode(newRaftConfig( raft.Storage((*replicaRaftStorage)(r)), - uint64(r.mu.replicaID), + uint64(r.replicaID), r.mu.state.RaftAppliedIndex, r.store.cfg, &raftLogger{ctx: ctx}, @@ -1709,7 +1709,7 @@ func (r *Replica) maybeCampaignOnWakeLocked(ctx context.Context) { // method were to be called on an uninitialized replica (which // has no state and thus an empty raft config), this might cause // problems. - if _, currentMember := r.mu.state.Desc.GetReplicaDescriptorByID(r.mu.replicaID); !currentMember { + if _, currentMember := r.mu.state.Desc.GetReplicaDescriptorByID(r.replicaID); !currentMember { return } diff --git a/pkg/kv/kvserver/replica_raft_quiesce.go b/pkg/kv/kvserver/replica_raft_quiesce.go index c61a3be61042..6c8310987fc6 100644 --- a/pkg/kv/kvserver/replica_raft_quiesce.go +++ b/pkg/kv/kvserver/replica_raft_quiesce.go @@ -402,10 +402,10 @@ func shouldReplicaQuiesce( func (r *Replica) quiesceAndNotifyRaftMuLockedReplicaMuLocked( ctx context.Context, status *raft.Status, lagging laggingReplicaSet, ) bool { - fromReplica, fromErr := r.getReplicaDescriptorByIDRLocked(r.mu.replicaID, r.raftMu.lastToReplica) + fromReplica, fromErr := r.getReplicaDescriptorByIDRLocked(r.replicaID, r.raftMu.lastToReplica) if fromErr != nil { if log.V(4) { - log.Infof(ctx, "not quiescing: cannot find from replica (%d)", r.mu.replicaID) + log.Infof(ctx, "not quiescing: cannot find from replica (%d)", r.replicaID) } return false } @@ -413,7 +413,7 @@ func (r *Replica) quiesceAndNotifyRaftMuLockedReplicaMuLocked( r.quiesceLocked(ctx, lagging) for id, prog := range status.Progress { - if roachpb.ReplicaID(id) == r.mu.replicaID { + if roachpb.ReplicaID(id) == r.replicaID { continue } toReplica, toErr := r.getReplicaDescriptorByIDRLocked( @@ -444,7 +444,7 @@ func (r *Replica) quiesceAndNotifyRaftMuLockedReplicaMuLocked( curLagging = nil } msg := raftpb.Message{ - From: uint64(r.mu.replicaID), + From: uint64(r.replicaID), To: id, Type: raftpb.MsgHeartbeat, Term: status.Term, diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index 9a350f3d5dbd..5b52cfd4d186 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -895,7 +895,7 @@ func (r *Replica) applySnapshot( // We've cleared all the raft state above, so we are forced to write the // RaftReplicaID again here. if err := r.raftMu.stateLoader.SetRaftReplicaID( - ctx, &unreplicatedSST, r.mu.replicaID); err != nil { + ctx, &unreplicatedSST, r.replicaID); err != nil { return errors.Wrapf(err, "unable to write RaftReplicaID to unreplicated SST writer") } @@ -1046,7 +1046,7 @@ func (r *Replica) applySnapshot( // we missed the application of a merge) and we are the new leaseholder, we // make sure to update the timestamp cache using the prior read summary to // account for any reads that were served on the right-hand side range(s). - if len(subsumedRepls) > 0 && state.Lease.Replica.ReplicaID == r.mu.replicaID && prioReadSum != nil { + if len(subsumedRepls) > 0 && state.Lease.Replica.ReplicaID == r.replicaID && prioReadSum != nil { applyReadSummaryToTimestampCache(r.store.tsCache, r.descRLocked(), *prioReadSum) } diff --git a/pkg/kv/kvserver/store_create_replica.go b/pkg/kv/kvserver/store_create_replica.go index 6418941e88cd..60a46f361c08 100644 --- a/pkg/kv/kvserver/store_create_replica.go +++ b/pkg/kv/kvserver/store_create_replica.go @@ -98,7 +98,7 @@ func (s *Store) tryGetOrCreateReplica( } // The current replica needs to be removed, remove it and go back around. - if toTooOld := repl.mu.replicaID < replicaID; toTooOld { + if toTooOld := repl.replicaID < replicaID; toTooOld { if shouldLog := log.V(1); shouldLog { log.Infof(ctx, "found message for replica ID %d which is newer than %v", replicaID, repl) @@ -115,14 +115,14 @@ func (s *Store) tryGetOrCreateReplica( } defer repl.mu.RUnlock() - if repl.mu.replicaID > replicaID { + if repl.replicaID > replicaID { // The sender is behind and is sending to an old replica. // We could silently drop this message but this way we'll inform the // sender that they may no longer exist. repl.raftMu.Unlock() return nil, false, &roachpb.RaftGroupDeletedError{} } - if repl.mu.replicaID != replicaID { + if repl.replicaID != replicaID { // This case should have been caught by handleToReplicaTooOld. log.Fatalf(ctx, "intended replica id %d unexpectedly does not match the current replica %v", replicaID, repl) diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index 9a64ab97170f..95215b88bd4b 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -388,7 +388,7 @@ func (s *Store) HandleRaftResponse( // that the replica has been removed and re-added quickly. In // that case, we don't want to add it to the replicaGCQueue. // If the replica is not alive then we also should ignore this error. - if tErr.ReplicaID != repl.mu.replicaID || + if tErr.ReplicaID != repl.replicaID || !repl.mu.destroyStatus.IsAlive() || // Ignore if we want to test the replicaGC queue. s.TestingKnobs().DisableEagerReplicaRemoval { diff --git a/pkg/kv/kvserver/store_remove_replica.go b/pkg/kv/kvserver/store_remove_replica.go index 171af032ef63..4520c0a567fb 100644 --- a/pkg/kv/kvserver/store_remove_replica.go +++ b/pkg/kv/kvserver/store_remove_replica.go @@ -90,7 +90,6 @@ func (s *Store) removeInitializedReplicaRaftMuLocked( // Sanity checks before committing to the removal by setting the // destroy status. var desc *roachpb.RangeDescriptor - var replicaID roachpb.ReplicaID { rep.readOnlyCmdMu.Lock() rep.mu.Lock() @@ -132,11 +131,9 @@ func (s *Store) removeInitializedReplicaRaftMuLocked( // Mark the replica as removed before deleting data. rep.mu.destroyStatus.Set(roachpb.NewRangeNotFoundError(rep.RangeID, rep.StoreID()), destroyReasonRemoved) - replicaID = rep.mu.replicaID rep.mu.Unlock() rep.readOnlyCmdMu.Unlock() } - // Proceed with the removal, all errors encountered from here down are fatal. // Another sanity check that this replica is a part of this store. @@ -150,7 +147,7 @@ func (s *Store) removeInitializedReplicaRaftMuLocked( // During merges, the context might have the subsuming range, so we explicitly // log the replica to be removed. - log.Infof(ctx, "removing replica r%d/%d", rep.RangeID, replicaID) + log.Infof(ctx, "removing replica r%d/%d", rep.RangeID, rep.replicaID) s.mu.Lock() if it := s.getOverlappingKeyRangeLocked(desc); it.repl != rep { diff --git a/pkg/kv/kvserver/store_send.go b/pkg/kv/kvserver/store_send.go index d908beda3461..8d7429f3a8af 100644 --- a/pkg/kv/kvserver/store_send.go +++ b/pkg/kv/kvserver/store_send.go @@ -174,10 +174,6 @@ func (s *Store) Send( return nil, roachpb.NewError(err) } if !repl.IsInitialized() { - repl.mu.RLock() - replicaID := repl.mu.replicaID - repl.mu.RUnlock() - // If we have an uninitialized copy of the range, then we are // probably a valid member of the range, we're just in the // process of getting our snapshot. If we returned @@ -193,7 +189,7 @@ func (s *Store) Send( Replica: roachpb.ReplicaDescriptor{ NodeID: repl.store.nodeDesc.NodeID, StoreID: repl.store.StoreID(), - ReplicaID: replicaID, + ReplicaID: repl.replicaID, }, }) }