Skip to content

Commit

Permalink
Merge #34066
Browse files Browse the repository at this point in the history
34066: rangefeed: Fix iterator leak when stopping cluster r=danhhz a=mrtracy

When converting rangefeeds to use a per-registration output loop in
issue #33557, I accidentally removed two places where the passed
"catchupIter" for a registration was being closed if the processor is
terminated before the registration is actually completed. This happened
due to some work-in-progress confusion where the catchupIter was
removed, but eventually restored. This was caught promptly by our
regular stress tests.

Fixes #34051

Release note: None

Co-authored-by: Matt Tracy <[email protected]>
  • Loading branch information
craig[bot] and Matt Tracy committed Jan 17, 2019
2 parents 41186e8 + 90993c6 commit 6885730
Showing 1 changed file with 6 additions and 0 deletions.
6 changes: 6 additions & 0 deletions pkg/storage/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ func (p *Processor) Start(stopper *stop.Stopper, rtsIter engine.SimpleIterator)
}
}
if err := stopper.RunAsyncTask(ctx, "rangefeed: output loop", runOutputLoop); err != nil {
if r.catchupIter != nil {
r.catchupIter.Close() // clean up
}
r.disconnect(roachpb.NewError(err))
p.reg.Unregister(&r)
}
Expand Down Expand Up @@ -354,6 +357,9 @@ func (p *Processor) Register(
select {
case p.regC <- r:
case <-p.stoppedC:
if catchupIter != nil {
catchupIter.Close() // clean up
}
// errC has a capacity of 1. If it is already full, we don't need to send
// another error.
select {
Expand Down

0 comments on commit 6885730

Please sign in to comment.