Skip to content

Commit

Permalink
rangefeed: create catchup iterators eagerly
Browse files Browse the repository at this point in the history
Previously, catchup iterators were created in the main rangefeed
processor work loop. This is negatively affecting scheduler based
processors as this operation could be slow.
This commit makes iterator creation eager, simplifying error handling
and making rangefeed times delays lower.

Epic: none

Release note: None
  • Loading branch information
aliher1911 committed Sep 22, 2023
1 parent f3497de commit 3257904
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 60 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/rangefeed/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func runBenchmarkRangefeed(b *testing.B, opts benchmarkRangefeedOpts) {
const withDiff = false
streams[i] = &noopStream{ctx: ctx}
futures[i] = &future.ErrorFuture{}
ok, _ := p.Register(span, hlc.MinTimestamp, nil, withDiff, streams[i], nil, futures[i])
ok, _ := p.Register(span, hlc.MinTimestamp, nilCatchUpIter, withDiff, streams[i], nil, futures[i])
require.True(b, ok)
}

Expand Down
47 changes: 40 additions & 7 deletions pkg/kv/kvserver/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
Expand Down Expand Up @@ -330,7 +331,41 @@ type IntentScannerConstructor func() IntentScanner
// for catchup-scans. Takes the key span and exclusive start time to run the
// catchup scan for. It should be called from underneath a stopper task to
// ensure that the engine has not been closed.
type CatchUpIteratorConstructor func(roachpb.Span, hlc.Timestamp) (*CatchUpIterator, error)
type CatchUpIteratorConstructor func() *CatchUpIterator

// CatchUpIteratorContainer manages catchup iterator lifecycle during
// rangefeed registration creation. Iterator is created by replica under the
// lock to get a consistent state. Container is then passed to registration.
// If registration is created successfully, iterator ownership is moved to
// registration. If registration creation fails on any stage before that,
// container could be closed to release iterator. Since ownership is moved
// out of container, it is safe to close it regardless of startup success or
// failure.
type CatchUpIteratorContainer struct {
iter atomic.Pointer[CatchUpIterator]
}

// NewCatchUpIteratorContainer constructs new container. It is safe to pass nil
// iterator.
func NewCatchUpIteratorContainer(iter *CatchUpIterator) *CatchUpIteratorContainer {
c := &CatchUpIteratorContainer{}
c.iter.Store(iter)
return c
}

// Detach moves iterator out of container. Calling Close on container won't close
// the iterator after that. Safe to call on empty container.
func (c *CatchUpIteratorContainer) Detach() (iter *CatchUpIterator) {
return c.iter.Swap(nil)
}

// Close closes underlying iterator if it was contained and was not moved out
// by Get.
func (c *CatchUpIteratorContainer) Close() {
if iter := c.Detach(); iter != nil {
iter.Close()
}
}

// Start implements Processor interface.
//
Expand Down Expand Up @@ -405,12 +440,10 @@ func (p *LegacyProcessor) run(
}

// Construct the catchUpIter before notifying the registration that it
// has been registered. Note that if the catchUpScan is never run, then
// the iterator constructed here will be closed in disconnect.
if err := r.maybeConstructCatchUpIter(); err != nil {
r.disconnect(kvpb.NewError(err))
return
}
// has been registered. This will move ownership of iterator to
// registration. Note that if the catchUpScan is never run, then
// the iterator obtained here will be closed in disconnect.
r.maybeConstructCatchUpIter()

// Add the new registration to the registry.
p.reg.Register(&r)
Expand Down
75 changes: 60 additions & 15 deletions pkg/kv/kvserver/rangefeed/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -331,6 +332,10 @@ func waitErrorFuture(f *future.ErrorFuture) error {
return resultErr
}

var nilCatchUpIter = func() *CatchUpIterator {
return nil
}

