Skip to content

Commit

Permalink
rangefeed: create catchup iterators eagerly
Browse files Browse the repository at this point in the history
Previously, catchup iterators were created in the main rangefeed
processor work loop. This is negatively affecting scheduler based
processors as this operation could be slow.
This commit makes iterator creation eager, simplifying error handling
and making rangefeed times delays lower.

Epic: none

Release note: None
  • Loading branch information
aliher1911 committed Sep 22, 2023
1 parent f3497de commit 54b8a73
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 97 deletions.
25 changes: 7 additions & 18 deletions pkg/kv/kvserver/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,10 @@ type Processor interface {
// provided an error when the registration closes.
//
// The optionally provided "catch-up" iterator is used to read changes from the
// engine which occurred after the provided start timestamp (exclusive).
// engine which occurred after the provided start timestamp (exclusive). If
// this method succeeds, registration must take ownership of iterator and
// subsequently close it. If method fails, iterator must be kept intact and
// would be closed by caller.
//
// If the method returns false, the processor will have been stopped, so calling
// Stop is not necessary. If the method returns true, it will also return an
Expand All @@ -168,7 +171,7 @@ type Processor interface {
Register(
span roachpb.RSpan,
startTS hlc.Timestamp, // exclusive
catchUpIterConstructor CatchUpIteratorConstructor,
catchUpIter *CatchUpIterator,
withDiff bool,
stream Stream,
disconnectFn func(),
Expand Down Expand Up @@ -326,12 +329,6 @@ func NewLegacyProcessor(cfg Config) *LegacyProcessor {
// engine has not been closed.
type IntentScannerConstructor func() IntentScanner

// CatchUpIteratorConstructor is used to construct an iterator that can be used
// for catchup-scans. Takes the key span and exclusive start time to run the
// catchup scan for. It should be called from underneath a stopper task to
// ensure that the engine has not been closed.
type CatchUpIteratorConstructor func(roachpb.Span, hlc.Timestamp) (*CatchUpIterator, error)

// Start implements Processor interface.
//
// LegacyProcessor launches a goroutine to process rangefeed events and send
Expand Down Expand Up @@ -404,14 +401,6 @@ func (p *LegacyProcessor) run(
log.Fatalf(ctx, "registration %s not in Processor's key range %v", r, p.Span)
}

// Construct the catchUpIter before notifying the registration that it
// has been registered. Note that if the catchUpScan is never run, then
// the iterator constructed here will be closed in disconnect.
if err := r.maybeConstructCatchUpIter(); err != nil {
r.disconnect(kvpb.NewError(err))
return
}

// Add the new registration to the registry.
p.reg.Register(&r)

Expand Down Expand Up @@ -559,7 +548,7 @@ func (p *LegacyProcessor) sendStop(pErr *kvpb.Error) {
func (p *LegacyProcessor) Register(
span roachpb.RSpan,
startTS hlc.Timestamp,
catchUpIterConstructor CatchUpIteratorConstructor,
catchUpIter *CatchUpIterator,
withDiff bool,
stream Stream,
disconnectFn func(),
Expand All @@ -572,7 +561,7 @@ func (p *LegacyProcessor) Register(

blockWhenFull := p.Config.EventChanTimeout == 0 // for testing
r := newRegistration(
span.AsRawSpanWithNoLocals(), startTS, catchUpIterConstructor, withDiff,
span.AsRawSpanWithNoLocals(), startTS, catchUpIter, withDiff,
p.Config.EventChanCap, blockWhenFull, p.Metrics, stream, disconnectFn, done,
)
select {
Expand Down
57 changes: 15 additions & 42 deletions pkg/kv/kvserver/rangefeed/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,6 @@ type registration struct {
withDiff bool
metrics *Metrics

// catchUpIterConstructor is used to construct the catchUpIter if necessary.
// The reason this constructor is plumbed down is to make sure that the
// iterator does not get constructed too late in server shutdown. However,
// it must also be stored in the struct to ensure that it is not constructed
// too late, after the raftMu has been dropped. Thus, this function, if
// non-nil, will be used to populate mu.catchUpIter while the registration
// is being registered by the processor.
catchUpIterConstructor CatchUpIteratorConstructor

// Output.
stream Stream
done *future.ErrorFuture
Expand All @@ -113,17 +104,18 @@ type registration struct {
outputLoopCancelFn func()
disconnected bool

// catchUpIter is populated on the Processor's goroutine while the
// Replica.raftMu is still held. If it is non-nil at the time that
// disconnect is called, it is closed by disconnect.
// catchUpIter is created by replcia under raftMu lock when registration is
// created. It is detached by output loop for processing and closed.
// If output loop was not started and catchUpIter is non-nil at the time
// that disconnect is called, it is closed by disconnect.
catchUpIter *CatchUpIterator
}
}

func newRegistration(
span roachpb.Span,
startTS hlc.Timestamp,
catchUpIterConstructor CatchUpIteratorConstructor,
catchUpIter *CatchUpIterator,
withDiff bool,
bufferSz int,
blockWhenFull bool,
Expand All @@ -133,19 +125,19 @@ func newRegistration(
done *future.ErrorFuture,
) registration {
r := registration{
span: span,
catchUpTimestamp: startTS,
catchUpIterConstructor: catchUpIterConstructor,
withDiff: withDiff,
metrics: metrics,
stream: stream,
done: done,
unreg: unregisterFn,
buf: make(chan *sharedEvent, bufferSz),
blockWhenFull: blockWhenFull,
span: span,
catchUpTimestamp: startTS,
withDiff: withDiff,
metrics: metrics,
stream: stream,
done: done,
unreg: unregisterFn,
buf: make(chan *sharedEvent, bufferSz),
blockWhenFull: blockWhenFull,
}
r.mu.Locker = &syncutil.Mutex{}
r.mu.caughtUp = true
r.mu.catchUpIter = catchUpIter
return r
}

Expand Down Expand Up @@ -587,25 +579,6 @@ func (r *registration) waitForCaughtUp() error {
return errors.Errorf("registration %v failed to empty in time", r.Range())
}

// maybeConstructCatchUpIter calls the catchUpIterConstructor and attaches
// the catchUpIter to be detached in the catchUpScan or closed on disconnect.
func (r *registration) maybeConstructCatchUpIter() error {
if r.catchUpIterConstructor == nil {
return nil
}

catchUpIter, err := r.catchUpIterConstructor(r.span, r.catchUpTimestamp)
if err != nil {
return err
}
r.catchUpIterConstructor = nil

r.mu.Lock()
defer r.mu.Unlock()
r.mu.catchUpIter = catchUpIter
return nil
}

// detachCatchUpIter detaches the catchUpIter that was previously attached.
func (r *registration) detachCatchUpIter() *CatchUpIterator {
r.mu.Lock()
Expand Down
19 changes: 8 additions & 11 deletions pkg/kv/kvserver/rangefeed/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,16 @@ type testRegistration struct {
stream *testStream
}

func makeCatchUpIteratorConstructor(iter storage.SimpleMVCCIterator) CatchUpIteratorConstructor {
func makeCatchUpIterator(
iter storage.SimpleMVCCIterator, span roachpb.Span, startTime hlc.Timestamp,
) *CatchUpIterator {
if iter == nil {
return nil
}
return func(span roachpb.Span, startTime hlc.Timestamp) (*CatchUpIterator, error) {
return &CatchUpIterator{
simpleCatchupIter: simpleCatchupIterAdapter{iter},
span: span,
startTime: startTime,
}, nil
return &CatchUpIterator{
simpleCatchupIter: simpleCatchupIterAdapter{iter},
span: span,
startTime: startTime,
}
}

Expand All @@ -120,7 +120,7 @@ func newTestRegistration(
r := newRegistration(
span,
ts,
makeCatchUpIteratorConstructor(catchup),
makeCatchUpIterator(catchup, span, ts),
withDiff,
5,
false, /* blockWhenFull */
Expand All @@ -129,9 +129,6 @@ func newTestRegistration(
func() {},
&future.ErrorFuture{},
)
if err := r.maybeConstructCatchUpIter(); err != nil {
panic(err)
}
return &testRegistration{
registration: r,
stream: s,
Expand Down
12 changes: 2 additions & 10 deletions pkg/kv/kvserver/rangefeed/scheduled_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func (p *ScheduledProcessor) sendStop(pErr *kvpb.Error) {
func (p *ScheduledProcessor) Register(
span roachpb.RSpan,
startTS hlc.Timestamp,
catchUpIterConstructor CatchUpIteratorConstructor,
catchUpIter *CatchUpIterator,
withDiff bool,
stream Stream,
disconnectFn func(),
Expand All @@ -307,7 +307,7 @@ func (p *ScheduledProcessor) Register(

blockWhenFull := p.Config.EventChanTimeout == 0 // for testing
r := newRegistration(
span.AsRawSpanWithNoLocals(), startTS, catchUpIterConstructor, withDiff,
span.AsRawSpanWithNoLocals(), startTS, catchUpIter, withDiff,
p.Config.EventChanCap, blockWhenFull, p.Metrics, stream, disconnectFn, done,
)

Expand All @@ -319,14 +319,6 @@ func (p *ScheduledProcessor) Register(
log.Fatalf(ctx, "registration %s not in Processor's key range %v", r, p.Span)
}

// Construct the catchUpIter before notifying the registration that it
// has been registered. Note that if the catchUpScan is never run, then
// the iterator constructed here will be closed in disconnect.
if err := r.maybeConstructCatchUpIter(); err != nil {
r.disconnect(kvpb.NewError(err))
return nil
}

// Add the new registration to the registry.
p.reg.Register(&r)

Expand Down
45 changes: 29 additions & 16 deletions pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,25 +271,22 @@ func (r *Replica) RangeFeed(
}

// Register the stream with a catch-up iterator.
var catchUpIterFunc rangefeed.CatchUpIteratorConstructor
var catchUpIter *rangefeed.CatchUpIterator
if usingCatchUpIter {
catchUpIterFunc = func(span roachpb.Span, startTime hlc.Timestamp) (*rangefeed.CatchUpIterator, error) {
// Assert that we still hold the raftMu when this is called to ensure
// that the catchUpIter reads from the current snapshot.
r.raftMu.AssertHeld()
i, err := rangefeed.NewCatchUpIterator(r.store.TODOEngine(), span, startTime, iterSemRelease, pacer)
if err != nil {
return nil, err
}
if f := r.store.TestingKnobs().RangefeedValueHeaderFilter; f != nil {
i.OnEmit = f
}
return i, nil
catchUpIter, err = rangefeed.NewCatchUpIterator(r.store.TODOEngine(), rSpan.AsRawSpanWithNoLocals(),
args.Timestamp, iterSemRelease, pacer)
if err != nil {
r.raftMu.Unlock()
iterSemRelease()
return future.MakeCompletedErrorFuture(err)
}
if f := r.store.TestingKnobs().RangefeedValueHeaderFilter; f != nil {
catchUpIter.OnEmit = f
}
}
var done future.ErrorFuture
p := r.registerWithRangefeedRaftMuLocked(
ctx, rSpan, args.Timestamp, catchUpIterFunc, args.WithDiff, lockedStream, &done,
ctx, rSpan, args.Timestamp, catchUpIter, args.WithDiff, lockedStream, &done,
)
r.raftMu.Unlock()

Expand Down Expand Up @@ -373,18 +370,32 @@ func logSlowRangefeedRegistration(ctx context.Context) func() {
// registerWithRangefeedRaftMuLocked sets up a Rangefeed registration over the
// provided span. It initializes a rangefeed for the Replica if one is not
// already running. Requires raftMu be locked.
// Returns Future[*roachpb.Error] which will return an error once rangefeed completes.
// Returns Future[*roachpb.Error] which will return an error once rangefeed
// completes.
// Note that caller delegates lifecycle of catchUpIter to this method in both
// success and failure cases. So it is important that this method closes
// iterator in case registration fails. Successful registration takes iterator
// ownership and ensures it is closed when catch up is complete or aborted.
func (r *Replica) registerWithRangefeedRaftMuLocked(
ctx context.Context,
span roachpb.RSpan,
startTS hlc.Timestamp, // exclusive
catchUpIter rangefeed.CatchUpIteratorConstructor,
catchUpIter *rangefeed.CatchUpIterator,
withDiff bool,
stream rangefeed.Stream,
done *future.ErrorFuture,
) rangefeed.Processor {
defer logSlowRangefeedRegistration(ctx)()

// Always defer closing iterator to cover old and new failure cases.
// On successful path where registration succeeds reset catchUpIter to prevent
// closing it.
defer func() {
if catchUpIter != nil {
catchUpIter.Close()
}
}()

// Attempt to register with an existing Rangefeed processor, if one exists.
// The locking here is a little tricky because we need to handle the case
// of concurrent processor shutdowns (see maybeDisconnectEmptyRangefeed).
Expand All @@ -399,6 +410,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked(
// that this new registration might be interested in.
r.setRangefeedFilterLocked(filter)
r.rangefeedMu.Unlock()
catchUpIter = nil
return p
}
// If the registration failed, the processor was already being shut
Expand Down Expand Up @@ -477,6 +489,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked(
panic("unexpected Stopped processor")
}
}
catchUpIter = nil

// Set the rangefeed processor and filter reference.
r.setRangefeedProcessor(p)
Expand Down

0 comments on commit 54b8a73

Please sign in to comment.