diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go index e14b3e9b343f..45726924519f 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go @@ -100,6 +100,13 @@ func (h *Handle) Admit(ctx context.Context, pri admissionpb.WorkPriority, ct tim log.Errorf(ctx, "operating on a closed handle") return nil } + + // NB: We're using a copy-on-write scheme elsewhere to maintain this slice + // of sorted connections. Here (the performance critical read path) we + // simply grab a reference to the connections slice under the mutex before + // iterating through them further below and invoking the blocking + // kvflowcontrol.Controller.Admit() for each one, something we want to do + // without holding the mutex. connections := h.mu.connections h.mu.Unlock() @@ -234,16 +241,23 @@ func (h *Handle) connectStreamLocked( if _, ok := h.mu.perStreamTokenTracker[stream]; ok { log.Fatalf(ctx, "reconnecting already connected stream: %s", stream) } - h.mu.connections = append(h.mu.connections, newConnectedStream(stream)) - sort.Slice(h.mu.connections, func(i, j int) bool { + + connections := make([]*connectedStream, len(h.mu.connections)+1) + copy(connections, h.mu.connections) + connections[len(connections)-1] = newConnectedStream(stream) + sort.Slice(connections, func(i, j int) bool { // Sort connections based on store IDs (this is the order in which we // invoke Controller.Admit) for predictability. If in the future we use // flow tokens for raft log catchup (see I11 and [^9] in // kvflowcontrol/doc.go), we may want to introduce an Admit-variant that // both blocks and deducts tokens before sending catchup MsgApps. In // that case, this sorting will help avoid deadlocks. - return h.mu.connections[i].Stream().StoreID < h.mu.connections[j].Stream().StoreID + return connections[i].Stream().StoreID < connections[j].Stream().StoreID }) + // NB: We use a copy-on-write scheme when appending to the connections slice + // -- the read path is what's performance critical. + h.mu.connections = connections + h.mu.perStreamTokenTracker[stream] = kvflowtokentracker.New(pos, stream, h.knobs) h.metrics.StreamsConnected.Inc(1) log.VInfof(ctx, 1, "connected to stream: %s", stream) @@ -319,16 +333,17 @@ func (h *Handle) disconnectStreamLocked(ctx context.Context, stream kvflowcontro ) delete(h.mu.perStreamTokenTracker, stream) - streamIdx := -1 + connections := make([]*connectedStream, 0, len(h.mu.connections)-1) for i := range h.mu.connections { if h.mu.connections[i].Stream() == stream { - streamIdx = i - break + h.mu.connections[i].Disconnect() + } else { + connections = append(connections, h.mu.connections[i]) } } - connection := h.mu.connections[streamIdx] - connection.Disconnect() - h.mu.connections = append(h.mu.connections[:streamIdx], h.mu.connections[streamIdx+1:]...) + // NB: We use a copy-on-write scheme when splicing the connections slice -- + // the read path is what's performance critical. + h.mu.connections = connections log.VInfof(ctx, 1, "disconnected stream: %s", stream) h.metrics.StreamsDisconnected.Inc(1)