diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 141a838aff6c..aac0bed70040 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -279,6 +279,47 @@ type Replica struct { stateMachine replicaStateMachine // decoder is used to decode committed raft entries. decoder replicaDecoder + + // The last seen replica descriptors from incoming Raft messages. These are + // stored so that the replica still knows the replica descriptors for itself + // and for its message recipients in the circumstances when its RangeDescriptor + // is out of date. + // + // Normally, a replica knows about the other replica descriptors for a + // range via the RangeDescriptor stored in Replica.mu.state.Desc. But that + // descriptor is only updated during a Split or ChangeReplicas operation. + // There are periods during a Replica's lifetime when that information is + // out of date: + // + // 1. When a replica is being newly created as the result of an incoming + // Raft message for it. This is the common case for ChangeReplicas and an + // uncommon case for Splits. The leader will be sending the replica + // messages and the replica needs to be able to respond before it can + // receive an updated range descriptor (via a snapshot, + // changeReplicasTrigger, or splitTrigger). + // + // 2. If the node containing a replica is partitioned or down while the + // replicas for the range are updated. When the node comes back up, other + // replicas may begin communicating with it and it needs to be able to + // respond. Unlike 1 where there is no range descriptor, in this situation + // the replica has a range descriptor but it is out of date. Note that a + // replica being removed from a node and then quickly re-added before the + // replica has been GC'd will also use the last seen descriptors. In + // effect, this is another path for which the replica's local range + // descriptor is out of date. + // + // The last seen replica descriptors are updated on receipt of every raft + // message via Replica.setLastReplicaDescriptors (see + // Store.HandleRaftRequest). These last seen descriptors are used when + // the replica's RangeDescriptor contains missing or out of date descriptors + // for a replica (see Replica.sendRaftMessageRaftMuLocked). + // + // Removing a replica from Store.mu.replicas is not a problem because + // when a replica is completely removed, it won't be recreated until + // there is another event that will repopulate the replicas map in the + // range descriptor. When it is temporarily dropped and recreated, the + // newly recreated replica will have a complete range descriptor. + lastToReplica, lastFromReplica roachpb.ReplicaDescriptor } // Contains the lease history when enabled. @@ -499,47 +540,6 @@ type Replica struct { // live node will not lose leaseholdership. lastUpdateTimes lastUpdateTimesMap - // The last seen replica descriptors from incoming Raft messages. These are - // stored so that the replica still knows the replica descriptors for itself - // and for its message recipients in the circumstances when its RangeDescriptor - // is out of date. - // - // Normally, a replica knows about the other replica descriptors for a - // range via the RangeDescriptor stored in Replica.mu.state.Desc. But that - // descriptor is only updated during a Split or ChangeReplicas operation. - // There are periods during a Replica's lifetime when that information is - // out of date: - // - // 1. When a replica is being newly created as the result of an incoming - // Raft message for it. This is the common case for ChangeReplicas and an - // uncommon case for Splits. The leader will be sending the replica - // messages and the replica needs to be able to respond before it can - // receive an updated range descriptor (via a snapshot, - // changeReplicasTrigger, or splitTrigger). - // - // 2. If the node containing a replica is partitioned or down while the - // replicas for the range are updated. When the node comes back up, other - // replicas may begin communicating with it and it needs to be able to - // respond. Unlike 1 where there is no range descriptor, in this situation - // the replica has a range descriptor but it is out of date. Note that a - // replica being removed from a node and then quickly re-added before the - // replica has been GC'd will also use the last seen descriptors. In - // effect, this is another path for which the replica's local range - // descriptor is out of date. - // - // The last seen replica descriptors are updated on receipt of every raft - // message via Replica.setLastReplicaDescriptors (see - // Store.HandleRaftRequest). These last seen descriptors are used when - // the replica's RangeDescriptor contains missing or out of date descriptors - // for a replica (see Replica.sendRaftMessage). - // - // Removing a replica from Store.mu.replicas is not a problem because - // when a replica is completely removed, it won't be recreated until - // there is another event that will repopulate the replicas map in the - // range descriptor. When it is temporarily dropped and recreated, the - // newly recreated replica will have a complete range descriptor. - lastToReplica, lastFromReplica roachpb.ReplicaDescriptor - // Computed checksum at a snapshot UUID. checksums map[uuid.UUID]ReplicaChecksum @@ -1063,13 +1063,11 @@ func (r *Replica) mergeInProgressRLocked() bool { } // setLastReplicaDescriptors sets the most recently seen replica -// descriptors to those contained in the *RaftMessageRequest, acquiring r.mu -// to do so. -func (r *Replica) setLastReplicaDescriptors(req *RaftMessageRequest) { - r.mu.Lock() - r.mu.lastFromReplica = req.FromReplica - r.mu.lastToReplica = req.ToReplica - r.mu.Unlock() +// descriptors to those contained in the *RaftMessageRequest. +func (r *Replica) setLastReplicaDescriptorsRaftMuLocked(req *RaftMessageRequest) { + r.raftMu.AssertHeld() + r.raftMu.lastFromReplica = req.FromReplica + r.raftMu.lastToReplica = req.ToReplica } // GetMVCCStats returns a copy of the MVCC stats object for this range. diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 7f6166730630..b7717203bdc2 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -752,7 +752,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( msgApps, otherMsgs := splitMsgApps(rd.Messages) r.traceMessageSends(msgApps, "sending msgApp") - r.sendRaftMessages(ctx, msgApps) + r.sendRaftMessagesRaftMuLocked(ctx, msgApps) // Use a more efficient write-only batch because we don't need to do any // reads from the batch. Any reads are performed on the underlying DB. @@ -862,7 +862,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( // Update raft log entry cache. We clear any older, uncommitted log entries // and cache the latest ones. r.store.raftEntryCache.Add(r.RangeID, rd.Entries, true /* truncate */) - r.sendRaftMessages(ctx, otherMsgs) + r.sendRaftMessagesRaftMuLocked(ctx, otherMsgs) r.traceEntries(rd.CommittedEntries, "committed, before applying any entries") applicationStart := timeutil.Now() @@ -1010,7 +1010,7 @@ func (r *Replica) tick(ctx context.Context, livenessMap liveness.IsLiveMap) (boo } now := r.store.Clock().NowAsClockTimestamp() - if r.maybeQuiesceLocked(ctx, now, livenessMap) { + if r.maybeQuiesceRaftMuLockedReplicaMuLocked(ctx, now, livenessMap) { return false, nil } @@ -1207,7 +1207,7 @@ func (r *Replica) maybeCoalesceHeartbeat( return true } -func (r *Replica) sendRaftMessages(ctx context.Context, messages []raftpb.Message) { +func (r *Replica) sendRaftMessagesRaftMuLocked(ctx context.Context, messages []raftpb.Message) { var lastAppResp raftpb.Message for _, message := range messages { drop := false @@ -1275,19 +1275,19 @@ func (r *Replica) sendRaftMessages(ctx context.Context, messages []raftpb.Messag } if !drop { - r.sendRaftMessage(ctx, message) + r.sendRaftMessageRaftMuLocked(ctx, message) } } if lastAppResp.Index > 0 { - r.sendRaftMessage(ctx, lastAppResp) + r.sendRaftMessageRaftMuLocked(ctx, lastAppResp) } } -// sendRaftMessage sends a Raft message. -func (r *Replica) sendRaftMessage(ctx context.Context, msg raftpb.Message) { +// sendRaftMessageRaftMuLocked sends a Raft message. +func (r *Replica) sendRaftMessageRaftMuLocked(ctx context.Context, msg raftpb.Message) { r.mu.RLock() - fromReplica, fromErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.From), r.mu.lastToReplica) - toReplica, toErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.To), r.mu.lastFromReplica) + fromReplica, fromErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.From), r.raftMu.lastToReplica) + toReplica, toErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.To), r.raftMu.lastFromReplica) var startKey roachpb.RKey if msg.Type == raftpb.MsgApp && r.mu.internalRaftGroup != nil { // When the follower is potentially an uninitialized replica waiting for diff --git a/pkg/kv/kvserver/replica_raft_quiesce.go b/pkg/kv/kvserver/replica_raft_quiesce.go index 63ae153c2a2a..258665ef14df 100644 --- a/pkg/kv/kvserver/replica_raft_quiesce.go +++ b/pkg/kv/kvserver/replica_raft_quiesce.go @@ -121,9 +121,9 @@ func (r *Replica) canUnquiesceRLocked() bool { r.mu.internalRaftGroup != nil } -// maybeQuiesceLocked checks to see if the replica is quiescable and initiates -// quiescence if it is. Returns true if the replica has been quiesced and false -// otherwise. +// maybeQuiesceRaftMuLockedReplicaMuLocked checks to see if the replica is +// quiescable and initiates quiescence if it is. Returns true if the replica has +// been quiesced and false otherwise. // // A quiesced range is not ticked and thus doesn't create MsgHeartbeat requests // or cause elections. The Raft leader for a range checks various @@ -178,14 +178,14 @@ func (r *Replica) canUnquiesceRLocked() bool { // would quiesce. The fallout from this situation are undesirable raft // elections which will cause throughput hiccups to the range, but not // correctness issues. -func (r *Replica) maybeQuiesceLocked( +func (r *Replica) maybeQuiesceRaftMuLockedReplicaMuLocked( ctx context.Context, now hlc.ClockTimestamp, livenessMap liveness.IsLiveMap, ) bool { status, lagging, ok := shouldReplicaQuiesce(ctx, r, now, livenessMap) if !ok { return false } - return r.quiesceAndNotifyLocked(ctx, status, lagging) + return r.quiesceAndNotifyRaftMuLockedReplicaMuLocked(ctx, status, lagging) } type quiescer interface { @@ -398,10 +398,10 @@ func shouldReplicaQuiesce( return status, lagging, true } -func (r *Replica) quiesceAndNotifyLocked( +func (r *Replica) quiesceAndNotifyRaftMuLockedReplicaMuLocked( ctx context.Context, status *raft.Status, lagging laggingReplicaSet, ) bool { - fromReplica, fromErr := r.getReplicaDescriptorByIDRLocked(r.mu.replicaID, r.mu.lastToReplica) + fromReplica, fromErr := r.getReplicaDescriptorByIDRLocked(r.mu.replicaID, r.raftMu.lastToReplica) if fromErr != nil { if log.V(4) { log.Infof(ctx, "not quiescing: cannot find from replica (%d)", r.mu.replicaID) @@ -416,7 +416,7 @@ func (r *Replica) quiesceAndNotifyLocked( continue } toReplica, toErr := r.getReplicaDescriptorByIDRLocked( - roachpb.ReplicaID(id), r.mu.lastFromReplica) + roachpb.ReplicaID(id), r.raftMu.lastFromReplica) if toErr != nil { if log.V(4) { log.Infof(ctx, "failed to quiesce: cannot find to replica (%d)", id) diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index b80a122d611b..6c19235cdc05 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -516,7 +516,7 @@ Store.HandleRaftRequest (which is part of the RaftMessageHandler interface), ultimately resulting in a call to Replica.handleRaftReadyRaftMuLocked, which houses the integration with the etcd/raft library (raft.RawNode). This may generate Raft messages to be sent to other Stores; these are handed to -Replica.sendRaftMessages which ultimately hands them to the Store's +Replica.sendRaftMessagesRaftMuLocked which ultimately hands them to the Store's RaftTransport.SendAsync method. Raft uses message passing (not request-response), and outgoing messages will use a gRPC stream that differs from that used for incoming messages (which makes asymmetric partitions more diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index 89a70537990a..e7cc24dba5f1 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -213,7 +213,7 @@ func (s *Store) withReplicaForRequest( return roachpb.NewError(err) } defer r.raftMu.Unlock() - r.setLastReplicaDescriptors(req) + r.setLastReplicaDescriptorsRaftMuLocked(req) return f(ctx, r) }