diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index ef0673da2981..a96758a9e66c 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -911,12 +911,12 @@ type Store struct { m map[roachpb.RangeID]struct{} } - // replicaQueues is a map of per-Replica incoming request queues. These + // raftRecvQueues is a map of per-Replica incoming request queues. These // queues might more naturally belong in Replica, but are kept separate to // avoid reworking the locking in getOrCreateReplica which requires // Replica.raftMu to be held while a replica is being inserted into // Store.mu.replicas. - replicaQueues syncutil.IntMap // map[roachpb.RangeID]*raftRequestQueue + raftRecvQueues syncutil.IntMap // map[roachpb.RangeID]*raftReceiveQueue scheduler *raftScheduler diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index 3c930cddefc7..311c00cc4fbc 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -33,12 +33,12 @@ type raftRequestInfo struct { respStream RaftMessageResponseStream } -type raftRequestQueue struct { +type raftReceiveQueue struct { syncutil.Mutex infos []raftRequestInfo } -func (q *raftRequestQueue) drain() ([]raftRequestInfo, bool) { +func (q *raftReceiveQueue) drain() ([]raftRequestInfo, bool) { q.Lock() defer q.Unlock() if len(q.infos) == 0 { @@ -49,7 +49,7 @@ func (q *raftRequestQueue) drain() ([]raftRequestInfo, bool) { return infos, true } -func (q *raftRequestQueue) recycle(processed []raftRequestInfo) { +func (q *raftReceiveQueue) recycle(processed []raftRequestInfo) { if cap(processed) > 4 { return // cap recycled slice lengths } @@ -166,11 +166,11 @@ func (s *Store) HandleRaftUncoalescedRequest( // count them. s.metrics.RaftRcvdMessages[req.Message.Type].Inc(1) - value, ok := s.replicaQueues.Load(int64(req.RangeID)) + value, ok := s.raftRecvQueues.Load(int64(req.RangeID)) if !ok { - value, _ = s.replicaQueues.LoadOrStore(int64(req.RangeID), unsafe.Pointer(&raftRequestQueue{})) + value, _ = s.raftRecvQueues.LoadOrStore(int64(req.RangeID), unsafe.Pointer(&raftReceiveQueue{})) } - q := (*raftRequestQueue)(value) + q := (*raftReceiveQueue)(value) q.Lock() defer q.Unlock() if len(q.infos) >= replicaRequestQueueSize { @@ -440,11 +440,11 @@ func (s *Store) enqueueRaftUpdateCheck(rangeID roachpb.RangeID) { } func (s *Store) processRequestQueue(ctx context.Context, rangeID roachpb.RangeID) bool { - value, ok := s.replicaQueues.Load(int64(rangeID)) + value, ok := s.raftRecvQueues.Load(int64(rangeID)) if !ok { return false } - q := (*raftRequestQueue)(value) + q := (*raftReceiveQueue)(value) infos, ok := q.drain() if !ok { return false @@ -482,7 +482,7 @@ func (s *Store) processRequestQueue(ctx context.Context, rangeID roachpb.RangeID if _, exists := s.mu.replicasByRangeID.Load(rangeID); !exists { q.Lock() if len(q.infos) == 0 { - s.replicaQueues.Delete(int64(rangeID)) + s.raftRecvQueues.Delete(int64(rangeID)) } q.Unlock() } diff --git a/pkg/kv/kvserver/store_remove_replica.go b/pkg/kv/kvserver/store_remove_replica.go index 4520c0a567fb..eb8dd4ef7de8 100644 --- a/pkg/kv/kvserver/store_remove_replica.go +++ b/pkg/kv/kvserver/store_remove_replica.go @@ -299,7 +299,7 @@ func (s *Store) unlinkReplicaByRangeIDLocked(ctx context.Context, rangeID roachp delete(s.unquiescedReplicas.m, rangeID) s.unquiescedReplicas.Unlock() delete(s.mu.uninitReplicas, rangeID) - s.replicaQueues.Delete(int64(rangeID)) + s.raftRecvQueues.Delete(int64(rangeID)) s.mu.replicasByRangeID.Delete(rangeID) s.unregisterLeaseholderByID(ctx, rangeID) }