From cb21a6faf768c4859c33a3c939b78c275ace7044 Mon Sep 17 00:00:00 2001 From: Wenyi Hu Date: Thu, 29 Aug 2024 23:52:34 -0400 Subject: [PATCH] kvserver/rangefeed: add unbuffered registration This patch adds unbufferedRegistration. UnbufferedRegistration is like BufferedRegistration but uses BufferedSender to buffer live raft updates instead of a using fixed size channel and having a dedicated per-registration goroutine to volley events to underlying gRPC stream. Instead, there is only one BufferedSender for each incoming node.MuxRangefeed gRPC call. BufferedSender is responsible for buffering and sending its updates to the underlying gRPC stream in a dedicated goroutine O(node). Resolved: #110432 Release note: none Co-authored-by: Steven Danna --- pkg/kv/kvserver/rangefeed/BUILD.bazel | 2 + .../rangefeed/buffered_registration.go | 12 + .../rangefeed/buffered_sender_test.go | 3 +- .../rangefeed/processor_helpers_test.go | 54 +- pkg/kv/kvserver/rangefeed/processor_test.go | 235 +++--- .../rangefeed/registry_helper_test.go | 176 ++++- pkg/kv/kvserver/rangefeed/registry_test.go | 725 ++++++++++-------- .../kvserver/rangefeed/scheduled_processor.go | 13 +- .../kvserver/rangefeed/sender_helper_test.go | 43 +- .../kvserver/rangefeed/stream_manager_test.go | 2 + .../rangefeed/unbuffered_registration.go | 400 ++++++++++ .../rangefeed/unbuffered_registration_test.go | 371 +++++++++ 12 files changed, 1566 insertions(+), 470 deletions(-) create mode 100644 pkg/kv/kvserver/rangefeed/unbuffered_registration.go create mode 100644 pkg/kv/kvserver/rangefeed/unbuffered_registration_test.go diff --git a/pkg/kv/kvserver/rangefeed/BUILD.bazel b/pkg/kv/kvserver/rangefeed/BUILD.bazel index 757d873a5b68..342dd5bf837e 100644 --- a/pkg/kv/kvserver/rangefeed/BUILD.bazel +++ b/pkg/kv/kvserver/rangefeed/BUILD.bazel @@ -20,6 +20,7 @@ go_library( "stream_manager.go", "task.go", "test_helpers.go", + "unbuffered_registration.go", "unbuffered_sender.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed", @@ -77,6 +78,7 @@ go_test( "sender_helper_test.go", "stream_manager_test.go", "task_test.go", + "unbuffered_registration_test.go", "unbuffered_sender_test.go", ], embed = [":rangefeed"], diff --git a/pkg/kv/kvserver/rangefeed/buffered_registration.go b/pkg/kv/kvserver/rangefeed/buffered_registration.go index 909e9283fe12..d88176dea90d 100644 --- a/pkg/kv/kvserver/rangefeed/buffered_registration.go +++ b/pkg/kv/kvserver/rangefeed/buffered_registration.go @@ -347,3 +347,15 @@ var overflowLogEvery = log.Every(5 * time.Second) func (br *bufferedRegistration) shouldLogOverflow(checkpointSent bool) bool { return (!checkpointSent) || log.V(1) || overflowLogEvery.ShouldLog() } + +// Used for testing only. +func (br *bufferedRegistration) getBuf() chan *sharedEvent { + return br.buf +} + +// Used for testing only. +func (br *bufferedRegistration) getOverflowed() bool { + br.mu.Lock() + defer br.mu.Unlock() + return br.mu.overflowed +} diff --git a/pkg/kv/kvserver/rangefeed/buffered_sender_test.go b/pkg/kv/kvserver/rangefeed/buffered_sender_test.go index 5b8517c71994..ad77b399264a 100644 --- a/pkg/kv/kvserver/rangefeed/buffered_sender_test.go +++ b/pkg/kv/kvserver/rangefeed/buffered_sender_test.go @@ -71,7 +71,8 @@ func TestBufferedSenderDisconnectStream(t *testing.T) { require.NoError(t, bs.waitForEmptyBuffer(ctx)) sm.DisconnectStream(int64(streamID), err) require.NoError(t, bs.waitForEmptyBuffer(ctx)) - require.Equal(t, 1, testServerStream.totalEventsSent()) + require.Equalf(t, 1, testServerStream.totalEventsSent(), + "expected only 1 error event in %s", testServerStream.String()) testRangefeedCounter.waitForRangefeedCount(t, 0) }) } diff --git a/pkg/kv/kvserver/rangefeed/processor_helpers_test.go b/pkg/kv/kvserver/rangefeed/processor_helpers_test.go index 88ca84a8a3fc..6d843a6f7241 100644 --- a/pkg/kv/kvserver/rangefeed/processor_helpers_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_helpers_test.go @@ -23,6 +23,24 @@ import ( "github.com/stretchr/testify/require" ) +type testBufferedStream struct { + Stream +} + +var _ BufferedStream = (*testBufferedStream)(nil) + +func (tb *testBufferedStream) SendBuffered( + e *kvpb.RangeFeedEvent, _ *SharedBudgetAllocation, +) error { + // In production code, buffered stream would be responsible for properly using + // and releasing the alloc. We just ignore memory accounting here. + return tb.SendUnbuffered(e) +} + +func (tb *testBufferedStream) Disconnect(err *kvpb.Error) { + tb.Stream.SendError(err) +} + func makeLogicalOp(val interface{}) enginepb.MVCCLogicalOp { var op enginepb.MVCCLogicalOp op.MustSetValue(val) @@ -189,11 +207,12 @@ const testProcessorEventCCap = 16 const testProcessorEventCTimeout = 10 * time.Millisecond type processorTestHelper struct { - span roachpb.RSpan - rts *resolvedTimestamp - syncEventC func() - sendSpanSync func(*roachpb.Span) - scheduler *ClientScheduler + span roachpb.RSpan + rts *resolvedTimestamp + syncEventC func() + sendSpanSync func(*roachpb.Span) + scheduler *ClientScheduler + toBufferedStreamIfNeeded func(s Stream) Stream } // syncEventAndRegistrations waits for all previously sent events to be @@ -237,16 +256,23 @@ type rangefeedTestType bool var ( scheduledProcessorWithUnbufferedSender rangefeedTestType + scheduledProcessorWithBufferedSender rangefeedTestType ) var testTypes = []rangefeedTestType{ scheduledProcessorWithUnbufferedSender, + scheduledProcessorWithBufferedSender, } -// NB: When adding new types, please keep make sure existing -// benchmarks will keep their old name. func (t rangefeedTestType) String() string { - return "scheduled" + switch t { + case scheduledProcessorWithUnbufferedSender: + return "scheduled" + case scheduledProcessorWithBufferedSender: + return "scheduled/registration=buffered" + default: + panic("unknown rangefeed test type") + } } type testConfig struct { @@ -426,6 +452,18 @@ func newTestProcessor( p.syncSendAndWait(&syncEvent{c: make(chan struct{}), testRegCatchupSpan: span}) } h.scheduler = &p.scheduler + switch cfg.feedType { + case scheduledProcessorWithUnbufferedSender: + h.toBufferedStreamIfNeeded = func(s Stream) Stream { + return s + } + case scheduledProcessorWithBufferedSender: + h.toBufferedStreamIfNeeded = func(s Stream) Stream { + return &testBufferedStream{Stream: s} + } + default: + panic("unknown rangefeed test type") + } default: panic("unknown processor type") } diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go index 32d0e86cd4f2..8b7f3cfacc2b 100644 --- a/pkg/kv/kvserver/rangefeed/processor_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_test.go @@ -61,7 +61,7 @@ func TestProcessorBasic(t *testing.T) { // Add a registration. r1Stream := newTestStream() - r1OK, _, r1Filter := p.Register( + r1OK, r1Disconnector, r1Filter := p.Register( r1Stream.ctx, roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, hlc.Timestamp{WallTime: 1}, @@ -69,7 +69,7 @@ func TestProcessorBasic(t *testing.T) { false, /* withDiff */ false, /* withFiltering */ false, /* withOmitRemote */ - r1Stream, + h.toBufferedStreamIfNeeded(r1Stream), ) require.True(t, r1OK) h.syncEventAndRegistrations() @@ -203,7 +203,7 @@ func TestProcessorBasic(t *testing.T) { true, /* withDiff */ true, /* withFiltering */ false, /* withOmitRemote */ - r2Stream, + h.toBufferedStreamIfNeeded(r2Stream), ) require.True(t, r2OK) h.syncEventAndRegistrations() @@ -298,7 +298,11 @@ func TestProcessorBasic(t *testing.T) { require.Equal(t, valEvent3, r1Stream.GetAndClearEvents()) // r2Stream should not see the event. // Cancel the first registration. - r1Stream.Cancel() + if rt == scheduledProcessorWithUnbufferedSender { + r1Stream.Cancel() + } else { + r1Disconnector.Disconnect(kvpb.NewError(context.Canceled)) + } require.NotNil(t, r1Stream.WaitForError(t)) // Disconnect the registration via SendError should work. @@ -311,7 +315,7 @@ func TestProcessorBasic(t *testing.T) { false, /* withDiff */ false, /* withFiltering */ false, /* withOmitRemote */ - r3Stream, + h.toBufferedStreamIfNeeded(r3Stream), ) require.True(t, r30K) r3Stream.SendError(kvpb.NewError(fmt.Errorf("disconnection error"))) @@ -325,14 +329,14 @@ func TestProcessorBasic(t *testing.T) { // Adding another registration should fail. r4Stream := newTestStream() r4OK, _, _ := p.Register( - r3Stream.ctx, + r4Stream.ctx, roachpb.RSpan{Key: roachpb.RKey("c"), EndKey: roachpb.RKey("z")}, hlc.Timestamp{WallTime: 1}, nil, /* catchUpIter */ false, /* withDiff */ false, /* withFiltering */ false, /* withOmitRemote */ - r4Stream, + h.toBufferedStreamIfNeeded(r4Stream), ) require.False(t, r4OK) }) @@ -357,7 +361,7 @@ func TestProcessorOmitRemote(t *testing.T) { false, /* withDiff */ false, /* withFiltering */ false, /* withOmitRemote */ - r1Stream, + h.toBufferedStreamIfNeeded(r1Stream), ) require.True(t, r1OK) h.syncEventAndRegistrations() @@ -382,7 +386,7 @@ func TestProcessorOmitRemote(t *testing.T) { false, /* withDiff */ false, /* withFiltering */ true, /* withOmitRemote */ - r2Stream, + h.toBufferedStreamIfNeeded(r2Stream), ) require.True(t, r2OK) h.syncEventAndRegistrations() @@ -418,104 +422,108 @@ func TestProcessorOmitRemote(t *testing.T) { }) } +// TestProcessorSlowConsumer tests that buffered registration will drop events +// and properly disconnect the stream when the buffer capacity exceeds. This +// doesn't apply to unbuffered registrations. func TestProcessorSlowConsumer(t *testing.T) { defer leaktest.AfterTest(t)() - testutils.RunValues(t, "feed type", testTypes, func(t *testing.T, rt rangefeedTestType) { - p, h, stopper := newTestProcessor(t, withRangefeedTestType(rt)) - ctx := context.Background() - defer stopper.Stop(ctx) - - // Add a registration. - r1Stream := newTestStream() - p.Register( - r1Stream.ctx, - roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, - hlc.Timestamp{WallTime: 1}, - nil, /* catchUpIter */ - false, /* withDiff */ - false, /* withFiltering */ - false, /* withOmitRemote */ - r1Stream, - ) - r2Stream := newTestStream() - p.Register( - r2Stream.ctx, - roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")}, - hlc.Timestamp{WallTime: 1}, - nil, /* catchUpIter */ - false, /* withDiff */ - false, /* withFiltering */ - false, /* withOmitRemote */ - r2Stream, - ) - h.syncEventAndRegistrations() - require.Equal(t, 2, p.Len()) - require.Equal(t, - []*kvpb.RangeFeedEvent{ - rangeFeedCheckpoint( - roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("m")}, - hlc.Timestamp{WallTime: 0}, - ), - }, - r1Stream.GetAndClearEvents(), - ) - require.Equal(t, - []*kvpb.RangeFeedEvent{ - rangeFeedCheckpoint( - roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("z")}, - hlc.Timestamp{WallTime: 0}, - ), - }, - r2Stream.GetAndClearEvents(), - ) - - // Block its Send method and fill up the registration's input channel. - unblock := r1Stream.BlockSend() - defer func() { - if unblock != nil { - unblock() + testutils.RunValues(t, "feed type", []rangefeedTestType{scheduledProcessorWithUnbufferedSender}, + func(t *testing.T, rt rangefeedTestType) { + p, h, stopper := newTestProcessor(t, withRangefeedTestType(rt)) + ctx := context.Background() + defer stopper.Stop(ctx) + + // Add a registration. + r1Stream := newTestStream() + p.Register( + r1Stream.ctx, + roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, + hlc.Timestamp{WallTime: 1}, + nil, /* catchUpIter */ + false, /* withDiff */ + false, /* withFiltering */ + false, /* withOmitRemote */ + h.toBufferedStreamIfNeeded(r1Stream), + ) + r2Stream := newTestStream() + p.Register( + r2Stream.ctx, + roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")}, + hlc.Timestamp{WallTime: 1}, + nil, /* catchUpIter */ + false, /* withDiff */ + false, /* withFiltering */ + false, /* withOmitRemote */ + h.toBufferedStreamIfNeeded(r2Stream), + ) + h.syncEventAndRegistrations() + require.Equal(t, 2, p.Len()) + require.Equal(t, + []*kvpb.RangeFeedEvent{ + rangeFeedCheckpoint( + roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("m")}, + hlc.Timestamp{WallTime: 0}, + ), + }, + r1Stream.GetAndClearEvents(), + ) + require.Equal(t, + []*kvpb.RangeFeedEvent{ + rangeFeedCheckpoint( + roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("z")}, + hlc.Timestamp{WallTime: 0}, + ), + }, + r2Stream.GetAndClearEvents(), + ) + + // Block its Send method and fill up the registration's input channel. + unblock := r1Stream.BlockSend() + defer func() { + if unblock != nil { + unblock() + } + }() + // Need one more message to fill the channel because the first one will be + // sent to the stream and block the registration outputLoop goroutine. + toFill := testProcessorEventCCap + 1 + for i := 0; i < toFill; i++ { + ts := hlc.Timestamp{WallTime: int64(i + 2)} + p.ConsumeLogicalOps(ctx, writeValueOpWithKV(roachpb.Key("k"), ts, []byte("val"))) + + // Wait for just the unblocked registration to catch up. This prevents + // the race condition where this registration overflows anyway due to + // the rapid event consumption and small buffer size. + h.syncEventAndRegistrationsSpan(spXY) } - }() - // Need one more message to fill the channel because the first one will be - // sent to the stream and block the registration outputLoop goroutine. - toFill := testProcessorEventCCap + 1 - for i := 0; i < toFill; i++ { - ts := hlc.Timestamp{WallTime: int64(i + 2)} - p.ConsumeLogicalOps(ctx, writeValueOpWithKV(roachpb.Key("k"), ts, []byte("val"))) - - // Wait for just the unblocked registration to catch up. This prevents - // the race condition where this registration overflows anyway due to - // the rapid event consumption and small buffer size. - h.syncEventAndRegistrationsSpan(spXY) - } - - // Consume one more event. Should not block, but should cause r1 to overflow - // its registration buffer and drop the event. - p.ConsumeLogicalOps(ctx, - writeValueOpWithKV(roachpb.Key("k"), hlc.Timestamp{WallTime: 18}, []byte("val"))) - // Wait for just the unblocked registration to catch up. - h.syncEventAndRegistrationsSpan(spXY) - require.Equal(t, toFill+1, len(r2Stream.GetAndClearEvents())) - require.Equal(t, 2, p.Len()) + // Consume one more event. Should not block, but should cause r1 to overflow + // its registration buffer and drop the event. + p.ConsumeLogicalOps(ctx, + writeValueOpWithKV(roachpb.Key("k"), hlc.Timestamp{WallTime: 18}, []byte("val"))) - // Unblock the send channel. The events should quickly be consumed. - unblock() - unblock = nil - h.syncEventAndRegistrations() - // At least one event should have been dropped due to overflow. We expect - // exactly one event to be dropped, but it is possible that multiple events - // were dropped due to rapid event consumption before the r1's outputLoop - // began consuming from its event buffer. - require.LessOrEqual(t, len(r1Stream.GetAndClearEvents()), toFill) - require.Equal(t, newErrBufferCapacityExceeded().GoError(), r1Stream.WaitForError(t)) - testutils.SucceedsSoon(t, func() error { - if act, exp := p.Len(), 1; exp != act { - return fmt.Errorf("processor had %d regs, wanted %d", act, exp) - } - return nil + // Wait for just the unblocked registration to catch up. + h.syncEventAndRegistrationsSpan(spXY) + require.Equal(t, toFill+1, len(r2Stream.GetAndClearEvents())) + require.Equal(t, 2, p.Len()) + + // Unblock the send channel. The events should quickly be consumed. + unblock() + unblock = nil + h.syncEventAndRegistrations() + // At least one event should have been dropped due to overflow. We expect + // exactly one event to be dropped, but it is possible that multiple events + // were dropped due to rapid event consumption before the r1's outputLoop + // began consuming from its event buffer. + require.LessOrEqual(t, len(r1Stream.GetAndClearEvents()), toFill) + require.Equal(t, newErrBufferCapacityExceeded().GoError(), r1Stream.WaitForError(t)) + testutils.SucceedsSoon(t, func() error { + if act, exp := p.Len(), 1; exp != act { + return fmt.Errorf("processor had %d regs, wanted %d", act, exp) + } + return nil + }) }) - }) } // TestProcessorMemoryBudgetExceeded tests that memory budget will limit amount @@ -524,7 +532,6 @@ func TestProcessorSlowConsumer(t *testing.T) { func TestProcessorMemoryBudgetExceeded(t *testing.T) { defer leaktest.AfterTest(t)() testutils.RunValues(t, "feed type", testTypes, func(t *testing.T, rt rangefeedTestType) { - fb := newTestBudget(40) m := NewMetrics() p, h, stopper := newTestProcessor(t, withBudget(fb), withChanTimeout(time.Millisecond), @@ -542,7 +549,7 @@ func TestProcessorMemoryBudgetExceeded(t *testing.T) { false, /* withDiff */ false, /* withFiltering */ false, /* withOmitRemote */ - r1Stream, + h.toBufferedStreamIfNeeded(r1Stream), ) h.syncEventAndRegistrations() @@ -597,7 +604,7 @@ func TestProcessorMemoryBudgetReleased(t *testing.T) { false, /* withDiff */ false, /* withFiltering */ false, /* withOmitRemote */ - r1Stream, + h.toBufferedStreamIfNeeded(r1Stream), ) h.syncEventAndRegistrations() @@ -678,7 +685,7 @@ func TestProcessorInitializeResolvedTimestamp(t *testing.T) { false, /* withDiff */ false, /* withFiltering */ false, /* withOmitRemote */ - r1Stream, + h.toBufferedStreamIfNeeded(r1Stream), ) h.syncEventAndRegistrations() require.Equal(t, 1, p.Len()) @@ -985,7 +992,8 @@ func TestProcessorConcurrentStop(t *testing.T) { runtime.Gosched() s := newTestStream() p.Register(s.ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */ - false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, s) + false /* withDiff */, false /* withFiltering */, false, /* withOmitRemote */ + h.toBufferedStreamIfNeeded(s)) }() go func() { defer wg.Done() @@ -1057,7 +1065,8 @@ func TestProcessorRegistrationObservesOnlyNewEvents(t *testing.T) { s := newTestStream() regs[s] = firstIdx p.Register(s.ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */ - false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, s) + false /* withDiff */, false /* withFiltering */, false, /* withOmitRemote */ + h.toBufferedStreamIfNeeded(s)) regDone <- struct{}{} } }() @@ -1117,7 +1126,7 @@ func TestBudgetReleaseOnProcessorStop(t *testing.T) { false, /* withDiff */ false, /* withFiltering */ false, /* withOmitRemote */ - rStream, + h.toBufferedStreamIfNeeded(rStream), ) h.syncEventAndRegistrations() @@ -1197,7 +1206,7 @@ func TestBudgetReleaseOnLastStreamError(t *testing.T) { false, /* withDiff */ false, /* withFiltering */ false, /* withOmitRemote */ - rStream, + h.toBufferedStreamIfNeeded(rStream), ) h.syncEventAndRegistrations() @@ -1267,7 +1276,7 @@ func TestBudgetReleaseOnOneStreamError(t *testing.T) { false, /* withDiff */ false, /* withFiltering */ false, /* withOmitRemote */ - r1Stream, + h.toBufferedStreamIfNeeded(r1Stream), ) // Non-blocking registration that would consume all events. @@ -1280,7 +1289,7 @@ func TestBudgetReleaseOnOneStreamError(t *testing.T) { false, /* withDiff */ false, /* withFiltering */ false, /* withOmitRemote */ - r2Stream, + h.toBufferedStreamIfNeeded(r2Stream), ) h.syncEventAndRegistrations() @@ -1448,7 +1457,8 @@ func TestProcessorBackpressure(t *testing.T) { // Add a registration. stream := newTestStream() ok, _, _ := p.Register(stream.ctx, span, hlc.MinTimestamp, nil, /* catchUpIter */ - false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, stream) + false /* withDiff */, false /* withFiltering */, false, /* withOmitRemote */ + h.toBufferedStreamIfNeeded(stream)) require.True(t, ok) // Wait for the initial checkpoint. @@ -1508,7 +1518,6 @@ func TestProcessorContextCancellation(t *testing.T) { // Try stopping both via the stopper and via Processor.Stop(). testutils.RunTrueAndFalse(t, "stopper", func(t *testing.T, useStopper bool) { - // Set up a transaction to push. txnTS := hlc.Timestamp{WallTime: 10} // after resolved timestamp txnMeta := enginepb.TxnMeta{ diff --git a/pkg/kv/kvserver/rangefeed/registry_helper_test.go b/pkg/kv/kvserver/rangefeed/registry_helper_test.go index 56d9780f3593..e6a5c1da7a3a 100644 --- a/pkg/kv/kvserver/rangefeed/registry_helper_test.go +++ b/pkg/kv/kvserver/rangefeed/registry_helper_test.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/errors" ) var ( @@ -190,10 +191,18 @@ func (s *testStream) GetAndClearEvents() []*kvpb.RangeFeedEvent { return es } +func (s *testStream) GetEvents() []*kvpb.RangeFeedEvent { + s.mu.Lock() + defer s.mu.Unlock() + es := s.mu.events + return es +} + func (s *testStream) BlockSend() func() { s.mu.Lock() var once sync.Once return func() { + // nolint:deferunlockcheck once.Do(s.mu.Unlock) // safe to call multiple times, e.g. defer and explicit } } @@ -228,9 +237,16 @@ func (s *testStream) WaitForError(t *testing.T) error { } } -type testRegistration struct { - *bufferedRegistration - *testStream +func (s *testStream) waitForEventCount(t *testing.T, expected int) { + testutils.SucceedsSoon(t, func() error { + s.mu.Lock() + defer s.mu.Unlock() + actual := len(s.mu.events) + if actual != expected { + return errors.Errorf("expected %d events, got %d", expected, actual) + } + return nil + }) } func makeCatchUpIterator( @@ -246,31 +262,133 @@ func makeCatchUpIterator( } } -func newTestRegistration( - span roachpb.Span, - ts hlc.Timestamp, - catchup storage.SimpleMVCCIterator, - withDiff bool, - withFiltering bool, - withOmitRemote bool, -) *testRegistration { - s := newTestStream() - r := newBufferedRegistration( - s.ctx, - span, - ts, - makeCatchUpIterator(catchup, span, ts), - withDiff, - withFiltering, - withOmitRemote, - 5, - false, /* blockWhenFull */ - NewMetrics(), - s, - func(registration) {}, - ) - return &testRegistration{ - bufferedRegistration: r, - testStream: s, +type registrationOption func(*testRegistrationConfig) + +func withCatchUpIter(iter storage.SimpleMVCCIterator) registrationOption { + return func(cfg *testRegistrationConfig) { + cfg.catchup = iter + } +} + +func withDiff(opt bool) registrationOption { + return func(cfg *testRegistrationConfig) { + cfg.withDiff = opt + } +} + +func withFiltering(opt bool) registrationOption { + return func(cfg *testRegistrationConfig) { + cfg.withFiltering = opt + } +} + +func withRMetrics(metrics *Metrics) registrationOption { + return func(cfg *testRegistrationConfig) { + cfg.metrics = metrics + } +} + +func withOmitRemote(opt bool) registrationOption { + return func(cfg *testRegistrationConfig) { + cfg.withOmitRemote = opt + } +} + +func withRegistrationType(regType registrationType) registrationOption { + return func(cfg *testRegistrationConfig) { + cfg.withRegistrationTestTypes = regType } } + +func withRSpan(span roachpb.Span) registrationOption { + return func(cfg *testRegistrationConfig) { + cfg.span = span + } +} + +func withStartTs(ts hlc.Timestamp) registrationOption { + return func(cfg *testRegistrationConfig) { + cfg.ts = ts + } +} + +type registrationType bool + +const ( + buffered registrationType = false + unbuffered = true +) + +func (t registrationType) String() string { + switch t { + case buffered: + return "buffered registration" + case unbuffered: + return "unbuffered registration" + } + panic("unknown processor type") +} + +var registrationTestTypes = []registrationType{buffered, unbuffered} + +type testRegistrationConfig struct { + span roachpb.Span + ts hlc.Timestamp + catchup storage.SimpleMVCCIterator + withDiff bool + withFiltering bool + withOmitRemote bool + withRegistrationTestTypes registrationType + metrics *Metrics +} + +func newTestRegistration(s *testStream, opts ...registrationOption) testRegistration { + cfg := testRegistrationConfig{} + for _, opt := range opts { + opt(&cfg) + } + if cfg.metrics == nil { + cfg.metrics = NewMetrics() + } + + switch cfg.withRegistrationTestTypes { + case buffered: + return newBufferedRegistration( + s.ctx, + cfg.span, + cfg.ts, + makeCatchUpIterator(cfg.catchup, cfg.span, cfg.ts), + cfg.withDiff, + cfg.withFiltering, + cfg.withOmitRemote, + 5, + false, /* blockWhenFull */ + cfg.metrics, + s, + func(registration) {}, + ) + case unbuffered: + return newUnbufferedRegistration( + s.ctx, + cfg.span, + cfg.ts, + makeCatchUpIterator(cfg.catchup, cfg.span, cfg.ts), + cfg.withDiff, + cfg.withFiltering, + cfg.withOmitRemote, + 5, + cfg.metrics, + &testBufferedStream{Stream: s}, + func(registration) {}, + ) + default: + panic("unknown registration type") + } +} + +type testRegistration interface { + registration + getBuf() chan *sharedEvent + getOverflowed() bool + maybeRunCatchUpScan(ctx context.Context) error +} diff --git a/pkg/kv/kvserver/rangefeed/registry_test.go b/pkg/kv/kvserver/rangefeed/registry_test.go index ba850f7cf571..554e4ff9a99e 100644 --- a/pkg/kv/kvserver/rangefeed/registry_test.go +++ b/pkg/kv/kvserver/rangefeed/registry_test.go @@ -29,106 +29,158 @@ func TestRegistrationBasic(t *testing.T) { ev1.MustSetValue(&kvpb.RangeFeedValue{Key: keyA, Value: val}) ev2.MustSetValue(&kvpb.RangeFeedValue{Key: keyB, Value: val}) - // Registration with no catchup scan specified. - noCatchupReg := newTestRegistration(spAB, hlc.Timestamp{}, nil, /* catchup */ - false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */) - noCatchupReg.publish(ctx, ev1, nil /* alloc */) - noCatchupReg.publish(ctx, ev2, nil /* alloc */) - require.Equal(t, len(noCatchupReg.buf), 2) - go noCatchupReg.runOutputLoop(ctx, 0) - require.NoError(t, noCatchupReg.waitForCaughtUp(ctx)) - require.Equal(t, []*kvpb.RangeFeedEvent{ev1, ev2}, noCatchupReg.GetAndClearEvents()) - noCatchupReg.Disconnect(nil) - - // Registration with catchup scan. - catchupReg := newTestRegistration(spBC, hlc.Timestamp{WallTime: 1}, - newTestIterator([]storage.MVCCKeyValue{ - makeKV("b", "val1", 10), - makeKV("bc", "val3", 11), - makeKV("bd", "val4", 9), - }, nil), - false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */) - catchupReg.publish(ctx, ev1, nil /* alloc */) - catchupReg.publish(ctx, ev2, nil /* alloc */) - require.Equal(t, len(catchupReg.buf), 2) - go catchupReg.runOutputLoop(ctx, 0) - require.NoError(t, catchupReg.waitForCaughtUp(ctx)) - events := catchupReg.GetAndClearEvents() - require.Equal(t, 5, len(events)) - require.Equal(t, []*kvpb.RangeFeedEvent{ev1, ev2}, events[3:]) - catchupReg.Disconnect(nil) - - // EXIT CONDITIONS - // External Disconnect. - disconnectReg := newTestRegistration(spAB, hlc.Timestamp{}, nil, /* catchup */ - false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */) - disconnectReg.publish(ctx, ev1, nil /* alloc */) - disconnectReg.publish(ctx, ev2, nil /* alloc */) - go disconnectReg.runOutputLoop(ctx, 0) - require.NoError(t, disconnectReg.waitForCaughtUp(ctx)) - discErr := kvpb.NewError(fmt.Errorf("disconnection error")) - disconnectReg.Disconnect(discErr) - require.Equal(t, discErr.GoError(), disconnectReg.WaitForError(t)) - require.Equal(t, 2, len(disconnectReg.GetAndClearEvents())) - - // External Disconnect before output loop. - disconnectEarlyReg := newTestRegistration(spAB, hlc.Timestamp{}, nil, /* catchup */ - false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */) - disconnectEarlyReg.publish(ctx, ev1, nil /* alloc */) - disconnectEarlyReg.publish(ctx, ev2, nil /* alloc */) - disconnectEarlyReg.Disconnect(discErr) - go disconnectEarlyReg.runOutputLoop(ctx, 0) - require.Equal(t, discErr.GoError(), disconnectEarlyReg.WaitForError(t)) - require.Equal(t, 0, len(disconnectEarlyReg.GetAndClearEvents())) - - // Overflow. - overflowReg := newTestRegistration(spAB, hlc.Timestamp{}, nil, /* catchup */ - false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */) - for i := 0; i < cap(overflowReg.buf)+3; i++ { - overflowReg.publish(ctx, ev1, nil /* alloc */) - } - go overflowReg.runOutputLoop(ctx, 0) - require.Equal(t, newErrBufferCapacityExceeded().GoError(), overflowReg.WaitForError(t)) - require.Equal(t, cap(overflowReg.buf), len(overflowReg.GetAndClearEvents())) - - // Stream Error. - streamErrReg := newTestRegistration(spAB, hlc.Timestamp{}, nil, /* catchup */ - false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */) - streamErr := fmt.Errorf("stream error") - streamErrReg.SetSendErr(streamErr) - go streamErrReg.runOutputLoop(ctx, 0) - streamErrReg.publish(ctx, ev1, nil /* alloc */) - require.Equal(t, streamErr, streamErrReg.WaitForError(t)) - - // Stream Context Canceled. - streamCancelReg := newTestRegistration(spAB, hlc.Timestamp{}, nil, /* catchup */ - false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */) - - streamCancelReg.Cancel() - go streamCancelReg.runOutputLoop(streamCancelReg.ctx, 0) - require.NoError(t, streamCancelReg.waitForCaughtUp(ctx)) - require.Equal(t, streamCancelReg.ctx.Err(), streamCancelReg.WaitForError(t)) + testutils.RunValues(t, "registration type=", registrationTestTypes, func(t *testing.T, rt registrationType) { + t.Run("registration with no catchup scan specified", func(t *testing.T) { + s := newTestStream() + noCatchupReg := newTestRegistration(s, withRSpan(spAB), withRegistrationType(rt)) + noCatchupReg.publish(ctx, ev1, nil /* alloc */) + noCatchupReg.publish(ctx, ev2, nil /* alloc */) + eventSent := len(noCatchupReg.getBuf()) + switch rt { + case unbuffered: + eventSent += len(s.GetEvents()) + require.Nil(t, noCatchupReg.getBuf()) + } + go noCatchupReg.runOutputLoop(ctx, 0) + require.NoError(t, noCatchupReg.waitForCaughtUp(ctx)) + require.Equal(t, []*kvpb.RangeFeedEvent{ev1, ev2}, s.GetAndClearEvents()) + noCatchupReg.Disconnect(nil) + require.Nil(t, s.GetAndClearEvents()) + }) + t.Run("registration with catchup scan", func(t *testing.T) { + s := newTestStream() + catchupReg := newTestRegistration(s, withRSpan(spBC), + withStartTs(hlc.Timestamp{WallTime: 1}), + withCatchUpIter(newTestIterator([]storage.MVCCKeyValue{ + makeKV("b", "val1", 10), + makeKV("bc", "val3", 11), + makeKV("bd", "val4", 9), + }, nil)), withRegistrationType(rt)) + catchupReg.publish(ctx, ev1, nil /* alloc */) + catchupReg.publish(ctx, ev2, nil /* alloc */) + require.Equal(t, 2, len(catchupReg.getBuf())) + go catchupReg.runOutputLoop(ctx, 0) + require.NoError(t, catchupReg.waitForCaughtUp(ctx)) + events := s.GetAndClearEvents() + require.Equal(t, 5, len(events)) + require.Equal(t, []*kvpb.RangeFeedEvent{ev1, ev2}, events[3:]) + catchupReg.Disconnect(nil) + require.Nil(t, s.GetAndClearEvents()) + }) + t.Run("external disconnect after output loop", func(t *testing.T) { + s := newTestStream() + disconnectReg := newTestRegistration(s, withRSpan(spAB), withRegistrationType(rt)) + disconnectReg.publish(ctx, ev1, nil /* alloc */) + disconnectReg.publish(ctx, ev2, nil /* alloc */) + go disconnectReg.runOutputLoop(ctx, 0) + require.NoError(t, disconnectReg.waitForCaughtUp(ctx)) + discErr := kvpb.NewError(fmt.Errorf("disconnection error")) + disconnectReg.Disconnect(discErr) + require.Equal(t, discErr.GoError(), s.WaitForError(t)) + require.Equal(t, 2, len(s.GetAndClearEvents())) + disconnectReg.publish(ctx, ev1, nil /* alloc */) + require.Equal(t, 0, len(s.GetAndClearEvents())) + }) + t.Run("external disconnect before output loop without catch up iter", func(t *testing.T) { + s := newTestStream() + disconnectEarlyReg := newTestRegistration(s, withRSpan(spAB), withRegistrationType(rt)) + disconnectEarlyReg.publish(ctx, ev1, nil /* alloc */) + disconnectEarlyReg.publish(ctx, ev2, nil /* alloc */) + discErr := kvpb.NewError(fmt.Errorf("disconnection error")) + disconnectEarlyReg.Disconnect(discErr) + go disconnectEarlyReg.runOutputLoop(ctx, 0) + require.Equal(t, discErr.GoError(), s.WaitForError(t)) + // This is the main behaviour change between buffered and unbuffered + // registration. For buffered registration, events buffered are + // discarded after disconnection. For unbuffered registration, + // events buffered already are still sent to the stream after + // disconnection. + if rt == buffered { + require.Equal(t, 0, len(s.GetAndClearEvents())) + } else { + require.Equal(t, []*kvpb.RangeFeedEvent{ev1, ev2}, s.GetAndClearEvents()) + } + // Repeatedly disconnect should be idempotent. + disconnectEarlyReg.Disconnect(kvpb.NewError(nil)) + require.Nil(t, s.GetAndClearEvents()) + }) + t.Run("external disconnect before output loop with catch up iter", func(t *testing.T) { + s := newTestStream() + iter := newTestIterator(keyValues, roachpb.Key("w")) + disconnectEarlyReg := newTestRegistration(s, withRSpan(spAB), withRegistrationType(rt), withCatchUpIter(iter)) + disconnectEarlyReg.publish(ctx, ev1, nil /* alloc */) + disconnectEarlyReg.publish(ctx, ev2, nil /* alloc */) + discErr := kvpb.NewError(fmt.Errorf("disconnection error")) + disconnectEarlyReg.Disconnect(discErr) + require.Equal(t, discErr.GoError(), s.WaitForError(t)) + disconnectEarlyReg.runOutputLoop(ctx, 0) + // If there is a catch up iterator, the buffered events should not + // be sent to the stream. + // Test the effects rather than the implementation + require.Nil(t, s.GetAndClearEvents()) + }) + t.Run("overflow", func(t *testing.T) { + s := newTestStream() + // Unbuffered registration can only overflow during a catch up scan, + // so we initialize a catch up scan. + overflowReg := newTestRegistration(s, + withRegistrationType(rt), + withRSpan(spBC), + withStartTs(hlc.Timestamp{WallTime: 1}), + withCatchUpIter(newTestIterator([]storage.MVCCKeyValue{}, nil)), + withRegistrationType(rt)) + capOfBuf := cap(overflowReg.getBuf()) + for i := 0; i < capOfBuf+3; i++ { + overflowReg.publish(ctx, ev1, nil /* alloc */) + } + go overflowReg.runOutputLoop(ctx, 0) + require.Equal(t, kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_SLOW_CONSUMER), s.WaitForError(t)) + require.Equal(t, capOfBuf, len(s.GetAndClearEvents())) + require.NoError(t, overflowReg.waitForCaughtUp(ctx)) + require.True(t, overflowReg.getOverflowed()) + }) + t.Run("stream error", func(t *testing.T) { + s := newTestStream() + streamErrReg := newTestRegistration(s, withRSpan(spAB), withRegistrationType(rt)) + streamErr := fmt.Errorf("stream error") + s.SetSendErr(streamErr) + go streamErrReg.runOutputLoop(ctx, 0) + streamErrReg.publish(ctx, ev1, nil /* alloc */) + require.Equal(t, streamErr, s.WaitForError(t)) + }) + t.Run("stream context canceled", func(t *testing.T) { + s := newTestStream() + streamCancelReg := newTestRegistration(s, withRSpan(spAB), withRegistrationType(rt)) + streamCancelReg.Disconnect(kvpb.NewError(context.Canceled)) + go streamCancelReg.runOutputLoop(ctx, 0) + require.NoError(t, streamCancelReg.waitForCaughtUp(ctx)) + require.Equal(t, context.Canceled, s.WaitForError(t)) + }) + }) } func TestRegistrationCatchUpScan(t *testing.T) { defer leaktest.AfterTest(t)() - - testutils.RunTrueAndFalse(t, "withFiltering", func(t *testing.T, withFiltering bool) { - // Run a catch-up scan for a registration over a test - // iterator with the following keys. - iter := newTestIterator(keyValues, roachpb.Key("w")) - r := newTestRegistration(roachpb.Span{ - Key: roachpb.Key("d"), - EndKey: roachpb.Key("w"), - }, hlc.Timestamp{WallTime: 4}, iter, true /* withDiff */, withFiltering, false /* withOmitRemote */) - - require.Zero(t, r.metrics.RangeFeedCatchUpScanNanos.Count()) - require.NoError(t, r.maybeRunCatchUpScan(context.Background())) - require.True(t, iter.closed) - require.NotZero(t, r.metrics.RangeFeedCatchUpScanNanos.Count()) - - // Compare the events sent on the registration's Stream to the expected events. - require.Equal(t, expEvents(withFiltering), r.GetAndClearEvents()) + testutils.RunValues(t, "registration type=", registrationTestTypes, func(t *testing.T, rt registrationType) { + testutils.RunTrueAndFalse(t, "filtering", func(t *testing.T, filtering bool) { + // Run a catch-up scan for a registration over a test + // iterator with the following keys. + iter := newTestIterator(keyValues, roachpb.Key("w")) + metrics := NewMetrics() + s := newTestStream() + r := newTestRegistration(s, withRSpan(roachpb.Span{ + Key: roachpb.Key("d"), + EndKey: roachpb.Key("w"), + }), withStartTs(hlc.Timestamp{WallTime: 4}), withCatchUpIter(iter), withDiff(true), + withFiltering(filtering), withRMetrics(metrics), withRegistrationType(rt)) + require.Zero(t, metrics.RangeFeedCatchUpScanNanos.Count()) + require.NoError(t, r.maybeRunCatchUpScan(context.Background())) + require.True(t, iter.closed) + require.NotZero(t, metrics.RangeFeedCatchUpScanNanos.Count()) + // Compare the events sent on the registration's Stream to the + // expected events. + require.Equal(t, expEvents(filtering), s.GetAndClearEvents()) + }) }) } @@ -138,227 +190,242 @@ func TestRegistryWithOmitOrigin(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() - noPrev := func(ev *kvpb.RangeFeedEvent) *kvpb.RangeFeedEvent { - ev = ev.ShallowCopy() - ev.GetValue().(*kvpb.RangeFeedValue).PrevValue = roachpb.Value{} - return ev - } + testutils.RunValues(t, "registration type=", registrationTestTypes, func(t *testing.T, rt registrationType) { + noPrev := func(ev *kvpb.RangeFeedEvent) *kvpb.RangeFeedEvent { + ev = ev.ShallowCopy() + ev.GetValue().(*kvpb.RangeFeedValue).PrevValue = roachpb.Value{} + return ev + } - val := roachpb.Value{RawBytes: []byte("val"), Timestamp: hlc.Timestamp{WallTime: 1}} - ev1, ev2 := new(kvpb.RangeFeedEvent), new(kvpb.RangeFeedEvent) - ev1.MustSetValue(&kvpb.RangeFeedValue{Key: keyA, Value: val, PrevValue: val}) - ev2.MustSetValue(&kvpb.RangeFeedValue{Key: keyB, Value: val, PrevValue: val}) + val := roachpb.Value{RawBytes: []byte("val"), Timestamp: hlc.Timestamp{WallTime: 1}} + ev1, ev2 := new(kvpb.RangeFeedEvent), new(kvpb.RangeFeedEvent) + ev1.MustSetValue(&kvpb.RangeFeedValue{Key: keyA, Value: val, PrevValue: val}) + ev2.MustSetValue(&kvpb.RangeFeedValue{Key: keyB, Value: val, PrevValue: val}) + + reg := makeRegistry(NewMetrics()) - reg := makeRegistry(NewMetrics()) - rAC := newTestRegistration(spAC, hlc.Timestamp{}, nil, false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */) - originFiltering := newTestRegistration(spAC, hlc.Timestamp{}, nil, false /* withDiff */, false /* withFiltering */, true /* withOmitRemote */) + sAC := newTestStream() + rAC := newTestRegistration(sAC, withRSpan(spAC), withRegistrationType(rt)) + originFilteringStream := newTestStream() + originFiltering := newTestRegistration(originFilteringStream, withRSpan(spAC), + withOmitRemote(true), withRegistrationType(rt)) - go rAC.runOutputLoop(ctx, 0) - go originFiltering.runOutputLoop(ctx, 0) + go rAC.runOutputLoop(ctx, 0) + go originFiltering.runOutputLoop(ctx, 0) - defer rAC.Disconnect(nil) - defer originFiltering.Disconnect(nil) + defer rAC.Disconnect(nil) + defer originFiltering.Disconnect(nil) - reg.Register(ctx, rAC.bufferedRegistration) - reg.Register(ctx, originFiltering.bufferedRegistration) + reg.Register(ctx, rAC) + reg.Register(ctx, originFiltering) - reg.PublishToOverlapping(ctx, spAC, ev1, logicalOpMetadata{}, nil /* alloc */) - reg.PublishToOverlapping(ctx, spAC, ev2, logicalOpMetadata{originID: 1}, nil /* alloc */) + reg.PublishToOverlapping(ctx, spAC, ev1, logicalOpMetadata{}, nil /* alloc */) + reg.PublishToOverlapping(ctx, spAC, ev2, logicalOpMetadata{originID: 1}, nil /* alloc */) - require.NoError(t, reg.waitForCaughtUp(ctx, all)) + require.NoError(t, reg.waitForCaughtUp(ctx, all)) - require.Equal(t, []*kvpb.RangeFeedEvent{noPrev(ev1), noPrev(ev2)}, rAC.GetAndClearEvents()) - require.Equal(t, []*kvpb.RangeFeedEvent{noPrev(ev1)}, originFiltering.GetAndClearEvents()) - require.Nil(t, rAC.Error()) - require.Nil(t, originFiltering.Error()) + require.Equal(t, []*kvpb.RangeFeedEvent{noPrev(ev1), noPrev(ev2)}, sAC.GetAndClearEvents()) + require.Equal(t, []*kvpb.RangeFeedEvent{noPrev(ev1)}, originFilteringStream.GetAndClearEvents()) + require.Nil(t, sAC.Error()) + require.Nil(t, originFilteringStream.Error()) + }) } func TestRegistryBasic(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() - val := roachpb.Value{RawBytes: []byte("val"), Timestamp: hlc.Timestamp{WallTime: 1}} - ev1, ev2 := new(kvpb.RangeFeedEvent), new(kvpb.RangeFeedEvent) - ev3, ev4, ev5 := new(kvpb.RangeFeedEvent), new(kvpb.RangeFeedEvent), new(kvpb.RangeFeedEvent) - ev1.MustSetValue(&kvpb.RangeFeedValue{Key: keyA, Value: val, PrevValue: val}) - ev2.MustSetValue(&kvpb.RangeFeedValue{Key: keyB, Value: val, PrevValue: val}) - ev3.MustSetValue(&kvpb.RangeFeedValue{Key: keyC, Value: val, PrevValue: val}) - ev4.MustSetValue(&kvpb.RangeFeedValue{Key: keyD, Value: val, PrevValue: val}) - ev5.MustSetValue(&kvpb.RangeFeedValue{Key: keyD, Value: val, PrevValue: val}) - err1 := kvpb.NewErrorf("error1") - noPrev := func(ev *kvpb.RangeFeedEvent) *kvpb.RangeFeedEvent { - ev = ev.ShallowCopy() - ev.GetValue().(*kvpb.RangeFeedValue).PrevValue = roachpb.Value{} - return ev - } - - reg := makeRegistry(NewMetrics()) - require.Equal(t, 0, reg.Len()) - reg.PublishToOverlapping(ctx, spAB, ev1, logicalOpMetadata{}, nil /* alloc */) - reg.DisconnectWithErr(ctx, spAB, err1) - - rAB := newTestRegistration(spAB, hlc.Timestamp{}, nil, false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */) - rBC := newTestRegistration(spBC, hlc.Timestamp{}, nil, true /* withDiff */, false /* withFiltering */, false /* withOmitRemote */) - rCD := newTestRegistration(spCD, hlc.Timestamp{}, nil, true /* withDiff */, false /* withFiltering */, false /* withOmitRemote */) - rAC := newTestRegistration(spAC, hlc.Timestamp{}, nil, false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */) - rACFiltering := newTestRegistration(spAC, hlc.Timestamp{}, nil, false /* withDiff */, true /* withFiltering */, false /* withOmitRemote */) - go rAB.runOutputLoop(ctx, 0) - go rBC.runOutputLoop(ctx, 0) - go rCD.runOutputLoop(ctx, 0) - go rAC.runOutputLoop(ctx, 0) - go rACFiltering.runOutputLoop(ctx, 0) - defer rAB.Disconnect(nil) - defer rBC.Disconnect(nil) - defer rCD.Disconnect(nil) - defer rAC.Disconnect(nil) - defer rACFiltering.Disconnect(nil) - - // Register 6 registrations. - reg.Register(ctx, rAB.bufferedRegistration) - require.Equal(t, 1, reg.Len()) - reg.Register(ctx, rBC.bufferedRegistration) - require.Equal(t, 2, reg.Len()) - reg.Register(ctx, rCD.bufferedRegistration) - require.Equal(t, 3, reg.Len()) - reg.Register(ctx, rAC.bufferedRegistration) - require.Equal(t, 4, reg.Len()) - reg.Register(ctx, rACFiltering.bufferedRegistration) - require.Equal(t, 5, reg.Len()) - - // Publish to different spans. - reg.PublishToOverlapping(ctx, spAB, ev1, logicalOpMetadata{}, nil /* alloc */) - reg.PublishToOverlapping(ctx, spBC, ev2, logicalOpMetadata{}, nil /* alloc */) - reg.PublishToOverlapping(ctx, spCD, ev3, logicalOpMetadata{}, nil /* alloc */) - reg.PublishToOverlapping(ctx, spAC, ev4, logicalOpMetadata{}, nil /* alloc */) - reg.PublishToOverlapping(ctx, spAC, ev5, logicalOpMetadata{omitInRangefeeds: true}, nil /* alloc */) - - require.NoError(t, reg.waitForCaughtUp(ctx, all)) - require.Equal(t, []*kvpb.RangeFeedEvent{noPrev(ev1), noPrev(ev4), noPrev(ev5)}, rAB.GetAndClearEvents()) - require.Equal(t, []*kvpb.RangeFeedEvent{ev2, ev4, ev5}, rBC.GetAndClearEvents()) - require.Equal(t, []*kvpb.RangeFeedEvent{ev3}, rCD.GetAndClearEvents()) - require.Equal(t, []*kvpb.RangeFeedEvent{noPrev(ev1), noPrev(ev2), noPrev(ev4), noPrev(ev5)}, rAC.GetAndClearEvents()) - // Registration rACFiltering doesn't receive ev5 because both withFiltering - // (for the registration) and OmitInRangefeeds (for the event) are true. - require.Equal(t, []*kvpb.RangeFeedEvent{noPrev(ev1), noPrev(ev2), noPrev(ev4)}, rACFiltering.GetAndClearEvents()) - require.Nil(t, rAB.Error()) - require.Nil(t, rBC.Error()) - require.Nil(t, rCD.Error()) - require.Nil(t, rAC.Error()) - require.Nil(t, rACFiltering.Error()) - - // Check the registry's operation filter. - f := reg.NewFilter() - // Testing NeedVal. - require.True(t, f.NeedVal(spAB)) - require.True(t, f.NeedVal(spBC)) - require.True(t, f.NeedVal(spCD)) - require.True(t, f.NeedVal(spAC)) - require.False(t, f.NeedVal(spXY)) - require.True(t, f.NeedVal(roachpb.Span{Key: keyA})) - require.True(t, f.NeedVal(roachpb.Span{Key: keyB})) - require.True(t, f.NeedVal(roachpb.Span{Key: keyC})) - require.False(t, f.NeedVal(roachpb.Span{Key: keyX})) - // Testing NeedPrevVal. - require.False(t, f.NeedPrevVal(spAB)) - require.True(t, f.NeedPrevVal(spBC)) - require.True(t, f.NeedPrevVal(spCD)) - require.True(t, f.NeedPrevVal(spAC)) - require.False(t, f.NeedPrevVal(spXY)) - require.False(t, f.NeedPrevVal(roachpb.Span{Key: keyA})) - require.True(t, f.NeedPrevVal(roachpb.Span{Key: keyB})) - require.True(t, f.NeedPrevVal(roachpb.Span{Key: keyC})) - require.False(t, f.NeedPrevVal(roachpb.Span{Key: keyX})) - - // Disconnect span that overlaps with rCD. - reg.DisconnectWithErr(ctx, spCD, err1) - require.Equal(t, 4, reg.Len()) - require.Equal(t, err1.GoError(), rCD.WaitForError(t)) - - // Can still publish to rAB. - reg.PublishToOverlapping(ctx, spAB, ev4, logicalOpMetadata{}, nil /* alloc */) - reg.PublishToOverlapping(ctx, spBC, ev3, logicalOpMetadata{}, nil /* alloc */) - reg.PublishToOverlapping(ctx, spCD, ev2, logicalOpMetadata{}, nil /* alloc */) - reg.PublishToOverlapping(ctx, spAC, ev1, logicalOpMetadata{}, nil /* alloc */) - require.NoError(t, reg.waitForCaughtUp(ctx, all)) - require.Equal(t, []*kvpb.RangeFeedEvent{noPrev(ev4), noPrev(ev1)}, rAB.GetAndClearEvents()) - - // Disconnect from rAB without error. - reg.DisconnectWithErr(ctx, spAB, nil) - require.Nil(t, rAC.WaitForError(t)) - require.Nil(t, rAB.WaitForError(t)) - require.Equal(t, 1, reg.Len()) - - // Check the registry's operation filter again. - f = reg.NewFilter() - // Testing NeedVal. - require.False(t, f.NeedVal(spAB)) - require.True(t, f.NeedVal(spBC)) - require.False(t, f.NeedVal(spCD)) - require.True(t, f.NeedVal(spAC)) - require.False(t, f.NeedVal(spXY)) - require.False(t, f.NeedVal(roachpb.Span{Key: keyA})) - require.True(t, f.NeedVal(roachpb.Span{Key: keyB})) - require.False(t, f.NeedVal(roachpb.Span{Key: keyC})) - require.False(t, f.NeedVal(roachpb.Span{Key: keyX})) - // Testing NeedPrevVal. - require.False(t, f.NeedPrevVal(spAB)) - require.True(t, f.NeedPrevVal(spBC)) - require.False(t, f.NeedPrevVal(spCD)) - require.True(t, f.NeedPrevVal(spAC)) - require.False(t, f.NeedPrevVal(spXY)) - require.False(t, f.NeedPrevVal(roachpb.Span{Key: keyA})) - require.True(t, f.NeedPrevVal(roachpb.Span{Key: keyB})) - require.False(t, f.NeedPrevVal(roachpb.Span{Key: keyC})) - require.False(t, f.NeedPrevVal(roachpb.Span{Key: keyX})) - - // Unregister the rBC registration as if it was being unregistered via the - // processor. - rBC.setShouldUnregister() - reg.unregisterMarkedRegistrations(ctx) - require.Equal(t, 0, reg.Len()) - require.Equal(t, 0, int(reg.metrics.RangeFeedRegistrations.Value()), - "RangefeedRegistrations metric not zero after all registrations have been removed") + testutils.RunValues(t, "registration type=", registrationTestTypes, func(t *testing.T, rt registrationType) { + val := roachpb.Value{RawBytes: []byte("val"), Timestamp: hlc.Timestamp{WallTime: 1}} + ev1, ev2 := new(kvpb.RangeFeedEvent), new(kvpb.RangeFeedEvent) + ev3, ev4, ev5 := new(kvpb.RangeFeedEvent), new(kvpb.RangeFeedEvent), new(kvpb.RangeFeedEvent) + ev1.MustSetValue(&kvpb.RangeFeedValue{Key: keyA, Value: val, PrevValue: val}) + ev2.MustSetValue(&kvpb.RangeFeedValue{Key: keyB, Value: val, PrevValue: val}) + ev3.MustSetValue(&kvpb.RangeFeedValue{Key: keyC, Value: val, PrevValue: val}) + ev4.MustSetValue(&kvpb.RangeFeedValue{Key: keyD, Value: val, PrevValue: val}) + ev5.MustSetValue(&kvpb.RangeFeedValue{Key: keyD, Value: val, PrevValue: val}) + err1 := kvpb.NewErrorf("error1") + noPrev := func(ev *kvpb.RangeFeedEvent) *kvpb.RangeFeedEvent { + ev = ev.ShallowCopy() + ev.GetValue().(*kvpb.RangeFeedValue).PrevValue = roachpb.Value{} + return ev + } + + reg := makeRegistry(NewMetrics()) + require.Equal(t, 0, reg.Len()) + reg.PublishToOverlapping(ctx, spAB, ev1, logicalOpMetadata{}, nil /* alloc */) + reg.DisconnectWithErr(ctx, spAB, err1) + + sAB := newTestStream() + rAB := newTestRegistration(sAB, withRSpan(spAB), withRegistrationType(rt)) + sBC := newTestStream() + rBC := newTestRegistration(sBC, withRSpan(spBC), withDiff(true), withRegistrationType(rt)) + sCD := newTestStream() + rCD := newTestRegistration(sCD, withRSpan(spCD), withDiff(true), withRegistrationType(rt)) + sAC := newTestStream() + rAC := newTestRegistration(sAC, withRSpan(spAC), withRegistrationType(rt)) + sACFiltering := newTestStream() + rACFiltering := newTestRegistration(sACFiltering, withRSpan(spAC), withFiltering(true), withRegistrationType(rt)) + go rAB.runOutputLoop(ctx, 0) + go rBC.runOutputLoop(ctx, 0) + go rCD.runOutputLoop(ctx, 0) + go rAC.runOutputLoop(ctx, 0) + go rACFiltering.runOutputLoop(ctx, 0) + defer rAB.Disconnect(nil) + defer rBC.Disconnect(nil) + defer rCD.Disconnect(nil) + defer rAC.Disconnect(nil) + defer rACFiltering.Disconnect(nil) + + // Register 6 registrations. + reg.Register(ctx, rAB) + require.Equal(t, 1, reg.Len()) + reg.Register(ctx, rBC) + require.Equal(t, 2, reg.Len()) + reg.Register(ctx, rCD) + require.Equal(t, 3, reg.Len()) + reg.Register(ctx, rAC) + require.Equal(t, 4, reg.Len()) + reg.Register(ctx, rACFiltering) + require.Equal(t, 5, reg.Len()) + + // Publish to different spans. + reg.PublishToOverlapping(ctx, spAB, ev1, logicalOpMetadata{}, nil /* alloc */) + reg.PublishToOverlapping(ctx, spBC, ev2, logicalOpMetadata{}, nil /* alloc */) + reg.PublishToOverlapping(ctx, spCD, ev3, logicalOpMetadata{}, nil /* alloc */) + reg.PublishToOverlapping(ctx, spAC, ev4, logicalOpMetadata{}, nil /* alloc */) + reg.PublishToOverlapping(ctx, spAC, ev5, logicalOpMetadata{omitInRangefeeds: true}, nil /* alloc */) + + require.NoError(t, reg.waitForCaughtUp(ctx, all)) + require.Equal(t, []*kvpb.RangeFeedEvent{noPrev(ev1), noPrev(ev4), noPrev(ev5)}, sAB.GetAndClearEvents()) + require.Equal(t, []*kvpb.RangeFeedEvent{ev2, ev4, ev5}, sBC.GetAndClearEvents()) + require.Equal(t, []*kvpb.RangeFeedEvent{ev3}, sCD.GetAndClearEvents()) + require.Equal(t, []*kvpb.RangeFeedEvent{noPrev(ev1), noPrev(ev2), noPrev(ev4), noPrev(ev5)}, sAC.GetAndClearEvents()) + // Registration rACFiltering doesn't receive ev5 because both withFiltering + // (for the registration) and OmitInRangefeeds (for the event) are true. + require.Equal(t, []*kvpb.RangeFeedEvent{noPrev(ev1), noPrev(ev2), noPrev(ev4)}, sACFiltering.GetAndClearEvents()) + require.Nil(t, sAB.Error()) + require.Nil(t, sBC.Error()) + require.Nil(t, sCD.Error()) + require.Nil(t, sAC.Error()) + require.Nil(t, sACFiltering.Error()) + + // Check the registry's operation filter. + f := reg.NewFilter() + // Testing NeedVal. + require.True(t, f.NeedVal(spAB)) + require.True(t, f.NeedVal(spBC)) + require.True(t, f.NeedVal(spCD)) + require.True(t, f.NeedVal(spAC)) + require.False(t, f.NeedVal(spXY)) + require.True(t, f.NeedVal(roachpb.Span{Key: keyA})) + require.True(t, f.NeedVal(roachpb.Span{Key: keyB})) + require.True(t, f.NeedVal(roachpb.Span{Key: keyC})) + require.False(t, f.NeedVal(roachpb.Span{Key: keyX})) + // Testing NeedPrevVal. + require.False(t, f.NeedPrevVal(spAB)) + require.True(t, f.NeedPrevVal(spBC)) + require.True(t, f.NeedPrevVal(spCD)) + require.True(t, f.NeedPrevVal(spAC)) + require.False(t, f.NeedPrevVal(spXY)) + require.False(t, f.NeedPrevVal(roachpb.Span{Key: keyA})) + require.True(t, f.NeedPrevVal(roachpb.Span{Key: keyB})) + require.True(t, f.NeedPrevVal(roachpb.Span{Key: keyC})) + require.False(t, f.NeedPrevVal(roachpb.Span{Key: keyX})) + + // Disconnect span that overlaps with rCD. + reg.DisconnectWithErr(ctx, spCD, err1) + require.Equal(t, 4, reg.Len()) + require.Equal(t, err1.GoError(), sCD.WaitForError(t)) + + // Can still publish to rAB. + reg.PublishToOverlapping(ctx, spAB, ev4, logicalOpMetadata{}, nil /* alloc */) + reg.PublishToOverlapping(ctx, spBC, ev3, logicalOpMetadata{}, nil /* alloc */) + reg.PublishToOverlapping(ctx, spCD, ev2, logicalOpMetadata{}, nil /* alloc */) + reg.PublishToOverlapping(ctx, spAC, ev1, logicalOpMetadata{}, nil /* alloc */) + require.NoError(t, reg.waitForCaughtUp(ctx, all)) + require.Equal(t, []*kvpb.RangeFeedEvent{noPrev(ev4), noPrev(ev1)}, sAB.GetAndClearEvents()) + + // Disconnect from rAB without error. + reg.DisconnectWithErr(ctx, spAB, nil) + require.Nil(t, sAC.WaitForError(t)) + require.Nil(t, sAB.WaitForError(t)) + require.Equal(t, 1, reg.Len()) + + // Check the registry's operation filter again. + f = reg.NewFilter() + // Testing NeedVal. + require.False(t, f.NeedVal(spAB)) + require.True(t, f.NeedVal(spBC)) + require.False(t, f.NeedVal(spCD)) + require.True(t, f.NeedVal(spAC)) + require.False(t, f.NeedVal(spXY)) + require.False(t, f.NeedVal(roachpb.Span{Key: keyA})) + require.True(t, f.NeedVal(roachpb.Span{Key: keyB})) + require.False(t, f.NeedVal(roachpb.Span{Key: keyC})) + require.False(t, f.NeedVal(roachpb.Span{Key: keyX})) + // Testing NeedPrevVal. + require.False(t, f.NeedPrevVal(spAB)) + require.True(t, f.NeedPrevVal(spBC)) + require.False(t, f.NeedPrevVal(spCD)) + require.True(t, f.NeedPrevVal(spAC)) + require.False(t, f.NeedPrevVal(spXY)) + require.False(t, f.NeedPrevVal(roachpb.Span{Key: keyA})) + require.True(t, f.NeedPrevVal(roachpb.Span{Key: keyB})) + require.False(t, f.NeedPrevVal(roachpb.Span{Key: keyC})) + require.False(t, f.NeedPrevVal(roachpb.Span{Key: keyX})) + + // Unregister the rBC registration as if it was being unregistered via the + // processor. + rBC.setShouldUnregister() + reg.unregisterMarkedRegistrations(ctx) + require.Equal(t, 0, reg.Len()) + require.Equal(t, 0, int(reg.metrics.RangeFeedRegistrations.Value()), + "RangefeedRegistrations metric not zero after all registrations have been removed") + }) } func TestRegistryPublishBeneathStartTimestamp(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() - reg := makeRegistry(NewMetrics()) - - r := newTestRegistration(spAB, hlc.Timestamp{WallTime: 10}, nil, /* catchup */ - false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */) - go r.runOutputLoop(ctx, 0) - reg.Register(ctx, r.bufferedRegistration) - - // Publish a value with a timestamp beneath the registration's start - // timestamp. Should be ignored. - ev := new(kvpb.RangeFeedEvent) - ev.MustSetValue(&kvpb.RangeFeedValue{ - Value: roachpb.Value{Timestamp: hlc.Timestamp{WallTime: 5}}, - }) - reg.PublishToOverlapping(ctx, spAB, ev, logicalOpMetadata{}, nil /* alloc */) - require.NoError(t, reg.waitForCaughtUp(ctx, all)) - require.Nil(t, r.GetAndClearEvents()) - - // Publish a value with a timestamp equal to the registration's start - // timestamp. Should be ignored. - ev.MustSetValue(&kvpb.RangeFeedValue{ - Value: roachpb.Value{Timestamp: hlc.Timestamp{WallTime: 10}}, - }) - reg.PublishToOverlapping(ctx, spAB, ev, logicalOpMetadata{}, nil /* alloc */) - require.NoError(t, reg.waitForCaughtUp(ctx, all)) - require.Nil(t, r.GetAndClearEvents()) - - // Publish a checkpoint with a timestamp beneath the registration's. Should - // be delivered. - ev.MustSetValue(&kvpb.RangeFeedCheckpoint{ - Span: spAB, ResolvedTS: hlc.Timestamp{WallTime: 5}, - }) - reg.PublishToOverlapping(ctx, spAB, ev, logicalOpMetadata{}, nil /* alloc */) - require.NoError(t, reg.waitForCaughtUp(ctx, all)) - require.Equal(t, []*kvpb.RangeFeedEvent{ev}, r.GetAndClearEvents()) - r.Disconnect(nil) + testutils.RunValues(t, "registration type=", registrationTestTypes, func(t *testing.T, rt registrationType) { + reg := makeRegistry(NewMetrics()) + s := newTestStream() + r := newTestRegistration(s, withRSpan(spAB), withStartTs(hlc.Timestamp{WallTime: 10}), withRegistrationType(rt)) + go r.runOutputLoop(ctx, 0) + reg.Register(ctx, r) + + // Publish a value with a timestamp beneath the registration's start + // timestamp. Should be ignored. + ev := new(kvpb.RangeFeedEvent) + ev.MustSetValue(&kvpb.RangeFeedValue{ + Value: roachpb.Value{Timestamp: hlc.Timestamp{WallTime: 5}}, + }) + reg.PublishToOverlapping(ctx, spAB, ev, logicalOpMetadata{}, nil /* alloc */) + require.NoError(t, reg.waitForCaughtUp(ctx, all)) + require.Nil(t, s.GetAndClearEvents()) + + // Publish a value with a timestamp equal to the registration's start + // timestamp. Should be ignored. + ev.MustSetValue(&kvpb.RangeFeedValue{ + Value: roachpb.Value{Timestamp: hlc.Timestamp{WallTime: 10}}, + }) + reg.PublishToOverlapping(ctx, spAB, ev, logicalOpMetadata{}, nil /* alloc */) + require.NoError(t, reg.waitForCaughtUp(ctx, all)) + require.Nil(t, s.GetAndClearEvents()) + + // Publish a checkpoint with a timestamp beneath the registration's. Should + // be delivered. + ev.MustSetValue(&kvpb.RangeFeedCheckpoint{ + Span: spAB, ResolvedTS: hlc.Timestamp{WallTime: 5}, + }) + reg.PublishToOverlapping(ctx, spAB, ev, logicalOpMetadata{}, nil /* alloc */) + require.NoError(t, reg.waitForCaughtUp(ctx, all)) + require.Equal(t, []*kvpb.RangeFeedEvent{ev}, s.GetAndClearEvents()) + + r.Disconnect(nil) + }) } func TestRegistrationString(t *testing.T) { @@ -404,35 +471,63 @@ func TestRegistrationString(t *testing.T) { func TestRegistryShutdownMetrics(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() - reg := makeRegistry(NewMetrics()) - - regDoneC := make(chan interface{}) - r := newTestRegistration(spAB, hlc.Timestamp{WallTime: 10}, nil, /*catchup */ - false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */) - go func() { - r.runOutputLoop(ctx, 0) - close(regDoneC) - }() - reg.Register(ctx, r.bufferedRegistration) - - reg.DisconnectAllOnShutdown(ctx, nil) - <-regDoneC - require.Zero(t, reg.metrics.RangeFeedRegistrations.Value(), "metric is not zero on stop") + + testutils.RunValues(t, "registration type=", registrationTestTypes, func(t *testing.T, rt registrationType) { + reg := makeRegistry(NewMetrics()) + regDoneC := make(chan interface{}) + r := newTestRegistration(newTestStream(), withRSpan(spAB), + withStartTs(hlc.Timestamp{WallTime: 10}), withRegistrationType(rt)) + go func() { + r.runOutputLoop(ctx, 0) + close(regDoneC) + }() + reg.Register(ctx, r) + + reg.DisconnectAllOnShutdown(ctx, nil) + <-regDoneC + require.Zero(t, reg.metrics.RangeFeedRegistrations.Value(), "metric is not zero on stop") + }) } // TestBaseRegistration tests base registration implementation methods. func TestBaseRegistration(t *testing.T) { defer leaktest.AfterTest(t)() - r := newTestRegistration(spAB, hlc.Timestamp{WallTime: 10}, nil, /*catchup */ - true /* withDiff */, true /* withFiltering */, false /* withOmitRemote */) + r := newTestRegistration(newTestStream(), withRSpan(spAB), withStartTs(hlc.Timestamp{WallTime: 10}), withDiff(true), withFiltering(true)) require.Equal(t, spAB, r.getSpan()) require.Equal(t, hlc.Timestamp{WallTime: 10}, r.getCatchUpTimestamp()) r.setSpanAsKeys() - require.Equal(t, r.keys, spAB.AsRange()) - require.Equal(t, r.keys, r.Range()) + require.Equal(t, r.Range(), spAB.AsRange()) + require.Equal(t, r.Range(), r.Range()) r.setID(10) require.Equal(t, uintptr(10), r.ID()) require.True(t, r.getWithDiff()) require.True(t, r.getWithFiltering()) require.False(t, r.getWithOmitRemote()) } + +// TODO(wenyihu6): Should catch up events also get stripped events? They current'y don't. +func TestPublishStrippedEvents(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + + val := roachpb.Value{RawBytes: []byte("val"), Timestamp: hlc.Timestamp{WallTime: 1}} + prevVal := roachpb.Value{RawBytes: []byte("prevVal"), Timestamp: hlc.Timestamp{WallTime: 1}} + ev1 := new(kvpb.RangeFeedEvent) + ev1.MustSetValue(&kvpb.RangeFeedValue{Key: keyA, Value: val, PrevValue: prevVal}) + expectedEv := new(kvpb.RangeFeedEvent) + // maybeStripEvent should strip the PrevValue from the event since withDiff is + // false. + expectedEv.MustSetValue(&kvpb.RangeFeedValue{Key: keyA, Value: val}) + + err := kvpb.NewError(kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_NO_LEASEHOLDER)) + testutils.RunValues(t, "registration type=", registrationTestTypes, func(t *testing.T, rt registrationType) { + s := newTestStream() + noCatchupReg := newTestRegistration(s, withRSpan(spAB), withRegistrationType(rt), withDiff(false)) + noCatchupReg.publish(ctx, ev1, nil /* alloc */) + go noCatchupReg.runOutputLoop(ctx, 0) + require.NoError(t, noCatchupReg.waitForCaughtUp(ctx)) + require.Equal(t, []*kvpb.RangeFeedEvent{expectedEv}, s.GetAndClearEvents()) + noCatchupReg.Disconnect(err) + require.Equal(t, err, kvpb.NewError(s.Error())) + }) +} diff --git a/pkg/kv/kvserver/rangefeed/scheduled_processor.go b/pkg/kv/kvserver/rangefeed/scheduled_processor.go index e8d01d3a569d..29e25f98ea4e 100644 --- a/pkg/kv/kvserver/rangefeed/scheduled_processor.go +++ b/pkg/kv/kvserver/rangefeed/scheduled_processor.go @@ -335,10 +335,13 @@ func (p *ScheduledProcessor) Register( p.syncEventC() blockWhenFull := p.Config.EventChanTimeout == 0 // for testing + var r registration - if _, ok := stream.(BufferedStream); ok { - log.Fatalf(context.Background(), - "unimplemented: unbuffered registrations for rangefeed, see #126560") + bufferedStream, isBufferedStream := stream.(BufferedStream) + if isBufferedStream { + r = newUnbufferedRegistration( + streamCtx, span.AsRawSpanWithNoLocals(), startTS, catchUpIter, withDiff, withFiltering, withOmitRemote, + p.Config.EventChanCap, p.Metrics, bufferedStream, p.unregisterClientAsync) } else { r = newBufferedRegistration( streamCtx, span.AsRawSpanWithNoLocals(), startTS, catchUpIter, withDiff, withFiltering, withOmitRemote, @@ -374,6 +377,10 @@ func (p *ScheduledProcessor) Register( // could only happen on shutdown. Disconnect stream and just remove // registration. r.Disconnect(kvpb.NewError(err)) + // Normally, ubr.runOutputLoop is responsible for draining catch up + // buffer. If it fails to start, we should drain it here. + // TODO(wenyihu6 during code review): Double check if runOutputLoop is + // guaranteed to be invoked if err == nil here. Seems true. r.drainAllocations(ctx) } return f diff --git a/pkg/kv/kvserver/rangefeed/sender_helper_test.go b/pkg/kv/kvserver/rangefeed/sender_helper_test.go index 113f1605c0f7..1962bf195289 100644 --- a/pkg/kv/kvserver/rangefeed/sender_helper_test.go +++ b/pkg/kv/kvserver/rangefeed/sender_helper_test.go @@ -64,6 +64,8 @@ type testServerStream struct { streamEvents map[int64][]*kvpb.MuxRangeFeedEvent } +var _ ServerStreamSender = &testServerStream{} + func newTestServerStream() *testServerStream { return &testServerStream{ streamEvents: make(map[int64][]*kvpb.MuxRangeFeedEvent), @@ -76,6 +78,15 @@ func (s *testServerStream) totalEventsSent() int { return s.eventsSent } +func (s *testServerStream) getEventsByStreamID(streamID int64) (res []*kvpb.RangeFeedEvent) { + s.Lock() + defer s.Unlock() + for _, ev := range s.streamEvents[streamID] { + res = append(res, &ev.RangeFeedEvent) + } + return res +} + func (s *testServerStream) waitForEvent(t *testing.T, ev *kvpb.MuxRangeFeedEvent) { testutils.SucceedsSoon(t, func() error { if s.hasEvent(ev) { @@ -106,8 +117,28 @@ func (s *testServerStream) String() string { s.Lock() defer s.Unlock() var str strings.Builder + fmt.Fprintf(&str, "Total Streams Sent: %d\n", len(s.streamEvents)) for streamID, eventList := range s.streamEvents { - fmt.Fprintf(&str, "StreamID:%d, Len:%d\n", streamID, len(eventList)) + fmt.Fprintf(&str, "\tStreamID:%d, Len:%d", streamID, len(eventList)) + for _, ev := range eventList { + switch { + case ev.Val != nil: + fmt.Fprintf(&str, "\t\tvalue") + case ev.Checkpoint != nil: + fmt.Fprintf(&str, "\t\tcheckpoint") + case ev.SST != nil: + fmt.Fprintf(&str, "\t\tsst") + case ev.DeleteRange != nil: + fmt.Fprintf(&str, "\t\tdelete") + case ev.Metadata != nil: + fmt.Fprintf(&str, "\t\tmetadata") + case ev.Error != nil: + fmt.Fprintf(&str, "\t\terror") + default: + panic("unknown event type") + } + } + fmt.Fprintf(&str, "\n") } return str.String() } @@ -165,6 +196,16 @@ func (s *testServerStream) waitForEventCount(t *testing.T, count int) { }) } +func (s *testServerStream) iterateEventsByStreamID( + f func(id int64, events []*kvpb.MuxRangeFeedEvent), +) { + s.Lock() + defer s.Unlock() + for id, v := range s.streamEvents { + f(id, v) + } +} + type cancelCtxDisconnector struct { mu struct { syncutil.Mutex diff --git a/pkg/kv/kvserver/rangefeed/stream_manager_test.go b/pkg/kv/kvserver/rangefeed/stream_manager_test.go index 7387ba42693b..201d3323476c 100644 --- a/pkg/kv/kvserver/rangefeed/stream_manager_test.go +++ b/pkg/kv/kvserver/rangefeed/stream_manager_test.go @@ -179,6 +179,8 @@ func TestStreamManagerErrorHandling(t *testing.T) { switch rt { case scheduledProcessorWithUnbufferedSender: s = NewUnbufferedSender(testServerStream) + case scheduledProcessorWithBufferedSender: + s = NewBufferedSender(testServerStream) default: t.Fatalf("unknown rangefeed test type %v", rt) } diff --git a/pkg/kv/kvserver/rangefeed/unbuffered_registration.go b/pkg/kv/kvserver/rangefeed/unbuffered_registration.go new file mode 100644 index 000000000000..e10fb42bdf0b --- /dev/null +++ b/pkg/kv/kvserver/rangefeed/unbuffered_registration.go @@ -0,0 +1,400 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package rangefeed + +import ( + "context" + "sync" + "time" + + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" +) + +// unbufferedRegistration is similar to bufferedRegistration but do not +// internally buffer events delivered via publish(). Rather, when caught up, it +// forwards events directly to the BufferedStream. It assumes that +// BufferedStream's SendBuffered method is non-blocking. As a result, it does +// not require a long-lived goroutine to move event between an internal buffer +// and the sender. +// +// Note that UnbufferedRegistration needs to ensure that events sent to +// BufferedStream from the processor during a running catchup scan are sent to +// the client in order. To achieve this, events from catch-up scans are sent +// using UnbufferedSend. While the catch-up scan is ongoing, live updates +// delivered via publish() are temporarily buffered in catchUpBuf to hold them +// until the catch-up scan is complete. After the catch-up scan is done, we know +// that catch-up scan events have been sent to grpc stream successfully. +// Catch-up buffer is then drained by our hopefully short-lived output loop +// goroutine. Once the catchup buffer is fully drained, publish begins sending to +// the buffered sender directly. unbufferedRegistration is responsible for +// correctly handling memory reservations for data stored in the catchUpBuf. +// +// All errors are delivered to this registration via Disonnect() which is +// non-blocking. Disconnect sends an error to the stream and invokes any +// necessary cleanup. +// +// unbufferedRegistration is responsible for allocating and draining memory +// catch-up buffer. +type unbufferedRegistration struct { + // Input. + baseRegistration + metrics *Metrics + + // Output. + stream BufferedStream + + // Internal. + mu struct { + sync.Locker + + // True if this catchUpBuf has overflowed, live raft events are dropped. + // This will cause the registration to disconnect with an error once + // catch-up scan is done. Once catchUpOverflowed is true, it will always be + // true. + catchUpOverflowed bool + + // catchUpBuf hold events published to this registration while the catch up + // scan is running. It is set to nil once it has been drained (because + // either catch-up scan succeeded or failed). If no catch-up iter is + // provided, catchUpBuf will be nil from the point of initialization. + catchUpBuf chan *sharedEvent + + // catchUpScanCancelFn is called to tear down the goroutine responsible for + // the catch-up scan. Can be called more than once. + catchUpScanCancelFn func() + + // disconnected indicates that the registration is marked for disconnection + // and disconnection is taking place. Once disconnected is true, events sent + // via publish() are ignored. + disconnected bool + + // catchUpIter is created by replica under raftMu lock when registration is + // created. It is set to nil by output loop for processing and closed when + // done. + catchUpIter *CatchUpIterator + + // Used for testing only. Indicates that all events in catchUpBuf have been + // sent to BufferedStream. + caughtUp bool + } +} + +var _ registration = (*unbufferedRegistration)(nil) + +func newUnbufferedRegistration( + streamCtx context.Context, + span roachpb.Span, + startTS hlc.Timestamp, + catchUpIter *CatchUpIterator, + withDiff bool, + withFiltering bool, + withOmitRemote bool, + bufferSz int, + metrics *Metrics, + stream BufferedStream, + removeRegFromProcessor func(registration), +) *unbufferedRegistration { + br := &unbufferedRegistration{ + baseRegistration: baseRegistration{ + streamCtx: streamCtx, + span: span, + catchUpTimestamp: startTS, + withDiff: withDiff, + withFiltering: withFiltering, + withOmitRemote: withOmitRemote, + removeRegFromProcessor: removeRegFromProcessor, + }, + metrics: metrics, + stream: stream, + } + br.mu.Locker = &syncutil.Mutex{} + br.mu.catchUpIter = catchUpIter + br.mu.caughtUp = true + if br.mu.catchUpIter != nil { + // A nil catchUpIter indicates we don't need a catch-up scan. We avoid + // initializing catchUpBuf in this case, which will result in publish() + // sending all events to the underlying stream immediately. + br.mu.catchUpBuf = make(chan *sharedEvent, bufferSz) + } + return br +} + +// publish sends a single event to this registration. It is called by the +// processor if the event overlaps the span this registration is interested in. +// Events are either stored in catchUpBuf or sent to BufferedStream directly, +// depending on whether catch-up scan is done. publish is responsible for using +// and releasing alloc. +func (ubr *unbufferedRegistration) publish( + ctx context.Context, event *kvpb.RangeFeedEvent, alloc *SharedBudgetAllocation, +) { + ubr.mu.Lock() + defer ubr.mu.Unlock() + if ubr.mu.disconnected || ubr.mu.catchUpOverflowed { + return + } + + ubr.assertEvent(ctx, event) + strippedEvent := ubr.maybeStripEvent(ctx, event) + + // Disconnected or catchUpOverflowed is not set and catchUpBuf + // is nil. Safe to send to underlying stream. + if ubr.mu.catchUpBuf == nil { + if err := ubr.stream.SendBuffered(strippedEvent, alloc); err != nil { + // Disconnect here for testing purposes only: there are test stream + // implementations that inject errors without calling disconnect. For + // production code, we expect buffered sender to shut down all + // registrations. + ubr.disconnectLocked(kvpb.NewError(err)) + } + } else { + // catchUpBuf is set, put event into the catchUpBuf if + // there is room. + e := getPooledSharedEvent(sharedEvent{event: strippedEvent, alloc: alloc}) + alloc.Use(ctx) + select { + case ubr.mu.catchUpBuf <- e: + ubr.mu.caughtUp = false + default: + // catchUpBuf exceeded and we are dropping this event. A catch up scan is + // needed later. + ubr.mu.catchUpOverflowed = true + e.alloc.Release(ctx) + putPooledSharedEvent(e) + } + } +} + +// Disconnect is called to shut down the registration. It is safe to run +// multiple times, but subsequent errors are discarded. It is invoked by both +// the processor in response to errors from the replica and by the StreamManager +// in response to shutdowns. +func (ubr *unbufferedRegistration) Disconnect(pErr *kvpb.Error) { + ubr.mu.Lock() + defer ubr.mu.Unlock() + ubr.disconnectLocked(pErr) +} + +func (ubr *unbufferedRegistration) disconnectLocked(pErr *kvpb.Error) { + if ubr.mu.disconnected { + return + } + if ubr.mu.catchUpIter != nil { + // Catch-up scan hasn't started yet. + ubr.mu.catchUpIter.Close() + ubr.mu.catchUpIter = nil + } + if ubr.mu.catchUpScanCancelFn != nil { + ubr.mu.catchUpScanCancelFn() + } + ubr.mu.disconnected = true + + // SendError cleans up metrics and sends error back to client without + // blocking. + ubr.stream.SendError(pErr) + // Clean up unregisters registration from processor async. + ubr.removeRegFromProcessor(ubr) +} + +// IsDisconnected returns true if the registration is disconnected. +func (ubr *unbufferedRegistration) IsDisconnected() bool { + ubr.mu.Lock() + defer ubr.mu.Unlock() + return ubr.mu.disconnected +} + +// runOutputLoop is run in a goroutine. It is short-lived and responsible for +// 1. running catch-up scan +// 2. publishing/discarding catch-up buffer after catch-up scan is done. +// +// runOutputLoop runs a catch-up scan if required and then moves any events in +// catchUpBuf from the buffer into the sender. +// +// It is expected to be relatively short-lived. +// +// Once the catchUpBuf is drained, it will be set to nil, indicating to publish +// that it is now safe to deliver events directly to the sender. +func (ubr *unbufferedRegistration) runOutputLoop(ctx context.Context, forStacks roachpb.RangeID) { + ubr.mu.Lock() + // Noop if publishCatchUpBuffer below returns no error. + defer ubr.drainAllocations(ctx) + + if ubr.mu.disconnected { + ubr.mu.Unlock() + return + } + + ctx, ubr.mu.catchUpScanCancelFn = context.WithCancel(ctx) + ubr.mu.Unlock() + + if err := ubr.maybeRunCatchUpScan(ctx); err != nil { + // Important to disconnect before draining otherwise publish() might + // interpret this as a successful catch-up scan and send events to the + // underlying stream directly. + ubr.Disconnect(kvpb.NewError(errors.Wrap(err, "catch-up scan failed"))) + return + } + + if err := ubr.publishCatchUpBuffer(ctx); err != nil { + // Important to disconnect before draining otherwise publish() might + // interpret this as a successful catch-up scan and send events to the + // underlying stream directly. + ubr.Disconnect(kvpb.NewError(err)) + return + } + // Success: publishCatchUpBuffer should have drained and set catchUpBuf to nil + // when it succeeds. +} + +// drainAllocations drains catchUpBuf and release all memory held by the events +// in catch-up buffer without publishing them. It is safe to assume that +// catch-up buffer is empty and nil after this. +func (ubr *unbufferedRegistration) drainAllocations(ctx context.Context) { + ubr.mu.Lock() + defer ubr.mu.Unlock() + // TODO(wenyihu6): Check if we can just discard without holding the lock first + // ? We shouldn't be reading from the buffer at the same time + if ubr.mu.catchUpBuf == nil { + // Already drained. + return + } + + func() { + for { + select { + case e := <-ubr.mu.catchUpBuf: + e.alloc.Release(ctx) + putPooledSharedEvent(e) + default: + // Done. + return + } + } + }() + + ubr.mu.catchUpBuf = nil + ubr.mu.caughtUp = true +} + +// publishCatchUpBuffer drains catchUpBuf and release all memory held by the +// events in catch-up buffer while publishing them. Note that +// drainAllocations should never be called concurrently with this function. +// Caller is responsible for draining it again if error is returned. +func (ubr *unbufferedRegistration) publishCatchUpBuffer(ctx context.Context) error { + publish := func() error { + for { + select { + case e := <-ubr.mu.catchUpBuf: + err := ubr.stream.SendBuffered(e.event, e.alloc) + e.alloc.Release(ctx) + putPooledSharedEvent(e) + if err != nil { + return err + } + case <-ctx.Done(): + return ctx.Err() + default: + // Done. + return nil + } + } + } + + // Drain without holding locks first to avoid unnecessary blocking on publish(). + if err := publish(); err != nil { + return err + } + + ubr.mu.Lock() + defer ubr.mu.Unlock() + + // Drain again with lock held to ensure that events added to the buffer while + // draining took place are also published. + if err := publish(); err != nil { + return err + } + + // Even if the catch-up buffer has overflowed, all events in it should still + // be published. Important to disconnect before setting catchUpBuf to nil. + if ubr.mu.catchUpOverflowed { + return newRetryErrBufferCapacityExceeded() + } + + // Success. + ubr.mu.catchUpBuf = nil + ubr.mu.caughtUp = true + return nil +} + +// detachCatchUpIter detaches the catchUpIter that was previously attached. +func (ubr *unbufferedRegistration) detachCatchUpIter() *CatchUpIterator { + ubr.mu.Lock() + defer ubr.mu.Unlock() + catchUpIter := ubr.mu.catchUpIter + ubr.mu.catchUpIter = nil + return catchUpIter +} + +// maybeRunCatchUpScan runs the catch-up scan if catchUpIter is not nil. It +// promises to close catchUpIter once detached. It returns an error if catch-up +// scan fails. Note that catch up scan bypasses BufferedStream and are sent to +// the underlying stream directly. +func (ubr *unbufferedRegistration) maybeRunCatchUpScan(ctx context.Context) error { + catchUpIter := ubr.detachCatchUpIter() + if catchUpIter == nil { + return nil + } + start := timeutil.Now() + defer func() { + catchUpIter.Close() + ubr.metrics.RangeFeedCatchUpScanNanos.Inc(timeutil.Since(start).Nanoseconds()) + }() + + return catchUpIter.CatchUpScan(ctx, ubr.stream.SendUnbuffered, ubr.withDiff, ubr.withFiltering, + ubr.withOmitRemote) +} + +// Used for testing only. +func (ubr *unbufferedRegistration) getBuf() chan *sharedEvent { + ubr.mu.Lock() + defer ubr.mu.Unlock() + return ubr.mu.catchUpBuf +} + +// Used for testing only. +func (ubr *unbufferedRegistration) getOverflowed() bool { + ubr.mu.Lock() + defer ubr.mu.Unlock() + return ubr.mu.catchUpOverflowed +} + +// Used for testing only. Wait for this registration to completely drain its +// catchUpBuf. +func (ubr *unbufferedRegistration) waitForCaughtUp(ctx context.Context) error { + opts := retry.Options{ + InitialBackoff: 5 * time.Millisecond, + Multiplier: 2, + MaxBackoff: 10 * time.Second, + MaxRetries: 50, + } + for re := retry.StartWithCtx(ctx, opts); re.Next(); { + ubr.mu.Lock() + caughtUp := len(ubr.mu.catchUpBuf) == 0 && ubr.mu.caughtUp + ubr.mu.Unlock() + if caughtUp { + return nil + } + } + if err := ctx.Err(); err != nil { + return err + } + return errors.Errorf("unbufferedRegistration %v failed to empty in time", ubr.Range()) +} diff --git a/pkg/kv/kvserver/rangefeed/unbuffered_registration_test.go b/pkg/kv/kvserver/rangefeed/unbuffered_registration_test.go new file mode 100644 index 000000000000..5d439312f8c4 --- /dev/null +++ b/pkg/kv/kvserver/rangefeed/unbuffered_registration_test.go @@ -0,0 +1,371 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package rangefeed + +import ( + "context" + "fmt" + "sync" + "testing" + + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +// TestUnbufferedRegOnConcurrentDisconnect tests that BufferedSender can handle +// concurrent stream disconnects. +func TestUnbufferedRegWithStreamManager(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + p, h, stopper := newTestProcessor(t, withRangefeedTestType(scheduledProcessorWithBufferedSender)) + defer stopper.Stop(ctx) + testServerStream := newTestServerStream() + testRangefeedCounter := newTestRangefeedCounter() + bs := NewBufferedSender(testServerStream) + sm := NewStreamManager(bs, testRangefeedCounter) + require.NoError(t, sm.Start(ctx, stopper)) + + const r1 = 1 + t.Run("no events sent before registering streams", func(t *testing.T) { + p.ConsumeLogicalOps(ctx, writeValueOp(hlc.Timestamp{WallTime: 1})) + require.Equal(t, 0, testServerStream.totalEventsSent()) + }) + t.Run("register 50 streams", func(t *testing.T) { + for id := int64(0); id < 50; id++ { + registered, d, _ := p.Register(ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */ + false /* withDiff */, false /* withFiltering */, false, /* withOmitRemote */ + sm.NewStream(id, r1)) + require.True(t, registered) + sm.AddStream(id, d) + } + require.Equal(t, 50, testRangefeedCounter.get()) + require.Equal(t, 50, p.Len()) + }) + t.Run("publish a 0-valued checkpoint to signal catch-up completion", func(t *testing.T) { + testServerStream.waitForEventCount(t, 50) + testServerStream.iterateEventsByStreamID(func(_ int64, events []*kvpb.MuxRangeFeedEvent) { + require.Equal(t, 1, len(events)) + require.NotNil(t, events[0].Checkpoint) + }) + }) + testServerStream.reset() + t.Run("publish 20 logical ops to 50 registrations", func(t *testing.T) { + for i := 0; i < 20; i++ { + p.ConsumeLogicalOps(ctx, writeValueOp(hlc.Timestamp{WallTime: 1})) + } + testServerStream.waitForEventCount(t, 20*50) + testServerStream.iterateEventsByStreamID(func(_ int64, events []*kvpb.MuxRangeFeedEvent) { + require.Equal(t, 20, len(events)) + require.NotNil(t, events[0].RangeFeedEvent.Val) + }) + }) + + t.Run("disconnect 50 streams concurrently", func(t *testing.T) { + var wg sync.WaitGroup + for id := int64(0); id < 50; id++ { + wg.Add(1) + go func(id int64) { + defer wg.Done() + sm.DisconnectStream(id, kvpb.NewError(fmt.Errorf("disconnection error"))) + }(id) + } + wg.Wait() + require.NoError(t, bs.waitForEmptyBuffer(ctx)) + testRangefeedCounter.waitForRangefeedCount(t, 0) + testutils.SucceedsSoon(t, func() error { + if p.Len() == 0 { + return nil + } + return errors.Newf("expected 0 registrations, found %d", p.Len()) + }) + }) +} + +// TestUnbufferedRegOnDisconnect tests that BufferedSender can handle +// disconnects properly and send events in the correct order. +func TestUnbufferedRegCorrectnessOnDisconnect(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + p, h, stopper := newTestProcessor(t, withRangefeedTestType(scheduledProcessorWithBufferedSender)) + defer stopper.Stop(ctx) + testServerStream := newTestServerStream() + testRangefeedCounter := newTestRangefeedCounter() + bs := NewBufferedSender(testServerStream) + sm := NewStreamManager(bs, testRangefeedCounter) + require.NoError(t, sm.Start(ctx, stopper)) + defer sm.Stop(ctx) + + startTs := hlc.Timestamp{WallTime: 4} + span := roachpb.Span{Key: roachpb.Key("d"), EndKey: roachpb.Key("w")} + catchUpIter := newTestIterator(keyValues, roachpb.Key("w")) + const r1 = 1 + const s1 = 1 + + key := roachpb.Key("d") + val1 := roachpb.Value{RawBytes: []byte("val1"), Timestamp: hlc.Timestamp{WallTime: 5}} + val2 := roachpb.Value{RawBytes: []byte("val2"), Timestamp: hlc.Timestamp{WallTime: 6}} + op1 := writeValueOpWithKV(key, val1.Timestamp, val1.RawBytes) + op2 := writeValueOpWithKV(key, val2.Timestamp, val2.RawBytes) + ev1, ev2 := new(kvpb.RangeFeedEvent), new(kvpb.RangeFeedEvent) + ev1.MustSetValue(&kvpb.RangeFeedValue{Key: key, Value: val1}) + ev2.MustSetValue(&kvpb.RangeFeedValue{Key: key, Value: val2}) + + // Add checkpoint event + checkpointEvent := &kvpb.RangeFeedEvent{} + checkpointEvent.MustSetValue(&kvpb.RangeFeedCheckpoint{ + Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("z")}, + }) + + discErr := kvpb.NewError(fmt.Errorf("disconnection error")) + evErr := &kvpb.RangeFeedEvent{} + evErr.MustSetValue(&kvpb.RangeFeedError{Error: *discErr}) + + // Register one stream. + registered, d, _ := p.Register(ctx, h.span, startTs, + makeCatchUpIterator(catchUpIter, span, startTs), /* catchUpIter */ + true /* withDiff */, false /* withFiltering */, false, /* withOmitRemote */ + sm.NewStream(s1, r1)) + sm.AddStream(s1, d) + require.True(t, registered) + require.Equal(t, 1, testRangefeedCounter.get()) + + // Publish two real live events to the stream. + p.ConsumeLogicalOps(ctx, op1) + p.ConsumeLogicalOps(ctx, op2) + + catchUpEvents := expEvents(false) + // catch up events + checkpoint + ev1 + evv2 + testServerStream.waitForEventCount(t, len(catchUpEvents)+1+2) + + // Disconnection error should be buffered and sent after the events. + p.DisconnectSpanWithErr(spBC, discErr) + testServerStream.waitForEventCount(t, len(catchUpEvents)+1+2+1) + + numExpectedEvents := len(catchUpEvents) + 4 + expectedEvents := make([]*kvpb.RangeFeedEvent, numExpectedEvents) + // Copy catch-up events into expected events + copy(expectedEvents, catchUpEvents) + // Add checkpoint event. + expectedEvents[len(catchUpEvents)] = checkpointEvent + // Add disconnection error. + expectedEvents[len(catchUpEvents)+1] = ev1 + expectedEvents[len(catchUpEvents)+2] = ev2 + expectedEvents[len(catchUpEvents)+3] = evErr + + require.Equal(t, testServerStream.getEventsByStreamID(s1), expectedEvents) + testRangefeedCounter.waitForRangefeedCount(t, 0) +} + +// TestCatchUpBufDrain tests that the catchUpBuf is drained after all events are +// sent or after being disconnected. +func TestUnbufferedRegWithCatchUpBufCleanUpAfterRunOutputLoop(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + rng, _ := randutil.NewTestRand() + ev1 := new(kvpb.RangeFeedEvent) + val := roachpb.Value{RawBytes: []byte("val"), Timestamp: hlc.Timestamp{WallTime: 1}} + ev1.MustSetValue(&kvpb.RangeFeedValue{Key: keyA, Value: val}) + numReg := rng.Intn(1000) + regs := make([]*unbufferedRegistration, numReg) + + for i := 0; i < numReg; i++ { + s := newTestStream() + iter := newTestIterator(keyValues, roachpb.Key("w")) + catchUpReg := newTestRegistration(s, withRSpan(spAB), withRegistrationType(unbuffered), + withDiff(false), withCatchUpIter(iter)).(*unbufferedRegistration) + catchUpReg.publish(ctx, ev1, nil /* alloc */) + go catchUpReg.runOutputLoop(ctx, 0) + regs[i] = catchUpReg + } + + // For each registration, publish events (higher chance) and disconnect + // randomly. + for j := 0; j < numReg; j++ { + if rng.Intn(5) < 4 { + for i := 0; i < 100; i++ { + regs[j].publish(ctx, ev1, nil /* alloc */) + } + } else { + regs[j].Disconnect(kvpb.NewError(nil)) + } + } + + // All registrations should be caught up or disconnected. CatchUpBuf should + // be emptied. + for _, reg := range regs { + require.NoError(t, reg.waitForCaughtUp(ctx)) + require.Nil(t, reg.getBuf()) + } +} + +// TestUnbufferedReg tests for unbuffered registrations specifically. Note that +// a lot of tests are already covered in registry_test.go. +// TODO(wenyihu6): figure out better tests for this (add memory accounting +// tests here as well, add specific tests for draining, add a testing hook +// specific to catch up switch over point) +func TestUnbufferedRegOnCatchUpSwitchOver(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + + val1 := roachpb.Value{RawBytes: []byte("val"), Timestamp: hlc.Timestamp{WallTime: 1}} + val2 := roachpb.Value{RawBytes: []byte("val"), Timestamp: hlc.Timestamp{WallTime: 5}} + ev1, ev2, ev3, ev4, ev5 := new(kvpb.RangeFeedEvent), new(kvpb.RangeFeedEvent), + new(kvpb.RangeFeedEvent), new(kvpb.RangeFeedEvent), new(kvpb.RangeFeedEvent) + ev1.MustSetValue(&kvpb.RangeFeedValue{Key: keyA, Value: val1}) + ev2.MustSetValue(&kvpb.RangeFeedValue{Key: keyB, Value: val1}) + ev3.MustSetValue(&kvpb.RangeFeedValue{Key: keyC, Value: val2}) + ev4.MustSetValue(&kvpb.RangeFeedCheckpoint{ + Span: roachpb.Span{ + Key: roachpb.Key("d"), + EndKey: roachpb.Key("w")}, + ResolvedTS: hlc.Timestamp{WallTime: 5}, + }) + ev5.MustSetValue(&kvpb.RangeFeedDeleteRange{ + Span: roachpb.Span{ + Key: roachpb.Key("d"), + EndKey: roachpb.Key("w")}, + Timestamp: hlc.Timestamp{WallTime: 6}, + }) + t.Run("disconnect before catch up scan starts", func(t *testing.T) { + s := newTestStream() + iter := newTestIterator(keyValues, roachpb.Key("w")) + catchUpReg := newTestRegistration(s, withRSpan(spAB), withRegistrationType(unbuffered), + withCatchUpIter(iter)).(*unbufferedRegistration) + catchUpReg.publish(ctx, ev1, nil /* alloc */) + catchUpReg.Disconnect(kvpb.NewError(nil)) + require.Nil(t, catchUpReg.mu.catchUpIter) + // Catch up scan should not be initiated. + go catchUpReg.runOutputLoop(ctx, 0) + require.NoError(t, catchUpReg.waitForCaughtUp(ctx)) + require.Nil(t, catchUpReg.mu.catchUpIter) + // No events should be sent since the registration has catch up buffer and + // is disconnected before catch up scan starts. + require.Nil(t, s.GetAndClearEvents()) + // Repeatedly disconnect should be fine. + catchUpReg.Disconnect(kvpb.NewError(nil)) + }) + t.Run("disconnect after maybeRunCatchUpScan and before publishCatchUpBuffer", + func(t *testing.T) { + s := newTestStream() + iter := newTestIterator(keyValues, roachpb.Key("w")) + catchUpReg := newTestRegistration(s, withRSpan(roachpb.Span{ + Key: roachpb.Key("d"), + EndKey: roachpb.Key("w"), + }), withStartTs(hlc.Timestamp{WallTime: 4}), withCatchUpIter(iter), withDiff(true), + withRegistrationType(unbuffered)).(*unbufferedRegistration) + for i := 0; i < 10000; i++ { + catchUpReg.publish(ctx, ev1, nil /* alloc */) + } + // No events should be sent since the registration has catch up buffer. + require.Nil(t, s.GetAndClearEvents()) + ctx, catchUpReg.mu.catchUpScanCancelFn = context.WithCancel(ctx) + require.NoError(t, catchUpReg.maybeRunCatchUpScan(ctx)) + // Disconnected before catch up overflowed. + catchUpReg.Disconnect(kvpb.NewError(nil)) + require.Equal(t, context.Canceled, catchUpReg.publishCatchUpBuffer(ctx)) + require.NotNil(t, catchUpReg.mu.catchUpBuf) + require.True(t, catchUpReg.mu.catchUpOverflowed) + catchUpReg.drainAllocations(ctx) + require.Nil(t, catchUpReg.mu.catchUpBuf) + }) + t.Run("disconnect during publishCatchUpBuffer", + func(t *testing.T) { + s := newTestStream() + iter := newTestIterator(keyValues, roachpb.Key("w")) + catchUpReg := newTestRegistration(s, withRSpan(roachpb.Span{ + Key: roachpb.Key("d"), + EndKey: roachpb.Key("w"), + }), withStartTs(hlc.Timestamp{WallTime: 4}), withCatchUpIter(iter), withDiff(true), + withRegistrationType(unbuffered)).(*unbufferedRegistration) + for i := 0; i < 100000; i++ { + catchUpReg.publish(ctx, ev1, nil /* alloc */) + } + // No events should be sent since the registration has catch up buffer. + require.Nil(t, s.GetAndClearEvents()) + ctx, catchUpReg.mu.catchUpScanCancelFn = context.WithCancel(ctx) + require.NoError(t, catchUpReg.maybeRunCatchUpScan(ctx)) + go func() { + _ = catchUpReg.publishCatchUpBuffer(ctx) + }() + catchUpReg.Disconnect(kvpb.NewError(nil)) + require.NotNil(t, catchUpReg.mu.catchUpBuf) + require.True(t, catchUpReg.mu.catchUpOverflowed) + catchUpReg.drainAllocations(ctx) + require.Nil(t, catchUpReg.mu.catchUpBuf) + }) + + t.Run("disconnect after publishCatchUpBuffer", + func(t *testing.T) { + s := newTestStream() + iter := newTestIterator(keyValues, roachpb.Key("w")) + catchUpReg := newTestRegistration(s, withRSpan(roachpb.Span{ + Key: roachpb.Key("d"), + EndKey: roachpb.Key("w"), + }), withStartTs(hlc.Timestamp{WallTime: 4}), withCatchUpIter(iter), withDiff(true), + withRegistrationType(unbuffered)).(*unbufferedRegistration) + for i := 0; i < 100000; i++ { + catchUpReg.publish(ctx, ev1, nil /* alloc */) + } + // No events should be sent since the registration has catch up buffer. + require.Nil(t, s.GetAndClearEvents()) + ctx, catchUpReg.mu.catchUpScanCancelFn = context.WithCancel(ctx) + require.NoError(t, catchUpReg.maybeRunCatchUpScan(ctx)) + require.Error(t, newRetryErrBufferCapacityExceeded(), catchUpReg.publishCatchUpBuffer(ctx)) + catchUpReg.Disconnect(kvpb.NewError(newRetryErrBufferCapacityExceeded())) + require.NotNil(t, catchUpReg.mu.catchUpBuf) + require.True(t, catchUpReg.mu.catchUpOverflowed) + catchUpReg.drainAllocations(ctx) + require.Nil(t, catchUpReg.mu.catchUpBuf) + }) + t.Run("disconnect after runOutputLoop", func(t *testing.T) { + // Run a catch-up scan for a registration over a test + // iterator with the following keys. + s := newTestStream() + ctx := context.Background() + iter := newTestIterator(keyValues, roachpb.Key("w")) + r := newTestRegistration(s, withRSpan(roachpb.Span{ + Key: roachpb.Key("d"), + EndKey: roachpb.Key("w"), + }), withStartTs(hlc.Timestamp{WallTime: 4}), withCatchUpIter(iter), withDiff(true), + withRegistrationType(unbuffered)).(*unbufferedRegistration) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + r.runOutputLoop(ctx, 0) + }() + capOfBuf := cap(r.mu.catchUpBuf) + r.publish(ctx, ev1, &SharedBudgetAllocation{refCount: 1}) + r.publish(ctx, ev2, &SharedBudgetAllocation{refCount: 1}) + r.publish(ctx, ev3, &SharedBudgetAllocation{refCount: 1}) + r.publish(ctx, ev4, &SharedBudgetAllocation{refCount: 1}) + r.publish(ctx, ev5, &SharedBudgetAllocation{refCount: 1}) + catchUpEvents := expEvents(false) + wg.Wait() + + require.False(t, r.getOverflowed()) + require.Nil(t, r.getBuf()) + s.waitForEventCount(t, capOfBuf+len(catchUpEvents)) + require.Equal(t, + []*kvpb.RangeFeedEvent{ev1, ev2, ev3, ev4, ev5}, s.mu.events[len(catchUpEvents):]) + }) +}