diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index 1d85069a62b9..3c6ccfc84485 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -13,6 +13,7 @@ package kvserver import ( "context" "fmt" + "sync" "time" "github.com/cockroachdb/cockroach/pkg/base" @@ -170,14 +171,19 @@ func (r *Replica) RangeFeed( if err := lim.Begin(ctx); err != nil { return roachpb.NewError(err) } - // Finish the iterator limit, but only if we exit before - // creating the iterator itself. - iterSemRelease = lim.Finish - defer func() { - if iterSemRelease != nil { - iterSemRelease() - } - }() + // Finish the iterator limit if we exit before the iterator finishes. + // The release function will be hooked into the Close method on the + // iterator below. The sync.Once prevents any races between exiting early + // from this call and finishing the catchup scan underneath the + // rangefeed.Processor. We need to release here in case we fail to + // register the processor, or, more perniciously, in the case where the + // processor gets registered by shut down before starting the catchup + // scan. + var iterSemReleaseOnce sync.Once + iterSemRelease = func() { + iterSemReleaseOnce.Do(lim.Finish) + } + defer iterSemRelease() } // Lock the raftMu, then register the stream as a new rangefeed registration. @@ -210,8 +216,6 @@ func (r *Replica) RangeFeed( SimpleIterator: innerIter, close: iterSemRelease, } - // Responsibility for releasing the semaphore now passes to the iterator. - iterSemRelease = nil return catchUpIter } }