diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index 02cbc9cae902..3079743a5267 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -158,6 +158,10 @@ func NewProcessor(cfg Config) *Processor { } } +// IteratorConstructor is used to construct an iterator. It should be called +// from underneath a stopper task to ensure that the engine has not been closed. +type IteratorConstructor func() storage.SimpleIterator + // Start launches a goroutine to process rangefeed events and send them to // registrations. // @@ -167,10 +171,10 @@ func NewProcessor(cfg Config) *Processor { // calling its Close method when it is finished. If the iterator is nil then // no initialization scan will be performed and the resolved timestamp will // immediately be considered initialized. -func (p *Processor) Start(stopper *stop.Stopper, rtsIter storage.SimpleIterator) { +func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IteratorConstructor) { ctx := p.AnnotateCtx(context.Background()) if err := stopper.RunAsyncTask(ctx, "rangefeed.Processor", func(ctx context.Context) { - p.run(ctx, rtsIter, stopper) + p.run(ctx, rtsIterFunc, stopper) }); err != nil { pErr := roachpb.NewError(err) p.reg.DisconnectWithErr(all, pErr) @@ -180,7 +184,7 @@ func (p *Processor) Start(stopper *stop.Stopper, rtsIter storage.SimpleIterator) // run is called from Start and runs the rangefeed. func (p *Processor) run( - ctx context.Context, rtsIter storage.SimpleIterator, stopper *stop.Stopper, + ctx context.Context, rtsIterFunc IteratorConstructor, stopper *stop.Stopper, ) { defer close(p.stoppedC) ctx, cancelOutputLoops := context.WithCancel(ctx) @@ -188,7 +192,8 @@ func (p *Processor) run( // Launch an async task to scan over the resolved timestamp iterator and // initialize the unresolvedIntentQueue. Ignore error if quiescing. - if rtsIter != nil { + if rtsIterFunc != nil { + rtsIter := rtsIterFunc() initScan := newInitResolvedTSScan(p, rtsIter) err := stopper.RunAsyncTask(ctx, "rangefeed: init resolved ts", initScan.Run) if err != nil { @@ -239,9 +244,6 @@ func (p *Processor) run( } } if err := stopper.RunAsyncTask(ctx, "rangefeed: output loop", runOutputLoop); err != nil { - if r.catchupIter != nil { - r.catchupIter.Close() // clean up - } r.disconnect(roachpb.NewError(err)) p.reg.Unregister(&r) } @@ -368,7 +370,7 @@ func (p *Processor) sendStop(pErr *roachpb.Error) { func (p *Processor) Register( span roachpb.RSpan, startTS hlc.Timestamp, - catchupIter storage.SimpleIterator, + catchupIterConstructor IteratorConstructor, withDiff bool, stream Stream, errC chan<- *roachpb.Error, @@ -379,7 +381,7 @@ func (p *Processor) Register( p.syncEventC() r := newRegistration( - span.AsRawSpanWithNoLocals(), startTS, catchupIter, withDiff, + span.AsRawSpanWithNoLocals(), startTS, catchupIterConstructor, withDiff, p.Config.EventChanCap, p.Metrics, stream, errC, ) select { diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go index 7fbc63531354..e4bf0062547b 100644 --- a/pkg/kv/kvserver/rangefeed/processor_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_test.go @@ -152,10 +152,17 @@ func newTestProcessorWithTxnPusher( EventChanCap: testProcessorEventCCap, CheckStreamsInterval: 10 * time.Millisecond, }) - p.Start(stopper, rtsIter) + p.Start(stopper, makeIteratorConstructor(rtsIter)) return p, stopper } +func makeIteratorConstructor(rtsIter storage.SimpleIterator) IteratorConstructor { + if rtsIter == nil { + return nil + } + return func() storage.SimpleIterator { return rtsIter } +} + func newTestProcessor(rtsIter storage.SimpleIterator) (*Processor, *stop.Stopper) { return newTestProcessorWithTxnPusher(rtsIter, nil /* pusher */) } diff --git a/pkg/kv/kvserver/rangefeed/registry.go b/pkg/kv/kvserver/rangefeed/registry.go index 3774860830bd..dbc66daf7fd9 100644 --- a/pkg/kv/kvserver/rangefeed/registry.go +++ b/pkg/kv/kvserver/rangefeed/registry.go @@ -53,11 +53,11 @@ type Stream interface { // has finished. type registration struct { // Input. - span roachpb.Span - catchupTimestamp hlc.Timestamp - catchupIter storage.SimpleIterator - withDiff bool - metrics *Metrics + span roachpb.Span + catchupTimestamp hlc.Timestamp + catchupIterConstructor func() storage.SimpleIterator + withDiff bool + metrics *Metrics // Output. stream Stream @@ -86,7 +86,7 @@ type registration struct { func newRegistration( span roachpb.Span, startTS hlc.Timestamp, - catchupIter storage.SimpleIterator, + catchupIterConstructor func() storage.SimpleIterator, withDiff bool, bufferSz int, metrics *Metrics, @@ -94,14 +94,14 @@ func newRegistration( errC chan<- *roachpb.Error, ) registration { r := registration{ - span: span, - catchupTimestamp: startTS, - catchupIter: catchupIter, - withDiff: withDiff, - metrics: metrics, - stream: stream, - errC: errC, - buf: make(chan *roachpb.RangeFeedEvent, bufferSz), + span: span, + catchupTimestamp: startTS, + catchupIterConstructor: catchupIterConstructor, + withDiff: withDiff, + metrics: metrics, + stream: stream, + errC: errC, + buf: make(chan *roachpb.RangeFeedEvent, bufferSz), } r.mu.Locker = &syncutil.Mutex{} r.mu.caughtUp = true @@ -231,13 +231,11 @@ func (r *registration) disconnect(pErr *roachpb.Error) { // canceled, or when the buffer has overflowed and all pre-overflow entries // have been emitted. func (r *registration) outputLoop(ctx context.Context) error { - // If the registration has a catch-up scan, - if r.catchupIter != nil { - if err := r.runCatchupScan(); err != nil { - err = errors.Wrap(err, "catch-up scan failed") - log.Errorf(ctx, "%v", err) - return err - } + // If the registration has a catch-up scan, run it. + if err := r.maybeRunCatchupScan(); err != nil { + err = errors.Wrap(err, "catch-up scan failed") + log.Errorf(ctx, "%v", err) + return err } // Normal buffered output loop. @@ -274,18 +272,22 @@ func (r *registration) runOutputLoop(ctx context.Context) { r.disconnect(roachpb.NewError(err)) } -// runCatchupScan starts a catchup scan which will output entries for all +// maybeRunCatchupScan starts a catchup scan which will output entries for all // recorded changes in the replica that are newer than the catchupTimestamp. // This uses the iterator provided when the registration was originally created; // after the scan completes, the iterator will be closed. -func (r *registration) runCatchupScan() error { - if r.catchupIter == nil { +// +// If the registration does not have a catchUpIteratorConstructor, this method +// is a no-op. +func (r *registration) maybeRunCatchupScan() error { + if r.catchupIterConstructor == nil { return nil } + catchupIter := r.catchupIterConstructor() + r.catchupIterConstructor = nil start := timeutil.Now() defer func() { - r.catchupIter.Close() - r.catchupIter = nil + catchupIter.Close() r.metrics.RangeFeedCatchupScanNanos.Inc(timeutil.Since(start).Nanoseconds()) }() @@ -323,16 +325,16 @@ func (r *registration) runCatchupScan() error { // versions of each key that are after the registration's startTS, so we // can't use NextKey. var meta enginepb.MVCCMetadata - r.catchupIter.SeekGE(startKey) + catchupIter.SeekGE(startKey) for { - if ok, err := r.catchupIter.Valid(); err != nil { + if ok, err := catchupIter.Valid(); err != nil { return err - } else if !ok || !r.catchupIter.UnsafeKey().Less(endKey) { + } else if !ok || !catchupIter.UnsafeKey().Less(endKey) { break } - unsafeKey := r.catchupIter.UnsafeKey() - unsafeVal := r.catchupIter.UnsafeValue() + unsafeKey := catchupIter.UnsafeKey() + unsafeVal := catchupIter.UnsafeValue() if !unsafeKey.IsValue() { // Found a metadata key. if err := protoutil.Unmarshal(unsafeVal, &meta); err != nil { @@ -344,7 +346,7 @@ func (r *registration) runCatchupScan() error { // past the corresponding provisional key-value. To do this, // scan to the timestamp immediately before (i.e. the key // immediately after) the provisional key. - r.catchupIter.SeekGE(storage.MVCCKey{ + catchupIter.SeekGE(storage.MVCCKey{ Key: unsafeKey.Key, Timestamp: hlc.Timestamp(meta.Timestamp).Prev(), }) @@ -375,7 +377,7 @@ func (r *registration) runCatchupScan() error { if ignore && !r.withDiff { // Skip all the way to the next key. // NB: fast-path to avoid value copy when !r.withDiff. - r.catchupIter.NextKey() + catchupIter.NextKey() continue } @@ -388,10 +390,10 @@ func (r *registration) runCatchupScan() error { if ignore { // Skip all the way to the next key. - r.catchupIter.NextKey() + catchupIter.NextKey() } else { // Move to the next version of this key. - r.catchupIter.Next() + catchupIter.Next() var event roachpb.RangeFeedEvent event.MustSetValue(&roachpb.RangeFeedValue{ diff --git a/pkg/kv/kvserver/rangefeed/registry_test.go b/pkg/kv/kvserver/rangefeed/registry_test.go index d003b488030b..54bfc0c8d769 100644 --- a/pkg/kv/kvserver/rangefeed/registry_test.go +++ b/pkg/kv/kvserver/rangefeed/registry_test.go @@ -104,7 +104,7 @@ func newTestRegistration( registration: newRegistration( span, ts, - catchup, + makeIteratorConstructor(catchup), withDiff, 5, NewMetrics(), @@ -253,7 +253,7 @@ func TestRegistrationCatchUpScan(t *testing.T) { }, hlc.Timestamp{WallTime: 4}, iter, true /* withDiff */) require.Zero(t, r.metrics.RangeFeedCatchupScanNanos.Count()) - require.NoError(t, r.runCatchupScan()) + require.NoError(t, r.maybeRunCatchupScan()) require.True(t, iter.closed) require.NotZero(t, r.metrics.RangeFeedCatchupScanNanos.Count()) diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index 2a43ed9fc21d..e6298f70720a 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -194,28 +194,32 @@ func (r *Replica) RangeFeed( } // Register the stream with a catch-up iterator. - var catchUpIter storage.SimpleIterator + var catchUpIterFunc rangefeed.IteratorConstructor if usingCatchupIter { - innerIter := r.Engine().NewIterator(storage.IterOptions{ - UpperBound: args.Span.EndKey, - // RangeFeed originally intended to use the time-bound iterator - // performance optimization. However, they've had correctness issues in - // the past (#28358, #34819) and no-one has the time for the due-diligence - // necessary to be confidant in their correctness going forward. Not using - // them causes the total time spent in RangeFeed catchup on changefeed - // over tpcc-1000 to go from 40s -> 4853s, which is quite large but still - // workable. See #35122 for details. - // MinTimestampHint: args.Timestamp, - }) - catchUpIter = iteratorWithCloser{ - SimpleIterator: innerIter, - close: iterSemRelease, + catchUpIterFunc = func() storage.SimpleIterator { + + innerIter := r.Engine().NewIterator(storage.IterOptions{ + UpperBound: args.Span.EndKey, + // RangeFeed originally intended to use the time-bound iterator + // performance optimization. However, they've had correctness issues in + // the past (#28358, #34819) and no-one has the time for the due-diligence + // necessary to be confidant in their correctness going forward. Not using + // them causes the total time spent in RangeFeed catchup on changefeed + // over tpcc-1000 to go from 40s -> 4853s, which is quite large but still + // workable. See #35122 for details. + // MinTimestampHint: args.Timestamp, + }) + catchUpIter := iteratorWithCloser{ + SimpleIterator: innerIter, + close: iterSemRelease, + } + // Responsibility for releasing the semaphore now passes to the iterator. + iterSemRelease = nil + return catchUpIter } - // Responsibility for releasing the semaphore now passes to the iterator. - iterSemRelease = nil } p := r.registerWithRangefeedRaftMuLocked( - ctx, rSpan, args.Timestamp, catchUpIter, args.WithDiff, lockedStream, errC, + ctx, rSpan, args.Timestamp, catchUpIterFunc, args.WithDiff, lockedStream, errC, ) r.raftMu.Unlock() @@ -296,7 +300,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( ctx context.Context, span roachpb.RSpan, startTS hlc.Timestamp, - catchupIter storage.SimpleIterator, + catchupIter rangefeed.IteratorConstructor, withDiff bool, stream rangefeed.Stream, errC chan<- *roachpb.Error, @@ -341,16 +345,18 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( p = rangefeed.NewProcessor(cfg) // Start it with an iterator to initialize the resolved timestamp. - rtsIter := r.Engine().NewIterator(storage.IterOptions{ - UpperBound: desc.EndKey.AsRawKey(), - // TODO(nvanbenschoten): To facilitate fast restarts of rangefeed - // we should periodically persist the resolved timestamp so that we - // can initialize the rangefeed using an iterator that only needs to - // observe timestamps back to the last recorded resolved timestamp. - // This is safe because we know that there are no unresolved intents - // at times before a resolved timestamp. - // MinTimestampHint: r.ResolvedTimestamp, - }) + rtsIter := func() storage.SimpleIterator { + return r.Engine().NewIterator(storage.IterOptions{ + UpperBound: desc.EndKey.AsRawKey(), + // TODO(nvanbenschoten): To facilitate fast restarts of rangefeed + // we should periodically persist the resolved timestamp so that we + // can initialize the rangefeed using an iterator that only needs to + // observe timestamps back to the last recorded resolved timestamp. + // This is safe because we know that there are no unresolved intents + // at times before a resolved timestamp. + // MinTimestampHint: r.ResolvedTimestamp, + }) + } p.Start(r.store.Stopper(), rtsIter) // Register with the processor *before* we attach its reference to the @@ -360,7 +366,6 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( // server shutdown. reg, filter := p.Register(span, startTS, catchupIter, withDiff, stream, errC) if !reg { - catchupIter.Close() // clean up select { case <-r.store.Stopper().ShouldQuiesce(): errC <- roachpb.NewError(&roachpb.NodeUnavailableError{})