Skip to content

Commit

Permalink
kvflowdispatch: move down dispatch mutex to reduce contention
Browse files Browse the repository at this point in the history
In `kv0/enc=false/nodes=3/cpu=96`, we noticed mutex contention around
the `outbox` map. This patch tries to alleviate that by moving the mutex
down into each individual dispatch map (sharding by NodeID).

Informs: cockroachdb#104154.

Release note: None
  • Loading branch information
aadityasondhi committed Oct 2, 2023
1 parent d51a306 commit 0999d98
Showing 1 changed file with 44 additions and 24 deletions.
68 changes: 44 additions & 24 deletions pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package kvflowdispatch

import (
"context"
"sync"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
Expand All @@ -32,13 +33,9 @@ const AdmittedRaftLogEntriesBytes = 50
// entries to specific nodes, and (ii) to read pending dispatches.
type Dispatch struct {
mu struct {
// TODO(irfansharif,aaditya): On kv0/enc=false/nodes=3/cpu=96 this mutex
// is responsible for ~3.7% of the mutex contention. Look to address it
// as part of #104154. Perhaps shard this mutex by node ID? Or use a
// sync.Map instead?
syncutil.Mutex
// outbox maintains pending dispatches on a per-node basis.
outbox map[roachpb.NodeID]dispatches
outbox sync.Map // roachpb.NodeID -> *dispatches
}
metrics *metrics
// handles is used to dispatch tokens locally. Remote token dispatches are
Expand All @@ -57,7 +54,12 @@ type dispatchKey struct {
admissionpb.WorkPriority
}

type dispatches map[dispatchKey]kvflowcontrolpb.RaftLogPosition
type dispatches struct {
mu struct {
syncutil.Mutex
items map[dispatchKey]kvflowcontrolpb.RaftLogPosition
}
}

var _ kvflowcontrol.Dispatch = &Dispatch{}

Expand All @@ -69,7 +71,7 @@ func New(
handles: handles,
nodeID: nodeID,
}
d.mu.outbox = make(map[roachpb.NodeID]dispatches)
d.mu.outbox = sync.Map{}
d.metrics = newMetrics()
registry.AddMetricStruct(d.metrics)
return d
Expand Down Expand Up @@ -103,11 +105,12 @@ func (d *Dispatch) Dispatch(
}
d.metrics.RemoteDispatch[wc].Inc(1)

d.mu.Lock()
defer d.mu.Unlock()
dispatchMap := d.getDispatchMap(nodeID)

if _, ok := d.mu.outbox[nodeID]; !ok {
d.mu.outbox[nodeID] = dispatches{}
dispatchMap.mu.Lock()
defer dispatchMap.mu.Unlock()

if len(dispatchMap.mu.items) == 0 {
d.metrics.PendingNodes.Inc(1)
}

Expand All @@ -117,9 +120,9 @@ func (d *Dispatch) Dispatch(
pri,
}

existing, found := d.mu.outbox[nodeID][dk]
existing, found := dispatchMap.mu.items[dk]
if !found || existing.Less(entries.UpToRaftLogPosition) {
d.mu.outbox[nodeID][dk] = entries.UpToRaftLogPosition
dispatchMap.mu.items[dk] = entries.UpToRaftLogPosition

if !found {
d.metrics.PendingDispatches[wc].Inc(1)
Expand All @@ -141,27 +144,37 @@ func (d *Dispatch) PendingDispatch() []roachpb.NodeID {
d.mu.Lock()
defer d.mu.Unlock()

nodes := make([]roachpb.NodeID, 0, len(d.mu.outbox))
for node := range d.mu.outbox {
nodes = append(nodes, node)
}
var nodes []roachpb.NodeID
d.mu.outbox.Range(func(key, value any) bool {
dispatchMap := value.(*dispatches)
node := key.(roachpb.NodeID)
dispatchMap.mu.Lock()
defer dispatchMap.mu.Unlock()
if len(dispatchMap.mu.items) > 0 {
nodes = append(nodes, node)
}
return true
})

return nodes
}

// PendingDispatchFor is part of the kvflowcontrol.Dispatch interface.
func (d *Dispatch) PendingDispatchFor(
nodeID roachpb.NodeID, maxBytes int64,
) ([]kvflowcontrolpb.AdmittedRaftLogEntries, int) {
d.mu.Lock()
defer d.mu.Unlock()
dispatchMap := d.getDispatchMap(nodeID)

if _, ok := d.mu.outbox[nodeID]; !ok {
dispatchMap.mu.Lock()
defer dispatchMap.mu.Unlock()

if len(dispatchMap.mu.items) == 0 {
return nil, 0
}

var entries []kvflowcontrolpb.AdmittedRaftLogEntries
maxEntries := maxBytes / AdmittedRaftLogEntriesBytes
for key, dispatch := range d.mu.outbox[nodeID] {
for key, dispatch := range dispatchMap.mu.items {
if maxEntries == 0 {
break
}
Expand All @@ -178,18 +191,25 @@ func (d *Dispatch) PendingDispatchFor(
wc := admissionpb.WorkClassFromPri(key.WorkPriority)
d.metrics.PendingDispatches[wc].Dec(1)
maxEntries -= 1
delete(d.mu.outbox[nodeID], key)
delete(dispatchMap.mu.items, key)
}

remainingDispatches := len(d.mu.outbox[nodeID])
remainingDispatches := len(dispatchMap.mu.items)
if remainingDispatches == 0 {
delete(d.mu.outbox, nodeID)
d.metrics.PendingNodes.Dec(1)
}

return entries, remainingDispatches
}

func (d *Dispatch) getDispatchMap(nodeID roachpb.NodeID) *dispatches {
dispatchMap, loaded := d.mu.outbox.LoadOrStore(nodeID, &dispatches{})
if !loaded {
dispatchMap.(*dispatches).mu.items = make(map[dispatchKey]kvflowcontrolpb.RaftLogPosition)
}
return dispatchMap.(*dispatches)
}

// testingMetrics returns the underlying metrics struct for testing purposes.
func (d *Dispatch) testingMetrics() *metrics {
return d.metrics
Expand Down

0 comments on commit 0999d98

Please sign in to comment.