Skip to content

Commit

Permalink
Merge #80619
Browse files Browse the repository at this point in the history
80619: kvserver: raft{Request->Receive}Queue r=erikgrinaker a=tbg

This clarifies that this queue holds received raft messages (not
messages waiting to be sent).

These changes were mechanical, via Goland.

Release note: None


Co-authored-by: Tobias Grieger <[email protected]>
  • Loading branch information
craig[bot] and tbg committed Apr 27, 2022
2 parents ce57830 + f3837c2 commit 70d1c10
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 12 deletions.
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -911,12 +911,12 @@ type Store struct {
m map[roachpb.RangeID]struct{}
}

// replicaQueues is a map of per-Replica incoming request queues. These
// raftRecvQueues is a map of per-Replica incoming request queues. These
// queues might more naturally belong in Replica, but are kept separate to
// avoid reworking the locking in getOrCreateReplica which requires
// Replica.raftMu to be held while a replica is being inserted into
// Store.mu.replicas.
replicaQueues syncutil.IntMap // map[roachpb.RangeID]*raftRequestQueue
raftRecvQueues syncutil.IntMap // map[roachpb.RangeID]*raftReceiveQueue

scheduler *raftScheduler

Expand Down
18 changes: 9 additions & 9 deletions pkg/kv/kvserver/store_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ type raftRequestInfo struct {
respStream RaftMessageResponseStream
}

type raftRequestQueue struct {
type raftReceiveQueue struct {
syncutil.Mutex
infos []raftRequestInfo
}

func (q *raftRequestQueue) drain() ([]raftRequestInfo, bool) {
func (q *raftReceiveQueue) drain() ([]raftRequestInfo, bool) {
q.Lock()
defer q.Unlock()
if len(q.infos) == 0 {
Expand All @@ -49,7 +49,7 @@ func (q *raftRequestQueue) drain() ([]raftRequestInfo, bool) {
return infos, true
}

func (q *raftRequestQueue) recycle(processed []raftRequestInfo) {
func (q *raftReceiveQueue) recycle(processed []raftRequestInfo) {
if cap(processed) > 4 {
return // cap recycled slice lengths
}
Expand Down Expand Up @@ -166,11 +166,11 @@ func (s *Store) HandleRaftUncoalescedRequest(
// count them.
s.metrics.RaftRcvdMessages[req.Message.Type].Inc(1)

value, ok := s.replicaQueues.Load(int64(req.RangeID))
value, ok := s.raftRecvQueues.Load(int64(req.RangeID))
if !ok {
value, _ = s.replicaQueues.LoadOrStore(int64(req.RangeID), unsafe.Pointer(&raftRequestQueue{}))
value, _ = s.raftRecvQueues.LoadOrStore(int64(req.RangeID), unsafe.Pointer(&raftReceiveQueue{}))
}
q := (*raftRequestQueue)(value)
q := (*raftReceiveQueue)(value)
q.Lock()
defer q.Unlock()
if len(q.infos) >= replicaRequestQueueSize {
Expand Down Expand Up @@ -440,11 +440,11 @@ func (s *Store) enqueueRaftUpdateCheck(rangeID roachpb.RangeID) {
}

func (s *Store) processRequestQueue(ctx context.Context, rangeID roachpb.RangeID) bool {
value, ok := s.replicaQueues.Load(int64(rangeID))
value, ok := s.raftRecvQueues.Load(int64(rangeID))
if !ok {
return false
}
q := (*raftRequestQueue)(value)
q := (*raftReceiveQueue)(value)
infos, ok := q.drain()
if !ok {
return false
Expand Down Expand Up @@ -482,7 +482,7 @@ func (s *Store) processRequestQueue(ctx context.Context, rangeID roachpb.RangeID
if _, exists := s.mu.replicasByRangeID.Load(rangeID); !exists {
q.Lock()
if len(q.infos) == 0 {
s.replicaQueues.Delete(int64(rangeID))
s.raftRecvQueues.Delete(int64(rangeID))
}
q.Unlock()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store_remove_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func (s *Store) unlinkReplicaByRangeIDLocked(ctx context.Context, rangeID roachp
delete(s.unquiescedReplicas.m, rangeID)
s.unquiescedReplicas.Unlock()
delete(s.mu.uninitReplicas, rangeID)
s.replicaQueues.Delete(int64(rangeID))
s.raftRecvQueues.Delete(int64(rangeID))
s.mu.replicasByRangeID.Delete(rangeID)
s.unregisterLeaseholderByID(ctx, rangeID)
}
Expand Down

0 comments on commit 70d1c10

Please sign in to comment.