From 095f2b4c8a04a239d5b9afdbbd49bd383376f3a4 Mon Sep 17 00:00:00 2001 From: Wenyi Hu Date: Sat, 29 Jun 2024 19:46:38 -0400 Subject: [PATCH] kvserver/rangefeed: remove future package Previously, to reduce O(ranges) goroutines and avoid blocking on rangefeed completion, we introduced the future package to manage rangefeed completion by invoking a future callback when an error occurred. The future package makes it difficult to justify which goroutine the callback runs on and which locks are being held. This patch introduces a new approach, replacing the future usage. Each registration now uses the embedded StreamMuxer.DisconnectStreamWithError method to signal rangefeed completion. StreamMuxer will manage the shutdown logic and additional stream cleanup. Part of: #126561 Release note: none --- pkg/kv/kvclient/rangefeed/BUILD.bazel | 1 - .../rangefeed/rangefeed_external_test.go | 31 ++++-- pkg/kv/kvserver/BUILD.bazel | 2 - .../client_replica_circuit_breaker_test.go | 52 +++++++--- pkg/kv/kvserver/rangefeed/BUILD.bazel | 2 - pkg/kv/kvserver/rangefeed/bench_test.go | 35 +++++-- pkg/kv/kvserver/rangefeed/processor.go | 5 +- pkg/kv/kvserver/rangefeed/processor_test.go | 89 ++++++----------- pkg/kv/kvserver/rangefeed/registry.go | 11 ++- pkg/kv/kvserver/rangefeed/registry_test.go | 95 +++++++++++-------- .../kvserver/rangefeed/scheduled_processor.go | 4 +- pkg/kv/kvserver/replica_rangefeed.go | 44 ++++----- pkg/kv/kvserver/replica_rangefeed_test.go | 62 +++++++----- pkg/kv/kvserver/store.go | 13 +-- pkg/kv/kvserver/stores.go | 8 +- pkg/server/node.go | 24 ++++- 16 files changed, 268 insertions(+), 210 deletions(-) diff --git a/pkg/kv/kvclient/rangefeed/BUILD.bazel b/pkg/kv/kvclient/rangefeed/BUILD.bazel index 54c320b59746..bea38c400af3 100644 --- a/pkg/kv/kvclient/rangefeed/BUILD.bazel +++ b/pkg/kv/kvclient/rangefeed/BUILD.bazel @@ -87,7 +87,6 @@ go_test( "//pkg/testutils/testcluster", "//pkg/util/ctxgroup", "//pkg/util/encoding", - "//pkg/util/future", "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", diff --git a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go index ec5f71263bbc..993093f63375 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go @@ -40,7 +40,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/storageutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/encoding" - "github.com/cockroachdb/cockroach/pkg/util/future" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -1462,8 +1461,8 @@ func TestRangeFeedIntentResolutionRace(t *testing.T) { } eventC := make(chan *kvpb.RangeFeedEvent) sink := newChannelSink(ctx, eventC) - fErr := future.MakeAwaitableFuture(s3.RangeFeed(&req, sink)) - require.NoError(t, fErr.Get()) // check if we've errored yet + require.NoError(t, s3.RangeFeed(&req, sink)) // check if we've errored yet + require.NoError(t, sink.Error()) t.Logf("started rangefeed on %s", repl3) // Spawn a rangefeed monitor, which posts checkpoint updates to checkpointC. @@ -1620,17 +1619,18 @@ func TestRangeFeedIntentResolutionRace(t *testing.T) { } // The rangefeed should still be running. - require.NoError(t, fErr.Get()) + require.NoError(t, sink.Error()) } // channelSink is a rangefeed sink which posts events to a channel. type channelSink struct { - ctx context.Context - ch chan<- *kvpb.RangeFeedEvent + ctx context.Context + ch chan<- *kvpb.RangeFeedEvent + done chan *kvpb.Error } func newChannelSink(ctx context.Context, ch chan<- *kvpb.RangeFeedEvent) *channelSink { - return &channelSink{ctx: ctx, ch: ch} + return &channelSink{ctx: ctx, ch: ch, done: make(chan *kvpb.Error, 1)} } func (c *channelSink) Context() context.Context { @@ -1648,6 +1648,23 @@ func (c *channelSink) Send(e *kvpb.RangeFeedEvent) error { } } +// Error returns the error sent to the done channel if the sink has been +// disconnected. It returns nil otherwise. +func (c *channelSink) Error() error { + select { + case err := <-c.done: + return err.GoError() + default: + return nil + } +} + +// Disconnect implements the Stream interface. It mocks the disconnect behavior +// by sending the error to the done channel. +func (c *channelSink) Disconnect(err *kvpb.Error) { + c.done <- err +} + // TestRangeFeedMetadataManualSplit tests that a spawned rangefeed emits a // metadata event which indicates if it spawned to due a manual split. The // test specifically conducts the following: diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 45a33518dc78..70b166d27a7b 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -201,7 +201,6 @@ go_library( "//pkg/util/encoding", "//pkg/util/envutil", "//pkg/util/errorutil", - "//pkg/util/future", "//pkg/util/growstack", "//pkg/util/grpcutil", "//pkg/util/grunning", @@ -499,7 +498,6 @@ go_test( "//pkg/util/circuit", "//pkg/util/ctxgroup", "//pkg/util/encoding", - "//pkg/util/future", "//pkg/util/grunning", "//pkg/util/hlc", "//pkg/util/humanizeutil", diff --git a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go index 356bf691ce94..2aa8376826f1 100644 --- a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go +++ b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go @@ -35,7 +35,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/circuit" - "github.com/cockroachdb/cockroach/pkg/util/future" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -451,6 +450,16 @@ type dummyStream struct { name string ctx context.Context recv chan *kvpb.RangeFeedEvent + done chan *kvpb.Error +} + +func newDummyStream(ctx context.Context, name string) *dummyStream { + return &dummyStream{ + ctx: ctx, + name: name, + recv: make(chan *kvpb.RangeFeedEvent), + done: make(chan *kvpb.Error, 1), + } } func (s *dummyStream) Context() context.Context { @@ -472,21 +481,34 @@ func (s *dummyStream) Send(ev *kvpb.RangeFeedEvent) error { } } +// Disconnect implements the Stream interface. It mocks the disconnect behavior +// by sending the error to the done channel. +func (s *dummyStream) Disconnect(err *kvpb.Error) { + s.done <- err +} + func waitReplicaRangeFeed( - ctx context.Context, - r *kvserver.Replica, - req *kvpb.RangeFeedRequest, - stream kvpb.RangeFeedEventSink, + ctx context.Context, r *kvserver.Replica, req *kvpb.RangeFeedRequest, stream *dummyStream, ) error { - rfErr, ctxErr := future.Wait(ctx, r.RangeFeed(req, stream, nil /* pacer */)) - if ctxErr != nil { - return ctxErr + sendErrToStream := func(err *kvpb.Error) error { + var event kvpb.RangeFeedEvent + event.SetValue(&kvpb.RangeFeedError{ + Error: *err, + }) + return stream.Send(&event) + } + + err := r.RangeFeed(req, stream, nil /* pacer */) + if err != nil { + return sendErrToStream(kvpb.NewError(err)) + } + + select { + case err := <-stream.done: + return sendErrToStream(err) + case <-ctx.Done(): + return ctx.Err() } - var event kvpb.RangeFeedEvent - event.SetValue(&kvpb.RangeFeedError{ - Error: *kvpb.NewError(rfErr), - }) - return stream.Send(&event) } // This test verifies that RangeFeed bypasses the circuit breaker. When the @@ -508,7 +530,7 @@ func TestReplicaCircuitBreaker_RangeFeed(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() - stream1 := &dummyStream{ctx: ctx, name: "rangefeed1", recv: make(chan *kvpb.RangeFeedEvent)} + stream1 := newDummyStream(ctx, "rangefeed1") require.NoError(t, tc.Stopper().RunAsyncTask(ctx, "stream1", func(ctx context.Context) { err := waitReplicaRangeFeed(ctx, tc.repls[0].Replica, args, stream1) if ctx.Err() != nil { @@ -562,7 +584,7 @@ func TestReplicaCircuitBreaker_RangeFeed(t *testing.T) { // Start another stream during the "outage" to make sure it isn't rejected by // the breaker. - stream2 := &dummyStream{ctx: ctx, name: "rangefeed2", recv: make(chan *kvpb.RangeFeedEvent)} + stream2 := newDummyStream(ctx, "rangefeed2") require.NoError(t, tc.Stopper().RunAsyncTask(ctx, "stream2", func(ctx context.Context) { err := waitReplicaRangeFeed(ctx, tc.repls[0].Replica, args, stream2) if ctx.Err() != nil { diff --git a/pkg/kv/kvserver/rangefeed/BUILD.bazel b/pkg/kv/kvserver/rangefeed/BUILD.bazel index 16e0e1fcb991..047efd2fed2b 100644 --- a/pkg/kv/kvserver/rangefeed/BUILD.bazel +++ b/pkg/kv/kvserver/rangefeed/BUILD.bazel @@ -37,7 +37,6 @@ go_library( "//pkg/util/buildutil", "//pkg/util/container/heap", "//pkg/util/envutil", - "//pkg/util/future", "//pkg/util/hlc", "//pkg/util/interval", "//pkg/util/log", @@ -89,7 +88,6 @@ go_test( "//pkg/testutils/skip", "//pkg/testutils/storageutils", "//pkg/util/encoding", - "//pkg/util/future", "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", diff --git a/pkg/kv/kvserver/rangefeed/bench_test.go b/pkg/kv/kvserver/rangefeed/bench_test.go index af59ca71bf5e..1c40250bd734 100644 --- a/pkg/kv/kvserver/rangefeed/bench_test.go +++ b/pkg/kv/kvserver/rangefeed/bench_test.go @@ -22,7 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" - "github.com/cockroachdb/cockroach/pkg/util/future" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/stretchr/testify/require" @@ -100,7 +100,6 @@ func runBenchmarkRangefeed(b *testing.B, opts benchmarkRangefeedOpts) { // Add registrations. streams := make([]*noopStream, opts.numRegistrations) - futures := make([]*future.ErrorFuture, opts.numRegistrations) for i := 0; i < opts.numRegistrations; i++ { // withDiff does not matter for these benchmarks, since the previous value // is fetched and populated during Raft application. @@ -108,11 +107,10 @@ func runBenchmarkRangefeed(b *testing.B, opts benchmarkRangefeedOpts) { // withFiltering does not matter for these benchmarks because doesn't fetch // extra data. const withFiltering = false - streams[i] = &noopStream{ctx: ctx} - futures[i] = &future.ErrorFuture{} + streams[i] = &noopStream{ctx: ctx, done: make(chan *kvpb.Error, 1)} ok, _ := p.Register(span, hlc.MinTimestamp, nil, withDiff, withFiltering, false, /* withOmitRemote */ - streams[i], nil, futures[i]) + streams[i], nil) require.True(b, ok) } @@ -185,10 +183,9 @@ func runBenchmarkRangefeed(b *testing.B, opts benchmarkRangefeedOpts) { b.StopTimer() p.Stop() - for i, f := range futures { - regErr, err := future.Wait(ctx, f) - require.NoError(b, err) - require.NoError(b, regErr) + for i, s := range streams { + // p.Stop() sends a nil error to all streams to signal completion. + require.NoError(b, s.WaitForError(b)) require.Equal(b, b.N, streams[i].events-1) // ignore checkpoint after catchup } } @@ -197,6 +194,7 @@ func runBenchmarkRangefeed(b *testing.B, opts benchmarkRangefeedOpts) { type noopStream struct { ctx context.Context events int + done chan *kvpb.Error } func (s *noopStream) Context() context.Context { @@ -211,3 +209,22 @@ func (s *noopStream) Send(*kvpb.RangeFeedEvent) error { // Note that Send itself is not thread-safe, but it is written to be used only // in a single threaded environment in this test, ensuring thread-safety. func (s *noopStream) SendIsThreadSafe() {} + +// Disconnect implements the Stream interface. It mocks the disconnect behavior +// by sending the error to the done channel. +func (s *noopStream) Disconnect(error *kvpb.Error) { + s.done <- error +} + +// WaitForError waits for the rangefeed to complete and returns the error sent +// to the done channel. It fails the test if rangefeed cannot complete within 30 +// seconds. +func (s *noopStream) WaitForError(b *testing.B) error { + select { + case err := <-s.done: + return err.GoError() + case <-time.After(testutils.DefaultSucceedsSoonDuration): + b.Fatalf("time out waiting for rangefeed completion") + return nil + } +} diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index 88ea9870e700..b439cbfa9e0e 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -22,7 +22,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/envutil" - "github.com/cockroachdb/cockroach/pkg/util/future" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -210,7 +209,6 @@ type Processor interface { withOmitRemote bool, stream Stream, disconnectFn func(), - done *future.ErrorFuture, ) (bool, *Filter) // DisconnectSpanWithErr disconnects all rangefeed registrations that overlap // the given span with the given error. @@ -595,7 +593,6 @@ func (p *LegacyProcessor) Register( withOmitRemote bool, stream Stream, disconnectFn func(), - done *future.ErrorFuture, ) (bool, *Filter) { // Synchronize the event channel so that this registration doesn't see any // events that were consumed before this registration was called. Instead, @@ -605,7 +602,7 @@ func (p *LegacyProcessor) Register( blockWhenFull := p.Config.EventChanTimeout == 0 // for testing r := newRegistration( span.AsRawSpanWithNoLocals(), startTS, catchUpIter, withDiff, withFiltering, withOmitRemote, - p.Config.EventChanCap, blockWhenFull, p.Metrics, stream, disconnectFn, done, + p.Config.EventChanCap, blockWhenFull, p.Metrics, stream, disconnectFn, ) select { case p.regC <- r: diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go index b0e30c67c112..7217af9ba228 100644 --- a/pkg/kv/kvserver/rangefeed/processor_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_test.go @@ -30,7 +30,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" - "github.com/cockroachdb/cockroach/pkg/util/future" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -463,11 +462,6 @@ func newTestProcessor( return s, &h, stopper } -func waitErrorFuture(f *future.ErrorFuture) error { - resultErr, _ := future.Wait(context.Background(), f) - return resultErr -} - func TestProcessorBasic(t *testing.T) { defer leaktest.AfterTest(t)() testutils.RunValues(t, "proc type", testTypes, func(t *testing.T, pt procType) { @@ -496,7 +490,6 @@ func TestProcessorBasic(t *testing.T) { // Add a registration. r1Stream := newTestStream() - var r1Done future.ErrorFuture r1OK, r1Filter := p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, hlc.Timestamp{WallTime: 1}, @@ -506,7 +499,6 @@ func TestProcessorBasic(t *testing.T) { false, /* withOmitRemote */ r1Stream, func() {}, - &r1Done, ) require.True(t, r1OK) h.syncEventAndRegistrations() @@ -632,7 +624,6 @@ func TestProcessorBasic(t *testing.T) { // Add another registration with withDiff = true and withFiltering = true. r2Stream := newTestStream() - var r2Done future.ErrorFuture r2OK, r1And2Filter := p.Register( roachpb.RSpan{Key: roachpb.RKey("c"), EndKey: roachpb.RKey("z")}, hlc.Timestamp{WallTime: 1}, @@ -642,7 +633,6 @@ func TestProcessorBasic(t *testing.T) { false, /* withOmitRemote */ r2Stream, func() {}, - &r2Done, ) require.True(t, r2OK) h.syncEventAndRegistrations() @@ -739,16 +729,15 @@ func TestProcessorBasic(t *testing.T) { // Cancel the first registration. r1Stream.Cancel() - require.NotNil(t, waitErrorFuture(&r1Done)) + require.NotNil(t, r1Stream.WaitForError(t)) // Stop the processor with an error. pErr := kvpb.NewErrorf("stop err") p.StopWithErr(pErr) - require.NotNil(t, waitErrorFuture(&r2Done)) + require.NotNil(t, r2Stream.WaitForError(t)) // Adding another registration should fail. r3Stream := newTestStream() - var r3Done future.ErrorFuture r3OK, _ := p.Register( roachpb.RSpan{Key: roachpb.RKey("c"), EndKey: roachpb.RKey("z")}, hlc.Timestamp{WallTime: 1}, @@ -758,7 +747,6 @@ func TestProcessorBasic(t *testing.T) { false, /* withOmitRemote */ r3Stream, func() {}, - &r3Done, ) require.False(t, r3OK) }) @@ -775,7 +763,6 @@ func TestProcessorOmitRemote(t *testing.T) { // Add a registration. r1Stream := newTestStream() - var r1Done future.ErrorFuture r1OK, _ := p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, hlc.Timestamp{WallTime: 1}, @@ -785,7 +772,6 @@ func TestProcessorOmitRemote(t *testing.T) { false, /* withOmitRemote */ r1Stream, func() {}, - &r1Done, ) require.True(t, r1OK) h.syncEventAndRegistrations() @@ -802,7 +788,6 @@ func TestProcessorOmitRemote(t *testing.T) { // Add another registration with withOmitRemote = true. r2Stream := newTestStream() - var r2Done future.ErrorFuture r2OK, _ := p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, hlc.Timestamp{WallTime: 1}, @@ -812,7 +797,6 @@ func TestProcessorOmitRemote(t *testing.T) { true, /* withOmitRemote */ r2Stream, func() {}, - &r2Done, ) require.True(t, r2OK) h.syncEventAndRegistrations() @@ -857,7 +841,6 @@ func TestProcessorSlowConsumer(t *testing.T) { // Add a registration. r1Stream := newTestStream() - var r1Done future.ErrorFuture _, _ = p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, hlc.Timestamp{WallTime: 1}, @@ -867,10 +850,8 @@ func TestProcessorSlowConsumer(t *testing.T) { false, /* withOmitRemote */ r1Stream, func() {}, - &r1Done, ) r2Stream := newTestStream() - var r2Done future.ErrorFuture p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")}, hlc.Timestamp{WallTime: 1}, @@ -880,7 +861,6 @@ func TestProcessorSlowConsumer(t *testing.T) { false, /* withOmitRemote */ r2Stream, func() {}, - &r2Done, ) h.syncEventAndRegistrations() require.Equal(t, 2, p.Len()) @@ -942,7 +922,7 @@ func TestProcessorSlowConsumer(t *testing.T) { // were dropped due to rapid event consumption before the r1's outputLoop // began consuming from its event buffer. require.LessOrEqual(t, len(r1Stream.Events()), toFill) - require.Equal(t, newErrBufferCapacityExceeded().GoError(), waitErrorFuture(&r1Done)) + require.Equal(t, newErrBufferCapacityExceeded().GoError(), r1Stream.WaitForError(t)) testutils.SucceedsSoon(t, func() error { if act, exp := p.Len(), 1; exp != act { return fmt.Errorf("processor had %d regs, wanted %d", act, exp) @@ -968,7 +948,6 @@ func TestProcessorMemoryBudgetExceeded(t *testing.T) { // Add a registration. r1Stream := newTestStream() - var r1Done future.ErrorFuture _, _ = p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, hlc.Timestamp{WallTime: 1}, @@ -978,7 +957,6 @@ func TestProcessorMemoryBudgetExceeded(t *testing.T) { false, /* withOmitRemote */ r1Stream, func() {}, - &r1Done, ) h.syncEventAndRegistrations() @@ -1007,7 +985,7 @@ func TestProcessorMemoryBudgetExceeded(t *testing.T) { unblock = nil h.syncEventAndRegistrations() - require.Equal(t, newErrBufferCapacityExceeded().GoError(), waitErrorFuture(&r1Done)) + require.Equal(t, newErrBufferCapacityExceeded().GoError(), r1Stream.WaitForError(t)) require.Equal(t, 0, p.Len(), "registration was not removed") require.Equal(t, int64(1), m.RangeFeedBudgetExhausted.Count()) }) @@ -1025,7 +1003,6 @@ func TestProcessorMemoryBudgetReleased(t *testing.T) { // Add a registration. r1Stream := newTestStream() - var r1Done future.ErrorFuture p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, hlc.Timestamp{WallTime: 1}, @@ -1035,7 +1012,6 @@ func TestProcessorMemoryBudgetReleased(t *testing.T) { false, /* withOmitRemote */ r1Stream, func() {}, - &r1Done, ) h.syncEventAndRegistrations() @@ -1108,7 +1084,6 @@ func TestProcessorInitializeResolvedTimestamp(t *testing.T) { // Add a registration. r1Stream := newTestStream() - var r1Done future.ErrorFuture p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, hlc.Timestamp{WallTime: 1}, @@ -1118,7 +1093,6 @@ func TestProcessorInitializeResolvedTimestamp(t *testing.T) { false, /* withOmitRemote */ r1Stream, func() {}, - &r1Done, ) h.syncEventAndRegistrations() require.Equal(t, 1, p.Len()) @@ -1424,9 +1398,8 @@ func TestProcessorConcurrentStop(t *testing.T) { defer wg.Done() runtime.Gosched() s := newTestStream() - var done future.ErrorFuture p.Register(h.span, hlc.Timestamp{}, nil, /* catchUpIter */ - false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, s, func() {}, &done) + false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, s, func() {}) }() go func() { defer wg.Done() @@ -1497,9 +1470,8 @@ func TestProcessorRegistrationObservesOnlyNewEvents(t *testing.T) { // operation is should see is firstIdx. s := newTestStream() regs[s] = firstIdx - var done future.ErrorFuture p.Register(h.span, hlc.Timestamp{}, nil, /* catchUpIter */ - false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, s, func() {}, &done) + false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, s, func() {}) regDone <- struct{}{} } }() @@ -1520,14 +1492,6 @@ func TestProcessorRegistrationObservesOnlyNewEvents(t *testing.T) { }) } -func notifyWhenDone(f *future.ErrorFuture) chan error { - ch := make(chan error, 1) - f.WhenReady(func(err error) { - ch <- err - }) - return ch -} - func TestBudgetReleaseOnProcessorStop(t *testing.T) { defer leaktest.AfterTest(t)() const totalEvents = 100 @@ -1559,7 +1523,6 @@ func TestBudgetReleaseOnProcessorStop(t *testing.T) { // Add a registration. rStream := newConsumer(50) defer func() { rStream.Resume() }() - var done future.ErrorFuture _, _ = p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, hlc.Timestamp{WallTime: 1}, @@ -1569,9 +1532,7 @@ func TestBudgetReleaseOnProcessorStop(t *testing.T) { false, /* withOmitRemote */ rStream, func() {}, - &done, ) - rErrC := notifyWhenDone(&done) h.syncEventAndRegistrations() for i := 0; i < totalEvents; i++ { @@ -1584,7 +1545,7 @@ func TestBudgetReleaseOnProcessorStop(t *testing.T) { // Wait for half of the event to be processed by stream then stop processor. select { case <-rStream.blocked: - case err := <-rErrC: + case err := <-rStream.done: t.Fatal("stream failed with error before all data was consumed", err) } @@ -1642,7 +1603,6 @@ func TestBudgetReleaseOnLastStreamError(t *testing.T) { // Add a registration. rStream := newConsumer(90) defer func() { rStream.Resume() }() - var done future.ErrorFuture _, _ = p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, hlc.Timestamp{WallTime: 1}, @@ -1652,9 +1612,7 @@ func TestBudgetReleaseOnLastStreamError(t *testing.T) { false, /* withOmitRemote */ rStream, func() {}, - &done, ) - rErrC := notifyWhenDone(&done) h.syncEventAndRegistrations() for i := 0; i < totalEvents; i++ { @@ -1667,7 +1625,7 @@ func TestBudgetReleaseOnLastStreamError(t *testing.T) { // Wait for half of the event to be processed then raise error. select { case <-rStream.blocked: - case err := <-rErrC: + case err := <-rStream.done: t.Fatal("stream failed with error before stream blocked: ", err) } @@ -1715,7 +1673,6 @@ func TestBudgetReleaseOnOneStreamError(t *testing.T) { // Add a registration. r1Stream := newConsumer(50) defer func() { r1Stream.Resume() }() - var r1Done future.ErrorFuture _, _ = p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, hlc.Timestamp{WallTime: 1}, @@ -1725,13 +1682,10 @@ func TestBudgetReleaseOnOneStreamError(t *testing.T) { false, /* withOmitRemote */ r1Stream, func() {}, - &r1Done, ) - r1ErrC := notifyWhenDone(&r1Done) // Non-blocking registration that would consume all events. r2Stream := newConsumer(0) - var r2Done future.ErrorFuture p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, hlc.Timestamp{WallTime: 1}, @@ -1741,7 +1695,6 @@ func TestBudgetReleaseOnOneStreamError(t *testing.T) { false, /* withOmitRemote */ r2Stream, func() {}, - &r2Done, ) h.syncEventAndRegistrations() @@ -1755,7 +1708,7 @@ func TestBudgetReleaseOnOneStreamError(t *testing.T) { // Wait for half of the event to be processed then stop processor. select { case <-r1Stream.blocked: - case err := <-r1ErrC: + case err := <-r1Stream.done: t.Fatal("stream failed with error before all data was consumed", err) } @@ -1794,6 +1747,7 @@ type consumer struct { ctx context.Context ctxDone func() sentValues int32 + done chan *kvpb.Error blockAfter int blocked chan interface{} @@ -1808,6 +1762,7 @@ func newConsumer(blockAfter int) *consumer { blockAfter: blockAfter, blocked: make(chan interface{}), resume: make(chan error), + done: make(chan *kvpb.Error, 1), } } @@ -1841,6 +1796,25 @@ func (c *consumer) WaitBlock() { <-c.blocked } +// Disconnect implements the Stream interface. It mocks the disconnect behavior +// by sending the error to the done channel. +func (c *consumer) Disconnect(error *kvpb.Error) { + c.done <- error +} + +// WaitForError waits for the rangefeed to complete and returns the error sent +// to the done channel. It fails the test if rangefeed cannot complete within 30 +// seconds. +func (c *consumer) WaitForError(t *testing.T) error { + select { + case err := <-c.done: + return err.GoError() + case <-time.After(testutils.DefaultSucceedsSoonDuration): + t.Fatalf("time out waiting for rangefeed completion") + return nil + } +} + // Resume resumes stream by closing its wait channel. // If there was a pending err for resuming then it would be discarded and // channel closed. @@ -1890,9 +1864,8 @@ func TestProcessorBackpressure(t *testing.T) { // Add a registration. stream := newTestStream() - done := &future.ErrorFuture{} ok, _ := p.Register(span, hlc.MinTimestamp, nil, /* catchUpIter */ - false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, stream, nil, done) + false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, stream, nil) require.True(t, ok) // Wait for the initial checkpoint. diff --git a/pkg/kv/kvserver/rangefeed/registry.go b/pkg/kv/kvserver/rangefeed/registry.go index 532485616962..f79e06bbe092 100644 --- a/pkg/kv/kvserver/rangefeed/registry.go +++ b/pkg/kv/kvserver/rangefeed/registry.go @@ -18,7 +18,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/util/future" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/interval" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -31,6 +30,11 @@ import ( // Stream is a object capable of transmitting RangeFeedEvents. type Stream interface { kvpb.RangeFeedEventSink + // Disconnect disconnects the stream with the provided error. Note that this + // function can be called by the processor worker while holding raftMu, so it + // is important that this function doesn't block IO or try acquiring locks + // that could lead to deadlocks. + Disconnect(err *kvpb.Error) } // Shared event is an entry stored in registration channel. Each entry is @@ -81,7 +85,6 @@ type registration struct { // Output. stream Stream - done *future.ErrorFuture unreg func() // Internal. id int64 @@ -122,7 +125,6 @@ func newRegistration( metrics *Metrics, stream Stream, unregisterFn func(), - done *future.ErrorFuture, ) registration { r := registration{ span: span, @@ -132,7 +134,6 @@ func newRegistration( withOmitRemote: withOmitRemote, metrics: metrics, stream: stream, - done: done, unreg: unregisterFn, buf: make(chan *sharedEvent, bufferSz), blockWhenFull: blockWhenFull, @@ -299,7 +300,7 @@ func (r *registration) disconnect(pErr *kvpb.Error) { r.mu.outputLoopCancelFn() } r.mu.disconnected = true - r.done.Set(pErr.GoError()) + r.stream.Disconnect(pErr) } } diff --git a/pkg/kv/kvserver/rangefeed/registry_test.go b/pkg/kv/kvserver/rangefeed/registry_test.go index 4b3e76dede3a..22e71dc2c20b 100644 --- a/pkg/kv/kvserver/rangefeed/registry_test.go +++ b/pkg/kv/kvserver/rangefeed/registry_test.go @@ -15,6 +15,7 @@ import ( "fmt" "sync" "testing" + "time" _ "github.com/cockroachdb/cockroach/pkg/keys" // hook up pretty printer "github.com/cockroachdb/cockroach/pkg/kv/kvpb" @@ -22,7 +23,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" - "github.com/cockroachdb/cockroach/pkg/util/future" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/syncutil" @@ -45,6 +45,7 @@ var ( type testStream struct { ctx context.Context ctxDone func() + done chan *kvpb.Error mu struct { syncutil.Mutex sendErr error @@ -54,7 +55,7 @@ type testStream struct { func newTestStream() *testStream { ctx, done := context.WithCancel(context.Background()) - return &testStream{ctx: ctx, ctxDone: done} + return &testStream{ctx: ctx, ctxDone: done, done: make(chan *kvpb.Error, 1)} } func (s *testStream) Context() context.Context { @@ -99,9 +100,39 @@ func (s *testStream) BlockSend() func() { } } +// Disconnect implements the Stream interface. It mocks the disconnect behavior +// by sending the error to the done channel. +func (s *testStream) Disconnect(err *kvpb.Error) { + s.done <- err +} + +// Error returns the error that was sent to the done channel. It returns nil if +// no error was sent yet. +func (s *testStream) Error() error { + select { + case err := <-s.done: + return err.GoError() + default: + return nil + } +} + +// WaitForError waits for the rangefeed to complete and returns the error sent +// to the done channel. It fails the test if rangefeed cannot complete within 30 +// seconds. +func (s *testStream) WaitForError(t *testing.T) error { + select { + case err := <-s.done: + return err.GoError() + case <-time.After(testutils.DefaultSucceedsSoonDuration): + t.Fatalf("time out waiting for rangefeed completion") + return nil + } +} + type testRegistration struct { registration - stream *testStream + *testStream } func makeCatchUpIterator( @@ -138,27 +169,13 @@ func newTestRegistration( NewMetrics(), s, func() {}, - &future.ErrorFuture{}, ) return &testRegistration{ registration: r, - stream: s, + testStream: s, } } -func (r *testRegistration) Events() []*kvpb.RangeFeedEvent { - return r.stream.Events() -} - -func (r *testRegistration) Err() error { - err, _ := future.Wait(context.Background(), r.done) - return err -} - -func (r *testRegistration) TryErr() error { - return future.MakeAwaitableFuture(r.done).Get() -} - func TestRegistrationBasic(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() @@ -176,7 +193,7 @@ func TestRegistrationBasic(t *testing.T) { require.Equal(t, len(noCatchupReg.buf), 2) go noCatchupReg.runOutputLoop(ctx, 0) require.NoError(t, noCatchupReg.waitForCaughtUp(ctx)) - require.Equal(t, []*kvpb.RangeFeedEvent{ev1, ev2}, noCatchupReg.stream.Events()) + require.Equal(t, []*kvpb.RangeFeedEvent{ev1, ev2}, noCatchupReg.Events()) noCatchupReg.disconnect(nil) // Registration with catchup scan. @@ -192,7 +209,7 @@ func TestRegistrationBasic(t *testing.T) { require.Equal(t, len(catchupReg.buf), 2) go catchupReg.runOutputLoop(ctx, 0) require.NoError(t, catchupReg.waitForCaughtUp(ctx)) - events := catchupReg.stream.Events() + events := catchupReg.Events() require.Equal(t, 5, len(events)) require.Equal(t, []*kvpb.RangeFeedEvent{ev1, ev2}, events[3:]) catchupReg.disconnect(nil) @@ -207,8 +224,8 @@ func TestRegistrationBasic(t *testing.T) { require.NoError(t, disconnectReg.waitForCaughtUp(ctx)) discErr := kvpb.NewError(fmt.Errorf("disconnection error")) disconnectReg.disconnect(discErr) - require.Equal(t, discErr.GoError(), disconnectReg.Err()) - require.Equal(t, 2, len(disconnectReg.stream.Events())) + require.Equal(t, discErr.GoError(), disconnectReg.WaitForError(t)) + require.Equal(t, 2, len(disconnectReg.Events())) // External Disconnect before output loop. disconnectEarlyReg := newTestRegistration(spAB, hlc.Timestamp{}, nil, /* catchup */ @@ -217,8 +234,8 @@ func TestRegistrationBasic(t *testing.T) { disconnectEarlyReg.publish(ctx, ev2, nil /* alloc */) disconnectEarlyReg.disconnect(discErr) go disconnectEarlyReg.runOutputLoop(ctx, 0) - require.Equal(t, discErr.GoError(), disconnectEarlyReg.Err()) - require.Equal(t, 0, len(disconnectEarlyReg.stream.Events())) + require.Equal(t, discErr.GoError(), disconnectEarlyReg.WaitForError(t)) + require.Equal(t, 0, len(disconnectEarlyReg.Events())) // Overflow. overflowReg := newTestRegistration(spAB, hlc.Timestamp{}, nil, /* catchup */ @@ -227,26 +244,26 @@ func TestRegistrationBasic(t *testing.T) { overflowReg.publish(ctx, ev1, nil /* alloc */) } go overflowReg.runOutputLoop(ctx, 0) - require.Equal(t, newErrBufferCapacityExceeded().GoError(), overflowReg.Err()) + require.Equal(t, newErrBufferCapacityExceeded().GoError(), overflowReg.WaitForError(t)) require.Equal(t, cap(overflowReg.buf), len(overflowReg.Events())) // Stream Error. streamErrReg := newTestRegistration(spAB, hlc.Timestamp{}, nil, /* catchup */ false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */) streamErr := fmt.Errorf("stream error") - streamErrReg.stream.SetSendErr(streamErr) + streamErrReg.SetSendErr(streamErr) go streamErrReg.runOutputLoop(ctx, 0) streamErrReg.publish(ctx, ev1, nil /* alloc */) - require.Equal(t, streamErr.Error(), streamErrReg.Err().Error()) + require.Equal(t, streamErr, streamErrReg.WaitForError(t)) // Stream Context Canceled. streamCancelReg := newTestRegistration(spAB, hlc.Timestamp{}, nil, /* catchup */ false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */) - streamCancelReg.stream.Cancel() + streamCancelReg.Cancel() go streamCancelReg.runOutputLoop(ctx, 0) require.NoError(t, streamCancelReg.waitForCaughtUp(ctx)) - require.Equal(t, streamCancelReg.stream.Context().Err(), streamCancelReg.Err()) + require.Equal(t, streamCancelReg.stream.Context().Err(), streamCancelReg.WaitForError(t)) } func TestRegistrationCatchUpScan(t *testing.T) { @@ -408,8 +425,8 @@ func TestRegistryWithOmitOrigin(t *testing.T) { require.Equal(t, []*kvpb.RangeFeedEvent{noPrev(ev1), noPrev(ev2)}, rAC.Events()) require.Equal(t, []*kvpb.RangeFeedEvent{noPrev(ev1)}, originFiltering.Events()) - require.Nil(t, rAC.TryErr()) - require.Nil(t, originFiltering.TryErr()) + require.Nil(t, rAC.Error()) + require.Nil(t, originFiltering.Error()) } func TestRegistryBasic(t *testing.T) { @@ -480,11 +497,11 @@ func TestRegistryBasic(t *testing.T) { // Registration rACFiltering doesn't receive ev5 because both withFiltering // (for the registration) and OmitInRangefeeds (for the event) are true. require.Equal(t, []*kvpb.RangeFeedEvent{noPrev(ev1), noPrev(ev2), noPrev(ev4)}, rACFiltering.Events()) - require.Nil(t, rAB.TryErr()) - require.Nil(t, rBC.TryErr()) - require.Nil(t, rCD.TryErr()) - require.Nil(t, rAC.TryErr()) - require.Nil(t, rACFiltering.TryErr()) + require.Nil(t, rAB.Error()) + require.Nil(t, rBC.Error()) + require.Nil(t, rCD.Error()) + require.Nil(t, rAC.Error()) + require.Nil(t, rACFiltering.Error()) // Check the registry's operation filter. f := reg.NewFilter() @@ -512,7 +529,7 @@ func TestRegistryBasic(t *testing.T) { // Disconnect span that overlaps with rCD. reg.DisconnectWithErr(ctx, spCD, err1) require.Equal(t, 4, reg.Len()) - require.Equal(t, err1.GoError(), rCD.Err()) + require.Equal(t, err1.GoError(), rCD.WaitForError(t)) // Can still publish to rAB. reg.PublishToOverlapping(ctx, spAB, ev4, logicalOpMetadata{}, nil /* alloc */) @@ -524,8 +541,8 @@ func TestRegistryBasic(t *testing.T) { // Disconnect from rAB without error. reg.Disconnect(ctx, spAB) - require.Nil(t, rAC.Err()) - require.Nil(t, rAB.Err()) + require.Nil(t, rAC.WaitForError(t)) + require.Nil(t, rAB.WaitForError(t)) require.Equal(t, 1, reg.Len()) // Check the registry's operation filter again. diff --git a/pkg/kv/kvserver/rangefeed/scheduled_processor.go b/pkg/kv/kvserver/rangefeed/scheduled_processor.go index 2bf3b17fdd2b..577f8ff0fa11 100644 --- a/pkg/kv/kvserver/rangefeed/scheduled_processor.go +++ b/pkg/kv/kvserver/rangefeed/scheduled_processor.go @@ -18,7 +18,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/buildutil" - "github.com/cockroachdb/cockroach/pkg/util/future" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -310,7 +309,6 @@ func (p *ScheduledProcessor) Register( withOmitRemote bool, stream Stream, disconnectFn func(), - done *future.ErrorFuture, ) (bool, *Filter) { // Synchronize the event channel so that this registration doesn't see any // events that were consumed before this registration was called. Instead, @@ -320,7 +318,7 @@ func (p *ScheduledProcessor) Register( blockWhenFull := p.Config.EventChanTimeout == 0 // for testing r := newRegistration( span.AsRawSpanWithNoLocals(), startTS, catchUpIter, withDiff, withFiltering, withOmitRemote, - p.Config.EventChanCap, blockWhenFull, p.Metrics, stream, disconnectFn, done, + p.Config.EventChanCap, blockWhenFull, p.Metrics, stream, disconnectFn, ) filter := runRequest(p, func(ctx context.Context, p *ScheduledProcessor) *Filter { diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index 6e6f5475a505..bdb5305e996c 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -33,7 +33,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/envutil" - "github.com/cockroachdb/cockroach/pkg/util/future" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metamorphic" @@ -240,24 +239,24 @@ func (tp *rangefeedTxnPusher) Barrier(ctx context.Context) error { // complete. The surrounding store's ConcurrentRequestLimiter is used to limit // the number of rangefeeds using catch-up iterators at the same time. func (r *Replica) RangeFeed( - args *kvpb.RangeFeedRequest, stream kvpb.RangeFeedEventSink, pacer *admission.Pacer, -) *future.ErrorFuture { + args *kvpb.RangeFeedRequest, stream rangefeed.Stream, pacer *admission.Pacer, +) error { ctx := r.AnnotateCtx(stream.Context()) rSpan, err := keys.SpanAddr(args.Span) if err != nil { - return future.MakeCompletedErrorFuture(err) + return err } if err := r.ensureClosedTimestampStarted(ctx); err != nil { - return future.MakeCompletedErrorFuture(err.GoError()) + return err.GoError() } var omitRemote bool if len(args.WithMatchingOriginIDs) == 1 && args.WithMatchingOriginIDs[0] == 0 { omitRemote = true } else if len(args.WithMatchingOriginIDs) > 0 { - return future.MakeCompletedErrorFuture(errors.Errorf("multiple origin IDs and OriginID != 0 not supported yet")) + return errors.Errorf("multiple origin IDs and OriginID != 0 not supported yet") } // If the RangeFeed is performing a catch-up scan then it will observe all @@ -283,7 +282,7 @@ func (r *Replica) RangeFeed( usingCatchUpIter = true alloc, err := r.store.limiters.ConcurrentRangefeedIters.Begin(ctx) if err != nil { - return future.MakeCompletedErrorFuture(err) + return err } // Finish the iterator limit if we exit before the iterator finishes. @@ -308,7 +307,7 @@ func (r *Replica) RangeFeed( if err := r.checkExecutionCanProceedForRangeFeed(ctx, rSpan, checkTS); err != nil { r.raftMu.Unlock() iterSemRelease() - return future.MakeCompletedErrorFuture(err) + return err } // Register the stream with a catch-up iterator. @@ -322,16 +321,15 @@ func (r *Replica) RangeFeed( if err != nil { r.raftMu.Unlock() iterSemRelease() - return future.MakeCompletedErrorFuture(err) + return err } if f := r.store.TestingKnobs().RangefeedValueHeaderFilter; f != nil { catchUpIter.OnEmit = f } } - var done future.ErrorFuture - p := r.registerWithRangefeedRaftMuLocked( - ctx, rSpan, args.Timestamp, catchUpIter, args.WithDiff, args.WithFiltering, omitRemote, stream, &done, + p, err := r.registerWithRangefeedRaftMuLocked( + ctx, rSpan, args.Timestamp, catchUpIter, args.WithDiff, args.WithFiltering, omitRemote, stream, ) r.raftMu.Unlock() @@ -339,8 +337,7 @@ func (r *Replica) RangeFeed( // encountered an error after we created processor, disconnect if processor // is empty. defer r.maybeDisconnectEmptyRangefeed(p) - - return &done + return err } func (r *Replica) getRangefeedProcessorAndFilter() (rangefeed.Processor, *rangefeed.Filter) { @@ -430,8 +427,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( withFiltering bool, withOmitRemote bool, stream rangefeed.Stream, - done *future.ErrorFuture, -) rangefeed.Processor { +) (rangefeed.Processor, error) { defer logSlowRangefeedRegistration(ctx)() // Always defer closing iterator to cover old and new failure cases. @@ -451,7 +447,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( if p != nil { reg, filter := p.Register(span, startTS, catchUpIter, withDiff, withFiltering, withOmitRemote, - stream, func() { r.maybeDisconnectEmptyRangefeed(p) }, done) + stream, func() { r.maybeDisconnectEmptyRangefeed(p) }) if reg { // Registered successfully with an existing processor. // Update the rangefeed filter to avoid filtering ops @@ -459,7 +455,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( r.setRangefeedFilterLocked(filter) r.rangefeedMu.Unlock() catchUpIter = nil - return p + return p, nil } // If the registration failed, the processor was already being shut // down. Help unset it and then continue on with initializing a new @@ -517,7 +513,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( scanner, err := rangefeed.NewSeparatedIntentScanner(ctx, r.store.TODOEngine(), desc.RSpan()) if err != nil { - done.Set(err) + stream.Disconnect(kvpb.NewError(err)) return nil } return scanner @@ -529,8 +525,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( // due to stopping, but before it enters the quiescing state, then the select // below will fall through to the panic. if err := p.Start(r.store.Stopper(), rtsIter); err != nil { - done.Set(err) - return nil + return nil, err } // Register with the processor *before* we attach its reference to the @@ -539,12 +534,11 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( // this ensures that the only time the registration fails is during // server shutdown. reg, filter := p.Register(span, startTS, catchUpIter, withDiff, - withFiltering, withOmitRemote, stream, func() { r.maybeDisconnectEmptyRangefeed(p) }, done) + withFiltering, withOmitRemote, stream, func() { r.maybeDisconnectEmptyRangefeed(p) }) if !reg { select { case <-r.store.Stopper().ShouldQuiesce(): - done.Set(&kvpb.NodeUnavailableError{}) - return nil + return nil, &kvpb.NodeUnavailableError{} default: panic("unexpected Stopped processor") } @@ -560,7 +554,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( // Check for an initial closed timestamp update immediately to help // initialize the rangefeed's resolved timestamp as soon as possible. r.handleClosedTimestampUpdateRaftMuLocked(ctx, r.GetCurrentClosedTimestamp(ctx)) - return p + return p, nil } // maybeDisconnectEmptyRangefeed tears down the provided Processor if it is diff --git a/pkg/kv/kvserver/replica_rangefeed_test.go b/pkg/kv/kvserver/replica_rangefeed_test.go index 2ba4716d6d71..ccb667b88299 100644 --- a/pkg/kv/kvserver/replica_rangefeed_test.go +++ b/pkg/kv/kvserver/replica_rangefeed_test.go @@ -39,7 +39,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" - "github.com/cockroachdb/cockroach/pkg/util/future" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -54,6 +53,7 @@ import ( type testStream struct { ctx context.Context cancel func() + done chan *kvpb.Error mu struct { syncutil.Mutex events []*kvpb.RangeFeedEvent @@ -62,7 +62,7 @@ type testStream struct { func newTestStream() *testStream { ctx, cancel := context.WithCancel(context.Background()) - return &testStream{ctx: ctx, cancel: cancel} + return &testStream{ctx: ctx, cancel: cancel, done: make(chan *kvpb.Error, 1)} } func (s *testStream) SendMsg(m interface{}) error { panic("unimplemented") } @@ -94,9 +94,32 @@ func (s *testStream) Events() []*kvpb.RangeFeedEvent { return s.mu.events } -func waitRangeFeed(store *kvserver.Store, req *kvpb.RangeFeedRequest, stream *testStream) error { - retErr, _ := future.Wait(context.Background(), store.RangeFeed(req, stream)) - return retErr +// Disconnect implements the Stream interface. It mocks the disconnect behavior +// by sending the error to the done channel. +func (s *testStream) Disconnect(error *kvpb.Error) { + s.done <- error +} + +// WaitForError waits for the rangefeed to complete and returns the error sent +// to the done channel. It fails the test if rangefeed cannot complete within 30 +// seconds. +func (s *testStream) WaitForError(t *testing.T) error { + select { + case err := <-s.done: + return err.GoError() + case <-time.After(testutils.DefaultSucceedsSoonDuration): + t.Fatalf("time out waiting for rangefeed completion") + return nil + } +} + +func waitRangeFeed( + t *testing.T, store *kvserver.Store, req *kvpb.RangeFeedRequest, stream *testStream, +) error { + if err := store.RangeFeed(req, stream); err != nil { + return err + } + return stream.WaitForError(t) } func TestReplicaRangefeed(t *testing.T) { @@ -170,7 +193,7 @@ func TestReplicaRangefeed(t *testing.T) { } timer := time.AfterFunc(10*time.Second, stream.Cancel) defer timer.Stop() - streamErrC <- waitRangeFeed(store, &req, stream) + streamErrC <- waitRangeFeed(t, store, &req, stream) }(i) } @@ -566,7 +589,7 @@ func TestReplicaRangefeed(t *testing.T) { defer timer.Stop() defer stream.Cancel() - if pErr := waitRangeFeed(store, &req, stream); !testutils.IsError( + if pErr := waitRangeFeed(t, store, &req, stream); !testutils.IsError( pErr, `must be after replica GC threshold`, ) { return pErr @@ -608,7 +631,7 @@ func setupSimpleRangefeed( } timer := time.AfterFunc(10*time.Second, stream.Cancel) defer timer.Stop() - streamErrC <- waitRangeFeed(store, &req, stream) + streamErrC <- waitRangeFeed(t, store, &req, stream) }() // Wait for a checkpoint. @@ -716,11 +739,6 @@ func TestReplicaRangefeedOriginIDFiltering(t *testing.T) { }) } -func waitErrorFuture(f *future.ErrorFuture) error { - resultErr, _ := future.Wait(context.Background(), f) - return resultErr -} - func TestScheduledProcessorKillSwitch(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -910,7 +928,7 @@ func TestReplicaRangefeedErrors(t *testing.T) { } timer := time.AfterFunc(10*time.Second, stream.Cancel) defer timer.Stop() - streamErrC <- waitRangeFeed(store, &req, stream) + streamErrC <- waitRangeFeed(t, store, &req, stream) }() // Wait for the first checkpoint event. @@ -945,7 +963,7 @@ func TestReplicaRangefeedErrors(t *testing.T) { } timer := time.AfterFunc(10*time.Second, stream.Cancel) defer timer.Stop() - streamErrC <- waitRangeFeed(store, &req, stream) + streamErrC <- waitRangeFeed(t, store, &req, stream) }() // Wait for the first checkpoint event. @@ -990,7 +1008,7 @@ func TestReplicaRangefeedErrors(t *testing.T) { } timer := time.AfterFunc(10*time.Second, streamLeft.Cancel) defer timer.Stop() - streamLeftErrC <- waitRangeFeed(store, &req, streamLeft) + streamLeftErrC <- waitRangeFeed(t, store, &req, streamLeft) }() // Establish a rangefeed on the right replica. @@ -1006,7 +1024,7 @@ func TestReplicaRangefeedErrors(t *testing.T) { } timer := time.AfterFunc(10*time.Second, streamRight.Cancel) defer timer.Stop() - streamRightErrC <- waitRangeFeed(store, &req, streamRight) + streamRightErrC <- waitRangeFeed(t, store, &req, streamRight) }() // Wait for the first checkpoint event on each stream. @@ -1064,7 +1082,7 @@ func TestReplicaRangefeedErrors(t *testing.T) { } timer := time.AfterFunc(10*time.Second, stream.Cancel) defer timer.Stop() - streamErrC <- waitRangeFeed(partitionStore, &req, stream) + streamErrC <- waitRangeFeed(t, partitionStore, &req, stream) }() // Wait for the first checkpoint event. @@ -1192,7 +1210,7 @@ func TestReplicaRangefeedErrors(t *testing.T) { kvserver.RangefeedEnabled.Override(ctx, &store.ClusterSettings().SV, true) timer := time.AfterFunc(10*time.Second, stream.Cancel) defer timer.Stop() - streamErrC <- waitRangeFeed(store, &req, stream) + streamErrC <- waitRangeFeed(t, store, &req, stream) }() // Wait for the first checkpoint event. @@ -1267,7 +1285,7 @@ func TestReplicaRangefeedErrors(t *testing.T) { } timer := time.AfterFunc(10*time.Second, stream.Cancel) defer timer.Stop() - streamErrC <- waitErrorFuture(store.RangeFeed(&req, stream)) + streamErrC <- waitRangeFeed(t, store, &req, stream) }() // Check the error. @@ -1287,7 +1305,7 @@ func TestReplicaRangefeedErrors(t *testing.T) { } timer := time.AfterFunc(10*time.Second, stream.Cancel) defer timer.Stop() - streamErrC <- waitErrorFuture(store.RangeFeed(&req, stream)) + streamErrC <- waitRangeFeed(t, store, &req, stream) }() // Wait for the first checkpoint event. @@ -1315,7 +1333,7 @@ func TestReplicaRangefeedErrors(t *testing.T) { } timer := time.AfterFunc(10*time.Second, stream.Cancel) defer timer.Stop() - streamErrC <- waitRangeFeed(store, &req, stream) + streamErrC <- waitRangeFeed(t, store, &req, stream) }() // Check the error. diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 8c35fd7bfa68..5ba3d858de97 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -77,7 +77,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/envutil" - "github.com/cockroachdb/cockroach/pkg/util/future" "github.com/cockroachdb/cockroach/pkg/util/grunning" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/iterutil" @@ -3145,23 +3144,21 @@ func (s *Store) Descriptor(ctx context.Context, useCached bool) (*roachpb.StoreD // RangeFeed registers a rangefeed over the specified span. It sends updates to // the provided stream and returns a future with an optional error when the rangefeed is // complete. -func (s *Store) RangeFeed( - args *kvpb.RangeFeedRequest, stream kvpb.RangeFeedEventSink, -) *future.ErrorFuture { +func (s *Store) RangeFeed(args *kvpb.RangeFeedRequest, stream rangefeed.Stream) error { if filter := s.TestingKnobs().TestingRangefeedFilter; filter != nil { if pErr := filter(args, stream); pErr != nil { - return future.MakeCompletedErrorFuture(pErr.GoError()) + return pErr.GoError() } } if err := verifyKeys(args.Span.Key, args.Span.EndKey, true); err != nil { - return future.MakeCompletedErrorFuture(err) + return err } // Get range and add command to the range for execution. repl, err := s.GetReplica(args.RangeID) if err != nil { - return future.MakeCompletedErrorFuture(err) + return err } if !repl.IsInitialized() { // (*Store).Send has an optimization for uninitialized replicas to send back @@ -3169,7 +3166,7 @@ func (s *Store) RangeFeed( // be found. RangeFeeds can always be served from followers and so don't // otherwise return NotLeaseHolderError. For simplicity we also don't return // one here. - return future.MakeCompletedErrorFuture(kvpb.NewRangeNotFoundError(args.RangeID, s.StoreID())) + return kvpb.NewRangeNotFoundError(args.RangeID, s.StoreID()) } tenID, _ := repl.TenantID() diff --git a/pkg/kv/kvserver/stores.go b/pkg/kv/kvserver/stores.go index d9bf5a1a5a5c..8320265f6aed 100644 --- a/pkg/kv/kvserver/stores.go +++ b/pkg/kv/kvserver/stores.go @@ -19,10 +19,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/disk" - "github.com/cockroachdb/cockroach/pkg/util/future" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" @@ -209,9 +209,7 @@ func (ls *Stores) SendWithWriteBytes( // RangeFeed registers a rangefeed over the specified span. It sends // updates to the provided stream and returns a future with an optional error // when the rangefeed is complete. -func (ls *Stores) RangeFeed( - args *kvpb.RangeFeedRequest, stream kvpb.RangeFeedEventSink, -) *future.ErrorFuture { +func (ls *Stores) RangeFeed(args *kvpb.RangeFeedRequest, stream rangefeed.Stream) error { ctx := stream.Context() if args.RangeID == 0 { log.Fatal(ctx, "rangefeed request missing range ID") @@ -221,7 +219,7 @@ func (ls *Stores) RangeFeed( store, err := ls.GetStore(args.Replica.StoreID) if err != nil { - return future.MakeCompletedErrorFuture(err) + return err } return store.RangeFeed(args, stream) diff --git a/pkg/server/node.go b/pkg/server/node.go index 72754ca881f7..92f425e00387 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -1935,6 +1935,7 @@ type perRangeEventSink struct { } var _ kvpb.RangeFeedEventSink = (*perRangeEventSink)(nil) +var _ rangefeed.Stream = (*perRangeEventSink)(nil) func (s *perRangeEventSink) Context() context.Context { return s.ctx @@ -1954,6 +1955,14 @@ func (s *perRangeEventSink) Send(event *kvpb.RangeFeedEvent) error { return s.wrapped.Send(response) } +// Disconnect implements the rangefeed.Stream interface. It requests the +// StreamMuxer to detach the stream. The StreamMuxer is then responsible for +// handling the actual disconnection and additional cleanup. Note that Caller +// should not rely on immediate disconnection as cleanup takes place async. +func (s *perRangeEventSink) Disconnect(err *kvpb.Error) { + s.wrapped.DisconnectStreamWithError(s.streamID, s.rangeID, err) +} + // lockedMuxStream provides support for concurrent calls to Send. The underlying // MuxRangeFeedServer (default grpc.Stream) is not safe for concurrent calls to // Send. @@ -1999,8 +2008,9 @@ func (n *Node) MuxRangeFeed(stream kvpb.Internal_MuxRangeFeedServer) error { } if req.CloseStream { - // Note that we will call disconnect again when future.Error returns, - // but DisconnectStreamWithError will ignore subsequent errors. + // Note that we will call DisconnectStreamWithError again when + // registration.disconnect happens, but DisconnectStreamWithError will + // ignore subsequent errors. streamMuxer.DisconnectStreamWithError(req.StreamID, req.RangeID, kvpb.NewError(kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED))) continue @@ -2019,10 +2029,14 @@ func (n *Node) MuxRangeFeed(stream kvpb.Internal_MuxRangeFeedServer) error { } streamMuxer.AddStream(req.StreamID, cancel) - f := n.stores.RangeFeed(req, streamSink) - f.WhenReady(func(err error) { + // Rangefeed attempts to register rangefeed a request over the specified + // span. If registration fails, it returns an error. Otherwise, it returns + // nil without blocking on rangefeed completion. Events are then sent to + // the provided streamSink. If the rangefeed disconnects after being + // successfully registered, it calls streamSink.Disconnect with the error. + if err := n.stores.RangeFeed(req, streamSink); err != nil { streamMuxer.DisconnectStreamWithError(req.StreamID, req.RangeID, kvpb.NewError(err)) - }) + } } } }