diff --git a/pkg/kv/kvclient/rangefeed/rangefeed.go b/pkg/kv/kvclient/rangefeed/rangefeed.go index d05588e46a0a..811bd4848016 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed.go @@ -157,8 +157,6 @@ func (f *Factory) New( initialTimestamp: initialTimestamp, name: name, onValue: onValue, - - stopped: make(chan struct{}), } initConfig(&r.config, options) return &r @@ -181,10 +179,8 @@ type RangeFeed struct { onValue OnValue - closeOnce sync.Once - cancel context.CancelFunc - stopped chan struct{} - + cancel context.CancelFunc + running sync.WaitGroup started int32 // accessed atomically } @@ -242,8 +238,10 @@ func (f *RangeFeed) Start(ctx context.Context, spans []roachpb.Span) error { ctx = logtags.AddTag(ctx, "rangefeed", f.name) ctx, f.cancel = f.stopper.WithCancelOnQuiesce(ctx) + f.running.Add(1) if err := f.stopper.RunAsyncTask(ctx, "rangefeed", runWithFrontier); err != nil { f.cancel() + f.running.Done() return err } return nil @@ -253,10 +251,8 @@ func (f *RangeFeed) Start(ctx context.Context, spans []roachpb.Span) error { // idempotently. It waits for the currently running handler, if any, to complete // and guarantees that no future handlers will be invoked after this point. func (f *RangeFeed) Close() { - f.closeOnce.Do(func() { - f.cancel() - <-f.stopped - }) + f.cancel() + f.running.Wait() } // Run the rangefeed in a loop in the case of failure, likely due to node @@ -272,7 +268,7 @@ var useMuxRangeFeed = util.ConstantWithMetamorphicTestBool("use-mux-rangefeed", // run will run the RangeFeed until the context is canceled or if the client // indicates that an initial scan error is non-recoverable. func (f *RangeFeed) run(ctx context.Context, frontier *span.Frontier) { - defer close(f.stopped) + defer f.running.Done() r := retry.StartWithCtx(ctx, f.retryOptions) restartLogEvery := log.Every(10 * time.Second)