Skip to content

Commit

Permalink
kv/kvserver/rangefeed: fix catchupIter construction synchronization
Browse files Browse the repository at this point in the history
The catchupIter could have been constructed after the state of the underlying
store has changed. In general this doesn't seem like a disaster unless the
range has been removed or the gc threshold has changed and data is gone.

So, maybe it is a disaster.

Release justification: fixes for high-priority or high-severity bugs in
existing functionality

Release note: None
  • Loading branch information
ajwerner committed Aug 30, 2021
1 parent e946415 commit 72ef548
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 8 deletions.
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,13 @@ func (p *Processor) run(
log.Fatalf(ctx, "registration %s not in Processor's key range %v", r, p.Span)
}

// Construct the catchupIter before notifying the registration that it
// has been registered.
if r.catchupIterConstructor != nil {
r.catchupIter = r.catchupIterConstructor()
r.catchupIterConstructor = nil
}

// Add the new registration to the registry.
p.reg.Register(&r)

Expand Down
24 changes: 17 additions & 7 deletions pkg/kv/kvserver/rangefeed/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,22 @@ type Stream interface {
// has finished.
type registration struct {
// Input.
span roachpb.Span
catchupTimestamp hlc.Timestamp
span roachpb.Span
catchupTimestamp hlc.Timestamp
withDiff bool
metrics *Metrics

// catchupIterConstructor is used to construct the catchupIter if necessary.
// The reason this constructor is plumbed down is to make sure that the
// iterator does not get constructed too late in server shutdown. However,
// it must also be stored in the struct to ensure that it is not constructed
// too late, after the raftMu has been dropped. Thus, this function, if
// non-nil, will be used to populated catchupIter while the registration
// is being registered by the processor.
catchupIterConstructor func() storage.SimpleMVCCIterator
withDiff bool
metrics *Metrics
// catchupIter is populated on the Processor's goroutine while the
// Replica.raftMu is still held.
catchupIter storage.SimpleMVCCIterator

// Output.
stream Stream
Expand Down Expand Up @@ -285,11 +296,10 @@ func (r *registration) runOutputLoop(ctx context.Context, _forStacks roachpb.Ran
// If the registration does not have a catchUpIteratorConstructor, this method
// is a no-op.
func (r *registration) maybeRunCatchupScan() error {
if r.catchupIterConstructor == nil {
if r.catchupIter == nil {
return nil
}
catchupIter := r.catchupIterConstructor()
r.catchupIterConstructor = nil
catchupIter := r.catchupIter
start := timeutil.Now()
defer func() {
catchupIter.Close()
Expand Down
9 changes: 8 additions & 1 deletion pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,9 @@ func (r *Replica) rangeFeedWithRangeID(
var catchUpIterFunc rangefeed.IteratorConstructor
if usingCatchupIter {
catchUpIterFunc = func() storage.SimpleMVCCIterator {

// Assert that we still hold the raftMu when this is called to ensure
// that the catchUpIter reads from the current snapshot.
r.raftMu.AssertHeld()
innerIter := r.Engine().NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{
UpperBound: args.Span.EndKey,
// RangeFeed originally intended to use the time-bound iterator
Expand Down Expand Up @@ -373,6 +375,11 @@ func (r *Replica) registerWithRangefeedRaftMuLocked(

// Start it with an iterator to initialize the resolved timestamp.
rtsIter := func() storage.SimpleMVCCIterator {
// Assert that we still hold the raftMu when this is called to ensure
// that the catchUpIter reads from the current snapshot. The replica
// synchronizes with the rangefeed Processor calling this function by
// waiting for the Register call below to return.
r.raftMu.AssertHeld()
return r.Engine().NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{
UpperBound: desc.EndKey.AsRawKey(),
// TODO(nvanbenschoten): To facilitate fast restarts of rangefeed
Expand Down

0 comments on commit 72ef548

Please sign in to comment.