Skip to content

Commit

Permalink
kvstreamer: remove a memory leak
Browse files Browse the repository at this point in the history
At the moment, we have a memory leak of `Streamer` objects (although
nil-ed out) because of `SetOnChange` handler of the streamer concurrency
limit cluster setting and passing in a closure into `Stopper.AddCloser`.
This was copied over from the `DistSender` code, but a crucial difference
wasn't appreciated - we have a single global `DistSender` that lives
throughout the uptime of the server whereas each `Streamer` object lives
only during the query execution.

We don't need to dynamically react to changes in the streamer concurrency
limits, so this commit removes the handler. The closure has been
refactored too.

Release note: None
  • Loading branch information
yuzefovich committed Feb 14, 2022
1 parent de283bf commit da43e5b
Showing 1 changed file with 1 addition and 5 deletions.
6 changes: 1 addition & 5 deletions pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,10 +335,6 @@ func NewStreamer(
"single Streamer async concurrency",
uint64(streamerConcurrencyLimit.Get(&st.SV)),
)
streamerConcurrencyLimit.SetOnChange(&st.SV, func(ctx context.Context) {
s.coordinator.asyncSem.UpdateCapacity(uint64(streamerConcurrencyLimit.Get(&st.SV)))
})
stopper.AddCloser(s.coordinator.asyncSem.Closer("stopper"))
return s
}

Expand Down Expand Up @@ -398,7 +394,7 @@ func (s *Streamer) Enqueue(
) (retErr error) {
if !s.coordinatorStarted {
var coordinatorCtx context.Context
coordinatorCtx, s.coordinatorCtxCancel = context.WithCancel(ctx)
coordinatorCtx, s.coordinatorCtxCancel = s.stopper.WithCancelOnQuiesce(ctx)
s.waitGroup.Add(1)
if err := s.stopper.RunAsyncTaskEx(
coordinatorCtx,
Expand Down

0 comments on commit da43e5b

Please sign in to comment.