diff --git a/pkg/storage/rangefeed/processor.go b/pkg/storage/rangefeed/processor.go index d26f07137b73..d46ca9ac7b1c 100644 --- a/pkg/storage/rangefeed/processor.go +++ b/pkg/storage/rangefeed/processor.go @@ -42,14 +42,7 @@ const ( // newErrBufferCapacityExceeded creates an error that is returned to subscribers // if the rangefeed processor is not able to keep up with the flow of incoming -// events and is forced to drop events in order to not block. The error is -// usually associated with a slow consumer. -// TODO(nvanbenschoten): currently a single slow consumer can cause all -// rangefeeds on a Range to be shut down with this error. We should work -// on isolating buffering for individual consumers so that a slow consumer -// only affects itself. One idea for this is to replace Stream.Send with -// Stream.SendAsync and give each stream its own individual buffer. If -// an individual stream is unable to keep up, to should fail on its own. +// events and is forced to drop events in order to not block. func newErrBufferCapacityExceeded() *roachpb.Error { return roachpb.NewErrorf("rangefeed: buffer capacity exceeded due to slow consumer") } @@ -117,7 +110,7 @@ type Processor struct { rts resolvedTimestamp regC chan registration - catchUpC chan catchUpResult + unregC chan *registration lenReqC chan struct{} lenResC chan int eventC chan event @@ -125,13 +118,6 @@ type Processor struct { stoppedC chan struct{} } -// catchUpResult is delivered to the Processor goroutine when a catch up scan -// for a new registration has completed. -type catchUpResult struct { - r *registration - pErr *roachpb.Error -} - // event is a union of different event types that the Processor goroutine needs // to be informed of. It is used so that all events can be sent over the same // channel, which is necessary to prevent reordering. @@ -140,6 +126,11 @@ type event struct { ct hlc.Timestamp initRTS bool syncC chan struct{} + // This setting is used in conjunction with syncC in tests in order to ensure + // that all registrations have fully finished outputting their buffers. This + // has to be done by the processor in order to avoid race conditions with the + // registry. Should be used only in tests. + testRegCatchupSpan roachpb.Span } // NewProcessor creates a new rangefeed Processor. The corresponding goroutine @@ -153,7 +144,7 @@ func NewProcessor(cfg Config) *Processor { rts: makeResolvedTimestamp(), regC: make(chan registration), - catchUpC: make(chan catchUpResult), + unregC: make(chan *registration), lenReqC: make(chan struct{}), lenResC: make(chan int), eventC: make(chan event, cfg.EventChanCap), @@ -200,13 +191,9 @@ func (p *Processor) Start(stopper *stop.Stopper, rtsIter engine.SimpleIterator) defer txnPushTicker.Stop() } - // checkStreamsTicker periodically checks whether any streams have - // disconnected. If so, the registration is unregistered. - checkStreamsTicker := time.NewTicker(p.CheckStreamsInterval) - defer checkStreamsTicker.Stop() - for { select { + // Handle new registrations. case r := <-p.regC: if !p.Span.AsRawSpanWithNoLocals().Contains(r.span) { @@ -216,21 +203,28 @@ func (p *Processor) Start(stopper *stop.Stopper, rtsIter engine.SimpleIterator) // Add the new registration to the registry. p.reg.Register(&r) - // Launch an async catch-up scan for the new registration. - // Ignore error if quiescing. - if r.catchUpIter != nil { - catchUp := newCatchUpScan(p, &r) - err := stopper.RunAsyncTask(ctx, "rangefeed: catch-up scan", catchUp.Run) - if err != nil { - catchUp.Cancel() + // Immediately publish a checkpoint event to the registry. This will be + // the first event published to this registration after its initial + // catch-up scan completes. + r.publish(p.newCheckpointEvent()) + + // Run an output loop for the registry. + runOutputLoop := func(ctx context.Context) { + r.runOutputLoop(ctx) + select { + case p.unregC <- &r: + case <-p.stoppedC: } - } else { - p.handleCatchUpScanRes(ctx, catchUpResult{r: &r}) + } + if err := stopper.RunAsyncTask(ctx, "rangefeed: output loop", runOutputLoop); err != nil { + r.disconnect(roachpb.NewError(err)) + p.reg.Unregister(&r) } - // React to registrations finishing their catch up scan. - case res := <-p.catchUpC: - p.handleCatchUpScanRes(ctx, res) + // Respond to unregistration requests; these come from registrations that + // encounter an error during their output loop. + case r := <-p.unregC: + p.reg.Unregister(r) // Respond to answers about the processor goroutine state. case <-p.lenReqC: @@ -285,10 +279,6 @@ func (p *Processor) Start(stopper *stop.Stopper, rtsIter engine.SimpleIterator) txnPushTickerC = txnPushTicker.C txnPushAttemptC = nil - // Check whether any streams have disconnected. - case <-checkStreamsTicker.C: - p.reg.CheckStreams() - // Close registrations and exit when signaled. case pErr := <-p.stopC: p.reg.DisconnectWithErr(all, pErr) @@ -340,17 +330,14 @@ func (p *Processor) sendStop(pErr *roachpb.Error) { // events that are consumed concurrently with this call. The channel will be // provided an error when the registration closes. // -// The provided iterator is used to catch the registration up from its starting -// timestamp with value events for all committed values. It must obey the -// contract of an iterator used for a catchUpScan. The Processor promises to -// clean up the iterator by calling its Close method when it is finished. If the -// iterator is nil then no catch-up scan will be performed. +// The optionally provided "catch-up" iterator is used to read changes from the +// engine which occurred after the provided start timestamp. // // NOT safe to call on nil Processor. func (p *Processor) Register( span roachpb.RSpan, startTS hlc.Timestamp, - catchUpIter engine.SimpleIterator, + catchupIter engine.SimpleIterator, stream Stream, errC chan<- *roachpb.Error, ) { @@ -359,19 +346,12 @@ func (p *Processor) Register( // it should see these events during its catch up scan. p.syncEventC() - r := registration{ - span: span.AsRawSpanWithNoLocals(), - startTS: startTS, - catchUpIter: catchUpIter, - stream: stream, - errC: errC, - } + r := newRegistration( + span.AsRawSpanWithNoLocals(), startTS, catchupIter, p.Config.EventChanCap, stream, errC, + ) select { case p.regC <- r: case <-p.stoppedC: - if catchUpIter != nil { - catchUpIter.Close() // clean up - } // errC has a capacity of 1. If it is already full, we don't need to send // another error. select { @@ -397,16 +377,6 @@ func (p *Processor) Len() int { } } -// deliverCatchUpScanRes informs the Processor of the results of a catch-up scan -// for a given registration. -func (p *Processor) deliverCatchUpScanRes(r *registration, pErr *roachpb.Error) { - select { - case p.catchUpC <- catchUpResult{r: r, pErr: pErr}: - case <-p.stoppedC: - // Already stopped. Do nothing. - } -} - // ConsumeLogicalOps informs the rangefeed processor of the set of logical // operations. It returns false if consuming the operations hit a timeout, as // specified by the EventChanTimeout configuration. If the method returns false, @@ -497,25 +467,6 @@ func (p *Processor) syncEventC() { } } -// -// Methods called from Processor goroutine. -// - -func (p *Processor) handleCatchUpScanRes(ctx context.Context, res catchUpResult) { - if res.pErr == nil { - res.r.SetCaughtUp() - - // Publish checkpoint to processor even if the resolved timestamp is - // not initialized. In that case, the timestamp will be empty but the - // checkpoint event is still useful to indicate that the catch-up scan - // has completed. This allows clients to rely on stronger ordering - // semantics once they observe the first checkpoint event. - p.reg.PublishToReg(res.r, p.newCheckpointEvent()) - } else { - p.reg.DisconnectRegWithError(res.r, res.pErr) - } -} - func (p *Processor) consumeEvent(ctx context.Context, e event) { switch { case len(e.ops) > 0: @@ -525,6 +476,15 @@ func (p *Processor) consumeEvent(ctx context.Context, e event) { case e.initRTS: p.initResolvedTS(ctx) case e.syncC != nil: + if e.testRegCatchupSpan.Valid() { + if err := p.reg.waitForCaughtUp(e.testRegCatchupSpan); err != nil { + log.Errorf( + ctx, + "error waiting for registries to catch up during test, results might be impacted: %s", + err, + ) + } + } close(e.syncC) default: panic("missing event variant") diff --git a/pkg/storage/rangefeed/processor_test.go b/pkg/storage/rangefeed/processor_test.go index 396c77bb804f..d7f90c5bb4b0 100644 --- a/pkg/storage/rangefeed/processor_test.go +++ b/pkg/storage/rangefeed/processor_test.go @@ -17,6 +17,7 @@ package rangefeed import ( "bytes" "context" + "fmt" "runtime" "sort" "sync" @@ -26,13 +27,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/uuid" - "github.com/pkg/errors" "github.com/stretchr/testify/require" ) @@ -145,7 +146,7 @@ func newTestProcessor(rtsIter engine.SimpleIterator) (*Processor, *stop.Stopper) return newTestProcessorWithTxnPusher(rtsIter, nil /* pusher */) } -func TestProcessor(t *testing.T) { +func TestProcessorBasic(t *testing.T) { defer leaktest.AfterTest(t)() p, stopper := newTestProcessor(nil /* rtsIter */) defer stopper.Stop(context.Background()) @@ -180,6 +181,7 @@ func TestProcessor(t *testing.T) { r1Stream, r1ErrC, ) + p.syncEventAndRegistrations() require.Equal(t, 1, p.Len()) require.Equal(t, []*roachpb.RangeFeedEvent{rangeFeedCheckpoint( @@ -191,7 +193,7 @@ func TestProcessor(t *testing.T) { // Test checkpoint with one registration. p.ForwardClosedTS(hlc.Timestamp{WallTime: 5}) - p.syncEventC() + p.syncEventAndRegistrations() require.Equal(t, []*roachpb.RangeFeedEvent{rangeFeedCheckpoint( roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("z")}, @@ -204,7 +206,7 @@ func TestProcessor(t *testing.T) { p.ConsumeLogicalOps( writeValueOpWithKV(roachpb.Key("c"), hlc.Timestamp{WallTime: 6}, []byte("val")), ) - p.syncEventC() + p.syncEventAndRegistrations() require.Equal(t, []*roachpb.RangeFeedEvent{rangeFeedValue( roachpb.Key("c"), @@ -220,14 +222,14 @@ func TestProcessor(t *testing.T) { p.ConsumeLogicalOps( writeValueOpWithKV(roachpb.Key("s"), hlc.Timestamp{WallTime: 6}, []byte("val")), ) - p.syncEventC() + p.syncEventAndRegistrations() require.Equal(t, []*roachpb.RangeFeedEvent(nil), r1Stream.Events()) // Test intent that is aborted with one registration. txn1 := uuid.MakeV4() // Write intent. p.ConsumeLogicalOps(writeIntentOp(txn1, hlc.Timestamp{WallTime: 6})) - p.syncEventC() + p.syncEventAndRegistrations() require.Equal(t, []*roachpb.RangeFeedEvent(nil), r1Stream.Events()) // Abort. p.ConsumeLogicalOps(abortIntentOp(txn1)) @@ -239,11 +241,11 @@ func TestProcessor(t *testing.T) { txn2 := uuid.MakeV4() // Write intent. p.ConsumeLogicalOps(writeIntentOp(txn2, hlc.Timestamp{WallTime: 10})) - p.syncEventC() + p.syncEventAndRegistrations() require.Equal(t, []*roachpb.RangeFeedEvent(nil), r1Stream.Events()) // Forward closed timestamp. Should now be stuck on intent. p.ForwardClosedTS(hlc.Timestamp{WallTime: 15}) - p.syncEventC() + p.syncEventAndRegistrations() require.Equal(t, []*roachpb.RangeFeedEvent{rangeFeedCheckpoint( roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("z")}, @@ -253,7 +255,7 @@ func TestProcessor(t *testing.T) { ) // Update the intent. Should forward resolved timestamp. p.ConsumeLogicalOps(updateIntentOp(txn2, hlc.Timestamp{WallTime: 12})) - p.syncEventC() + p.syncEventAndRegistrations() require.Equal(t, []*roachpb.RangeFeedEvent{rangeFeedCheckpoint( roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("z")}, @@ -265,7 +267,7 @@ func TestProcessor(t *testing.T) { p.ConsumeLogicalOps( commitIntentOpWithKV(txn2, roachpb.Key("e"), hlc.Timestamp{WallTime: 13}, []byte("ival")), ) - p.syncEventC() + p.syncEventAndRegistrations() require.Equal(t, []*roachpb.RangeFeedEvent{ rangeFeedValue( @@ -293,6 +295,7 @@ func TestProcessor(t *testing.T) { r2Stream, r2ErrC, ) + p.syncEventAndRegistrations() require.Equal(t, 2, p.Len()) require.Equal(t, []*roachpb.RangeFeedEvent{rangeFeedCheckpoint( @@ -304,7 +307,7 @@ func TestProcessor(t *testing.T) { // Both registrations should see checkpoint. p.ForwardClosedTS(hlc.Timestamp{WallTime: 20}) - p.syncEventC() + p.syncEventAndRegistrations() chEvent := []*roachpb.RangeFeedEvent{rangeFeedCheckpoint( roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("z")}, hlc.Timestamp{WallTime: 20}, @@ -316,7 +319,7 @@ func TestProcessor(t *testing.T) { p.ConsumeLogicalOps( writeValueOpWithKV(roachpb.Key("k"), hlc.Timestamp{WallTime: 22}, []byte("val2")), ) - p.syncEventC() + p.syncEventAndRegistrations() valEvent := []*roachpb.RangeFeedEvent{rangeFeedValue( roachpb.Key("k"), roachpb.Value{ @@ -331,7 +334,7 @@ func TestProcessor(t *testing.T) { p.ConsumeLogicalOps( writeValueOpWithKV(roachpb.Key("v"), hlc.Timestamp{WallTime: 23}, []byte("val3")), ) - p.syncEventC() + p.syncEventAndRegistrations() valEvent2 := []*roachpb.RangeFeedEvent{rangeFeedValue( roachpb.Key("v"), roachpb.Value{ @@ -389,7 +392,17 @@ func TestProcessorSlowConsumer(t *testing.T) { r1Stream, r1ErrC, ) - require.Equal(t, 1, p.Len()) + r2Stream := newTestStream() + r2ErrC := make(chan *roachpb.Error, 1) + p.Register( + roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")}, + hlc.Timestamp{WallTime: 1}, + nil, /* catchUpIter */ + r2Stream, + r2ErrC, + ) + p.syncEventAndRegistrations() + require.Equal(t, 2, p.Len()) require.Equal(t, []*roachpb.RangeFeedEvent{rangeFeedCheckpoint( roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("z")}, @@ -400,6 +413,11 @@ func TestProcessorSlowConsumer(t *testing.T) { // Block its Send method and fill up the processor's input channel. unblock := r1Stream.BlockSend() + defer func() { + if unblock != nil { + unblock() + } + }() fillEventC := func() { // Need one more message to fill the channel because the first one // will be Sent to the stream and block the processor goroutine. @@ -412,8 +430,14 @@ func TestProcessorSlowConsumer(t *testing.T) { } } fillEventC() + p.syncEventC() + + // Wait for just the unblocked registration to catch up. This prevents the + // race condition where this registration overflows anyway due to the rapid + // event consumption and small buffer size. + p.syncEventAndRegistrationSpan(spXY) - // Consume one more event. Should block. + // Consume one more event. Should not block. consumedC := make(chan struct{}) go func() { p.ConsumeLogicalOps( @@ -421,31 +445,29 @@ func TestProcessorSlowConsumer(t *testing.T) { ) close(consumedC) }() - select { - case <-consumedC: - t.Errorf("ConsumeLogicalOps should have blocked") - case <-time.After(p.EventChanTimeout / 2): - } - - // Unblock the send channel. The events should quickly be consumed. - unblock() <-consumedC p.syncEventC() - require.Equal(t, testProcessorEventCCap+2, len(r1Stream.Events())) - // Block the Send method again and fill up the processor's input channel. - unblock = r1Stream.BlockSend() - fillEventC() - - // Consume one more event. Should tear down processor after timeout. - sent := p.ConsumeLogicalOps( - writeValueOpWithKV(roachpb.Key("k"), hlc.Timestamp{WallTime: 15}, []byte("val")), - ) - require.False(t, sent) + // Wait for just the unblocked registration to catch up. + p.syncEventAndRegistrationSpan(spXY) + events := r2Stream.Events() + require.Equal(t, testProcessorEventCCap+3, len(events)) + require.Equal(t, 2, p.reg.Len()) - // Registration should be rejected with error. + // Unblock the send channel. The events should quickly be consumed. unblock() - require.Equal(t, newErrBufferCapacityExceeded().Message, (<-r1ErrC).Message) + unblock = nil + <-consumedC + p.syncEventAndRegistrations() + // One event was dropped due to overflow. + require.Equal(t, testProcessorEventCCap+1, len(r1Stream.Events())) + require.Equal(t, newErrBufferCapacityExceeded().GoError(), (<-r1ErrC).GoError()) + testutils.SucceedsSoon(t, func() error { + if p.Len() != 1 { + return fmt.Errorf("processor had %d regs, wanted %d", p.reg.Len(), 1) + } + return nil + }) } // TestProcessorInitializeResolvedTimestamp tests that when a Processor is given @@ -492,6 +514,7 @@ func TestProcessorInitializeResolvedTimestamp(t *testing.T) { r1Stream, make(chan *roachpb.Error, 1), ) + p.syncEventAndRegistrations() require.Equal(t, 1, p.Len()) // The registration should be provided a checkpoint immediately with an @@ -522,7 +545,7 @@ func TestProcessorInitializeResolvedTimestamp(t *testing.T) { // timestamp. Txn1 has intents at many times but the unresolvedIntentQueue // tracks its latest, which is 19, so the resolved timestamp is // 19.FloorPrev() = 18. - p.syncEventC() + p.syncEventAndRegistrations() require.True(t, p.rts.IsInit()) require.Equal(t, hlc.Timestamp{WallTime: 18}, p.rts.Get()) @@ -534,150 +557,6 @@ func TestProcessorInitializeResolvedTimestamp(t *testing.T) { require.Equal(t, chEvent, r1Stream.Events()) } -func TestProcessorCatchUpScan(t *testing.T) { - defer leaktest.AfterTest(t)() - p, stopper := newTestProcessor(nil /* rtsIter */) - defer stopper.Stop(context.Background()) - - // The resolved timestamp should be initialized. - p.syncEventC() - require.True(t, p.rts.IsInit()) - require.Equal(t, hlc.Timestamp{}, p.rts.Get()) - - txn1, txn2 := uuid.MakeV4(), uuid.MakeV4() - catchUpIter := newTestIterator([]engine.MVCCKeyValue{ - makeKV("a", "val1", 10), - makeInline("b", "val2"), - makeIntent("c", txn1, "txnKey1", 15), - makeKV("c", "val3", 11), - makeKV("c", "val4", 9), - makeIntent("d", txn2, "txnKey2", 21), - makeKV("d", "val5", 20), - makeKV("d", "val6", 19), - makeInline("g", "val7"), - makeKV("m", "val8", 1), - makeIntent("n", txn1, "txnKey1", 12), - makeIntent("r", txn1, "txnKey1", 19), - makeKV("r", "val9", 4), - makeIntent("w", txn1, "txnKey1", 3), - makeInline("x", "val10"), - makeIntent("z", txn2, "txnKey2", 21), - makeKV("z", "val11", 4), - }) - catchUpIter.block = make(chan struct{}) - - // Add a registration with the catch-up iterator. - r1Stream := newTestStream() - p.Register( - roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("w")}, - hlc.Timestamp{WallTime: 2}, // too large to see key @ m - catchUpIter, - r1Stream, - make(chan *roachpb.Error, 1), - ) - require.Equal(t, 1, p.Len()) - - // The registration should not have gotten an initial checkpoint. - require.Nil(t, r1Stream.Events()) - - // Forward the closed timestamp. The resolved timestamp should be - // initialized and should move forward, but the registration should - // still not get a checkpoint. - p.ForwardClosedTS(hlc.Timestamp{WallTime: 20}) - p.syncEventC() - require.True(t, p.rts.IsInit()) - require.Equal(t, hlc.Timestamp{WallTime: 20}, p.rts.Get()) - require.Nil(t, r1Stream.Events()) - - // Let the scan proceed. - close(catchUpIter.block) - <-catchUpIter.done - require.True(t, catchUpIter.closed) - - // Synchronize the event channel then verify that the registration's stream - // was sent all values in its range and the resolved timestamp once the - // catch-up scan was complete. - p.syncEventC() - expEvents := []*roachpb.RangeFeedEvent{ - rangeFeedValue( - roachpb.Key("a"), - roachpb.Value{RawBytes: []byte("val1"), Timestamp: hlc.Timestamp{WallTime: 10}}, - ), - rangeFeedValue( - roachpb.Key("b"), - roachpb.Value{RawBytes: []byte("val2"), Timestamp: hlc.Timestamp{WallTime: 0}}, - ), - rangeFeedValue( - roachpb.Key("c"), - roachpb.Value{RawBytes: []byte("val3"), Timestamp: hlc.Timestamp{WallTime: 11}}, - ), - rangeFeedValue( - roachpb.Key("c"), - roachpb.Value{RawBytes: []byte("val4"), Timestamp: hlc.Timestamp{WallTime: 9}}, - ), - rangeFeedValue( - roachpb.Key("d"), - roachpb.Value{RawBytes: []byte("val5"), Timestamp: hlc.Timestamp{WallTime: 20}}, - ), - rangeFeedValue( - roachpb.Key("d"), - roachpb.Value{RawBytes: []byte("val6"), Timestamp: hlc.Timestamp{WallTime: 19}}, - ), - rangeFeedValue( - roachpb.Key("g"), - roachpb.Value{RawBytes: []byte("val7"), Timestamp: hlc.Timestamp{WallTime: 0}}, - ), - rangeFeedValue( - roachpb.Key("r"), - roachpb.Value{RawBytes: []byte("val9"), Timestamp: hlc.Timestamp{WallTime: 4}}, - ), - rangeFeedCheckpoint( - roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("z")}, - hlc.Timestamp{WallTime: 20}, - ), - } - require.Equal(t, expEvents, r1Stream.Events()) - - // Forward the closed timestamp. The registration should get a checkpoint - // this time. - p.ForwardClosedTS(hlc.Timestamp{WallTime: 25}) - p.syncEventC() - require.True(t, p.rts.IsInit()) - require.Equal(t, hlc.Timestamp{WallTime: 25}, p.rts.Get()) - require.Equal(t, - []*roachpb.RangeFeedEvent{rangeFeedCheckpoint( - roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("z")}, - hlc.Timestamp{WallTime: 25}, - )}, - r1Stream.Events(), - ) - - // Add a second registration, this time with an iterator that will throw an - // error. - r2Stream := newTestStream() - r2ErrC := make(chan *roachpb.Error, 1) - errCatchUpIter := newErrorIterator(errors.New("iteration error")) - errCatchUpIter.block = make(chan struct{}) - p.Register( - roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, - hlc.Timestamp{WallTime: 1}, - errCatchUpIter, - r2Stream, - r2ErrC, - ) - require.Equal(t, 2, p.Len()) - - // Wait for the scan to hit the error and finish. - close(errCatchUpIter.block) - <-errCatchUpIter.done - require.True(t, errCatchUpIter.closed) - - // The registration should throw an error and be unregistered. - require.NotNil(t, <-r2ErrC) - p.syncEventC() - require.Equal(t, 1, p.Len()) -} - func TestProcessorTxnPushAttempt(t *testing.T) { defer leaktest.AfterTest(t)() @@ -938,6 +817,7 @@ func TestProcessorRegistrationObservesOnlyNewEvents(t *testing.T) { } }() wg.Wait() + p.syncEventAndRegistrations() // Verify that no registrations were given operations // from before they registered. @@ -951,3 +831,28 @@ func TestProcessorRegistrationObservesOnlyNewEvents(t *testing.T) { require.Equal(t, expFirstIdx, firstIdx) } } + +// syncEventAndRegistrations waits for all previously sent events to be +// processed *and* for all registration output loops to fully process their own +// internal buffers. +func (p *Processor) syncEventAndRegistrations() { + p.syncEventAndRegistrationSpan(all) +} + +// syncEventAndRegistrations waits for all previously sent events to be +// processed *and* for all registration output loops for registrations +// overlapping the given span to fully process their own internal buffers. +func (p *Processor) syncEventAndRegistrationSpan(span roachpb.Span) { + syncC := make(chan struct{}) + select { + case p.eventC <- event{syncC: syncC, testRegCatchupSpan: span}: + select { + case <-syncC: + // Synchronized. + case <-p.stoppedC: + // Already stopped. Do nothing. + } + case <-p.stoppedC: + // Already stopped. Do nothing. + } +} diff --git a/pkg/storage/rangefeed/registry.go b/pkg/storage/rangefeed/registry.go index e4817c7e5736..02c6bc497700 100644 --- a/pkg/storage/rangefeed/registry.go +++ b/pkg/storage/rangefeed/registry.go @@ -17,11 +17,19 @@ package rangefeed import ( "context" "fmt" + "sync" + "time" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/util/bufalloc" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/interval" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/pkg/errors" ) @@ -46,21 +54,217 @@ type Stream interface { // channel is sent an error to inform it that the registration // has finished. type registration struct { + // Input. + span roachpb.Span + catchupIter engine.SimpleIterator + catchupTimestamp hlc.Timestamp + + // Output. + stream Stream + errC chan<- *roachpb.Error + // Internal. id int64 keys interval.Range + buf chan *roachpb.RangeFeedEvent - // Footprint. - span roachpb.Span - startTS hlc.Timestamp // exclusive + mu struct { + sync.Locker + // True if this registration buffer has overflowed, dropping a live event. + // This will cause the registration to exit with an error once the buffer + // has been emptied. + overflowed bool + // Boolean indicating if all events have been output to stream. Used only + // for testing. + caughtUp bool + // Management of the output loop goroutine, used to ensure proper teardown. + outputLoopCancelFn func() + disconnected bool + } +} + +func newRegistration( + span roachpb.Span, + startTS hlc.Timestamp, + catchupIter engine.SimpleIterator, + bufferSz int, + stream Stream, + errC chan<- *roachpb.Error, +) registration { + r := registration{ + span: span, + catchupIter: catchupIter, + stream: stream, + errC: errC, + buf: make(chan *roachpb.RangeFeedEvent, bufferSz), + catchupTimestamp: startTS, + } + r.mu.Locker = &syncutil.Mutex{} + r.mu.caughtUp = true + return r +} - // Catch-up state. - catchUpIter engine.SimpleIterator - caughtUp bool +// publish attempts to send a single event to the output buffer for this +// registration. If the output buffer is full, the overflowed flag is set, +// indicating that live events were lost and a catchup scan should be initiated. +// If overflowed is already set, events are ignored and not written to the +// buffer. +func (r *registration) publish(event *roachpb.RangeFeedEvent) { + r.mu.Lock() + defer r.mu.Unlock() + if r.mu.overflowed { + return + } + select { + case r.buf <- event: + r.mu.caughtUp = false + default: + // Buffer exceeded and we are dropping this event. Registration will need + // a catch-up scan. + r.mu.overflowed = true + } +} - // Output. - stream Stream - errC chan<- *roachpb.Error +// disconnect cancels the output loop context for the registration and passes an +// error to the output error stream for the registration. This also sets the +// disconnected flag on the registration, preventing it from being disconnected +// again. +func (r *registration) disconnect(pErr *roachpb.Error) { + r.mu.Lock() + defer r.mu.Unlock() + if !r.mu.disconnected { + if r.mu.outputLoopCancelFn != nil { + r.mu.outputLoopCancelFn() + } + r.mu.disconnected = true + r.errC <- pErr + } +} + +// outputLoop is the operational loop for a single registration. The behavior +// is as thus: +// +// 1. If a catch-up scan is indicated, run one before beginning the proper +// output loop. +// 2. After catch-up is complete, begin reading from the registration buffer +// channel and writing to the output stream until the buffer is empty *and* +// the overflow flag has been set. +// +// The loop exits with any error encountered, if the provided context is +// canceled, or when the buffer has overflowed and all pre-overflow entries +// have been emitted. +func (r *registration) outputLoop(ctx context.Context) error { + // If the registration has a catch-up scan, + if r.catchupIter != nil { + if err := r.runCatchupScan(); err != nil { + err = errors.Wrap(err, "catch-up scan failed") + log.Error(ctx, err) + return err + } + } + + // Normal buffered output loop. + for { + overflowed := false + r.mu.Lock() + if len(r.buf) == 0 { + overflowed = r.mu.overflowed + r.mu.caughtUp = true + } + r.mu.Unlock() + if overflowed { + return newErrBufferCapacityExceeded().GoError() + } + + select { + case nextEvent := <-r.buf: + if err := r.stream.Send(nextEvent); err != nil { + return err + } + case <-ctx.Done(): + return ctx.Err() + case <-r.stream.Context().Done(): + return r.stream.Context().Err() + } + } +} + +func (r *registration) runOutputLoop(ctx context.Context) { + r.mu.Lock() + ctx, r.mu.outputLoopCancelFn = context.WithCancel(ctx) + r.mu.Unlock() + err := r.outputLoop(ctx) + r.disconnect(roachpb.NewError(err)) +} + +// runCatchupScan starts a catchup scan which will output entries for all +// recorded changes in the replica that are newer than the catchupTimeStamp. +// This uses the iterator provided when the registration was originally created; +// after the scan completes, the iterator will be closed. +func (r *registration) runCatchupScan() error { + if r.catchupIter == nil { + return nil + } + defer func() { + r.catchupIter.Close() + r.catchupIter = nil + }() + + var a bufalloc.ByteAllocator + startKey := engine.MakeMVCCMetadataKey(r.span.Key) + endKey := engine.MakeMVCCMetadataKey(r.span.EndKey) + + // Iterate though all keys using Next. We want to publish all committed + // versions of each key that are after the registration's startTS, so we + // can't use NextKey. + var meta enginepb.MVCCMetadata + for r.catchupIter.Seek(startKey); ; r.catchupIter.Next() { + if ok, err := r.catchupIter.Valid(); err != nil { + return err + } else if !ok || !r.catchupIter.UnsafeKey().Less(endKey) { + break + } + + unsafeKey := r.catchupIter.UnsafeKey() + unsafeVal := r.catchupIter.UnsafeValue() + if !unsafeKey.IsValue() { + // Found a metadata key. + if err := protoutil.Unmarshal(unsafeVal, &meta); err != nil { + return errors.Wrapf(err, "unmarshaling mvcc meta: %v", unsafeKey) + } + if !meta.IsInline() { + // Not an inline value. Ignore. + continue + } + + // If write is inline, it doesn't have a timestamp so we don't + // filter on the registration's starting timestamp. Instead, we + // return all inline writes. + unsafeVal = meta.RawBytes + } else if !r.catchupTimestamp.Less(unsafeKey.Timestamp) { + // At or before the registration's exclusive starting timestamp. + // Ignore. + continue + } + + var key, val []byte + a, key = a.Copy(unsafeKey.Key, 0) + a, val = a.Copy(unsafeVal, 0) + ts := unsafeKey.Timestamp + + var event roachpb.RangeFeedEvent + event.MustSetValue(&roachpb.RangeFeedValue{ + Key: key, + Value: roachpb.Value{ + RawBytes: val, + Timestamp: ts, + }, + }) + if err := r.stream.Send(&event); err != nil { + return err + } + } + return nil } // ID implements interval.Interface. @@ -73,12 +277,8 @@ func (r *registration) Range() interval.Range { return r.keys } -func (r *registration) SetCaughtUp() { - r.caughtUp = true -} - func (r registration) String() string { - return fmt.Sprintf("[%s @ %s+]", r.span, r.startTS) + return fmt.Sprintf("[%s @ %s+]", r.span, r.catchupTimestamp) } // registry holds a set of registrations and manages their lifecycle. @@ -112,22 +312,12 @@ func (reg *registry) nextID() int64 { return reg.idAlloc } -// PublishToReg publishes the provided event to the given registration. No -// validation of whether the registration state is compatible with the event -// is performed. -func (reg *registry) PublishToReg(r *registration, event *roachpb.RangeFeedEvent) { - if err := r.stream.Send(event); err != nil { - reg.DisconnectRegWithError(r, roachpb.NewError(err)) - } -} - // PublishToOverlapping publishes the provided event to all registrations whose // range overlaps the specified span. func (reg *registry) PublishToOverlapping(span roachpb.Span, event *roachpb.RangeFeedEvent) { // Determine the earliest starting timestamp that a registration // can have while still needing to hear about this event. var minTS hlc.Timestamp - var requireCaughtUp bool switch t := event.GetValue().(type) { case *roachpb.RangeFeedValue: // Only publish values to registrations with starting @@ -137,31 +327,25 @@ func (reg *registry) PublishToOverlapping(span roachpb.Span, event *roachpb.Rang // Always publish checkpoint notifications, regardless // of a registration's starting timestamp. minTS = hlc.MaxTimestamp - requireCaughtUp = true default: panic(fmt.Sprintf("unexpected RangeFeedEvent variant: %v", event)) } reg.forOverlappingRegs(span, func(r *registration) (bool, *roachpb.Error) { - if !r.startTS.Less(minTS) { - // Don't publish events if they are equal to or less - // than the registration's starting timestamp. - return false, nil - } - if requireCaughtUp && !r.caughtUp { - // Don't publish event if it requires the registration - // to be caught up and this one is not. - return false, nil - } + // Don't publish events if they are equal to or less + // than the registration's starting timestamp. - err := r.stream.Send(event) - return err != nil, roachpb.NewError(err) + if r.catchupTimestamp.Less(minTS) { + r.publish(event) + } + return false, nil }) } -// DisconnectRegWithError disconnects a specific registration with a provided error. -func (reg *registry) DisconnectRegWithError(r *registration, pErr *roachpb.Error) { - r.errC <- pErr +// Unregister removes a registration from the registry. It is assumed that the +// registration has already been disconnected, this is intended only to clean +// up the registry. +func (reg *registry) Unregister(r *registration) { if err := reg.tree.Delete(r, false /* fast */); err != nil { panic(err) } @@ -181,15 +365,6 @@ func (reg *registry) DisconnectWithErr(span roachpb.Span, pErr *roachpb.Error) { }) } -// CheckStreams checks the context of all streams in the registry and -// unregisters any that have already disconnected. -func (reg *registry) CheckStreams() { - reg.forOverlappingRegs(all, func(reg *registration) (bool, *roachpb.Error) { - err := errors.Wrap(reg.stream.Context().Err(), "check streams") - return err != nil, roachpb.NewError(err) - }) -} - // all is a span that overlaps with all registrations. var all = roachpb.Span{Key: roachpb.KeyMin, EndKey: roachpb.KeyMax} @@ -205,7 +380,7 @@ func (reg *registry) forOverlappingRegs( r := i.(*registration) dis, pErr := fn(r) if dis { - r.errC <- pErr + r.disconnect(pErr) toDelete = append(toDelete, i) } return false @@ -231,3 +406,35 @@ func (reg *registry) forOverlappingRegs( reg.tree.AdjustRanges() } } + +// Wait for this registration to completely process its internal buffer. +func (r *registration) waitForCaughtUp() error { + opts := retry.Options{ + InitialBackoff: 5 * time.Millisecond, + Multiplier: 2, + MaxBackoff: 10 * time.Second, + MaxRetries: 50, + } + for re := retry.Start(opts); re.Next(); { + r.mu.Lock() + caughtUp := len(r.buf) == 0 && r.mu.caughtUp + r.mu.Unlock() + if caughtUp { + return nil + } + } + return errors.Errorf("registration %v failed to empty in time", r.Range()) +} + +// waitForCaughtUp waits for all registrations overlapping the given span to +// completely process their internal buffers. +func (reg *registry) waitForCaughtUp(span roachpb.Span) error { + var outerErr error + reg.forOverlappingRegs(span, func(r *registration) (bool, *roachpb.Error) { + if outerErr == nil { + outerErr = r.waitForCaughtUp() + } + return false, nil + }) + return outerErr +} diff --git a/pkg/storage/rangefeed/registry_test.go b/pkg/storage/rangefeed/registry_test.go index 5d162d1a2cfa..6ba2c6809c3b 100644 --- a/pkg/storage/rangefeed/registry_test.go +++ b/pkg/storage/rangefeed/registry_test.go @@ -16,25 +16,29 @@ package rangefeed import ( "context" + "fmt" "testing" _ "github.com/cockroachdb/cockroach/pkg/keys" // hook up pretty printer "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/syncutil" - "github.com/pkg/errors" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/stretchr/testify/require" ) var ( keyA, keyB = roachpb.Key("a"), roachpb.Key("b") keyC, keyD = roachpb.Key("c"), roachpb.Key("d") + keyX, keyY = roachpb.Key("x"), roachpb.Key("y") spAB = roachpb.Span{Key: keyA, EndKey: keyB} spBC = roachpb.Span{Key: keyB, EndKey: keyC} spCD = roachpb.Span{Key: keyC, EndKey: keyD} spAC = roachpb.Span{Key: keyA, EndKey: keyC} + spXY = roachpb.Span{Key: keyX, EndKey: keyY} ) type testStream struct { @@ -95,15 +99,20 @@ type testRegistration struct { errC <-chan *roachpb.Error } -func newTestRegistration(span roachpb.Span) *testRegistration { +func newTestRegistration( + span roachpb.Span, ts hlc.Timestamp, catchup engine.SimpleIterator, +) *testRegistration { s := newTestStream() errC := make(chan *roachpb.Error, 1) return &testRegistration{ - registration: registration{ - span: span, - stream: s, - errC: errC, - }, + registration: newRegistration( + span, + ts, + catchup, + 5, + s, + errC, + ), stream: s, errC: errC, } @@ -122,7 +131,135 @@ func (r *testRegistration) Err() *roachpb.Error { } } -func TestRegistry(t *testing.T) { +func TestRegistrationBasic(t *testing.T) { + defer leaktest.AfterTest(t)() + + val := roachpb.Value{Timestamp: hlc.Timestamp{WallTime: 1}} + ev1, ev2 := new(roachpb.RangeFeedEvent), new(roachpb.RangeFeedEvent) + ev1.MustSetValue(&roachpb.RangeFeedValue{Value: val}) + ev2.MustSetValue(&roachpb.RangeFeedValue{Value: val}) + + // Registration with no catchup scan specified. + noCatchupReg := newTestRegistration(spAB, hlc.Timestamp{}, nil) + noCatchupReg.publish(ev1) + noCatchupReg.publish(ev2) + require.Equal(t, len(noCatchupReg.buf), 2) + go noCatchupReg.runOutputLoop(context.Background()) + require.NoError(t, noCatchupReg.waitForCaughtUp()) + require.Equal(t, []*roachpb.RangeFeedEvent{ev1, ev2}, noCatchupReg.stream.Events()) + noCatchupReg.disconnect(nil) + <-noCatchupReg.errC + + // Registration with catchup scan. + catchupReg := newTestRegistration(spBC, hlc.Timestamp{WallTime: 1}, newTestIterator([]engine.MVCCKeyValue{ + makeKV("b", "val1", 10), + makeInline("ba", "val2"), + makeKV("bc", "val3", 11), + makeKV("bd", "val4", 9), + })) + catchupReg.publish(ev1) + catchupReg.publish(ev2) + require.Equal(t, len(catchupReg.buf), 2) + go catchupReg.runOutputLoop(context.Background()) + require.NoError(t, catchupReg.waitForCaughtUp()) + events := catchupReg.stream.Events() + require.Equal(t, 6, len(events)) + require.Equal(t, []*roachpb.RangeFeedEvent{ev1, ev2}, events[4:]) + catchupReg.disconnect(nil) + <-catchupReg.errC + + // EXIT CONDITIONS + // External Disconnect. + disconnectReg := newTestRegistration(spAB, hlc.Timestamp{}, nil) + disconnectReg.publish(ev1) + disconnectReg.publish(ev2) + go disconnectReg.runOutputLoop(context.Background()) + require.NoError(t, disconnectReg.waitForCaughtUp()) + discErr := roachpb.NewError(fmt.Errorf("disconnection error")) + disconnectReg.disconnect(discErr) + err := <-disconnectReg.errC + require.Equal(t, discErr, err) + + // Overflow. + overflowReg := newTestRegistration(spAB, hlc.Timestamp{}, nil) + for i := 0; i < cap(overflowReg.buf)+3; i++ { + overflowReg.publish(ev1) + } + go overflowReg.runOutputLoop(context.Background()) + err = <-overflowReg.errC + require.Equal(t, newErrBufferCapacityExceeded(), err) + require.Equal(t, cap(overflowReg.buf), len(overflowReg.Events())) + + // Stream Error. + streamErrReg := newTestRegistration(spAB, hlc.Timestamp{}, nil) + streamErr := fmt.Errorf("stream error") + streamErrReg.stream.SetSendErr(streamErr) + go streamErrReg.runOutputLoop(context.Background()) + streamErrReg.publish(ev1) + err = <-streamErrReg.errC + require.Equal(t, streamErr.Error(), err.GoError().Error()) + + // Stream Context Canceled. + streamCancelReg := newTestRegistration(spAB, hlc.Timestamp{}, nil) + streamCancelReg.stream.Cancel() + go streamCancelReg.runOutputLoop(context.Background()) + require.NoError(t, streamCancelReg.waitForCaughtUp()) + err = <-streamCancelReg.errC + require.Equal(t, streamCancelReg.stream.Context().Err().Error(), err.GoError().Error()) +} + +func TestRegistrationCatchUpScan(t *testing.T) { + defer leaktest.AfterTest(t)() + + // Run a catch-up scan for a registration over a test + // iterator with the following keys. + txn1, txn2 := uuid.MakeV4(), uuid.MakeV4() + iter := newTestIterator([]engine.MVCCKeyValue{ + makeKV("a", "val1", 10), + makeInline("b", "val2"), + makeIntent("c", txn1, "txnKey1", 15), + makeKV("c", "val3", 11), + makeKV("c", "val4", 9), + makeIntent("d", txn2, "txnKey2", 21), + makeKV("d", "val5", 20), + makeKV("d", "val6", 19), + makeInline("g", "val7"), + makeKV("m", "val8", 1), + makeIntent("n", txn1, "txnKey1", 12), + makeIntent("r", txn1, "txnKey1", 19), + makeKV("r", "val9", 4), + makeIntent("w", txn1, "txnKey1", 3), + makeInline("x", "val10"), + makeIntent("z", txn2, "txnKey2", 21), + makeKV("z", "val11", 4), + }) + r := newTestRegistration(roachpb.Span{ + Key: roachpb.Key("d"), + EndKey: roachpb.Key("w"), + }, hlc.Timestamp{WallTime: 4}, iter) + + require.NoError(t, r.runCatchupScan()) + require.True(t, iter.closed) + + // Compare the events sent on the registration's Stream to the expected events. + expEvents := []*roachpb.RangeFeedEvent{ + rangeFeedValue( + roachpb.Key("d"), + roachpb.Value{RawBytes: []byte("val5"), Timestamp: hlc.Timestamp{WallTime: 20}}, + ), + rangeFeedValue( + roachpb.Key("d"), + roachpb.Value{RawBytes: []byte("val6"), Timestamp: hlc.Timestamp{WallTime: 19}}, + ), + rangeFeedValue( + roachpb.Key("g"), + roachpb.Value{RawBytes: []byte("val7"), Timestamp: hlc.Timestamp{WallTime: 0}}, + ), + } + require.Equal(t, expEvents, r.Events()) +} + +func TestRegistryBasic(t *testing.T) { defer leaktest.AfterTest(t)() val := roachpb.Value{Timestamp: hlc.Timestamp{WallTime: 1}} @@ -139,12 +276,19 @@ func TestRegistry(t *testing.T) { require.NotPanics(t, func() { reg.PublishToOverlapping(spAB, ev1) }) require.NotPanics(t, func() { reg.Disconnect(spAB) }) require.NotPanics(t, func() { reg.DisconnectWithErr(spAB, err1) }) - require.NotPanics(t, func() { reg.CheckStreams() }) - rAB := newTestRegistration(spAB) - rBC := newTestRegistration(spBC) - rCD := newTestRegistration(spCD) - rAC := newTestRegistration(spAC) + rAB := newTestRegistration(spAB, hlc.Timestamp{}, nil) + rBC := newTestRegistration(spBC, hlc.Timestamp{}, nil) + rCD := newTestRegistration(spCD, hlc.Timestamp{}, nil) + rAC := newTestRegistration(spAC, hlc.Timestamp{}, nil) + go rAB.runOutputLoop(context.Background()) + go rBC.runOutputLoop(context.Background()) + go rCD.runOutputLoop(context.Background()) + go rAC.runOutputLoop(context.Background()) + defer rAB.disconnect(nil) + defer rBC.disconnect(nil) + defer rCD.disconnect(nil) + defer rAC.disconnect(nil) // Register 4 registrations. reg.Register(&rAB.registration) @@ -161,6 +305,7 @@ func TestRegistry(t *testing.T) { reg.PublishToOverlapping(spBC, ev2) reg.PublishToOverlapping(spCD, ev3) reg.PublishToOverlapping(spAC, ev4) + require.NoError(t, reg.waitForCaughtUp(all)) require.Equal(t, []*roachpb.RangeFeedEvent{ev1, ev4}, rAB.Events()) require.Equal(t, []*roachpb.RangeFeedEvent{ev2, ev4}, rBC.Events()) require.Equal(t, []*roachpb.RangeFeedEvent{ev3}, rCD.Events()) @@ -170,92 +315,36 @@ func TestRegistry(t *testing.T) { require.Nil(t, rCD.Err()) require.Nil(t, rAC.Err()) - // Check streams, all still alive. - reg.CheckStreams() - require.Equal(t, 4, reg.Len()) - require.Nil(t, rAB.Err()) - require.Nil(t, rBC.Err()) - require.Nil(t, rCD.Err()) - require.Nil(t, rAC.Err()) - - // Cancel rBC and check streams again. rBC should disconnect. - rBC.stream.Cancel() - require.Equal(t, 4, reg.Len()) - reg.CheckStreams() - require.Equal(t, 3, reg.Len()) - require.Nil(t, rAB.Err()) - require.NotNil(t, rBC.Err()) - require.Nil(t, rCD.Err()) - require.Nil(t, rAC.Err()) - - // Set a stream error on rAC and publish. Once a publication - // notices the error it disconnects. - rAC.stream.SetSendErr(errors.New("can't send")) - reg.PublishToOverlapping(spCD, ev1) - require.Equal(t, 3, reg.Len()) - require.Nil(t, rAB.Events()) - require.Equal(t, []*roachpb.RangeFeedEvent{ev1}, rCD.Events()) - require.Nil(t, rAC.Events()) - require.Nil(t, rAB.Err()) - require.Nil(t, rCD.Err()) - require.Nil(t, rAC.Err()) - reg.PublishToOverlapping(spAB, ev2) - require.Equal(t, 2, reg.Len()) - require.Equal(t, []*roachpb.RangeFeedEvent{ev2}, rAB.Events()) - require.Nil(t, rCD.Events()) - require.Nil(t, rAC.Events()) - require.Nil(t, rAB.Err()) - require.Nil(t, rCD.Err()) - require.NotNil(t, rAC.Err()) - // Disconnect span that overlaps with rCD. reg.DisconnectWithErr(spCD, err1) - require.Equal(t, 1, reg.Len()) - require.Nil(t, rAB.Err()) - require.Equal(t, err1, rCD.Err()) + require.Equal(t, 3, reg.Len()) + require.Equal(t, err1.GoError(), rCD.Err().GoError()) // Can still publish to rAB. reg.PublishToOverlapping(spAB, ev4) reg.PublishToOverlapping(spBC, ev3) reg.PublishToOverlapping(spCD, ev2) reg.PublishToOverlapping(spAC, ev1) + require.NoError(t, reg.waitForCaughtUp(all)) require.Equal(t, []*roachpb.RangeFeedEvent{ev4, ev1}, rAB.Events()) - require.Nil(t, rAB.Err()) // Disconnect from rAB without error. - reg.Disconnect(spBC) - require.Equal(t, 1, reg.Len()) - reg.Disconnect(spAC) - require.Equal(t, 0, reg.Len()) + reg.Disconnect(spAB) + require.Nil(t, rAC.Err()) require.Nil(t, rAB.Err()) - - // Register first 2 registrations again. - reg.Register(&rAB.registration) require.Equal(t, 1, reg.Len()) - reg.Register(&rBC.registration) - require.Equal(t, 2, reg.Len()) - // Publish event to only rAB. - reg.PublishToReg(&rAB.registration, ev1) - require.Equal(t, []*roachpb.RangeFeedEvent{ev1}, rAB.Events()) - require.Nil(t, rAB.Err()) - require.Nil(t, rBC.Events()) - require.Nil(t, rBC.Err()) - - // Disconnect only rBC. - reg.DisconnectRegWithError(&rBC.registration, err1) - require.Equal(t, 1, reg.Len()) - require.Nil(t, rAB.Err()) - require.Equal(t, err1, rBC.Err()) + // Register and unregister. + reg.Unregister(&rBC.registration) + require.Equal(t, 0, reg.Len()) } func TestRegistryPublishBeneathStartTimestamp(t *testing.T) { defer leaktest.AfterTest(t)() reg := makeRegistry() - r := newTestRegistration(spAB) - r.registration.caughtUp = true - r.registration.startTS = hlc.Timestamp{WallTime: 10} + r := newTestRegistration(spAB, hlc.Timestamp{WallTime: 10}, nil) + go r.runOutputLoop(context.Background()) reg.Register(&r.registration) // Publish a value with a timestamp beneath the registration's start @@ -265,6 +354,7 @@ func TestRegistryPublishBeneathStartTimestamp(t *testing.T) { Value: roachpb.Value{Timestamp: hlc.Timestamp{WallTime: 5}}, }) reg.PublishToOverlapping(spAB, ev) + require.NoError(t, reg.waitForCaughtUp(all)) require.Nil(t, r.Events()) // Publish a value with a timestamp equal to the registration's start @@ -273,6 +363,7 @@ func TestRegistryPublishBeneathStartTimestamp(t *testing.T) { Value: roachpb.Value{Timestamp: hlc.Timestamp{WallTime: 10}}, }) reg.PublishToOverlapping(spAB, ev) + require.NoError(t, reg.waitForCaughtUp(all)) require.Nil(t, r.Events()) // Publish a checkpoint with a timestamp beneath the registration's. Should @@ -281,29 +372,11 @@ func TestRegistryPublishBeneathStartTimestamp(t *testing.T) { ResolvedTS: hlc.Timestamp{WallTime: 5}, }) reg.PublishToOverlapping(spAB, ev) + require.NoError(t, reg.waitForCaughtUp(all)) require.Equal(t, []*roachpb.RangeFeedEvent{ev}, r.Events()) -} -func TestRegistryPublishCheckpointNotCaughtUp(t *testing.T) { - defer leaktest.AfterTest(t)() - reg := makeRegistry() - - r := newTestRegistration(spAB) - r.registration.caughtUp = false - reg.Register(&r.registration) - - // Publish a checkpoint before registration caught up. Should be ignored. - ev := new(roachpb.RangeFeedEvent) - ev.MustSetValue(&roachpb.RangeFeedCheckpoint{ - ResolvedTS: hlc.Timestamp{WallTime: 5}, - }) - reg.PublishToOverlapping(spAB, ev) - require.Nil(t, r.Events()) - - // Publish a checkpoint after registration caught up. Should be delivered. - r.SetCaughtUp() - reg.PublishToOverlapping(spAB, ev) - require.Equal(t, []*roachpb.RangeFeedEvent{ev}, r.Events()) + r.disconnect(nil) + <-r.errC } func TestRegistrationString(t *testing.T) { @@ -325,15 +398,15 @@ func TestRegistrationString(t *testing.T) { }, { r: registration{ - span: roachpb.Span{Key: roachpb.Key("d")}, - startTS: hlc.Timestamp{WallTime: 10, Logical: 1}, + span: roachpb.Span{Key: roachpb.Key("d")}, + catchupTimestamp: hlc.Timestamp{WallTime: 10, Logical: 1}, }, exp: `[d @ 0.000000010,1+]`, }, { r: registration{span: roachpb.Span{ Key: roachpb.Key("d"), EndKey: roachpb.Key("z")}, - startTS: hlc.Timestamp{WallTime: 40, Logical: 9}, + catchupTimestamp: hlc.Timestamp{WallTime: 40, Logical: 9}, }, exp: `[{d-z} @ 0.000000040,9+]`, }, diff --git a/pkg/storage/rangefeed/task.go b/pkg/storage/rangefeed/task.go index ad96f7278bb8..e6823e64aaa2 100644 --- a/pkg/storage/rangefeed/task.go +++ b/pkg/storage/rangefeed/task.go @@ -20,7 +20,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" - "github.com/cockroachdb/cockroach/pkg/util/bufalloc" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" @@ -115,104 +114,6 @@ func (s *initResolvedTSScan) Cancel() { s.it.Close() } -// catchUpScan scans over the provided iterator and publishes committed values -// to the registration's stream. This backfill allows a registration to request -// a starting timestamp in the past and observe events for writes that have -// already happened. -// -// Iterator Contract: -// Committed values beneath the registration's starting timestamp will be -// ignored, but all values above the registration's starting timestamp must be -// present. An important implication of this is that if the iterator is a -// TimeBoundIterator, its MinTimestamp cannot be above the registration's -// starting timestamp. -// -type catchUpScan struct { - p *Processor - r *registration - it engine.SimpleIterator - a bufalloc.ByteAllocator -} - -func newCatchUpScan(p *Processor, r *registration) runnable { - s := catchUpScan{p: p, r: r, it: r.catchUpIter} - r.catchUpIter = nil // detach - return &s -} - -func (s *catchUpScan) Run(ctx context.Context) { - defer s.Cancel() - if err := s.iterateAndSend(ctx); err != nil { - err = errors.Wrap(err, "catch-up scan failed") - log.Error(ctx, err) - s.p.deliverCatchUpScanRes(s.r, roachpb.NewError(err)) - } else { - s.p.deliverCatchUpScanRes(s.r, nil) - } -} - -func (s *catchUpScan) iterateAndSend(ctx context.Context) error { - startKey := engine.MakeMVCCMetadataKey(s.r.span.Key) - endKey := engine.MakeMVCCMetadataKey(s.r.span.EndKey) - - // Iterate though all keys using Next. We want to publish all committed - // versions of each key that are after the registration's startTS, so we - // can't use NextKey. - var meta enginepb.MVCCMetadata - for s.it.Seek(startKey); ; s.it.Next() { - if ok, err := s.it.Valid(); err != nil { - return err - } else if !ok || !s.it.UnsafeKey().Less(endKey) { - break - } - - unsafeKey := s.it.UnsafeKey() - unsafeVal := s.it.UnsafeValue() - if !unsafeKey.IsValue() { - // Found a metadata key. - if err := protoutil.Unmarshal(unsafeVal, &meta); err != nil { - return errors.Wrapf(err, "unmarshaling mvcc meta: %v", unsafeKey) - } - if !meta.IsInline() { - // Not an inline value. Ignore. - continue - } - - // If write is inline, it doesn't have a timestamp so we don't - // filter on the registration's starting timestamp. Instead, we - // return all inline writes. - unsafeVal = meta.RawBytes - } else if !s.r.startTS.Less(unsafeKey.Timestamp) { - // At or before the registration's exclusive starting timestamp. - // Ignore. - continue - } - - var key, val []byte - s.a, key = s.a.Copy(unsafeKey.Key, 0) - s.a, val = s.a.Copy(unsafeVal, 0) - ts := unsafeKey.Timestamp - - var event roachpb.RangeFeedEvent - event.MustSetValue(&roachpb.RangeFeedValue{ - Key: key, - Value: roachpb.Value{ - RawBytes: val, - Timestamp: ts, - }, - }) - if err := s.r.stream.Send(&event); err != nil { - return err - } - } - return nil -} - -func (s *catchUpScan) Cancel() { - s.it.Close() - s.a = nil -} - // TxnPusher is capable of pushing transactions to a new timestamp and // cleaning up the intents of transactions that are found to be committed. type TxnPusher interface { diff --git a/pkg/storage/rangefeed/task_test.go b/pkg/storage/rangefeed/task_test.go index a33f12b748a7..e39d8c4fe265 100644 --- a/pkg/storage/rangefeed/task_test.go +++ b/pkg/storage/rangefeed/task_test.go @@ -89,13 +89,6 @@ func newTestIterator(kvs []engine.MVCCKeyValue) *testIterator { } } -func newErrorIterator(err error) *testIterator { - return &testIterator{ - err: err, - done: make(chan struct{}), - } -} - func (s *testIterator) Close() { s.closed = true close(s.done) @@ -217,65 +210,6 @@ func TestInitResolvedTSScan(t *testing.T) { } } -func TestCatchUpScan(t *testing.T) { - defer leaktest.AfterTest(t)() - - // Mock processor. We just needs its catchUpC. - p := Processor{catchUpC: make(chan catchUpResult, 1)} - - // Run a catch-up scan for a registration over a test - // iterator with the following keys. - txn1, txn2 := uuid.MakeV4(), uuid.MakeV4() - iter := newTestIterator([]engine.MVCCKeyValue{ - makeKV("a", "val1", 10), - makeInline("b", "val2"), - makeIntent("c", txn1, "txnKey1", 15), - makeKV("c", "val3", 11), - makeKV("c", "val4", 9), - makeIntent("d", txn2, "txnKey2", 21), - makeKV("d", "val5", 20), - makeKV("d", "val6", 19), - makeInline("g", "val7"), - makeKV("m", "val8", 1), - makeIntent("n", txn1, "txnKey1", 12), - makeIntent("r", txn1, "txnKey1", 19), - makeKV("r", "val9", 4), - makeIntent("w", txn1, "txnKey1", 3), - makeInline("x", "val10"), - makeIntent("z", txn2, "txnKey2", 21), - makeKV("z", "val11", 4), - }) - r := newTestRegistration(roachpb.Span{ - Key: roachpb.Key("d"), - EndKey: roachpb.Key("w"), - }) - r.catchUpIter = iter - r.startTS = hlc.Timestamp{WallTime: 4} - - catchUpScan := newCatchUpScan(&p, &r.registration) - catchUpScan.Run(context.Background()) - require.True(t, iter.closed) - - // Compare the events sent on the registration's Stream to the expected events. - expEvents := []*roachpb.RangeFeedEvent{ - rangeFeedValue( - roachpb.Key("d"), - roachpb.Value{RawBytes: []byte("val5"), Timestamp: hlc.Timestamp{WallTime: 20}}, - ), - rangeFeedValue( - roachpb.Key("d"), - roachpb.Value{RawBytes: []byte("val6"), Timestamp: hlc.Timestamp{WallTime: 19}}, - ), - rangeFeedValue( - roachpb.Key("g"), - roachpb.Value{RawBytes: []byte("val7"), Timestamp: hlc.Timestamp{WallTime: 0}}, - ), - } - require.Equal(t, expEvents, r.Events()) - require.Equal(t, 1, len(p.catchUpC)) - require.Equal(t, catchUpResult{r: &r.registration}, <-p.catchUpC) -} - type testTxnPusher struct { pushTxnsFn func([]enginepb.TxnMeta, hlc.Timestamp) ([]roachpb.Transaction, error) cleanupTxnIntentsAsyncFn func([]roachpb.Transaction) error diff --git a/pkg/storage/replica_rangefeed.go b/pkg/storage/replica_rangefeed.go index 281dc2ed8005..f4da308b3b15 100644 --- a/pkg/storage/replica_rangefeed.go +++ b/pkg/storage/replica_rangefeed.go @@ -198,7 +198,7 @@ func (r *Replica) maybeInitRangefeedRaftMuLocked() *rangefeed.Processor { Clock: r.Clock(), Span: desc.RSpan(), TxnPusher: &tp, - EventChanCap: 4096, + EventChanCap: 256, EventChanTimeout: 50 * time.Millisecond, } r.raftMu.rangefeed = rangefeed.NewProcessor(cfg)