Skip to content

Commit

Permalink
rangefeed: create catchup iterators eagerly
Browse files Browse the repository at this point in the history
Previously, catchup iterators were created in the main rangefeed
processor work loop. This is negatively affecting scheduler based
processors as this operation could be slow.
This commit makes iterator creation eager, simplifying error handling
and making rangefeed times delays lower.

Epic: none

Release note: None
  • Loading branch information
aliher1911 committed Sep 22, 2023
1 parent f3497de commit cf35678
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 83 deletions.
20 changes: 3 additions & 17 deletions pkg/kv/kvserver/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ type Processor interface {
Register(
span roachpb.RSpan,
startTS hlc.Timestamp, // exclusive
catchUpIterConstructor CatchUpIteratorConstructor,
catchUpIter *CatchUpIterator,
withDiff bool,
stream Stream,
disconnectFn func(),
Expand Down Expand Up @@ -326,12 +326,6 @@ func NewLegacyProcessor(cfg Config) *LegacyProcessor {
// engine has not been closed.
type IntentScannerConstructor func() IntentScanner

// CatchUpIteratorConstructor is used to construct an iterator that can be used
// for catchup-scans. Takes the key span and exclusive start time to run the
// catchup scan for. It should be called from underneath a stopper task to
// ensure that the engine has not been closed.
type CatchUpIteratorConstructor func(roachpb.Span, hlc.Timestamp) (*CatchUpIterator, error)

// Start implements Processor interface.
//
// LegacyProcessor launches a goroutine to process rangefeed events and send
Expand Down Expand Up @@ -404,14 +398,6 @@ func (p *LegacyProcessor) run(
log.Fatalf(ctx, "registration %s not in Processor's key range %v", r, p.Span)
}

// Construct the catchUpIter before notifying the registration that it
// has been registered. Note that if the catchUpScan is never run, then
// the iterator constructed here will be closed in disconnect.
if err := r.maybeConstructCatchUpIter(); err != nil {
r.disconnect(kvpb.NewError(err))
return
}

// Add the new registration to the registry.
p.reg.Register(&r)

Expand Down Expand Up @@ -559,7 +545,7 @@ func (p *LegacyProcessor) sendStop(pErr *kvpb.Error) {
func (p *LegacyProcessor) Register(
span roachpb.RSpan,
startTS hlc.Timestamp,
catchUpIterConstructor CatchUpIteratorConstructor,
catchUpIter *CatchUpIterator,
withDiff bool,
stream Stream,
disconnectFn func(),
Expand All @@ -572,7 +558,7 @@ func (p *LegacyProcessor) Register(

blockWhenFull := p.Config.EventChanTimeout == 0 // for testing
r := newRegistration(
span.AsRawSpanWithNoLocals(), startTS, catchUpIterConstructor, withDiff,
span.AsRawSpanWithNoLocals(), startTS, catchUpIter, withDiff,
p.Config.EventChanCap, blockWhenFull, p.Metrics, stream, disconnectFn, done,
)
select {
Expand Down
32 changes: 2 additions & 30 deletions pkg/kv/kvserver/rangefeed/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,6 @@ type registration struct {
withDiff bool
metrics *Metrics

// catchUpIterConstructor is used to construct the catchUpIter if necessary.
// The reason this constructor is plumbed down is to make sure that the
// iterator does not get constructed too late in server shutdown. However,
// it must also be stored in the struct to ensure that it is not constructed
// too late, after the raftMu has been dropped. Thus, this function, if
// non-nil, will be used to populate mu.catchUpIter while the registration
// is being registered by the processor.
catchUpIterConstructor CatchUpIteratorConstructor

// Output.
stream Stream
done *future.ErrorFuture
Expand Down Expand Up @@ -123,7 +114,7 @@ type registration struct {
func newRegistration(
span roachpb.Span,
startTS hlc.Timestamp,
catchUpIterConstructor CatchUpIteratorConstructor,
catchUpIter *CatchUpIterator,
withDiff bool,
bufferSz int,
blockWhenFull bool,
Expand All @@ -135,7 +126,6 @@ func newRegistration(
r := registration{
span: span,
catchUpTimestamp: startTS,
catchUpIterConstructor: catchUpIterConstructor,
withDiff: withDiff,
metrics: metrics,
stream: stream,
Expand All @@ -146,6 +136,7 @@ func newRegistration(
}
r.mu.Locker = &syncutil.Mutex{}
r.mu.caughtUp = true
r.mu.catchUpIter = catchUpIter
return r
}

Expand Down Expand Up @@ -587,25 +578,6 @@ func (r *registration) waitForCaughtUp() error {
return errors.Errorf("registration %v failed to empty in time", r.Range())
}

// maybeConstructCatchUpIter calls the catchUpIterConstructor and attaches
// the catchUpIter to be detached in the catchUpScan or closed on disconnect.
func (r *registration) maybeConstructCatchUpIter() error {
if r.catchUpIterConstructor == nil {
return nil
}

catchUpIter, err := r.catchUpIterConstructor(r.span, r.catchUpTimestamp)
if err != nil {
return err
}
r.catchUpIterConstructor = nil

r.mu.Lock()
defer r.mu.Unlock()
r.mu.catchUpIter = catchUpIter
return nil
}

// detachCatchUpIter detaches the catchUpIter that was previously attached.
func (r *registration) detachCatchUpIter() *CatchUpIterator {
r.mu.Lock()
Expand Down
19 changes: 8 additions & 11 deletions pkg/kv/kvserver/rangefeed/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,16 @@ type testRegistration struct {
stream *testStream
}

func makeCatchUpIteratorConstructor(iter storage.SimpleMVCCIterator) CatchUpIteratorConstructor {
func makeCatchUpIteratorConstructor(
iter storage.SimpleMVCCIterator, span roachpb.Span, startTime hlc.Timestamp,
) *CatchUpIterator {
if iter == nil {
return nil
}
return func(span roachpb.Span, startTime hlc.Timestamp) (*CatchUpIterator, error) {
return &CatchUpIterator{
simpleCatchupIter: simpleCatchupIterAdapter{iter},
span: span,
startTime: startTime,
}, nil
return &CatchUpIterator{
simpleCatchupIter: simpleCatchupIterAdapter{iter},
span: span,
startTime: startTime,
}
}

Expand All @@ -120,7 +120,7 @@ func newTestRegistration(
r := newRegistration(
span,
ts,
makeCatchUpIteratorConstructor(catchup),
makeCatchUpIteratorConstructor(catchup, span, ts),
withDiff,
5,
false, /* blockWhenFull */
Expand All @@ -129,9 +129,6 @@ func newTestRegistration(
func() {},
&future.ErrorFuture{},
)
if err := r.maybeConstructCatchUpIter(); err != nil {
panic(err)
}
return &testRegistration{
registration: r,
stream: s,
Expand Down
12 changes: 2 additions & 10 deletions pkg/kv/kvserver/rangefeed/scheduled_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func (p *ScheduledProcessor) sendStop(pErr *kvpb.Error) {
func (p *ScheduledProcessor) Register(
span roachpb.RSpan,
startTS hlc.Timestamp,
catchUpIterConstructor CatchUpIteratorConstructor,
catchUpIter *CatchUpIterator,
withDiff bool,
stream Stream,
disconnectFn func(),
Expand All @@ -307,7 +307,7 @@ func (p *ScheduledProcessor) Register(

blockWhenFull := p.Config.EventChanTimeout == 0 // for testing
r := newRegistration(
span.AsRawSpanWithNoLocals(), startTS, catchUpIterConstructor, withDiff,
span.AsRawSpanWithNoLocals(), startTS, catchUpIter, withDiff,
p.Config.EventChanCap, blockWhenFull, p.Metrics, stream, disconnectFn, done,
)

Expand All @@ -319,14 +319,6 @@ func (p *ScheduledProcessor) Register(
log.Fatalf(ctx, "registration %s not in Processor's key range %v", r, p.Span)
}

// Construct the catchUpIter before notifying the registration that it
// has been registered. Note that if the catchUpScan is never run, then
// the iterator constructed here will be closed in disconnect.
if err := r.maybeConstructCatchUpIter(); err != nil {
r.disconnect(kvpb.NewError(err))
return nil
}

// Add the new registration to the registry.
p.reg.Register(&r)

Expand Down
35 changes: 20 additions & 15 deletions pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,25 +271,22 @@ func (r *Replica) RangeFeed(
}

// Register the stream with a catch-up iterator.
var catchUpIterFunc rangefeed.CatchUpIteratorConstructor
var catchUpIter *rangefeed.CatchUpIterator
if usingCatchUpIter {
catchUpIterFunc = func(span roachpb.Span, startTime hlc.Timestamp) (*rangefeed.CatchUpIterator, error) {
// Assert that we still hold the raftMu when this is called to ensure
// that the catchUpIter reads from the current snapshot.
r.raftMu.AssertHeld()
i, err := rangefeed.NewCatchUpIterator(r.store.TODOEngine(), span, startTime, iterSemRelease, pacer)
if err != nil {
return nil, err
}
if f := r.store.TestingKnobs().RangefeedValueHeaderFilter; f != nil {
i.OnEmit = f
}
return i, nil
catchUpIter, err = rangefeed.NewCatchUpIterator(r.store.TODOEngine(), rSpan.AsRawSpanWithNoLocals(),
args.Timestamp, iterSemRelease, pacer)
if err != nil {
r.raftMu.Unlock()
iterSemRelease()
return future.MakeCompletedErrorFuture(err)
}
if f := r.store.TestingKnobs().RangefeedValueHeaderFilter; f != nil {
catchUpIter.OnEmit = f
}
}
var done future.ErrorFuture
p := r.registerWithRangefeedRaftMuLocked(
ctx, rSpan, args.Timestamp, catchUpIterFunc, args.WithDiff, lockedStream, &done,
ctx, rSpan, args.Timestamp, catchUpIter, args.WithDiff, lockedStream, &done,
)
r.raftMu.Unlock()

Expand Down Expand Up @@ -378,13 +375,19 @@ func (r *Replica) registerWithRangefeedRaftMuLocked(
ctx context.Context,
span roachpb.RSpan,
startTS hlc.Timestamp, // exclusive
catchUpIter rangefeed.CatchUpIteratorConstructor,
catchUpIter *rangefeed.CatchUpIterator,
withDiff bool,
stream rangefeed.Stream,
done *future.ErrorFuture,
) rangefeed.Processor {
defer logSlowRangefeedRegistration(ctx)()

cleanupCatchUpIter := func() {
if catchUpIter != nil {
catchUpIter.Close()
}
}

// Attempt to register with an existing Rangefeed processor, if one exists.
// The locking here is a little tricky because we need to handle the case
// of concurrent processor shutdowns (see maybeDisconnectEmptyRangefeed).
Expand Down Expand Up @@ -459,6 +462,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked(
// below will fall through to the panic.
if err := p.Start(r.store.Stopper(), rtsIter); err != nil {
done.Set(err)
cleanupCatchUpIter()
return nil
}

Expand All @@ -469,6 +473,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked(
// server shutdown.
reg, filter := p.Register(span, startTS, catchUpIter, withDiff, stream, func() { r.maybeDisconnectEmptyRangefeed(p) }, done)
if !reg {
cleanupCatchUpIter()
select {
case <-r.store.Stopper().ShouldQuiesce():
done.Set(&kvpb.NodeUnavailableError{})
Expand Down

0 comments on commit cf35678

Please sign in to comment.