Skip to content

Commit

Permalink
kvserver: rename Replica.mu.replicaID to Replica.replicaID
Browse files Browse the repository at this point in the history
The existing behavior is that replicaID is set when creating
a Replica object and does not change.

Release note: None
  • Loading branch information
sumeerbhola committed Feb 8, 2022
1 parent 844ac13 commit d35cf75
Show file tree
Hide file tree
Showing 13 changed files with 50 additions and 62 deletions.
24 changes: 9 additions & 15 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,11 @@ type Replica struct {
log.AmbientContext

RangeID roachpb.RangeID // Only set by the constructor
// The ID of the replica within the Raft group. Only set by the constructor,
// so it will not change over the lifetime of this replica. If addressed
// under a newer replicaID, the replica immediately replicaGCs itself to
// make way for the newer incarnation.
replicaID roachpb.ReplicaID

// The start key of a Range remains constant throughout its lifetime (it does
// not change through splits or merges). This field carries a copy of
Expand Down Expand Up @@ -516,13 +521,6 @@ type Replica struct {
applyingEntries bool
// The replica's Raft group "node".
internalRaftGroup *raft.RawNode
// The ID of the replica within the Raft group. This value may never be 0.
// It will not change over the lifetime of this replica. If addressed under
// a newer replicaID, the replica immediately replicaGCs itself to make
// way for the newer incarnation.
// TODO(sumeer): since this is initialized in newUnloadedReplica and never
// changed, lift this out of the mu struct.
replicaID roachpb.ReplicaID
// The minimum allowed ID for this replica. Initialized from
// RangeTombstone.NextReplicaID.
tombstoneMinReplicaID roachpb.ReplicaID
Expand Down Expand Up @@ -715,11 +713,7 @@ func (r *Replica) SafeFormat(w redact.SafePrinter, _ rune) {
// ReplicaID returns the ID for the Replica. This value is fixed for the
// lifetime of the Replica.
func (r *Replica) ReplicaID() roachpb.ReplicaID {
// The locking of mu is unnecessary. It will be removed when we lift
// replicaID out of the mu struct.
r.mu.RLock()
defer r.mu.RUnlock()
return r.mu.replicaID
return r.replicaID
}

// cleanupFailedProposal cleans up after a proposal that has failed. It
Expand Down Expand Up @@ -1320,8 +1314,8 @@ func (r *Replica) assertStateRaftMuLockedReplicaMuRLocked(
if !ok {
log.Fatalf(ctx, "%+v does not contain local store s%d", r.mu.state.Desc, r.store.StoreID())
}
if replDesc.ReplicaID != r.mu.replicaID {
log.Fatalf(ctx, "replica's replicaID %d diverges from descriptor %+v", r.mu.replicaID, r.mu.state.Desc)
if replDesc.ReplicaID != r.replicaID {
log.Fatalf(ctx, "replica's replicaID %d diverges from descriptor %+v", r.replicaID, r.mu.state.Desc)
}
}
}
Expand Down Expand Up @@ -1677,7 +1671,7 @@ func (r *Replica) isNewerThanSplitRLocked(split *roachpb.SplitTrigger) bool {
// ID which is above the replica ID of the split then we would not have
// written a tombstone but we will have a replica ID that will exceed the
// split replica ID.
r.mu.replicaID > rightDesc.ReplicaID
r.replicaID > rightDesc.ReplicaID
}

// WatchForMerge is like maybeWatchForMergeLocked, except it expects a merge to
Expand Down
16 changes: 8 additions & 8 deletions pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func newUnloadedReplica(
r := &Replica{
AmbientContext: store.cfg.AmbientCtx,
RangeID: desc.RangeID,
replicaID: replicaID,
creationTime: timeutil.Now(),
store: store,
abortSpan: abortspan.New(desc.RangeID),
Expand All @@ -93,7 +94,6 @@ func newUnloadedReplica(
r.mu.stateLoader = stateloader.Make(desc.RangeID)
r.mu.quiescent = true
r.mu.conf = store.cfg.DefaultSpanConfig
r.mu.replicaID = replicaID
split.Init(&r.loadBasedSplitter, rand.Intn, func() float64 {
return float64(SplitByLoadQPSThreshold.Get(&store.cfg.Settings.SV))
}, func() time.Duration {
Expand Down Expand Up @@ -179,7 +179,7 @@ func (r *Replica) loadRaftMuLockedReplicaMuLocked(desc *roachpb.RangeDescriptor)
ctx := r.AnnotateCtx(context.TODO())
if r.mu.state.Desc != nil && r.isInitializedRLocked() {
log.Fatalf(ctx, "r%d: cannot reinitialize an initialized replica", desc.RangeID)
} else if r.mu.replicaID == 0 {
} else if r.replicaID == 0 {
// NB: This is just a defensive check as r.mu.replicaID should never be 0.
log.Fatalf(ctx, "r%d: cannot initialize replica without a replicaID", desc.RangeID)
}
Expand All @@ -204,15 +204,15 @@ func (r *Replica) loadRaftMuLockedReplicaMuLocked(desc *roachpb.RangeDescriptor)

// Ensure that we're not trying to load a replica with a different ID than
// was used to construct this Replica.
replicaID := r.mu.replicaID
replicaID := r.replicaID
if replicaDesc, found := r.mu.state.Desc.GetReplicaDescriptor(r.StoreID()); found {
replicaID = replicaDesc.ReplicaID
} else if desc.IsInitialized() {
log.Fatalf(ctx, "r%d: cannot initialize replica which is not in descriptor %v", desc.RangeID, desc)
}
if r.mu.replicaID != replicaID {
if r.replicaID != replicaID {
log.Fatalf(ctx, "attempting to initialize a replica which has ID %d with ID %d",
r.mu.replicaID, replicaID)
r.replicaID, replicaID)
}

r.setDescLockedRaftMuLocked(ctx, desc)
Expand Down Expand Up @@ -346,9 +346,9 @@ func (r *Replica) setDescLockedRaftMuLocked(ctx context.Context, desc *roachpb.R
// store to exist on disk.
// 3) Various unit tests do not provide a valid descriptor.
replDesc, found := desc.GetReplicaDescriptor(r.StoreID())
if found && replDesc.ReplicaID != r.mu.replicaID {
if found && replDesc.ReplicaID != r.replicaID {
log.Fatalf(ctx, "attempted to change replica's ID from %d to %d",
r.mu.replicaID, replDesc.ReplicaID)
r.replicaID, replDesc.ReplicaID)
}

// Initialize the tenant. The must be the first time that the descriptor has
Expand Down Expand Up @@ -380,7 +380,7 @@ func (r *Replica) setDescLockedRaftMuLocked(ctx context.Context, desc *roachpb.R
r.mu.lastReplicaAddedTime = time.Time{}
}

r.rangeStr.store(r.mu.replicaID, desc)
r.rangeStr.store(r.replicaID, desc)
r.connectionClass.set(rpc.ConnectionClassForKey(desc.StartKey))
r.concMgr.OnRangeDescUpdated(desc)
r.mu.state.Desc = desc
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func (r *Replica) computeChecksumPostApply(ctx context.Context, cc kvserverpb.Co

var shouldFatal bool
for _, rDesc := range cc.Terminate {
if rDesc.StoreID == r.store.StoreID() && rDesc.ReplicaID == r.mu.replicaID {
if rDesc.StoreID == r.store.StoreID() && rDesc.ReplicaID == r.replicaID {
shouldFatal = true
}
}
Expand Down Expand Up @@ -381,7 +381,7 @@ func (r *Replica) leasePostApplyLocked(
}
}

iAmTheLeaseHolder := newLease.Replica.ReplicaID == r.mu.replicaID
iAmTheLeaseHolder := newLease.Replica.ReplicaID == r.replicaID
// NB: in the case in which a node restarts, minLeaseProposedTS forces it to
// get a new lease and we make sure it gets a new sequence number, thus
// causing the right half of the disjunction to fire so that we update the
Expand Down
13 changes: 7 additions & 6 deletions pkg/kv/kvserver/replica_proposal_buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ type proposer interface {
locker() sync.Locker
rlocker() sync.Locker
// The following require the proposer to hold (at least) a shared lock.
replicaID() roachpb.ReplicaID
getReplicaID() roachpb.ReplicaID
destroyed() destroyStatus
leaseAppliedIndex() uint64
enqueueUpdateCheck()
Expand Down Expand Up @@ -395,7 +395,8 @@ func (b *propBuf) FlushLockedWithRaftGroup(
if raftGroup != nil {
leaderInfo = b.p.leaderStatusRLocked(raftGroup)
// Sanity check.
if leaderInfo.leaderKnown && leaderInfo.leader == b.p.replicaID() && !leaderInfo.iAmTheLeader {
if leaderInfo.leaderKnown && leaderInfo.leader == b.p.getReplicaID() &&
!leaderInfo.iAmTheLeader {
log.Fatalf(ctx,
"inconsistent Raft state: state %s while the current replica is also the lead: %d",
raftGroup.BasicStatus().RaftState, leaderInfo.leader)
Expand Down Expand Up @@ -531,7 +532,7 @@ func (b *propBuf) FlushLockedWithRaftGroup(
// Flush any previously batched (non-conf change) proposals to
// preserve the correct ordering or proposals. Later proposals
// will start a new batch.
if err := proposeBatch(raftGroup, b.p.replicaID(), ents); err != nil {
if err := proposeBatch(raftGroup, b.p.getReplicaID(), ents); err != nil {
firstErr = err
continue
}
Expand Down Expand Up @@ -581,7 +582,7 @@ func (b *propBuf) FlushLockedWithRaftGroup(
if firstErr != nil {
return 0, firstErr
}
return used, proposeBatch(raftGroup, b.p.replicaID(), ents)
return used, proposeBatch(raftGroup, b.p.getReplicaID(), ents)
}

// allocateLAIAndClosedTimestampLocked computes a LAI and closed timestamp to be
Expand Down Expand Up @@ -997,8 +998,8 @@ func (rp *replicaProposer) rlocker() sync.Locker {
return rp.mu.RWMutex.RLocker()
}

func (rp *replicaProposer) replicaID() roachpb.ReplicaID {
return rp.mu.replicaID
func (rp *replicaProposer) getReplicaID() roachpb.ReplicaID {
return rp.replicaID
}

func (rp *replicaProposer) destroyed() destroyStatus {
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/replica_proposal_buf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (t *testProposer) rlocker() sync.Locker {
return t.RWMutex.RLocker()
}

func (t *testProposer) replicaID() roachpb.ReplicaID {
func (t *testProposer) getReplicaID() roachpb.ReplicaID {
return 1
}

Expand Down Expand Up @@ -170,7 +170,7 @@ func (t *testProposer) leaderStatusRLocked(raftGroup proposerRaft) rangeLeaderIn
var iAmTheLeader, leaderEligibleForLease bool
if leaderKnown {
leaderRep = roachpb.ReplicaID(raftGroup.BasicStatus().Lead)
iAmTheLeader = leaderRep == t.replicaID()
iAmTheLeader = leaderRep == t.getReplicaID()
repDesc := roachpb.ReplicaDescriptor{
ReplicaID: leaderRep,
Type: &t.leaderReplicaType,
Expand Down Expand Up @@ -521,9 +521,9 @@ func TestProposalBufferRejectLeaseAcqOnFollower(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
var p testProposer
var pc proposalCreator
// p.replicaID() is hardcoded; it'd better be hardcoded to what this test
// expects.
require.Equal(t, self, uint64(p.replicaID()))
// p.getReplicaID() is hardcoded; it'd better be hardcoded to what this
// test expects.
require.Equal(t, self, uint64(p.getReplicaID()))

var rejected roachpb.ReplicaID
if tc.expRejection {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_proposal_quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (r *Replica) updateProposalQuotaRaftMuLocked(

status := r.mu.internalRaftGroup.BasicStatus()
if r.mu.leaderID != lastLeaderID {
if r.mu.replicaID == r.mu.leaderID {
if r.replicaID == r.mu.leaderID {
// We're becoming the leader.
// Initialize the proposalQuotaBaseIndex at the applied index.
// After the proposal quota is enabled all entries applied by this replica
Expand Down Expand Up @@ -125,7 +125,7 @@ func (r *Replica) updateProposalQuotaRaftMuLocked(
}
return
} else if r.mu.proposalQuota == nil {
if r.mu.replicaID == r.mu.leaderID {
if r.replicaID == r.mu.leaderID {
log.Fatal(ctx, "leader has uninitialized proposalQuota pool")
}
// We're a follower.
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -868,7 +868,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
r.mu.leaderID = leaderID
// Clear the remote proposal set. Would have been nil already if not
// previously the leader.
becameLeader = r.mu.leaderID == r.mu.replicaID
becameLeader = r.mu.leaderID == r.replicaID
}
r.mu.Unlock()

Expand Down Expand Up @@ -1040,8 +1040,8 @@ func (r *Replica) tick(ctx context.Context, livenessMap liveness.IsLiveMap) (boo
// into the local Raft group. The leader won't hit that path, so we update
// it whenever it ticks. In effect, this makes sure it always sees itself as
// alive.
if r.mu.replicaID == r.mu.leaderID {
r.mu.lastUpdateTimes.update(r.mu.replicaID, timeutil.Now())
if r.replicaID == r.mu.leaderID {
r.mu.lastUpdateTimes.update(r.replicaID, timeutil.Now())
}

r.mu.ticks++
Expand Down Expand Up @@ -1585,7 +1585,7 @@ func (r *Replica) withRaftGroupLocked(
ctx := r.AnnotateCtx(context.TODO())
raftGroup, err := raft.NewRawNode(newRaftConfig(
raft.Storage((*replicaRaftStorage)(r)),
uint64(r.mu.replicaID),
uint64(r.replicaID),
r.mu.state.RaftAppliedIndex,
r.store.cfg,
&raftLogger{ctx: ctx},
Expand Down Expand Up @@ -1709,7 +1709,7 @@ func (r *Replica) maybeCampaignOnWakeLocked(ctx context.Context) {
// method were to be called on an uninitialized replica (which
// has no state and thus an empty raft config), this might cause
// problems.
if _, currentMember := r.mu.state.Desc.GetReplicaDescriptorByID(r.mu.replicaID); !currentMember {
if _, currentMember := r.mu.state.Desc.GetReplicaDescriptorByID(r.replicaID); !currentMember {
return
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/replica_raft_quiesce.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,18 +402,18 @@ func shouldReplicaQuiesce(
func (r *Replica) quiesceAndNotifyRaftMuLockedReplicaMuLocked(
ctx context.Context, status *raft.Status, lagging laggingReplicaSet,
) bool {
fromReplica, fromErr := r.getReplicaDescriptorByIDRLocked(r.mu.replicaID, r.raftMu.lastToReplica)
fromReplica, fromErr := r.getReplicaDescriptorByIDRLocked(r.replicaID, r.raftMu.lastToReplica)
if fromErr != nil {
if log.V(4) {
log.Infof(ctx, "not quiescing: cannot find from replica (%d)", r.mu.replicaID)
log.Infof(ctx, "not quiescing: cannot find from replica (%d)", r.replicaID)
}
return false
}

r.quiesceLocked(ctx, lagging)

for id, prog := range status.Progress {
if roachpb.ReplicaID(id) == r.mu.replicaID {
if roachpb.ReplicaID(id) == r.replicaID {
continue
}
toReplica, toErr := r.getReplicaDescriptorByIDRLocked(
Expand Down Expand Up @@ -444,7 +444,7 @@ func (r *Replica) quiesceAndNotifyRaftMuLockedReplicaMuLocked(
curLagging = nil
}
msg := raftpb.Message{
From: uint64(r.mu.replicaID),
From: uint64(r.replicaID),
To: id,
Type: raftpb.MsgHeartbeat,
Term: status.Term,
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,7 @@ func (r *Replica) applySnapshot(
// We've cleared all the raft state above, so we are forced to write the
// RaftReplicaID again here.
if err := r.raftMu.stateLoader.SetRaftReplicaID(
ctx, &unreplicatedSST, r.mu.replicaID); err != nil {
ctx, &unreplicatedSST, r.replicaID); err != nil {
return errors.Wrapf(err, "unable to write RaftReplicaID to unreplicated SST writer")
}

Expand Down Expand Up @@ -1046,7 +1046,7 @@ func (r *Replica) applySnapshot(
// we missed the application of a merge) and we are the new leaseholder, we
// make sure to update the timestamp cache using the prior read summary to
// account for any reads that were served on the right-hand side range(s).
if len(subsumedRepls) > 0 && state.Lease.Replica.ReplicaID == r.mu.replicaID && prioReadSum != nil {
if len(subsumedRepls) > 0 && state.Lease.Replica.ReplicaID == r.replicaID && prioReadSum != nil {
applyReadSummaryToTimestampCache(r.store.tsCache, r.descRLocked(), *prioReadSum)
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/store_create_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (s *Store) tryGetOrCreateReplica(
}

// The current replica needs to be removed, remove it and go back around.
if toTooOld := repl.mu.replicaID < replicaID; toTooOld {
if toTooOld := repl.replicaID < replicaID; toTooOld {
if shouldLog := log.V(1); shouldLog {
log.Infof(ctx, "found message for replica ID %d which is newer than %v",
replicaID, repl)
Expand All @@ -115,14 +115,14 @@ func (s *Store) tryGetOrCreateReplica(
}
defer repl.mu.RUnlock()

if repl.mu.replicaID > replicaID {
if repl.replicaID > replicaID {
// The sender is behind and is sending to an old replica.
// We could silently drop this message but this way we'll inform the
// sender that they may no longer exist.
repl.raftMu.Unlock()
return nil, false, &roachpb.RaftGroupDeletedError{}
}
if repl.mu.replicaID != replicaID {
if repl.replicaID != replicaID {
// This case should have been caught by handleToReplicaTooOld.
log.Fatalf(ctx, "intended replica id %d unexpectedly does not match the current replica %v",
replicaID, repl)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ func (s *Store) HandleRaftResponse(
// that the replica has been removed and re-added quickly. In
// that case, we don't want to add it to the replicaGCQueue.
// If the replica is not alive then we also should ignore this error.
if tErr.ReplicaID != repl.mu.replicaID ||
if tErr.ReplicaID != repl.replicaID ||
!repl.mu.destroyStatus.IsAlive() ||
// Ignore if we want to test the replicaGC queue.
s.TestingKnobs().DisableEagerReplicaRemoval {
Expand Down
5 changes: 1 addition & 4 deletions pkg/kv/kvserver/store_remove_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ func (s *Store) removeInitializedReplicaRaftMuLocked(
// Sanity checks before committing to the removal by setting the
// destroy status.
var desc *roachpb.RangeDescriptor
var replicaID roachpb.ReplicaID
{
rep.readOnlyCmdMu.Lock()
rep.mu.Lock()
Expand Down Expand Up @@ -132,11 +131,9 @@ func (s *Store) removeInitializedReplicaRaftMuLocked(
// Mark the replica as removed before deleting data.
rep.mu.destroyStatus.Set(roachpb.NewRangeNotFoundError(rep.RangeID, rep.StoreID()),
destroyReasonRemoved)
replicaID = rep.mu.replicaID
rep.mu.Unlock()
rep.readOnlyCmdMu.Unlock()
}

// Proceed with the removal, all errors encountered from here down are fatal.

// Another sanity check that this replica is a part of this store.
Expand All @@ -150,7 +147,7 @@ func (s *Store) removeInitializedReplicaRaftMuLocked(

// During merges, the context might have the subsuming range, so we explicitly
// log the replica to be removed.
log.Infof(ctx, "removing replica r%d/%d", rep.RangeID, replicaID)
log.Infof(ctx, "removing replica r%d/%d", rep.RangeID, rep.replicaID)

s.mu.Lock()
if it := s.getOverlappingKeyRangeLocked(desc); it.repl != rep {
Expand Down
Loading

0 comments on commit d35cf75

Please sign in to comment.