func TestProcessorBasic(t *testing.T) {
defer leaktest.AfterTest(t)()
testutils.RunValues(t, "proc type", testTypes, func(t *testing.T, pt procType) {
Expand Down Expand Up @@ -363,7 +368,7 @@ func TestProcessorBasic(t *testing.T) {
r1OK, r1Filter := p.Register(
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")},
hlc.Timestamp{WallTime: 1},
nil, /* catchUpIter */
nilCatchUpIter,
false, /* withDiff */
r1Stream,
func() {},
Expand Down Expand Up @@ -496,7 +501,7 @@ func TestProcessorBasic(t *testing.T) {
r2OK, r1And2Filter := p.Register(
roachpb.RSpan{Key: roachpb.RKey("c"), EndKey: roachpb.RKey("z")},
hlc.Timestamp{WallTime: 1},
nil, /* catchUpIter */
nilCatchUpIter,
true, /* withDiff */
r2Stream,
func() {},
Expand Down Expand Up @@ -592,7 +597,7 @@ func TestProcessorBasic(t *testing.T) {
r3OK, _ := p.Register(
roachpb.RSpan{Key: roachpb.RKey("c"), EndKey: roachpb.RKey("z")},
hlc.Timestamp{WallTime: 1},
nil, /* catchUpIter */
nilCatchUpIter,
false, /* withDiff */
r3Stream,
func() {},
Expand All @@ -615,7 +620,7 @@ func TestProcessorSlowConsumer(t *testing.T) {
_, _ = p.Register(
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")},
hlc.Timestamp{WallTime: 1},
nil, /* catchUpIter */
nilCatchUpIter,
false, /* withDiff */
r1Stream,
func() {},
Expand All @@ -626,7 +631,7 @@ func TestProcessorSlowConsumer(t *testing.T) {
p.Register(
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")},
hlc.Timestamp{WallTime: 1},
nil, /* catchUpIter */
nilCatchUpIter,
false, /* withDiff */
r2Stream,
func() {},
Expand Down Expand Up @@ -722,7 +727,7 @@ func TestProcessorMemoryBudgetExceeded(t *testing.T) {
_, _ = p.Register(
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")},
hlc.Timestamp{WallTime: 1},
nil, /* catchUpIter */
nilCatchUpIter,
false, /* withDiff */
r1Stream,
func() {},
Expand Down Expand Up @@ -777,7 +782,7 @@ func TestProcessorMemoryBudgetReleased(t *testing.T) {
p.Register(
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")},
hlc.Timestamp{WallTime: 1},
nil, /* catchUpIter */
nilCatchUpIter,
false, /* withDiff */
r1Stream,
func() {},
Expand Down Expand Up @@ -854,7 +859,7 @@ func TestProcessorInitializeResolvedTimestamp(t *testing.T) {
p.Register(
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")},
hlc.Timestamp{WallTime: 1},
nil, /* catchUpIter */
nilCatchUpIter,
false, /* withDiff */
r1Stream,
func() {},
Expand Down Expand Up @@ -1110,7 +1115,7 @@ func TestProcessorConcurrentStop(t *testing.T) {
runtime.Gosched()
s := newTestStream()
var done future.ErrorFuture
p.Register(h.span, hlc.Timestamp{}, nil, false, s,
p.Register(h.span, hlc.Timestamp{}, nilCatchUpIter, false, s,
func() {}, &done)
}()
go func() {
Expand Down Expand Up @@ -1183,7 +1188,7 @@ func TestProcessorRegistrationObservesOnlyNewEvents(t *testing.T) {
s := newTestStream()
regs[s] = firstIdx
var done future.ErrorFuture
p.Register(h.span, hlc.Timestamp{}, nil, false,
p.Register(h.span, hlc.Timestamp{}, nilCatchUpIter, false,
s, func() {}, &done)
regDone <- struct{}{}
}
Expand Down Expand Up @@ -1243,7 +1248,7 @@ func TestBudgetReleaseOnProcessorStop(t *testing.T) {
_, _ = p.Register(
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")},
hlc.Timestamp{WallTime: 1},
nil, /* catchUpIter */
nilCatchUpIter,
false, /* withDiff */
rStream,
func() {},
Expand Down Expand Up @@ -1324,7 +1329,7 @@ func TestBudgetReleaseOnLastStreamError(t *testing.T) {
_, _ = p.Register(
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")},
hlc.Timestamp{WallTime: 1},
nil, /* catchUpIter */
nilCatchUpIter,
false, /* withDiff */
rStream,
func() {},
Expand Down Expand Up @@ -1395,7 +1400,7 @@ func TestBudgetReleaseOnOneStreamError(t *testing.T) {
_, _ = p.Register(
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")},
hlc.Timestamp{WallTime: 1},
nil, /* catchUpIter */
nilCatchUpIter,
false, /* withDiff */
r1Stream,
func() {},
Expand All @@ -1409,7 +1414,7 @@ func TestBudgetReleaseOnOneStreamError(t *testing.T) {
p.Register(
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")},
hlc.Timestamp{WallTime: 1},
nil, /* catchUpIter */
nilCatchUpIter,
false, /* withDiff */
r2Stream,
func() {},
Expand Down Expand Up @@ -1561,7 +1566,7 @@ func TestProcessorBackpressure(t *testing.T) {
// Add a registration.
stream := newTestStream()
done := &future.ErrorFuture{}
ok, _ := p.Register(span, hlc.MinTimestamp, nil, false, stream, nil, done)
ok, _ := p.Register(span, hlc.MinTimestamp, nilCatchUpIter, false, stream, nil, done)
require.True(t, ok)

// Wait for the initial checkpoint.
Expand Down Expand Up @@ -1610,3 +1615,43 @@ func TestProcessorBackpressure(t *testing.T) {
},
}, events[len(events)-1])
}

func TestIteratorContainer(t *testing.T) {
defer leaktest.AfterTest(t)()

eng := storage.NewDefaultInMemForTesting(storage.If(smallEngineBlocks, storage.BlockSize(1)))
defer eng.Close()
span := roachpb.Span{
Key: keys.LocalMax,
EndKey: roachpb.KeyMax,
}

t.Run("empty container", func(t *testing.T) {
container := NewCatchUpIteratorContainer(nil)
require.NotPanics(t, container.Close, "close empty container")
})

t.Run("close attached", func(t *testing.T) {
var closed bool
iter, err := NewCatchUpIterator(eng, span, hlc.Timestamp{WallTime: 1}, func() {
closed = true
}, nil)
require.NoError(t, err, "failed to create iterator for test")
container := NewCatchUpIteratorContainer(iter)
container.Close()
require.True(t, closed, "iterator in container is not closed")
})

t.Run("closed detached", func(t *testing.T) {
var closed bool
iter, err := NewCatchUpIterator(eng, span, hlc.Timestamp{WallTime: 1}, func() {
closed = true
}, nil)
require.NoError(t, err, "failed to create iterator for test")
container := NewCatchUpIteratorContainer(iter)
detached := container.Detach()
defer detached.Close()
container.Close()
require.False(t, closed, "iterator in container is not closed")
})
}
13 changes: 2 additions & 11 deletions pkg/kv/kvserver/rangefeed/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,21 +589,12 @@ func (r *registration) waitForCaughtUp() error {

// maybeConstructCatchUpIter calls the catchUpIterConstructor and attaches
// the catchUpIter to be detached in the catchUpScan or closed on disconnect.
func (r *registration) maybeConstructCatchUpIter() error {
if r.catchUpIterConstructor == nil {
return nil
}

catchUpIter, err := r.catchUpIterConstructor(r.span, r.catchUpTimestamp)
if err != nil {
return err
}
r.catchUpIterConstructor = nil
func (r *registration) maybeConstructCatchUpIter() {
catchUpIter := r.catchUpIterConstructor()

r.mu.Lock()
defer r.mu.Unlock()
r.mu.catchUpIter = catchUpIter
return nil
}

// detachCatchUpIter detaches the catchUpIter that was previously attached.
Expand Down
16 changes: 8 additions & 8 deletions pkg/kv/kvserver/rangefeed/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,18 @@ type testRegistration struct {
stream *testStream
}

func makeCatchUpIteratorConstructor(iter storage.SimpleMVCCIterator) CatchUpIteratorConstructor {
func makeCatchUpIteratorConstructor(iter storage.SimpleMVCCIterator, span roachpb.Span, startTime hlc.Timestamp) CatchUpIteratorConstructor {
if iter == nil {
return nil
return func() *CatchUpIterator {
return nil
}
}
return func(span roachpb.Span, startTime hlc.Timestamp) (*CatchUpIterator, error) {
return func() *CatchUpIterator {
return &CatchUpIterator{
simpleCatchupIter: simpleCatchupIterAdapter{iter},
span: span,
startTime: startTime,
}, nil
}
}
}

Expand All @@ -120,7 +122,7 @@ func newTestRegistration(
r := newRegistration(
span,
ts,
makeCatchUpIteratorConstructor(catchup),
makeCatchUpIteratorConstructor(catchup, span, ts),
withDiff,
5,
false, /* blockWhenFull */
Expand All @@ -129,9 +131,7 @@ func newTestRegistration(
func() {},
&future.ErrorFuture{},
)
if err := r.maybeConstructCatchUpIter(); err != nil {
panic(err)
}
r.maybeConstructCatchUpIter()
return &testRegistration{
registration: r,
stream: s,
Expand Down
5 changes: 1 addition & 4 deletions pkg/kv/kvserver/rangefeed/scheduled_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,10 +322,7 @@ func (p *ScheduledProcessor) Register(
// Construct the catchUpIter before notifying the registration that it
// has been registered. Note that if the catchUpScan is never run, then
// the iterator constructed here will be closed in disconnect.
if err := r.maybeConstructCatchUpIter(); err != nil {
r.disconnect(kvpb.NewError(err))
return nil
}
r.maybeConstructCatchUpIter()

// Add the new registration to the registry.
p.reg.Register(&r)
Expand Down
27 changes: 13 additions & 14 deletions pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,25 +271,24 @@ func (r *Replica) RangeFeed(
}

// Register the stream with a catch-up iterator.
var catchUpIterFunc rangefeed.CatchUpIteratorConstructor
var catchUpIter *rangefeed.CatchUpIterator
if usingCatchUpIter {
catchUpIterFunc = func(span roachpb.Span, startTime hlc.Timestamp) (*rangefeed.CatchUpIterator, error) {
// Assert that we still hold the raftMu when this is called to ensure
// that the catchUpIter reads from the current snapshot.
r.raftMu.AssertHeld()
i, err := rangefeed.NewCatchUpIterator(r.store.TODOEngine(), span, startTime, iterSemRelease, pacer)
if err != nil {
return nil, err
}
if f := r.store.TestingKnobs().RangefeedValueHeaderFilter; f != nil {
i.OnEmit = f
}
return i, nil
catchUpIter, err = rangefeed.NewCatchUpIterator(r.store.TODOEngine(), rSpan.AsRawSpanWithNoLocals(),
args.Timestamp, iterSemRelease, pacer)
if err != nil {
r.raftMu.Unlock()
iterSemRelease()
return future.MakeCompletedErrorFuture(err)
}
if f := r.store.TestingKnobs().RangefeedValueHeaderFilter; f != nil {
catchUpIter.OnEmit = f
}
}
catchUpIterContainer := rangefeed.NewCatchUpIteratorContainer(catchUpIter)
defer catchUpIterContainer.Close()
var done future.ErrorFuture
p := r.registerWithRangefeedRaftMuLocked(
ctx, rSpan, args.Timestamp, catchUpIterFunc, args.WithDiff, lockedStream, &done,
ctx, rSpan, args.Timestamp, catchUpIterContainer.Detach, args.WithDiff, lockedStream, &done,
)
r.raftMu.Unlock()

Expand Down

0 comments on commit 3257904

Please sign in to comment.