diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 3dce83b117af..7bca93c7569e 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -258,47 +258,52 @@ type Replica struct { stateMachine replicaStateMachine // decoder is used to decode committed raft entries. decoder replicaDecoder + } - // The last seen replica descriptors from incoming Raft messages. These are - // stored so that the replica still knows the replica descriptors for itself - // and for its message recipients in the circumstances when its RangeDescriptor - // is out of date. - // - // Normally, a replica knows about the other replica descriptors for a - // range via the RangeDescriptor stored in Replica.mu.state.Desc. But that - // descriptor is only updated during a Split or ChangeReplicas operation. - // There are periods during a Replica's lifetime when that information is - // out of date: - // - // 1. When a replica is being newly created as the result of an incoming - // Raft message for it. This is the common case for ChangeReplicas and an - // uncommon case for Splits. The leader will be sending the replica - // messages and the replica needs to be able to respond before it can - // receive an updated range descriptor (via a snapshot, - // changeReplicasTrigger, or splitTrigger). - // - // 2. If the node containing a replica is partitioned or down while the - // replicas for the range are updated. When the node comes back up, other - // replicas may begin communicating with it and it needs to be able to - // respond. Unlike 1 where there is no range descriptor, in this situation - // the replica has a range descriptor but it is out of date. Note that a - // replica being removed from a node and then quickly re-added before the - // replica has been GC'd will also use the last seen descriptors. In - // effect, this is another path for which the replica's local range - // descriptor is out of date. - // - // The last seen replica descriptors are updated on receipt of every raft - // message via Replica.setLastReplicaDescriptors (see - // Store.HandleRaftRequest). These last seen descriptors are used when - // the replica's RangeDescriptor contains missing or out of date descriptors - // for a replica (see Replica.sendRaftMessageRaftMuLocked). - // - // Removing a replica from Store.mu.replicas is not a problem because - // when a replica is completely removed, it won't be recreated until - // there is another event that will repopulate the replicas map in the - // range descriptor. When it is temporarily dropped and recreated, the - // newly recreated replica will have a complete range descriptor. - lastToReplica, lastFromReplica roachpb.ReplicaDescriptor + // The last seen replica descriptors from incoming Raft messages. These are + // stored so that the replica still knows the replica descriptors for itself + // and for its message recipients in the circumstances when its RangeDescriptor + // is out of date. + // + // Normally, a replica knows about the other replica descriptors for a + // range via the RangeDescriptor stored in Replica.mu.state.Desc. But that + // descriptor is only updated during a Split or ChangeReplicas operation. + // There are periods during a Replica's lifetime when that information is + // out of date: + // + // 1. When a replica is being newly created as the result of an incoming + // Raft message for it. This is the common case for ChangeReplicas and an + // uncommon case for Splits. The leader will be sending the replica + // messages and the replica needs to be able to respond before it can + // receive an updated range descriptor (via a snapshot, + // changeReplicasTrigger, or splitTrigger). + // + // 2. If the node containing a replica is partitioned or down while the + // replicas for the range are updated. When the node comes back up, other + // replicas may begin communicating with it and it needs to be able to + // respond. Unlike 1 where there is no range descriptor, in this situation + // the replica has a range descriptor but it is out of date. Note that a + // replica being removed from a node and then quickly re-added before the + // replica has been GC'd will also use the last seen descriptors. In + // effect, this is another path for which the replica's local range + // descriptor is out of date. + // + // The last seen replica descriptors are updated on receipt of every raft + // message via Replica.setLastReplicaDescriptors (see + // Store.HandleRaftRequest). These last seen descriptors are used when + // the replica's RangeDescriptor contains missing or out of date descriptors + // for a replica (see Replica.sendRaftMessageRaftMuLocked). + // + // Removing a replica from Store.mu.replicas is not a problem because + // when a replica is completely removed, it won't be recreated until + // there is another event that will repopulate the replicas map in the + // range descriptor. When it is temporarily dropped and recreated, the + // newly recreated replica will have a complete range descriptor. + // + // Locking notes: Replica.raftMu < Replica.mu < Replica.lastSeenReplicas + lastSeenReplicas struct { + syncutil.Mutex + to, from roachpb.ReplicaDescriptor } // Contains the lease history when enabled. @@ -654,11 +659,8 @@ type Replica struct { // loadBasedSplitter keeps information about load-based splitting. loadBasedSplitter split.Decider - // TODO(tbg): this is effectively unused, we only use it to call ReportUnreachable - // when a heartbeat gets dropped but it's unclear whether a) that ever fires in - // practice b) if it provides any benefit. - // - // See: https://github.com/cockroachdb/cockroach/issues/84246 + // unreachablesMu contains a set of remote ReplicaIDs that are to be reported + // as unreachable on the next raft tick. unreachablesMu struct { syncutil.Mutex remotes map[roachpb.ReplicaID]struct{} @@ -1087,12 +1089,24 @@ func (r *Replica) mergeInProgressRLocked() bool { return r.mu.mergeComplete != nil } -// setLastReplicaDescriptors sets the most recently seen replica -// descriptors to those contained in the *RaftMessageRequest. -func (r *Replica) setLastReplicaDescriptorsRaftMuLocked(req *kvserverpb.RaftMessageRequest) { - r.raftMu.AssertHeld() - r.raftMu.lastFromReplica = req.FromReplica - r.raftMu.lastToReplica = req.ToReplica +// setLastReplicaDescriptors sets the most recently seen replica descriptors to +// those contained in the *RaftMessageRequest. +// See the comment on Replica.lastSeenReplicas. +func (r *Replica) setLastReplicaDescriptors(req *kvserverpb.RaftMessageRequest) { + lsr := &r.lastSeenReplicas + lsr.Lock() + defer lsr.Unlock() + lsr.to = req.ToReplica + lsr.from = req.FromReplica +} + +// getLastReplicaDescriptors gets the most recently seen replica descriptors. +// See the comment on Replica.lastSeenReplicas. +func (r *Replica) getLastReplicaDescriptors() (to, from roachpb.ReplicaDescriptor) { + lsr := &r.lastSeenReplicas + lsr.Lock() + defer lsr.Unlock() + return lsr.to, lsr.from } // GetMVCCStats returns a copy of the MVCC stats object for this range. diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index ea9ed9da0700..287425136e61 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -928,7 +928,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( msgApps, otherMsgs := splitMsgApps(rd.Messages) r.traceMessageSends(msgApps, "sending msgApp") - r.sendRaftMessagesRaftMuLocked(ctx, msgApps, pausedFollowers) + r.sendRaftMessages(ctx, msgApps, pausedFollowers) // TODO(pavelkalinnikov): find a way to move it to storeEntries. if !raft.IsEmptyHardState(rd.HardState) { @@ -977,7 +977,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( r.store.replicateQueue.MaybeAddAsync(ctx, r, r.store.Clock().NowAsClockTimestamp()) } - r.sendRaftMessagesRaftMuLocked(ctx, otherMsgs, nil /* blocked */) + r.sendRaftMessages(ctx, otherMsgs, nil /* blocked */) r.traceEntries(rd.CommittedEntries, "committed, before applying any entries") stats.tApplicationBegin = timeutil.Now() @@ -1414,7 +1414,7 @@ func (r *Replica) maybeCoalesceHeartbeat( return true } -func (r *Replica) sendRaftMessagesRaftMuLocked( +func (r *Replica) sendRaftMessages( ctx context.Context, messages []raftpb.Message, blocked map[roachpb.ReplicaID]struct{}, ) { var lastAppResp raftpb.Message @@ -1487,19 +1487,24 @@ func (r *Replica) sendRaftMessagesRaftMuLocked( } if !drop { - r.sendRaftMessageRaftMuLocked(ctx, message) + r.sendRaftMessage(ctx, message) } } if lastAppResp.Index > 0 { - r.sendRaftMessageRaftMuLocked(ctx, lastAppResp) + r.sendRaftMessage(ctx, lastAppResp) } } -// sendRaftMessageRaftMuLocked sends a Raft message. -func (r *Replica) sendRaftMessageRaftMuLocked(ctx context.Context, msg raftpb.Message) { +// sendRaftMessage sends a Raft message. +// +// When calling this method, the raftMu may be held, but it does not need to be. +// The Replica mu must not be held. +func (r *Replica) sendRaftMessage(ctx context.Context, msg raftpb.Message) { + lastToReplica, lastFromReplica := r.getLastReplicaDescriptors() + r.mu.RLock() - fromReplica, fromErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.From), r.raftMu.lastToReplica) - toReplica, toErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.To), r.raftMu.lastFromReplica) + fromReplica, fromErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.From), lastToReplica) + toReplica, toErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.To), lastFromReplica) var startKey roachpb.RKey if msg.Type == raftpb.MsgApp && r.mu.internalRaftGroup != nil { // When the follower is potentially an uninitialized replica waiting for @@ -1550,13 +1555,10 @@ func (r *Replica) sendRaftMessageRaftMuLocked(ctx context.Context, msg raftpb.Me RangeStartKey: startKey, // usually nil } if !r.sendRaftMessageRequest(ctx, req) { - if err := r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) { - r.mu.droppedMessages++ - raftGroup.ReportUnreachable(msg.To) - return true, nil - }); err != nil && !errors.Is(err, errRemoved) { - log.Fatalf(ctx, "%v", err) - } + r.mu.Lock() + r.mu.droppedMessages++ + r.mu.Unlock() + r.addUnreachableRemoteReplica(toReplica.ReplicaID) } } diff --git a/pkg/kv/kvserver/replica_raft_quiesce.go b/pkg/kv/kvserver/replica_raft_quiesce.go index 1fb80c595813..e2d2adccac56 100644 --- a/pkg/kv/kvserver/replica_raft_quiesce.go +++ b/pkg/kv/kvserver/replica_raft_quiesce.go @@ -421,7 +421,8 @@ func shouldReplicaQuiesce( func (r *Replica) quiesceAndNotifyRaftMuLockedReplicaMuLocked( ctx context.Context, status *raftSparseStatus, lagging laggingReplicaSet, ) bool { - fromReplica, fromErr := r.getReplicaDescriptorByIDRLocked(r.replicaID, r.raftMu.lastToReplica) + lastToReplica, lastFromReplica := r.getLastReplicaDescriptors() + fromReplica, fromErr := r.getReplicaDescriptorByIDRLocked(r.replicaID, lastToReplica) if fromErr != nil { if log.V(4) { log.Infof(ctx, "not quiescing: cannot find from replica (%d)", r.replicaID) @@ -436,7 +437,7 @@ func (r *Replica) quiesceAndNotifyRaftMuLockedReplicaMuLocked( continue } toReplica, toErr := r.getReplicaDescriptorByIDRLocked( - roachpb.ReplicaID(id), r.raftMu.lastFromReplica) + roachpb.ReplicaID(id), lastFromReplica) if toErr != nil { if log.V(4) { log.Infof(ctx, "failed to quiesce: cannot find to replica (%d)", id) diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 0958e9adb6f3..af0b719b0141 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -538,7 +538,7 @@ Store.HandleRaftRequest (which is part of the RaftMessageHandler interface), ultimately resulting in a call to Replica.handleRaftReadyRaftMuLocked, which houses the integration with the etcd/raft library (raft.RawNode). This may generate Raft messages to be sent to other Stores; these are handed to -Replica.sendRaftMessagesRaftMuLocked which ultimately hands them to the Store's +Replica.sendRaftMessages which ultimately hands them to the Store's RaftTransport.SendAsync method. Raft uses message passing (not request-response), and outgoing messages will use a gRPC stream that differs from that used for incoming messages (which makes asymmetric partitions more diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index 1615a2136ab5..60d543d71685 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -337,7 +337,7 @@ func (s *Store) withReplicaForRequest( return roachpb.NewError(err) } defer r.raftMu.Unlock() - r.setLastReplicaDescriptorsRaftMuLocked(req) + r.setLastReplicaDescriptors(req) return f(ctx, r) }