Skip to content

Commit

Permalink
Merge #104855
Browse files Browse the repository at this point in the history
104855: kvflowcontrol: squash data race r=irfansharif a=irfansharif

Fixes #104837.
Fixes #105762.

Release note: None

Co-authored-by: irfan sharif <[email protected]>
  • Loading branch information
craig[bot] and irfansharif committed Jul 4, 2023
2 parents ee1d5cc + edd1e1b commit c77d127
Showing 1 changed file with 24 additions and 9 deletions.
33 changes: 24 additions & 9 deletions pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ func (h *Handle) Admit(ctx context.Context, pri admissionpb.WorkPriority, ct tim
log.Errorf(ctx, "operating on a closed handle")
return nil
}

// NB: We're using a copy-on-write scheme elsewhere to maintain this slice
// of sorted connections. Here (the performance critical read path) we
// simply grab a reference to the connections slice under the mutex before
// iterating through them further below and invoking the blocking
// kvflowcontrol.Controller.Admit() for each one, something we want to do
// without holding the mutex.
connections := h.mu.connections
h.mu.Unlock()

Expand Down Expand Up @@ -234,16 +241,23 @@ func (h *Handle) connectStreamLocked(
if _, ok := h.mu.perStreamTokenTracker[stream]; ok {
log.Fatalf(ctx, "reconnecting already connected stream: %s", stream)
}
h.mu.connections = append(h.mu.connections, newConnectedStream(stream))
sort.Slice(h.mu.connections, func(i, j int) bool {

connections := make([]*connectedStream, len(h.mu.connections)+1)
copy(connections, h.mu.connections)
connections[len(connections)-1] = newConnectedStream(stream)
sort.Slice(connections, func(i, j int) bool {
// Sort connections based on store IDs (this is the order in which we
// invoke Controller.Admit) for predictability. If in the future we use
// flow tokens for raft log catchup (see I11 and [^9] in
// kvflowcontrol/doc.go), we may want to introduce an Admit-variant that
// both blocks and deducts tokens before sending catchup MsgApps. In
// that case, this sorting will help avoid deadlocks.
return h.mu.connections[i].Stream().StoreID < h.mu.connections[j].Stream().StoreID
return connections[i].Stream().StoreID < connections[j].Stream().StoreID
})
// NB: We use a copy-on-write scheme when appending to the connections slice
// -- the read path is what's performance critical.
h.mu.connections = connections

h.mu.perStreamTokenTracker[stream] = kvflowtokentracker.New(pos, stream, h.knobs)
h.metrics.StreamsConnected.Inc(1)
log.VInfof(ctx, 1, "connected to stream: %s", stream)
Expand Down Expand Up @@ -319,16 +333,17 @@ func (h *Handle) disconnectStreamLocked(ctx context.Context, stream kvflowcontro
)
delete(h.mu.perStreamTokenTracker, stream)

streamIdx := -1
connections := make([]*connectedStream, 0, len(h.mu.connections)-1)
for i := range h.mu.connections {
if h.mu.connections[i].Stream() == stream {
streamIdx = i
break
h.mu.connections[i].Disconnect()
} else {
connections = append(connections, h.mu.connections[i])
}
}
connection := h.mu.connections[streamIdx]
connection.Disconnect()
h.mu.connections = append(h.mu.connections[:streamIdx], h.mu.connections[streamIdx+1:]...)
// NB: We use a copy-on-write scheme when splicing the connections slice --
// the read path is what's performance critical.
h.mu.connections = connections

log.VInfof(ctx, 1, "disconnected stream: %s", stream)
h.metrics.StreamsDisconnected.Inc(1)
Expand Down

0 comments on commit c77d127

Please sign in to comment.