Skip to content

Commit

Permalink
kv/kvserver/rangefeed: fix catchupIter construction synchronization
Browse files Browse the repository at this point in the history
The catchupIter could have been constructed after the state of the underlying
store has changed. In general this doesn't seem like a disaster unless the
range has been removed or the gc threshold has changed and data is gone.

So, maybe it is a disaster.

Release justification: fixes for high-priority or high-severity bugs in
existing functionality

Release note: None
  • Loading branch information
ajwerner committed Aug 31, 2021
1 parent e946415 commit 2e70853
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 20 deletions.
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,11 @@ func (p *Processor) run(
log.Fatalf(ctx, "registration %s not in Processor's key range %v", r, p.Span)
}

// Construct the catchupIter before notifying the registration that it
// has been registered. Note that if the catchUpScan is never run, then
// the iterator constructed here will be closed in disconnect.
r.maybeConstructCatchUpIter()

// Add the new registration to the registry.
p.reg.Register(&r)

Expand Down
54 changes: 47 additions & 7 deletions pkg/kv/kvserver/rangefeed/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,19 @@ type Stream interface {
// has finished.
type registration struct {
// Input.
span roachpb.Span
catchupTimestamp hlc.Timestamp
span roachpb.Span
catchupTimestamp hlc.Timestamp
withDiff bool
metrics *Metrics

// catchupIterConstructor is used to construct the catchupIter if necessary.
// The reason this constructor is plumbed down is to make sure that the
// iterator does not get constructed too late in server shutdown. However,
// it must also be stored in the struct to ensure that it is not constructed
// too late, after the raftMu has been dropped. Thus, this function, if
// non-nil, will be used to populate mu.catchupIter while the registration
// is being registered by the processor.
catchupIterConstructor func() storage.SimpleMVCCIterator
withDiff bool
metrics *Metrics

// Output.
stream Stream
Expand All @@ -80,6 +88,11 @@ type registration struct {
// Management of the output loop goroutine, used to ensure proper teardown.
outputLoopCancelFn func()
disconnected bool

// catchupIter is populated on the Processor's goroutine while the
// Replica.raftMu is still held. If it is non-nil at the time that
// disconnect is called, it is closed by disconnect.
catchupIter storage.SimpleMVCCIterator
}
}

Expand Down Expand Up @@ -210,6 +223,10 @@ func (r *registration) disconnect(pErr *roachpb.Error) {
r.mu.Lock()
defer r.mu.Unlock()
if !r.mu.disconnected {
if r.mu.catchupIter != nil {
r.mu.catchupIter.Close()
r.mu.catchupIter = nil
}
if r.mu.outputLoopCancelFn != nil {
r.mu.outputLoopCancelFn()
}
Expand Down Expand Up @@ -285,11 +302,10 @@ func (r *registration) runOutputLoop(ctx context.Context, _forStacks roachpb.Ran
// If the registration does not have a catchUpIteratorConstructor, this method
// is a no-op.
func (r *registration) maybeRunCatchupScan() error {
if r.catchupIterConstructor == nil {
catchupIter := r.detachCatchUpIter()
if catchupIter == nil {
return nil
}
catchupIter := r.catchupIterConstructor()
r.catchupIterConstructor = nil
start := timeutil.Now()
defer func() {
catchupIter.Close()
Expand Down Expand Up @@ -587,6 +603,30 @@ func (r *registration) waitForCaughtUp() error {
return errors.Errorf("registration %v failed to empty in time", r.Range())
}

// maybeConstructCatchUpIter calls the catchUpIterConstructor and attaches
// the catchUpIter to be detached in the catchUpScan or closed on disconnect.
func (r *registration) maybeConstructCatchUpIter() {
if r.catchupIterConstructor == nil {
return
}

ci := r.catchupIterConstructor()
r.catchupIterConstructor = nil

r.mu.Lock()
defer r.mu.Unlock()
r.mu.catchupIter = ci
}

// detachCatchUpIter detaches the catchupIter that was previously attached.
func (r *registration) detachCatchUpIter() storage.SimpleMVCCIterator {
r.mu.Lock()
defer r.mu.Unlock()
catchupIter := r.mu.catchupIter
r.mu.catchupIter = nil
return catchupIter
}

// waitForCaughtUp waits for all registrations overlapping the given span to
// completely process their internal buffers.
func (reg *registry) waitForCaughtUp(span roachpb.Span) error {
Expand Down
26 changes: 14 additions & 12 deletions pkg/kv/kvserver/rangefeed/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,19 +100,21 @@ func newTestRegistration(
) *testRegistration {
s := newTestStream()
errC := make(chan *roachpb.Error, 1)
r := newRegistration(
span,
ts,
makeIteratorConstructor(catchup),
withDiff,
5,
NewMetrics(),
s,
errC,
)
r.maybeConstructCatchUpIter()
return &testRegistration{
registration: newRegistration(
span,
ts,
makeIteratorConstructor(catchup),
withDiff,
5,
NewMetrics(),
s,
errC,
),
stream: s,
errC: errC,
registration: r,
stream: s,
errC: errC,
}
}

Expand Down
9 changes: 8 additions & 1 deletion pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,9 @@ func (r *Replica) rangeFeedWithRangeID(
var catchUpIterFunc rangefeed.IteratorConstructor
if usingCatchupIter {
catchUpIterFunc = func() storage.SimpleMVCCIterator {

// Assert that we still hold the raftMu when this is called to ensure
// that the catchUpIter reads from the current snapshot.
r.raftMu.AssertHeld()
innerIter := r.Engine().NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{
UpperBound: args.Span.EndKey,
// RangeFeed originally intended to use the time-bound iterator
Expand Down Expand Up @@ -373,6 +375,11 @@ func (r *Replica) registerWithRangefeedRaftMuLocked(

// Start it with an iterator to initialize the resolved timestamp.
rtsIter := func() storage.SimpleMVCCIterator {
// Assert that we still hold the raftMu when this is called to ensure
// that the rtsIter reads from the current snapshot. The replica
// synchronizes with the rangefeed Processor calling this function by
// waiting for the Register call below to return.
r.raftMu.AssertHeld()
return r.Engine().NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{
UpperBound: desc.EndKey.AsRawKey(),
// TODO(nvanbenschoten): To facilitate fast restarts of rangefeed
Expand Down

0 comments on commit 2e70853

Please sign in to comment.