Skip to content

Commit

Permalink
kvserver,rangefeed: ensure that iterators are only constructed under …
Browse files Browse the repository at this point in the history
…tasks

Prior to this change, it was possible for a rangefeed request to be issued
concurrently with shutting down which could lead to an iterator being
constructed after the engine has been closed.

Touches cockroachdb#51544

Release note: None
  • Loading branch information
ajwerner committed Aug 17, 2020
1 parent 8324ed3 commit 571f4be
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 77 deletions.
20 changes: 11 additions & 9 deletions pkg/kv/kvserver/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ func NewProcessor(cfg Config) *Processor {
}
}

// IteratorConstructor is used to construct an iterator. It should be called
// from underneath a stopper task to ensure that the engine has not been closed.
type IteratorConstructor func() storage.SimpleIterator

// Start launches a goroutine to process rangefeed events and send them to
// registrations.
//
Expand All @@ -167,10 +171,10 @@ func NewProcessor(cfg Config) *Processor {
// calling its Close method when it is finished. If the iterator is nil then
// no initialization scan will be performed and the resolved timestamp will
// immediately be considered initialized.
func (p *Processor) Start(stopper *stop.Stopper, rtsIter storage.SimpleIterator) {
func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IteratorConstructor) {
ctx := p.AnnotateCtx(context.Background())
if err := stopper.RunAsyncTask(ctx, "rangefeed.Processor", func(ctx context.Context) {
p.run(ctx, rtsIter, stopper)
p.run(ctx, rtsIterFunc, stopper)
}); err != nil {
pErr := roachpb.NewError(err)
p.reg.DisconnectWithErr(all, pErr)
Expand All @@ -180,15 +184,16 @@ func (p *Processor) Start(stopper *stop.Stopper, rtsIter storage.SimpleIterator)

// run is called from Start and runs the rangefeed.
func (p *Processor) run(
ctx context.Context, rtsIter storage.SimpleIterator, stopper *stop.Stopper,
ctx context.Context, rtsIterFunc IteratorConstructor, stopper *stop.Stopper,
) {
defer close(p.stoppedC)
ctx, cancelOutputLoops := context.WithCancel(ctx)
defer cancelOutputLoops()

// Launch an async task to scan over the resolved timestamp iterator and
// initialize the unresolvedIntentQueue. Ignore error if quiescing.
if rtsIter != nil {
if rtsIterFunc != nil {
rtsIter := rtsIterFunc()
initScan := newInitResolvedTSScan(p, rtsIter)
err := stopper.RunAsyncTask(ctx, "rangefeed: init resolved ts", initScan.Run)
if err != nil {
Expand Down Expand Up @@ -239,9 +244,6 @@ func (p *Processor) run(
}
}
if err := stopper.RunAsyncTask(ctx, "rangefeed: output loop", runOutputLoop); err != nil {
if r.catchupIter != nil {
r.catchupIter.Close() // clean up
}
r.disconnect(roachpb.NewError(err))
p.reg.Unregister(&r)
}
Expand Down Expand Up @@ -368,7 +370,7 @@ func (p *Processor) sendStop(pErr *roachpb.Error) {
func (p *Processor) Register(
span roachpb.RSpan,
startTS hlc.Timestamp,
catchupIter storage.SimpleIterator,
catchupIterConstructor IteratorConstructor,
withDiff bool,
stream Stream,
errC chan<- *roachpb.Error,
Expand All @@ -379,7 +381,7 @@ func (p *Processor) Register(
p.syncEventC()

r := newRegistration(
span.AsRawSpanWithNoLocals(), startTS, catchupIter, withDiff,
span.AsRawSpanWithNoLocals(), startTS, catchupIterConstructor, withDiff,
p.Config.EventChanCap, p.Metrics, stream, errC,
)
select {
Expand Down
9 changes: 8 additions & 1 deletion pkg/kv/kvserver/rangefeed/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,17 @@ func newTestProcessorWithTxnPusher(
EventChanCap: testProcessorEventCCap,
CheckStreamsInterval: 10 * time.Millisecond,
})
p.Start(stopper, rtsIter)
p.Start(stopper, makeIteratorConstructor(rtsIter))
return p, stopper
}

func makeIteratorConstructor(rtsIter storage.SimpleIterator) IteratorConstructor {
if rtsIter == nil {
return nil
}
return func() storage.SimpleIterator { return rtsIter }
}

func newTestProcessor(rtsIter storage.SimpleIterator) (*Processor, *stop.Stopper) {
return newTestProcessorWithTxnPusher(rtsIter, nil /* pusher */)
}
Expand Down
72 changes: 37 additions & 35 deletions pkg/kv/kvserver/rangefeed/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ type Stream interface {
// has finished.
type registration struct {
// Input.
span roachpb.Span
catchupTimestamp hlc.Timestamp
catchupIter storage.SimpleIterator
withDiff bool
metrics *Metrics
span roachpb.Span
catchupTimestamp hlc.Timestamp
catchupIterConstructor func() storage.SimpleIterator
withDiff bool
metrics *Metrics

// Output.
stream Stream
Expand Down Expand Up @@ -86,22 +86,22 @@ type registration struct {
func newRegistration(
span roachpb.Span,
startTS hlc.Timestamp,
catchupIter storage.SimpleIterator,
catchupIterConstructor func() storage.SimpleIterator,
withDiff bool,
bufferSz int,
metrics *Metrics,
stream Stream,
errC chan<- *roachpb.Error,
) registration {
r := registration{
span: span,
catchupTimestamp: startTS,
catchupIter: catchupIter,
withDiff: withDiff,
metrics: metrics,
stream: stream,
errC: errC,
buf: make(chan *roachpb.RangeFeedEvent, bufferSz),
span: span,
catchupTimestamp: startTS,
catchupIterConstructor: catchupIterConstructor,
withDiff: withDiff,
metrics: metrics,
stream: stream,
errC: errC,
buf: make(chan *roachpb.RangeFeedEvent, bufferSz),
}
r.mu.Locker = &syncutil.Mutex{}
r.mu.caughtUp = true
Expand Down Expand Up @@ -231,13 +231,11 @@ func (r *registration) disconnect(pErr *roachpb.Error) {
// canceled, or when the buffer has overflowed and all pre-overflow entries
// have been emitted.
func (r *registration) outputLoop(ctx context.Context) error {
// If the registration has a catch-up scan,
if r.catchupIter != nil {
if err := r.runCatchupScan(); err != nil {
err = errors.Wrap(err, "catch-up scan failed")
log.Errorf(ctx, "%v", err)
return err
}
// If the registration has a catch-up scan, run it.
if err := r.maybeRunCatchupScan(); err != nil {
err = errors.Wrap(err, "catch-up scan failed")
log.Errorf(ctx, "%v", err)
return err
}

// Normal buffered output loop.
Expand Down Expand Up @@ -274,18 +272,22 @@ func (r *registration) runOutputLoop(ctx context.Context) {
r.disconnect(roachpb.NewError(err))
}

// runCatchupScan starts a catchup scan which will output entries for all
// maybeRunCatchupScan starts a catchup scan which will output entries for all
// recorded changes in the replica that are newer than the catchupTimestamp.
// This uses the iterator provided when the registration was originally created;
// after the scan completes, the iterator will be closed.
func (r *registration) runCatchupScan() error {
if r.catchupIter == nil {
//
// If the registration does not have a catchUpIteratorConstructor, this method
// is a no-op.
func (r *registration) maybeRunCatchupScan() error {
if r.catchupIterConstructor == nil {
return nil
}
catchupIter := r.catchupIterConstructor()
r.catchupIterConstructor = nil
start := timeutil.Now()
defer func() {
r.catchupIter.Close()
r.catchupIter = nil
catchupIter.Close()
r.metrics.RangeFeedCatchupScanNanos.Inc(timeutil.Since(start).Nanoseconds())
}()

Expand Down Expand Up @@ -323,16 +325,16 @@ func (r *registration) runCatchupScan() error {
// versions of each key that are after the registration's startTS, so we
// can't use NextKey.
var meta enginepb.MVCCMetadata
r.catchupIter.SeekGE(startKey)
catchupIter.SeekGE(startKey)
for {
if ok, err := r.catchupIter.Valid(); err != nil {
if ok, err := catchupIter.Valid(); err != nil {
return err
} else if !ok || !r.catchupIter.UnsafeKey().Less(endKey) {
} else if !ok || !catchupIter.UnsafeKey().Less(endKey) {
break
}

unsafeKey := r.catchupIter.UnsafeKey()
unsafeVal := r.catchupIter.UnsafeValue()
unsafeKey := catchupIter.UnsafeKey()
unsafeVal := catchupIter.UnsafeValue()
if !unsafeKey.IsValue() {
// Found a metadata key.
if err := protoutil.Unmarshal(unsafeVal, &meta); err != nil {
Expand All @@ -344,7 +346,7 @@ func (r *registration) runCatchupScan() error {
// past the corresponding provisional key-value. To do this,
// scan to the timestamp immediately before (i.e. the key
// immediately after) the provisional key.
r.catchupIter.SeekGE(storage.MVCCKey{
catchupIter.SeekGE(storage.MVCCKey{
Key: unsafeKey.Key,
Timestamp: hlc.Timestamp(meta.Timestamp).Prev(),
})
Expand Down Expand Up @@ -375,7 +377,7 @@ func (r *registration) runCatchupScan() error {
if ignore && !r.withDiff {
// Skip all the way to the next key.
// NB: fast-path to avoid value copy when !r.withDiff.
r.catchupIter.NextKey()
catchupIter.NextKey()
continue
}

Expand All @@ -388,10 +390,10 @@ func (r *registration) runCatchupScan() error {

if ignore {
// Skip all the way to the next key.
r.catchupIter.NextKey()
catchupIter.NextKey()
} else {
// Move to the next version of this key.
r.catchupIter.Next()
catchupIter.Next()

var event roachpb.RangeFeedEvent
event.MustSetValue(&roachpb.RangeFeedValue{
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/rangefeed/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func newTestRegistration(
registration: newRegistration(
span,
ts,
catchup,
makeIteratorConstructor(catchup),
withDiff,
5,
NewMetrics(),
Expand Down Expand Up @@ -253,7 +253,7 @@ func TestRegistrationCatchUpScan(t *testing.T) {
}, hlc.Timestamp{WallTime: 4}, iter, true /* withDiff */)

require.Zero(t, r.metrics.RangeFeedCatchupScanNanos.Count())
require.NoError(t, r.runCatchupScan())
require.NoError(t, r.maybeRunCatchupScan())
require.True(t, iter.closed)
require.NotZero(t, r.metrics.RangeFeedCatchupScanNanos.Count())

Expand Down
65 changes: 35 additions & 30 deletions pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,28 +194,32 @@ func (r *Replica) RangeFeed(
}

// Register the stream with a catch-up iterator.
var catchUpIter storage.SimpleIterator
var catchUpIterFunc rangefeed.IteratorConstructor
if usingCatchupIter {
innerIter := r.Engine().NewIterator(storage.IterOptions{
UpperBound: args.Span.EndKey,
// RangeFeed originally intended to use the time-bound iterator
// performance optimization. However, they've had correctness issues in
// the past (#28358, #34819) and no-one has the time for the due-diligence
// necessary to be confidant in their correctness going forward. Not using
// them causes the total time spent in RangeFeed catchup on changefeed
// over tpcc-1000 to go from 40s -> 4853s, which is quite large but still
// workable. See #35122 for details.
// MinTimestampHint: args.Timestamp,
})
catchUpIter = iteratorWithCloser{
SimpleIterator: innerIter,
close: iterSemRelease,
catchUpIterFunc = func() storage.SimpleIterator {

innerIter := r.Engine().NewIterator(storage.IterOptions{
UpperBound: args.Span.EndKey,
// RangeFeed originally intended to use the time-bound iterator
// performance optimization. However, they've had correctness issues in
// the past (#28358, #34819) and no-one has the time for the due-diligence
// necessary to be confidant in their correctness going forward. Not using
// them causes the total time spent in RangeFeed catchup on changefeed
// over tpcc-1000 to go from 40s -> 4853s, which is quite large but still
// workable. See #35122 for details.
// MinTimestampHint: args.Timestamp,
})
catchUpIter := iteratorWithCloser{
SimpleIterator: innerIter,
close: iterSemRelease,
}
// Responsibility for releasing the semaphore now passes to the iterator.
iterSemRelease = nil
return catchUpIter
}
// Responsibility for releasing the semaphore now passes to the iterator.
iterSemRelease = nil
}
p := r.registerWithRangefeedRaftMuLocked(
ctx, rSpan, args.Timestamp, catchUpIter, args.WithDiff, lockedStream, errC,
ctx, rSpan, args.Timestamp, catchUpIterFunc, args.WithDiff, lockedStream, errC,
)
r.raftMu.Unlock()

Expand Down Expand Up @@ -296,7 +300,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked(
ctx context.Context,
span roachpb.RSpan,
startTS hlc.Timestamp,
catchupIter storage.SimpleIterator,
catchupIter rangefeed.IteratorConstructor,
withDiff bool,
stream rangefeed.Stream,
errC chan<- *roachpb.Error,
Expand Down Expand Up @@ -341,16 +345,18 @@ func (r *Replica) registerWithRangefeedRaftMuLocked(
p = rangefeed.NewProcessor(cfg)

// Start it with an iterator to initialize the resolved timestamp.
rtsIter := r.Engine().NewIterator(storage.IterOptions{
UpperBound: desc.EndKey.AsRawKey(),
// TODO(nvanbenschoten): To facilitate fast restarts of rangefeed
// we should periodically persist the resolved timestamp so that we
// can initialize the rangefeed using an iterator that only needs to
// observe timestamps back to the last recorded resolved timestamp.
// This is safe because we know that there are no unresolved intents
// at times before a resolved timestamp.
// MinTimestampHint: r.ResolvedTimestamp,
})
rtsIter := func() storage.SimpleIterator {
return r.Engine().NewIterator(storage.IterOptions{
UpperBound: desc.EndKey.AsRawKey(),
// TODO(nvanbenschoten): To facilitate fast restarts of rangefeed
// we should periodically persist the resolved timestamp so that we
// can initialize the rangefeed using an iterator that only needs to
// observe timestamps back to the last recorded resolved timestamp.
// This is safe because we know that there are no unresolved intents
// at times before a resolved timestamp.
// MinTimestampHint: r.ResolvedTimestamp,
})
}
p.Start(r.store.Stopper(), rtsIter)

// Register with the processor *before* we attach its reference to the
Expand All @@ -360,7 +366,6 @@ func (r *Replica) registerWithRangefeedRaftMuLocked(
// server shutdown.
reg, filter := p.Register(span, startTS, catchupIter, withDiff, stream, errC)
if !reg {
catchupIter.Close() // clean up
select {
case <-r.store.Stopper().ShouldQuiesce():
errC <- roachpb.NewError(&roachpb.NodeUnavailableError{})
Expand Down

0 comments on commit 571f4be

Please sign in to comment.