From da43e5bb147c93eeca8e6837ea56e6a05d672cf0 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Sun, 13 Feb 2022 13:01:14 -0800 Subject: [PATCH] kvstreamer: remove a memory leak At the moment, we have a memory leak of `Streamer` objects (although nil-ed out) because of `SetOnChange` handler of the streamer concurrency limit cluster setting and passing in a closure into `Stopper.AddCloser`. This was copied over from the `DistSender` code, but a crucial difference wasn't appreciated - we have a single global `DistSender` that lives throughout the uptime of the server whereas each `Streamer` object lives only during the query execution. We don't need to dynamically react to changes in the streamer concurrency limits, so this commit removes the handler. The closure has been refactored too. Release note: None --- pkg/kv/kvclient/kvstreamer/streamer.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/pkg/kv/kvclient/kvstreamer/streamer.go b/pkg/kv/kvclient/kvstreamer/streamer.go index 0cdca45eca07..822a90574c0d 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer.go +++ b/pkg/kv/kvclient/kvstreamer/streamer.go @@ -335,10 +335,6 @@ func NewStreamer( "single Streamer async concurrency", uint64(streamerConcurrencyLimit.Get(&st.SV)), ) - streamerConcurrencyLimit.SetOnChange(&st.SV, func(ctx context.Context) { - s.coordinator.asyncSem.UpdateCapacity(uint64(streamerConcurrencyLimit.Get(&st.SV))) - }) - stopper.AddCloser(s.coordinator.asyncSem.Closer("stopper")) return s } @@ -398,7 +394,7 @@ func (s *Streamer) Enqueue( ) (retErr error) { if !s.coordinatorStarted { var coordinatorCtx context.Context - coordinatorCtx, s.coordinatorCtxCancel = context.WithCancel(ctx) + coordinatorCtx, s.coordinatorCtxCancel = s.stopper.WithCancelOnQuiesce(ctx) s.waitGroup.Add(1) if err := s.stopper.RunAsyncTaskEx( coordinatorCtx,