Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver,rangefeed: ensure that iterators are only constructed under tasks #52844

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions pkg/kv/kvserver/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ func NewProcessor(cfg Config) *Processor {
}
}

// IteratorConstructor is used to construct an iterator. It should be called
// from underneath a stopper task to ensure that the engine has not been closed.
type IteratorConstructor func() storage.SimpleIterator

// Start launches a goroutine to process rangefeed events and send them to
// registrations.
//
Expand All @@ -167,10 +171,10 @@ func NewProcessor(cfg Config) *Processor {
// calling its Close method when it is finished. If the iterator is nil then
// no initialization scan will be performed and the resolved timestamp will
// immediately be considered initialized.
func (p *Processor) Start(stopper *stop.Stopper, rtsIter storage.SimpleIterator) {
func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IteratorConstructor) {
ctx := p.AnnotateCtx(context.Background())
if err := stopper.RunAsyncTask(ctx, "rangefeed.Processor", func(ctx context.Context) {
p.run(ctx, rtsIter, stopper)
p.run(ctx, rtsIterFunc, stopper)
}); err != nil {
pErr := roachpb.NewError(err)
p.reg.DisconnectWithErr(all, pErr)
Expand All @@ -180,15 +184,16 @@ func (p *Processor) Start(stopper *stop.Stopper, rtsIter storage.SimpleIterator)

// run is called from Start and runs the rangefeed.
func (p *Processor) run(
ctx context.Context, rtsIter storage.SimpleIterator, stopper *stop.Stopper,
ctx context.Context, rtsIterFunc IteratorConstructor, stopper *stop.Stopper,
) {
defer close(p.stoppedC)
ctx, cancelOutputLoops := context.WithCancel(ctx)
defer cancelOutputLoops()

// Launch an async task to scan over the resolved timestamp iterator and
// initialize the unresolvedIntentQueue. Ignore error if quiescing.
if rtsIter != nil {
if rtsIterFunc != nil {
rtsIter := rtsIterFunc()
initScan := newInitResolvedTSScan(p, rtsIter)
err := stopper.RunAsyncTask(ctx, "rangefeed: init resolved ts", initScan.Run)
if err != nil {
Expand Down Expand Up @@ -239,9 +244,6 @@ func (p *Processor) run(
}
}
if err := stopper.RunAsyncTask(ctx, "rangefeed: output loop", runOutputLoop); err != nil {
if r.catchupIter != nil {
r.catchupIter.Close() // clean up
}
r.disconnect(roachpb.NewError(err))
p.reg.Unregister(&r)
}
Expand Down Expand Up @@ -368,7 +370,7 @@ func (p *Processor) sendStop(pErr *roachpb.Error) {
func (p *Processor) Register(
span roachpb.RSpan,
startTS hlc.Timestamp,
catchupIter storage.SimpleIterator,
catchupIterConstructor IteratorConstructor,
withDiff bool,
stream Stream,
errC chan<- *roachpb.Error,
Expand All @@ -379,7 +381,7 @@ func (p *Processor) Register(
p.syncEventC()

r := newRegistration(
span.AsRawSpanWithNoLocals(), startTS, catchupIter, withDiff,
span.AsRawSpanWithNoLocals(), startTS, catchupIterConstructor, withDiff,
p.Config.EventChanCap, p.Metrics, stream, errC,
)
select {
Expand Down
9 changes: 8 additions & 1 deletion pkg/kv/kvserver/rangefeed/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,17 @@ func newTestProcessorWithTxnPusher(
EventChanCap: testProcessorEventCCap,
CheckStreamsInterval: 10 * time.Millisecond,
})
p.Start(stopper, rtsIter)
p.Start(stopper, makeIteratorConstructor(rtsIter))
return p, stopper
}

func makeIteratorConstructor(rtsIter storage.SimpleIterator) IteratorConstructor {
if rtsIter == nil {
return nil
}
return func() storage.SimpleIterator { return rtsIter }
}

func newTestProcessor(rtsIter storage.SimpleIterator) (*Processor, *stop.Stopper) {
return newTestProcessorWithTxnPusher(rtsIter, nil /* pusher */)
}
Expand Down
72 changes: 37 additions & 35 deletions pkg/kv/kvserver/rangefeed/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ type Stream interface {
// has finished.
type registration struct {
// Input.
span roachpb.Span
catchupTimestamp hlc.Timestamp
catchupIter storage.SimpleIterator
withDiff bool
metrics *Metrics
span roachpb.Span
catchupTimestamp hlc.Timestamp
catchupIterConstructor func() storage.SimpleIterator
withDiff bool
metrics *Metrics

// Output.
stream Stream
Expand Down Expand Up @@ -86,22 +86,22 @@ type registration struct {
func newRegistration(
span roachpb.Span,
startTS hlc.Timestamp,
catchupIter storage.SimpleIterator,
catchupIterConstructor func() storage.SimpleIterator,
withDiff bool,
bufferSz int,
metrics *Metrics,
stream Stream,
errC chan<- *roachpb.Error,
) registration {
r := registration{
span: span,
catchupTimestamp: startTS,
catchupIter: catchupIter,
withDiff: withDiff,
metrics: metrics,
stream: stream,
errC: errC,
buf: make(chan *roachpb.RangeFeedEvent, bufferSz),
span: span,
catchupTimestamp: startTS,
catchupIterConstructor: catchupIterConstructor,
withDiff: withDiff,
metrics: metrics,
stream: stream,
errC: errC,
buf: make(chan *roachpb.RangeFeedEvent, bufferSz),
}
r.mu.Locker = &syncutil.Mutex{}
r.mu.caughtUp = true
Expand Down Expand Up @@ -231,13 +231,11 @@ func (r *registration) disconnect(pErr *roachpb.Error) {
// canceled, or when the buffer has overflowed and all pre-overflow entries
// have been emitted.
func (r *registration) outputLoop(ctx context.Context) error {
// If the registration has a catch-up scan,
if r.catchupIter != nil {
if err := r.runCatchupScan(); err != nil {
err = errors.Wrap(err, "catch-up scan failed")
log.Errorf(ctx, "%v", err)
return err
}
// If the registration has a catch-up scan, run it.
if err := r.maybeRunCatchupScan(); err != nil {
err = errors.Wrap(err, "catch-up scan failed")
log.Errorf(ctx, "%v", err)
return err
}

// Normal buffered output loop.
Expand Down Expand Up @@ -274,18 +272,22 @@ func (r *registration) runOutputLoop(ctx context.Context) {
r.disconnect(roachpb.NewError(err))
}

// runCatchupScan starts a catchup scan which will output entries for all
// maybeRunCatchupScan starts a catchup scan which will output entries for all
// recorded changes in the replica that are newer than the catchupTimestamp.
// This uses the iterator provided when the registration was originally created;
// after the scan completes, the iterator will be closed.
func (r *registration) runCatchupScan() error {
if r.catchupIter == nil {
//
// If the registration does not have a catchUpIteratorConstructor, this method
// is a no-op.
func (r *registration) maybeRunCatchupScan() error {
if r.catchupIterConstructor == nil {
return nil
}
catchupIter := r.catchupIterConstructor()
r.catchupIterConstructor = nil
start := timeutil.Now()
defer func() {
r.catchupIter.Close()
r.catchupIter = nil
catchupIter.Close()
r.metrics.RangeFeedCatchupScanNanos.Inc(timeutil.Since(start).Nanoseconds())
}()

Expand Down Expand Up @@ -323,16 +325,16 @@ func (r *registration) runCatchupScan() error {
// versions of each key that are after the registration's startTS, so we
// can't use NextKey.
var meta enginepb.MVCCMetadata
r.catchupIter.SeekGE(startKey)
catchupIter.SeekGE(startKey)
for {
if ok, err := r.catchupIter.Valid(); err != nil {
if ok, err := catchupIter.Valid(); err != nil {
return err
} else if !ok || !r.catchupIter.UnsafeKey().Less(endKey) {
} else if !ok || !catchupIter.UnsafeKey().Less(endKey) {
break
}

unsafeKey := r.catchupIter.UnsafeKey()
unsafeVal := r.catchupIter.UnsafeValue()
unsafeKey := catchupIter.UnsafeKey()
unsafeVal := catchupIter.UnsafeValue()
if !unsafeKey.IsValue() {
// Found a metadata key.
if err := protoutil.Unmarshal(unsafeVal, &meta); err != nil {
Expand All @@ -344,7 +346,7 @@ func (r *registration) runCatchupScan() error {
// past the corresponding provisional key-value. To do this,
// scan to the timestamp immediately before (i.e. the key
// immediately after) the provisional key.
r.catchupIter.SeekGE(storage.MVCCKey{
catchupIter.SeekGE(storage.MVCCKey{
Key: unsafeKey.Key,
Timestamp: hlc.Timestamp(meta.Timestamp).Prev(),
})
Expand Down Expand Up @@ -375,7 +377,7 @@ func (r *registration) runCatchupScan() error {
if ignore && !r.withDiff {
// Skip all the way to the next key.
// NB: fast-path to avoid value copy when !r.withDiff.
r.catchupIter.NextKey()
catchupIter.NextKey()
continue
}

Expand All @@ -388,10 +390,10 @@ func (r *registration) runCatchupScan() error {

if ignore {
// Skip all the way to the next key.
r.catchupIter.NextKey()
catchupIter.NextKey()
} else {
// Move to the next version of this key.
r.catchupIter.Next()
catchupIter.Next()

var event roachpb.RangeFeedEvent
event.MustSetValue(&roachpb.RangeFeedValue{
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/rangefeed/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func newTestRegistration(
registration: newRegistration(
span,
ts,
catchup,
makeIteratorConstructor(catchup),
withDiff,
5,
NewMetrics(),
Expand Down Expand Up @@ -253,7 +253,7 @@ func TestRegistrationCatchUpScan(t *testing.T) {
}, hlc.Timestamp{WallTime: 4}, iter, true /* withDiff */)

require.Zero(t, r.metrics.RangeFeedCatchupScanNanos.Count())
require.NoError(t, r.runCatchupScan())
require.NoError(t, r.maybeRunCatchupScan())
require.True(t, iter.closed)
require.NotZero(t, r.metrics.RangeFeedCatchupScanNanos.Count())

Expand Down
65 changes: 35 additions & 30 deletions pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,28 +194,32 @@ func (r *Replica) RangeFeed(
}

// Register the stream with a catch-up iterator.
var catchUpIter storage.SimpleIterator
var catchUpIterFunc rangefeed.IteratorConstructor
if usingCatchupIter {
innerIter := r.Engine().NewIterator(storage.IterOptions{
UpperBound: args.Span.EndKey,
// RangeFeed originally intended to use the time-bound iterator
// performance optimization. However, they've had correctness issues in
// the past (#28358, #34819) and no-one has the time for the due-diligence
// necessary to be confidant in their correctness going forward. Not using
// them causes the total time spent in RangeFeed catchup on changefeed
// over tpcc-1000 to go from 40s -> 4853s, which is quite large but still
// workable. See #35122 for details.
// MinTimestampHint: args.Timestamp,
})
catchUpIter = iteratorWithCloser{
SimpleIterator: innerIter,
close: iterSemRelease,
catchUpIterFunc = func() storage.SimpleIterator {

innerIter := r.Engine().NewIterator(storage.IterOptions{
UpperBound: args.Span.EndKey,
// RangeFeed originally intended to use the time-bound iterator
// performance optimization. However, they've had correctness issues in
// the past (#28358, #34819) and no-one has the time for the due-diligence
// necessary to be confidant in their correctness going forward. Not using
// them causes the total time spent in RangeFeed catchup on changefeed
// over tpcc-1000 to go from 40s -> 4853s, which is quite large but still
// workable. See #35122 for details.
// MinTimestampHint: args.Timestamp,
})
catchUpIter := iteratorWithCloser{
SimpleIterator: innerIter,
close: iterSemRelease,
}
// Responsibility for releasing the semaphore now passes to the iterator.
iterSemRelease = nil
return catchUpIter
}
// Responsibility for releasing the semaphore now passes to the iterator.
iterSemRelease = nil
}
p := r.registerWithRangefeedRaftMuLocked(
ctx, rSpan, args.Timestamp, catchUpIter, args.WithDiff, lockedStream, errC,
ctx, rSpan, args.Timestamp, catchUpIterFunc, args.WithDiff, lockedStream, errC,
)
r.raftMu.Unlock()

Expand Down Expand Up @@ -296,7 +300,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked(
ctx context.Context,
span roachpb.RSpan,
startTS hlc.Timestamp,
catchupIter storage.SimpleIterator,
catchupIter rangefeed.IteratorConstructor,
withDiff bool,
stream rangefeed.Stream,
errC chan<- *roachpb.Error,
Expand Down Expand Up @@ -341,16 +345,18 @@ func (r *Replica) registerWithRangefeedRaftMuLocked(
p = rangefeed.NewProcessor(cfg)

// Start it with an iterator to initialize the resolved timestamp.
rtsIter := r.Engine().NewIterator(storage.IterOptions{
UpperBound: desc.EndKey.AsRawKey(),
// TODO(nvanbenschoten): To facilitate fast restarts of rangefeed
// we should periodically persist the resolved timestamp so that we
// can initialize the rangefeed using an iterator that only needs to
// observe timestamps back to the last recorded resolved timestamp.
// This is safe because we know that there are no unresolved intents
// at times before a resolved timestamp.
// MinTimestampHint: r.ResolvedTimestamp,
})
rtsIter := func() storage.SimpleIterator {
return r.Engine().NewIterator(storage.IterOptions{
UpperBound: desc.EndKey.AsRawKey(),
// TODO(nvanbenschoten): To facilitate fast restarts of rangefeed
// we should periodically persist the resolved timestamp so that we
// can initialize the rangefeed using an iterator that only needs to
// observe timestamps back to the last recorded resolved timestamp.
// This is safe because we know that there are no unresolved intents
// at times before a resolved timestamp.
// MinTimestampHint: r.ResolvedTimestamp,
})
}
p.Start(r.store.Stopper(), rtsIter)

// Register with the processor *before* we attach its reference to the
Expand All @@ -360,7 +366,6 @@ func (r *Replica) registerWithRangefeedRaftMuLocked(
// server shutdown.
reg, filter := p.Register(span, startTS, catchupIter, withDiff, stream, errC)
if !reg {
catchupIter.Close() // clean up
select {
case <-r.store.Stopper().ShouldQuiesce():
errC <- roachpb.NewError(&roachpb.NodeUnavailableError{})
Expand Down