From ae8106efc4b1563a5df421de92fc8bdd1a1e6861 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Tue, 19 Sep 2023 19:21:30 -0400 Subject: [PATCH] rangefeed: Ensure Close is safe even if Start failed Rangefeed Start may fail if the attempt to start async task (the rangefeed) fails due to server shutdown. If that happens, Close call would block indefinitely, waiting for the rangefeed tasks that was never started, to terminate. Fixes #110350 Release note: None --- pkg/kv/kvclient/rangefeed/rangefeed.go | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/pkg/kv/kvclient/rangefeed/rangefeed.go b/pkg/kv/kvclient/rangefeed/rangefeed.go index d05588e46a0a..811bd4848016 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed.go @@ -157,8 +157,6 @@ func (f *Factory) New( initialTimestamp: initialTimestamp, name: name, onValue: onValue, - - stopped: make(chan struct{}), } initConfig(&r.config, options) return &r @@ -181,10 +179,8 @@ type RangeFeed struct { onValue OnValue - closeOnce sync.Once - cancel context.CancelFunc - stopped chan struct{} - + cancel context.CancelFunc + running sync.WaitGroup started int32 // accessed atomically } @@ -242,8 +238,10 @@ func (f *RangeFeed) Start(ctx context.Context, spans []roachpb.Span) error { ctx = logtags.AddTag(ctx, "rangefeed", f.name) ctx, f.cancel = f.stopper.WithCancelOnQuiesce(ctx) + f.running.Add(1) if err := f.stopper.RunAsyncTask(ctx, "rangefeed", runWithFrontier); err != nil { f.cancel() + f.running.Done() return err } return nil @@ -253,10 +251,8 @@ func (f *RangeFeed) Start(ctx context.Context, spans []roachpb.Span) error { // idempotently. It waits for the currently running handler, if any, to complete // and guarantees that no future handlers will be invoked after this point. func (f *RangeFeed) Close() { - f.closeOnce.Do(func() { - f.cancel() - <-f.stopped - }) + f.cancel() + f.running.Wait() } // Run the rangefeed in a loop in the case of failure, likely due to node @@ -272,7 +268,7 @@ var useMuxRangeFeed = util.ConstantWithMetamorphicTestBool("use-mux-rangefeed", // run will run the RangeFeed until the context is canceled or if the client // indicates that an initial scan error is non-recoverable. func (f *RangeFeed) run(ctx context.Context, frontier *span.Frontier) { - defer close(f.stopped) + defer f.running.Done() r := retry.StartWithCtx(ctx, f.retryOptions) restartLogEvery := log.Every(10 * time.Second)