Skip to content

Commit

Permalink
Merge #110942
Browse files Browse the repository at this point in the history
110942: rangefeed: Ensure Close is safe even if Start failed r=miretskiy a=miretskiy

Rangefeed Start may fail if the attempt to start async task (the rangefeed) fails due to server shutdown. If that happens, Close call would block indefinitely, waiting for the rangefeed tasks that was never started, to terminate.

Fixes #110350

Release note: None

Co-authored-by: Yevgeniy Miretskiy <[email protected]>
  • Loading branch information
craig[bot] and Yevgeniy Miretskiy committed Sep 21, 2023
2 parents a3dccc6 + ae8106e commit f3497de
Showing 1 changed file with 7 additions and 11 deletions.
18 changes: 7 additions & 11 deletions pkg/kv/kvclient/rangefeed/rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,6 @@ func (f *Factory) New(
initialTimestamp: initialTimestamp,
name: name,
onValue: onValue,

stopped: make(chan struct{}),
}
initConfig(&r.config, options)
return &r
Expand All @@ -181,10 +179,8 @@ type RangeFeed struct {

onValue OnValue

closeOnce sync.Once
cancel context.CancelFunc
stopped chan struct{}

cancel context.CancelFunc
running sync.WaitGroup
started int32 // accessed atomically
}

Expand Down Expand Up @@ -242,8 +238,10 @@ func (f *RangeFeed) Start(ctx context.Context, spans []roachpb.Span) error {

ctx = logtags.AddTag(ctx, "rangefeed", f.name)
ctx, f.cancel = f.stopper.WithCancelOnQuiesce(ctx)
f.running.Add(1)
if err := f.stopper.RunAsyncTask(ctx, "rangefeed", runWithFrontier); err != nil {
f.cancel()
f.running.Done()
return err
}
return nil
Expand All @@ -253,10 +251,8 @@ func (f *RangeFeed) Start(ctx context.Context, spans []roachpb.Span) error {
// idempotently. It waits for the currently running handler, if any, to complete
// and guarantees that no future handlers will be invoked after this point.
func (f *RangeFeed) Close() {
f.closeOnce.Do(func() {
f.cancel()
<-f.stopped
})
f.cancel()
f.running.Wait()
}

// Run the rangefeed in a loop in the case of failure, likely due to node
Expand All @@ -272,7 +268,7 @@ var useMuxRangeFeed = util.ConstantWithMetamorphicTestBool("use-mux-rangefeed",
// run will run the RangeFeed until the context is canceled or if the client
// indicates that an initial scan error is non-recoverable.
func (f *RangeFeed) run(ctx context.Context, frontier *span.Frontier) {
defer close(f.stopped)
defer f.running.Done()
r := retry.StartWithCtx(ctx, f.retryOptions)
restartLogEvery := log.Every(10 * time.Second)

Expand Down

0 comments on commit f3497de

Please sign in to comment.