Skip to content

Commit

Permalink
rangefeed: Ensure Close is safe even if Start failed
Browse files Browse the repository at this point in the history
Rangefeed Start may fail if the attempt to start async
task (the rangefeed) fails due to server shutdown.
If that happens, Close call would block indefinitely,
waiting for the rangefeed tasks that was never started,
to terminate.

Fixes #110350

Release note: None
  • Loading branch information
Yevgeniy Miretskiy committed Sep 20, 2023
1 parent d414347 commit ae8106e
Showing 1 changed file with 7 additions and 11 deletions.
18 changes: 7 additions & 11 deletions pkg/kv/kvclient/rangefeed/rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,6 @@ func (f *Factory) New(
initialTimestamp: initialTimestamp,
name: name,
onValue: onValue,

stopped: make(chan struct{}),
}
initConfig(&r.config, options)
return &r
Expand All @@ -181,10 +179,8 @@ type RangeFeed struct {

onValue OnValue

closeOnce sync.Once
cancel context.CancelFunc
stopped chan struct{}

cancel context.CancelFunc
running sync.WaitGroup
started int32 // accessed atomically
}

Expand Down Expand Up @@ -242,8 +238,10 @@ func (f *RangeFeed) Start(ctx context.Context, spans []roachpb.Span) error {

ctx = logtags.AddTag(ctx, "rangefeed", f.name)
ctx, f.cancel = f.stopper.WithCancelOnQuiesce(ctx)
f.running.Add(1)
if err := f.stopper.RunAsyncTask(ctx, "rangefeed", runWithFrontier); err != nil {
f.cancel()
f.running.Done()
return err
}
return nil
Expand All @@ -253,10 +251,8 @@ func (f *RangeFeed) Start(ctx context.Context, spans []roachpb.Span) error {
// idempotently. It waits for the currently running handler, if any, to complete
// and guarantees that no future handlers will be invoked after this point.
func (f *RangeFeed) Close() {
f.closeOnce.Do(func() {
f.cancel()
<-f.stopped
})
f.cancel()
f.running.Wait()
}

// Run the rangefeed in a loop in the case of failure, likely due to node
Expand All @@ -272,7 +268,7 @@ var useMuxRangeFeed = util.ConstantWithMetamorphicTestBool("use-mux-rangefeed",
// run will run the RangeFeed until the context is canceled or if the client
// indicates that an initial scan error is non-recoverable.
func (f *RangeFeed) run(ctx context.Context, frontier *span.Frontier) {
defer close(f.stopped)
defer f.running.Done()
r := retry.StartWithCtx(ctx, f.retryOptions)
restartLogEvery := log.Every(10 * time.Second)

Expand Down

0 comments on commit ae8106e

Please sign in to comment.