Skip to content

Commit

Permalink
kv: remove lastToReplica and lastFromReplica from raftMu
Browse files Browse the repository at this point in the history
In 410ef29, we moved these fields from under the Replica.mu to under the
Replica.raftMu. This was done to avoid lock contention.

In this commit, we move these fields under their own mutex so that they
can be accessed without holding the raftMu. This allows us to send Raft
messages from other goroutines.

The commit also switches from calling RawNode.ReportUnreachable directly
in sendRaftMessage to using the more flexible unreachablesMu set, which
defers the call to ReportUnreachable until the next Raft tick. As a result,
the commit closes #84246.

Release note: None
Epic: None
  • Loading branch information
nvanbenschoten committed Jan 26, 2023
1 parent 6819d87 commit 1fff781
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 71 deletions.
116 changes: 65 additions & 51 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,47 +258,52 @@ type Replica struct {
stateMachine replicaStateMachine
// decoder is used to decode committed raft entries.
decoder replicaDecoder
}

// The last seen replica descriptors from incoming Raft messages. These are
// stored so that the replica still knows the replica descriptors for itself
// and for its message recipients in the circumstances when its RangeDescriptor
// is out of date.
//
// Normally, a replica knows about the other replica descriptors for a
// range via the RangeDescriptor stored in Replica.mu.state.Desc. But that
// descriptor is only updated during a Split or ChangeReplicas operation.
// There are periods during a Replica's lifetime when that information is
// out of date:
//
// 1. When a replica is being newly created as the result of an incoming
// Raft message for it. This is the common case for ChangeReplicas and an
// uncommon case for Splits. The leader will be sending the replica
// messages and the replica needs to be able to respond before it can
// receive an updated range descriptor (via a snapshot,
// changeReplicasTrigger, or splitTrigger).
//
// 2. If the node containing a replica is partitioned or down while the
// replicas for the range are updated. When the node comes back up, other
// replicas may begin communicating with it and it needs to be able to
// respond. Unlike 1 where there is no range descriptor, in this situation
// the replica has a range descriptor but it is out of date. Note that a
// replica being removed from a node and then quickly re-added before the
// replica has been GC'd will also use the last seen descriptors. In
// effect, this is another path for which the replica's local range
// descriptor is out of date.
//
// The last seen replica descriptors are updated on receipt of every raft
// message via Replica.setLastReplicaDescriptors (see
// Store.HandleRaftRequest). These last seen descriptors are used when
// the replica's RangeDescriptor contains missing or out of date descriptors
// for a replica (see Replica.sendRaftMessageRaftMuLocked).
//
// Removing a replica from Store.mu.replicas is not a problem because
// when a replica is completely removed, it won't be recreated until
// there is another event that will repopulate the replicas map in the
// range descriptor. When it is temporarily dropped and recreated, the
// newly recreated replica will have a complete range descriptor.
lastToReplica, lastFromReplica roachpb.ReplicaDescriptor
// The last seen replica descriptors from incoming Raft messages. These are
// stored so that the replica still knows the replica descriptors for itself
// and for its message recipients in the circumstances when its RangeDescriptor
// is out of date.
//
// Normally, a replica knows about the other replica descriptors for a
// range via the RangeDescriptor stored in Replica.mu.state.Desc. But that
// descriptor is only updated during a Split or ChangeReplicas operation.
// There are periods during a Replica's lifetime when that information is
// out of date:
//
// 1. When a replica is being newly created as the result of an incoming
// Raft message for it. This is the common case for ChangeReplicas and an
// uncommon case for Splits. The leader will be sending the replica
// messages and the replica needs to be able to respond before it can
// receive an updated range descriptor (via a snapshot,
// changeReplicasTrigger, or splitTrigger).
//
// 2. If the node containing a replica is partitioned or down while the
// replicas for the range are updated. When the node comes back up, other
// replicas may begin communicating with it and it needs to be able to
// respond. Unlike 1 where there is no range descriptor, in this situation
// the replica has a range descriptor but it is out of date. Note that a
// replica being removed from a node and then quickly re-added before the
// replica has been GC'd will also use the last seen descriptors. In
// effect, this is another path for which the replica's local range
// descriptor is out of date.
//
// The last seen replica descriptors are updated on receipt of every raft
// message via Replica.setLastReplicaDescriptors (see
// Store.HandleRaftRequest). These last seen descriptors are used when
// the replica's RangeDescriptor contains missing or out of date descriptors
// for a replica (see Replica.sendRaftMessageRaftMuLocked).
//
// Removing a replica from Store.mu.replicas is not a problem because
// when a replica is completely removed, it won't be recreated until
// there is another event that will repopulate the replicas map in the
// range descriptor. When it is temporarily dropped and recreated, the
// newly recreated replica will have a complete range descriptor.
//
// Locking notes: Replica.raftMu < Replica.mu < Replica.lastSeenReplicas
lastSeenReplicas struct {
syncutil.Mutex
to, from roachpb.ReplicaDescriptor
}

// Contains the lease history when enabled.
Expand Down Expand Up @@ -654,11 +659,8 @@ type Replica struct {
// loadBasedSplitter keeps information about load-based splitting.
loadBasedSplitter split.Decider

// TODO(tbg): this is effectively unused, we only use it to call ReportUnreachable
// when a heartbeat gets dropped but it's unclear whether a) that ever fires in
// practice b) if it provides any benefit.
//
// See: https://github.com/cockroachdb/cockroach/issues/84246
// unreachablesMu contains a set of remote ReplicaIDs that are to be reported
// as unreachable on the next raft tick.
unreachablesMu struct {
syncutil.Mutex
remotes map[roachpb.ReplicaID]struct{}
Expand Down Expand Up @@ -1087,12 +1089,24 @@ func (r *Replica) mergeInProgressRLocked() bool {
return r.mu.mergeComplete != nil
}

// setLastReplicaDescriptors sets the most recently seen replica
// descriptors to those contained in the *RaftMessageRequest.
func (r *Replica) setLastReplicaDescriptorsRaftMuLocked(req *kvserverpb.RaftMessageRequest) {
r.raftMu.AssertHeld()
r.raftMu.lastFromReplica = req.FromReplica
r.raftMu.lastToReplica = req.ToReplica
// setLastReplicaDescriptors sets the most recently seen replica descriptors to
// those contained in the *RaftMessageRequest.
// See the comment on Replica.lastSeenReplicas.
func (r *Replica) setLastReplicaDescriptors(req *kvserverpb.RaftMessageRequest) {
lsr := &r.lastSeenReplicas
lsr.Lock()
defer lsr.Unlock()
lsr.to = req.ToReplica
lsr.from = req.FromReplica
}

// getLastReplicaDescriptors gets the most recently seen replica descriptors.
// See the comment on Replica.lastSeenReplicas.
func (r *Replica) getLastReplicaDescriptors() (to, from roachpb.ReplicaDescriptor) {
lsr := &r.lastSeenReplicas
lsr.Lock()
defer lsr.Unlock()
return lsr.to, lsr.from
}

// GetMVCCStats returns a copy of the MVCC stats object for this range.
Expand Down
34 changes: 18 additions & 16 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(

msgApps, otherMsgs := splitMsgApps(rd.Messages)
r.traceMessageSends(msgApps, "sending msgApp")
r.sendRaftMessagesRaftMuLocked(ctx, msgApps, pausedFollowers)
r.sendRaftMessages(ctx, msgApps, pausedFollowers)

// TODO(pavelkalinnikov): find a way to move it to storeEntries.
if !raft.IsEmptyHardState(rd.HardState) {
Expand Down Expand Up @@ -977,7 +977,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
r.store.replicateQueue.MaybeAddAsync(ctx, r, r.store.Clock().NowAsClockTimestamp())
}

r.sendRaftMessagesRaftMuLocked(ctx, otherMsgs, nil /* blocked */)
r.sendRaftMessages(ctx, otherMsgs, nil /* blocked */)
r.traceEntries(rd.CommittedEntries, "committed, before applying any entries")

stats.tApplicationBegin = timeutil.Now()
Expand Down Expand Up @@ -1414,7 +1414,7 @@ func (r *Replica) maybeCoalesceHeartbeat(
return true
}

func (r *Replica) sendRaftMessagesRaftMuLocked(
func (r *Replica) sendRaftMessages(
ctx context.Context, messages []raftpb.Message, blocked map[roachpb.ReplicaID]struct{},
) {
var lastAppResp raftpb.Message
Expand Down Expand Up @@ -1487,19 +1487,24 @@ func (r *Replica) sendRaftMessagesRaftMuLocked(
}

if !drop {
r.sendRaftMessageRaftMuLocked(ctx, message)
r.sendRaftMessage(ctx, message)
}
}
if lastAppResp.Index > 0 {
r.sendRaftMessageRaftMuLocked(ctx, lastAppResp)
r.sendRaftMessage(ctx, lastAppResp)
}
}

// sendRaftMessageRaftMuLocked sends a Raft message.
func (r *Replica) sendRaftMessageRaftMuLocked(ctx context.Context, msg raftpb.Message) {
// sendRaftMessage sends a Raft message.
//
// When calling this method, the raftMu may be held, but it does not need to be.
// The Replica mu must not be held.
func (r *Replica) sendRaftMessage(ctx context.Context, msg raftpb.Message) {
lastToReplica, lastFromReplica := r.getLastReplicaDescriptors()

r.mu.RLock()
fromReplica, fromErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.From), r.raftMu.lastToReplica)
toReplica, toErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.To), r.raftMu.lastFromReplica)
fromReplica, fromErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.From), lastToReplica)
toReplica, toErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.To), lastFromReplica)
var startKey roachpb.RKey
if msg.Type == raftpb.MsgApp && r.mu.internalRaftGroup != nil {
// When the follower is potentially an uninitialized replica waiting for
Expand Down Expand Up @@ -1550,13 +1555,10 @@ func (r *Replica) sendRaftMessageRaftMuLocked(ctx context.Context, msg raftpb.Me
RangeStartKey: startKey, // usually nil
}
if !r.sendRaftMessageRequest(ctx, req) {
if err := r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) {
r.mu.droppedMessages++
raftGroup.ReportUnreachable(msg.To)
return true, nil
}); err != nil && !errors.Is(err, errRemoved) {
log.Fatalf(ctx, "%v", err)
}
r.mu.Lock()
r.mu.droppedMessages++
r.mu.Unlock()
r.addUnreachableRemoteReplica(toReplica.ReplicaID)
}
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/kv/kvserver/replica_raft_quiesce.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,8 @@ func shouldReplicaQuiesce(
func (r *Replica) quiesceAndNotifyRaftMuLockedReplicaMuLocked(
ctx context.Context, status *raftSparseStatus, lagging laggingReplicaSet,
) bool {
fromReplica, fromErr := r.getReplicaDescriptorByIDRLocked(r.replicaID, r.raftMu.lastToReplica)
lastToReplica, lastFromReplica := r.getLastReplicaDescriptors()
fromReplica, fromErr := r.getReplicaDescriptorByIDRLocked(r.replicaID, lastToReplica)
if fromErr != nil {
if log.V(4) {
log.Infof(ctx, "not quiescing: cannot find from replica (%d)", r.replicaID)
Expand All @@ -436,7 +437,7 @@ func (r *Replica) quiesceAndNotifyRaftMuLockedReplicaMuLocked(
continue
}
toReplica, toErr := r.getReplicaDescriptorByIDRLocked(
roachpb.ReplicaID(id), r.raftMu.lastFromReplica)
roachpb.ReplicaID(id), lastFromReplica)
if toErr != nil {
if log.V(4) {
log.Infof(ctx, "failed to quiesce: cannot find to replica (%d)", id)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ Store.HandleRaftRequest (which is part of the RaftMessageHandler interface),
ultimately resulting in a call to Replica.handleRaftReadyRaftMuLocked, which
houses the integration with the etcd/raft library (raft.RawNode). This may
generate Raft messages to be sent to other Stores; these are handed to
Replica.sendRaftMessagesRaftMuLocked which ultimately hands them to the Store's
Replica.sendRaftMessages which ultimately hands them to the Store's
RaftTransport.SendAsync method. Raft uses message passing (not
request-response), and outgoing messages will use a gRPC stream that differs
from that used for incoming messages (which makes asymmetric partitions more
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func (s *Store) withReplicaForRequest(
return roachpb.NewError(err)
}
defer r.raftMu.Unlock()
r.setLastReplicaDescriptorsRaftMuLocked(req)
r.setLastReplicaDescriptors(req)
return f(ctx, r)
}

Expand Down

0 comments on commit 1fff781

Please sign in to comment.