From 160c733f614c547d590471cbfe73e8ff0439db0e Mon Sep 17 00:00:00 2001 From: Wenyi Hu Date: Sat, 29 Jun 2024 19:46:38 -0400 Subject: [PATCH] kvserver/rangefeed: remove future package Previously, to reduce O(ranges) goroutines and avoid blocking on rangefeed completion, we introduced the future package to manage rangefeed completion by invoking a callback when a future error becomes ready. This patch replicas it with a new approach - introduce a `DisconnectRangefeedWithError` method on stream muxer. Each rangefeed stream can now call this function to signal rangefeed completion, allowing the muxer to handle shutdown logic. Part of: Epic: none Release note: none --- pkg/kv/kvclient/rangefeed/BUILD.bazel | 1 - .../rangefeed/rangefeed_external_test.go | 26 ++++-- pkg/kv/kvserver/BUILD.bazel | 2 - .../client_replica_circuit_breaker_test.go | 50 +++++++---- pkg/kv/kvserver/rangefeed/BUILD.bazel | 2 - pkg/kv/kvserver/rangefeed/bench_test.go | 29 +++++-- pkg/kv/kvserver/rangefeed/processor.go | 5 +- pkg/kv/kvserver/rangefeed/processor_test.go | 84 ++++++------------- pkg/kv/kvserver/rangefeed/registry.go | 7 +- pkg/kv/kvserver/rangefeed/registry_test.go | 65 +++++++++----- .../kvserver/rangefeed/scheduled_processor.go | 4 +- pkg/kv/kvserver/replica_rangefeed.go | 42 ++++------ pkg/kv/kvserver/replica_rangefeed_test.go | 55 +++++++----- pkg/kv/kvserver/store.go | 13 ++- pkg/kv/kvserver/stores.go | 8 +- pkg/server/node.go | 11 ++- 16 files changed, 214 insertions(+), 190 deletions(-) diff --git a/pkg/kv/kvclient/rangefeed/BUILD.bazel b/pkg/kv/kvclient/rangefeed/BUILD.bazel index 54c320b59746..bea38c400af3 100644 --- a/pkg/kv/kvclient/rangefeed/BUILD.bazel +++ b/pkg/kv/kvclient/rangefeed/BUILD.bazel @@ -87,7 +87,6 @@ go_test( "//pkg/testutils/testcluster", "//pkg/util/ctxgroup", "//pkg/util/encoding", - "//pkg/util/future", "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", diff --git a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go index e5ba37ce8890..b2e172a9c8ad 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go @@ -40,7 +40,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/storageutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/encoding" - "github.com/cockroachdb/cockroach/pkg/util/future" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -1463,8 +1462,7 @@ func TestRangeFeedIntentResolutionRace(t *testing.T) { } eventC := make(chan *kvpb.RangeFeedEvent) sink := newChannelSink(ctx, eventC) - fErr := future.MakeAwaitableFuture(s3.RangeFeed(&req, sink)) - require.NoError(t, fErr.Get()) // check if we've errored yet + require.NoError(t, s3.RangeFeed(&req, sink)) // check if we've errored yet t.Logf("started rangefeed on %s", repl3) // Spawn a rangefeed monitor, which posts checkpoint updates to checkpointC. @@ -1621,17 +1619,18 @@ func TestRangeFeedIntentResolutionRace(t *testing.T) { } // The rangefeed should still be running. - require.NoError(t, fErr.Get()) + require.NoError(t, sink.GetError()) } // channelSink is a rangefeed sink which posts events to a channel. type channelSink struct { - ctx context.Context - ch chan<- *kvpb.RangeFeedEvent + ctx context.Context + ch chan<- *kvpb.RangeFeedEvent + done chan *kvpb.Error } func newChannelSink(ctx context.Context, ch chan<- *kvpb.RangeFeedEvent) *channelSink { - return &channelSink{ctx: ctx, ch: ch} + return &channelSink{ctx: ctx, ch: ch, done: make(chan *kvpb.Error, 1)} } func (c *channelSink) Context() context.Context { @@ -1647,6 +1646,19 @@ func (c *channelSink) Send(e *kvpb.RangeFeedEvent) error { } } +func (c *channelSink) GetError() error { + select { + case err := <-c.done: + return err.GoError() + default: + return nil + } +} + +func (c *channelSink) Disconnect(err *kvpb.Error) { + c.done <- err +} + // TestRangeFeedMetadataManualSplit tests that a spawned rangefeed emits a // metadata event which indicates if it spawned to due a manual split. The // test specifically conducts the following: diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 30788d39ca78..1ea4a21504ab 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -199,7 +199,6 @@ go_library( "//pkg/util/encoding", "//pkg/util/envutil", "//pkg/util/errorutil", - "//pkg/util/future", "//pkg/util/growstack", "//pkg/util/grpcutil", "//pkg/util/grunning", @@ -495,7 +494,6 @@ go_test( "//pkg/util/circuit", "//pkg/util/ctxgroup", "//pkg/util/encoding", - "//pkg/util/future", "//pkg/util/grunning", "//pkg/util/hlc", "//pkg/util/humanizeutil", diff --git a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go index 44abe30267e1..88341825b771 100644 --- a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go +++ b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go @@ -34,7 +34,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/circuit" - "github.com/cockroachdb/cockroach/pkg/util/future" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -450,6 +449,16 @@ type dummyStream struct { name string ctx context.Context recv chan *kvpb.RangeFeedEvent + done chan *kvpb.Error +} + +func newDummyStream(ctx context.Context, name string) *dummyStream { + return &dummyStream{ + ctx: ctx, + name: name, + recv: make(chan *kvpb.RangeFeedEvent), + done: make(chan *kvpb.Error, 1), + } } func (s *dummyStream) Context() context.Context { @@ -469,21 +478,32 @@ func (s *dummyStream) Send(ev *kvpb.RangeFeedEvent) error { } } +func (s *dummyStream) Disconnect(err *kvpb.Error) { + s.done <- err +} + func waitReplicaRangeFeed( - ctx context.Context, - r *kvserver.Replica, - req *kvpb.RangeFeedRequest, - stream kvpb.RangeFeedEventSink, + ctx context.Context, r *kvserver.Replica, req *kvpb.RangeFeedRequest, stream *dummyStream, ) error { - rfErr, ctxErr := future.Wait(ctx, r.RangeFeed(req, stream, nil /* pacer */)) - if ctxErr != nil { - return ctxErr + sendErrToStream := func(err *kvpb.Error) error { + var event kvpb.RangeFeedEvent + event.SetValue(&kvpb.RangeFeedError{ + Error: *err, + }) + return stream.Send(&event) + } + + err := r.RangeFeed(req, stream, nil /* pacer */) + if err != nil { + return sendErrToStream(kvpb.NewError(err)) + } + + select { + case err := <-stream.done: + return sendErrToStream(err) + case <-ctx.Done(): + return ctx.Err() } - var event kvpb.RangeFeedEvent - event.SetValue(&kvpb.RangeFeedError{ - Error: *kvpb.NewError(rfErr), - }) - return stream.Send(&event) } // This test verifies that RangeFeed bypasses the circuit breaker. When the @@ -505,7 +525,7 @@ func TestReplicaCircuitBreaker_RangeFeed(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() - stream1 := &dummyStream{ctx: ctx, name: "rangefeed1", recv: make(chan *kvpb.RangeFeedEvent)} + stream1 := newDummyStream(ctx, "rangefeed1") require.NoError(t, tc.Stopper().RunAsyncTask(ctx, "stream1", func(ctx context.Context) { err := waitReplicaRangeFeed(ctx, tc.repls[0].Replica, args, stream1) if ctx.Err() != nil { @@ -559,7 +579,7 @@ func TestReplicaCircuitBreaker_RangeFeed(t *testing.T) { // Start another stream during the "outage" to make sure it isn't rejected by // the breaker. - stream2 := &dummyStream{ctx: ctx, name: "rangefeed2", recv: make(chan *kvpb.RangeFeedEvent)} + stream2 := newDummyStream(ctx, "rangefeed2") require.NoError(t, tc.Stopper().RunAsyncTask(ctx, "stream2", func(ctx context.Context) { err := waitReplicaRangeFeed(ctx, tc.repls[0].Replica, args, stream2) if ctx.Err() != nil { diff --git a/pkg/kv/kvserver/rangefeed/BUILD.bazel b/pkg/kv/kvserver/rangefeed/BUILD.bazel index 16e0e1fcb991..047efd2fed2b 100644 --- a/pkg/kv/kvserver/rangefeed/BUILD.bazel +++ b/pkg/kv/kvserver/rangefeed/BUILD.bazel @@ -37,7 +37,6 @@ go_library( "//pkg/util/buildutil", "//pkg/util/container/heap", "//pkg/util/envutil", - "//pkg/util/future", "//pkg/util/hlc", "//pkg/util/interval", "//pkg/util/log", @@ -89,7 +88,6 @@ go_test( "//pkg/testutils/skip", "//pkg/testutils/storageutils", "//pkg/util/encoding", - "//pkg/util/future", "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", diff --git a/pkg/kv/kvserver/rangefeed/bench_test.go b/pkg/kv/kvserver/rangefeed/bench_test.go index 50a2c39cdfec..358831032bfc 100644 --- a/pkg/kv/kvserver/rangefeed/bench_test.go +++ b/pkg/kv/kvserver/rangefeed/bench_test.go @@ -22,7 +22,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" - "github.com/cockroachdb/cockroach/pkg/util/future" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/stretchr/testify/require" @@ -100,7 +99,6 @@ func runBenchmarkRangefeed(b *testing.B, opts benchmarkRangefeedOpts) { // Add registrations. streams := make([]*noopStream, opts.numRegistrations) - futures := make([]*future.ErrorFuture, opts.numRegistrations) for i := 0; i < opts.numRegistrations; i++ { // withDiff does not matter for these benchmarks, since the previous value // is fetched and populated during Raft application. @@ -108,11 +106,10 @@ func runBenchmarkRangefeed(b *testing.B, opts benchmarkRangefeedOpts) { // withFiltering does not matter for these benchmarks because doesn't fetch // extra data. const withFiltering = false - streams[i] = &noopStream{ctx: ctx} - futures[i] = &future.ErrorFuture{} + streams[i] = &noopStream{ctx: ctx, done: make(chan *kvpb.Error, 1)} ok, _ := p.Register(span, hlc.MinTimestamp, nil, withDiff, withFiltering, false, /* withOmitRemote */ - streams[i], nil, futures[i]) + streams[i], nil) require.True(b, ok) } @@ -185,10 +182,9 @@ func runBenchmarkRangefeed(b *testing.B, opts benchmarkRangefeedOpts) { b.StopTimer() p.Stop() - for i, f := range futures { - regErr, err := future.Wait(ctx, f) - require.NoError(b, err) - require.NoError(b, regErr) + for i, s := range streams { + // p.Stop() sends a nil error to all streams to signal completion. + require.NoError(b, s.WaitForErr(b)) require.Equal(b, b.N, streams[i].events-1) // ignore checkpoint after catchup } } @@ -197,6 +193,7 @@ func runBenchmarkRangefeed(b *testing.B, opts benchmarkRangefeedOpts) { type noopStream struct { ctx context.Context events int + done chan *kvpb.Error } func (s *noopStream) Context() context.Context { @@ -207,3 +204,17 @@ func (s *noopStream) Send(*kvpb.RangeFeedEvent) error { s.events++ return nil } + +func (s *noopStream) Disconnect(error *kvpb.Error) { + s.done <- error +} + +func (s *noopStream) WaitForErr(b *testing.B) error { + select { + case err := <-s.done: + return err.GoError() + case <-time.After(30 * time.Second): + b.Fatalf("time out waiting for rangefeed completion") + return nil + } +} diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index 88ea9870e700..b439cbfa9e0e 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -22,7 +22,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/envutil" - "github.com/cockroachdb/cockroach/pkg/util/future" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -210,7 +209,6 @@ type Processor interface { withOmitRemote bool, stream Stream, disconnectFn func(), - done *future.ErrorFuture, ) (bool, *Filter) // DisconnectSpanWithErr disconnects all rangefeed registrations that overlap // the given span with the given error. @@ -595,7 +593,6 @@ func (p *LegacyProcessor) Register( withOmitRemote bool, stream Stream, disconnectFn func(), - done *future.ErrorFuture, ) (bool, *Filter) { // Synchronize the event channel so that this registration doesn't see any // events that were consumed before this registration was called. Instead, @@ -605,7 +602,7 @@ func (p *LegacyProcessor) Register( blockWhenFull := p.Config.EventChanTimeout == 0 // for testing r := newRegistration( span.AsRawSpanWithNoLocals(), startTS, catchUpIter, withDiff, withFiltering, withOmitRemote, - p.Config.EventChanCap, blockWhenFull, p.Metrics, stream, disconnectFn, done, + p.Config.EventChanCap, blockWhenFull, p.Metrics, stream, disconnectFn, ) select { case p.regC <- r: diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go index 954b179ebcb2..24b7121968c5 100644 --- a/pkg/kv/kvserver/rangefeed/processor_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_test.go @@ -30,7 +30,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" - "github.com/cockroachdb/cockroach/pkg/util/future" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -463,11 +462,6 @@ func newTestProcessor( return s, &h, stopper } -func waitErrorFuture(f *future.ErrorFuture) error { - resultErr, _ := future.Wait(context.Background(), f) - return resultErr -} - func TestProcessorBasic(t *testing.T) { defer leaktest.AfterTest(t)() testutils.RunValues(t, "proc type", testTypes, func(t *testing.T, pt procType) { @@ -496,7 +490,6 @@ func TestProcessorBasic(t *testing.T) { // Add a registration. r1Stream := newTestStream() - var r1Done future.ErrorFuture r1OK, r1Filter := p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, hlc.Timestamp{WallTime: 1}, @@ -506,7 +499,6 @@ func TestProcessorBasic(t *testing.T) { false, /* withOmitRemote */ r1Stream, func() {}, - &r1Done, ) require.True(t, r1OK) h.syncEventAndRegistrations() @@ -632,7 +624,6 @@ func TestProcessorBasic(t *testing.T) { // Add another registration with withDiff = true and withFiltering = true. r2Stream := newTestStream() - var r2Done future.ErrorFuture r2OK, r1And2Filter := p.Register( roachpb.RSpan{Key: roachpb.RKey("c"), EndKey: roachpb.RKey("z")}, hlc.Timestamp{WallTime: 1}, @@ -642,7 +633,6 @@ func TestProcessorBasic(t *testing.T) { false, /* withOmitRemote */ r2Stream, func() {}, - &r2Done, ) require.True(t, r2OK) h.syncEventAndRegistrations() @@ -739,16 +729,15 @@ func TestProcessorBasic(t *testing.T) { // Cancel the first registration. r1Stream.Cancel() - require.NotNil(t, waitErrorFuture(&r1Done)) + require.NotNil(t, r1Stream.WaitForErr(t)) // Stop the processor with an error. pErr := kvpb.NewErrorf("stop err") p.StopWithErr(pErr) - require.NotNil(t, waitErrorFuture(&r2Done)) + require.NotNil(t, r2Stream.WaitForErr(t)) // Adding another registration should fail. r3Stream := newTestStream() - var r3Done future.ErrorFuture r3OK, _ := p.Register( roachpb.RSpan{Key: roachpb.RKey("c"), EndKey: roachpb.RKey("z")}, hlc.Timestamp{WallTime: 1}, @@ -758,7 +747,6 @@ func TestProcessorBasic(t *testing.T) { false, /* withOmitRemote */ r3Stream, func() {}, - &r3Done, ) require.False(t, r3OK) }) @@ -775,7 +763,6 @@ func TestProcessorOmitRemote(t *testing.T) { // Add a registration. r1Stream := newTestStream() - var r1Done future.ErrorFuture r1OK, _ := p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, hlc.Timestamp{WallTime: 1}, @@ -785,7 +772,6 @@ func TestProcessorOmitRemote(t *testing.T) { false, /* withOmitRemote */ r1Stream, func() {}, - &r1Done, ) require.True(t, r1OK) h.syncEventAndRegistrations() @@ -802,7 +788,6 @@ func TestProcessorOmitRemote(t *testing.T) { // Add another registration with withOmitRemote = true. r2Stream := newTestStream() - var r2Done future.ErrorFuture r2OK, _ := p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, hlc.Timestamp{WallTime: 1}, @@ -812,7 +797,6 @@ func TestProcessorOmitRemote(t *testing.T) { true, /* withOmitRemote */ r2Stream, func() {}, - &r2Done, ) require.True(t, r2OK) h.syncEventAndRegistrations() @@ -857,7 +841,6 @@ func TestProcessorSlowConsumer(t *testing.T) { // Add a registration. r1Stream := newTestStream() - var r1Done future.ErrorFuture _, _ = p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, hlc.Timestamp{WallTime: 1}, @@ -867,10 +850,8 @@ func TestProcessorSlowConsumer(t *testing.T) { false, /* withOmitRemote */ r1Stream, func() {}, - &r1Done, ) r2Stream := newTestStream() - var r2Done future.ErrorFuture p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")}, hlc.Timestamp{WallTime: 1}, @@ -880,7 +861,6 @@ func TestProcessorSlowConsumer(t *testing.T) { false, /* withOmitRemote */ r2Stream, func() {}, - &r2Done, ) h.syncEventAndRegistrations() require.Equal(t, 2, p.Len()) @@ -942,7 +922,7 @@ func TestProcessorSlowConsumer(t *testing.T) { // were dropped due to rapid event consumption before the r1's outputLoop // began consuming from its event buffer. require.LessOrEqual(t, len(r1Stream.Events()), toFill) - require.Equal(t, newErrBufferCapacityExceeded().GoError(), waitErrorFuture(&r1Done)) + require.Equal(t, newErrBufferCapacityExceeded().GoError(), r1Stream.WaitForErr(t)) testutils.SucceedsSoon(t, func() error { if act, exp := p.Len(), 1; exp != act { return fmt.Errorf("processor had %d regs, wanted %d", act, exp) @@ -968,7 +948,6 @@ func TestProcessorMemoryBudgetExceeded(t *testing.T) { // Add a registration. r1Stream := newTestStream() - var r1Done future.ErrorFuture _, _ = p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, hlc.Timestamp{WallTime: 1}, @@ -978,7 +957,6 @@ func TestProcessorMemoryBudgetExceeded(t *testing.T) { false, /* withOmitRemote */ r1Stream, func() {}, - &r1Done, ) h.syncEventAndRegistrations() @@ -1007,7 +985,7 @@ func TestProcessorMemoryBudgetExceeded(t *testing.T) { unblock = nil h.syncEventAndRegistrations() - require.Equal(t, newErrBufferCapacityExceeded().GoError(), waitErrorFuture(&r1Done)) + require.Equal(t, newErrBufferCapacityExceeded().GoError(), r1Stream.WaitForErr(t)) require.Equal(t, 0, p.Len(), "registration was not removed") require.Equal(t, int64(1), m.RangeFeedBudgetExhausted.Count()) }) @@ -1025,7 +1003,6 @@ func TestProcessorMemoryBudgetReleased(t *testing.T) { // Add a registration. r1Stream := newTestStream() - var r1Done future.ErrorFuture p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, hlc.Timestamp{WallTime: 1}, @@ -1035,7 +1012,6 @@ func TestProcessorMemoryBudgetReleased(t *testing.T) { false, /* withOmitRemote */ r1Stream, func() {}, - &r1Done, ) h.syncEventAndRegistrations() @@ -1108,7 +1084,6 @@ func TestProcessorInitializeResolvedTimestamp(t *testing.T) { // Add a registration. r1Stream := newTestStream() - var r1Done future.ErrorFuture p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, hlc.Timestamp{WallTime: 1}, @@ -1118,7 +1093,6 @@ func TestProcessorInitializeResolvedTimestamp(t *testing.T) { false, /* withOmitRemote */ r1Stream, func() {}, - &r1Done, ) h.syncEventAndRegistrations() require.Equal(t, 1, p.Len()) @@ -1424,9 +1398,8 @@ func TestProcessorConcurrentStop(t *testing.T) { defer wg.Done() runtime.Gosched() s := newTestStream() - var done future.ErrorFuture p.Register(h.span, hlc.Timestamp{}, nil, /* catchUpIter */ - false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, s, func() {}, &done) + false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, s, func() {}) }() go func() { defer wg.Done() @@ -1497,9 +1470,8 @@ func TestProcessorRegistrationObservesOnlyNewEvents(t *testing.T) { // operation is should see is firstIdx. s := newTestStream() regs[s] = firstIdx - var done future.ErrorFuture p.Register(h.span, hlc.Timestamp{}, nil, /* catchUpIter */ - false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, s, func() {}, &done) + false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, s, func() {}) regDone <- struct{}{} } }() @@ -1520,14 +1492,6 @@ func TestProcessorRegistrationObservesOnlyNewEvents(t *testing.T) { }) } -func notifyWhenDone(f *future.ErrorFuture) chan error { - ch := make(chan error, 1) - f.WhenReady(func(err error) { - ch <- err - }) - return ch -} - func TestBudgetReleaseOnProcessorStop(t *testing.T) { defer leaktest.AfterTest(t)() const totalEvents = 100 @@ -1559,7 +1523,6 @@ func TestBudgetReleaseOnProcessorStop(t *testing.T) { // Add a registration. rStream := newConsumer(50) defer func() { rStream.Resume() }() - var done future.ErrorFuture _, _ = p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, hlc.Timestamp{WallTime: 1}, @@ -1569,9 +1532,7 @@ func TestBudgetReleaseOnProcessorStop(t *testing.T) { false, /* withOmitRemote */ rStream, func() {}, - &done, ) - rErrC := notifyWhenDone(&done) h.syncEventAndRegistrations() for i := 0; i < totalEvents; i++ { @@ -1584,7 +1545,7 @@ func TestBudgetReleaseOnProcessorStop(t *testing.T) { // Wait for half of the event to be processed by stream then stop processor. select { case <-rStream.blocked: - case err := <-rErrC: + case err := <-rStream.done: t.Fatal("stream failed with error before all data was consumed", err) } @@ -1642,7 +1603,6 @@ func TestBudgetReleaseOnLastStreamError(t *testing.T) { // Add a registration. rStream := newConsumer(90) defer func() { rStream.Resume() }() - var done future.ErrorFuture _, _ = p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, hlc.Timestamp{WallTime: 1}, @@ -1652,9 +1612,7 @@ func TestBudgetReleaseOnLastStreamError(t *testing.T) { false, /* withOmitRemote */ rStream, func() {}, - &done, ) - rErrC := notifyWhenDone(&done) h.syncEventAndRegistrations() for i := 0; i < totalEvents; i++ { @@ -1667,7 +1625,7 @@ func TestBudgetReleaseOnLastStreamError(t *testing.T) { // Wait for half of the event to be processed then raise error. select { case <-rStream.blocked: - case err := <-rErrC: + case err := <-rStream.done: t.Fatal("stream failed with error before stream blocked: ", err) } @@ -1715,7 +1673,6 @@ func TestBudgetReleaseOnOneStreamError(t *testing.T) { // Add a registration. r1Stream := newConsumer(50) defer func() { r1Stream.Resume() }() - var r1Done future.ErrorFuture _, _ = p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, hlc.Timestamp{WallTime: 1}, @@ -1725,13 +1682,10 @@ func TestBudgetReleaseOnOneStreamError(t *testing.T) { false, /* withOmitRemote */ r1Stream, func() {}, - &r1Done, ) - r1ErrC := notifyWhenDone(&r1Done) // Non-blocking registration that would consume all events. r2Stream := newConsumer(0) - var r2Done future.ErrorFuture p.Register( roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, hlc.Timestamp{WallTime: 1}, @@ -1741,7 +1695,6 @@ func TestBudgetReleaseOnOneStreamError(t *testing.T) { false, /* withOmitRemote */ r2Stream, func() {}, - &r2Done, ) h.syncEventAndRegistrations() @@ -1755,7 +1708,7 @@ func TestBudgetReleaseOnOneStreamError(t *testing.T) { // Wait for half of the event to be processed then stop processor. select { case <-r1Stream.blocked: - case err := <-r1ErrC: + case err := <-r1Stream.done: t.Fatal("stream failed with error before all data was consumed", err) } @@ -1794,6 +1747,7 @@ type consumer struct { ctx context.Context ctxDone func() sentValues int32 + done chan *kvpb.Error blockAfter int blocked chan interface{} @@ -1808,6 +1762,7 @@ func newConsumer(blockAfter int) *consumer { blockAfter: blockAfter, blocked: make(chan interface{}), resume: make(chan error), + done: make(chan *kvpb.Error, 1), } } @@ -1839,6 +1794,20 @@ func (c *consumer) WaitBlock() { <-c.blocked } +func (s *consumer) Disconnect(error *kvpb.Error) { + s.done <- error +} + +func (s *consumer) WaitForErr(t *testing.T) error { + select { + case err := <-s.done: + return err.GoError() + case <-time.After(30 * time.Second): + t.Fatalf("time out waiting for rangefeed completion") + return nil + } +} + // Resume resumes stream by closing its wait channel. // If there was a pending err for resuming then it would be discarded and // channel closed. @@ -1888,9 +1857,8 @@ func TestProcessorBackpressure(t *testing.T) { // Add a registration. stream := newTestStream() - done := &future.ErrorFuture{} ok, _ := p.Register(span, hlc.MinTimestamp, nil, /* catchUpIter */ - false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, stream, nil, done) + false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, stream, nil) require.True(t, ok) // Wait for the initial checkpoint. diff --git a/pkg/kv/kvserver/rangefeed/registry.go b/pkg/kv/kvserver/rangefeed/registry.go index a65cbf36a3fe..1d331a87b997 100644 --- a/pkg/kv/kvserver/rangefeed/registry.go +++ b/pkg/kv/kvserver/rangefeed/registry.go @@ -18,7 +18,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/util/future" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/interval" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -35,6 +34,7 @@ type Stream interface { // Send blocks until it sends m, the stream is done, or the stream breaks. // Send must be safe to call on the same stream in different goroutines. Send(*kvpb.RangeFeedEvent) error + Disconnect(err *kvpb.Error) } // Shared event is an entry stored in registration channel. Each entry is @@ -85,7 +85,6 @@ type registration struct { // Output. stream Stream - done *future.ErrorFuture unreg func() // Internal. id int64 @@ -126,7 +125,6 @@ func newRegistration( metrics *Metrics, stream Stream, unregisterFn func(), - done *future.ErrorFuture, ) registration { r := registration{ span: span, @@ -136,7 +134,6 @@ func newRegistration( withOmitRemote: withOmitRemote, metrics: metrics, stream: stream, - done: done, unreg: unregisterFn, buf: make(chan *sharedEvent, bufferSz), blockWhenFull: blockWhenFull, @@ -303,7 +300,7 @@ func (r *registration) disconnect(pErr *kvpb.Error) { r.mu.outputLoopCancelFn() } r.mu.disconnected = true - r.done.Set(pErr.GoError()) + r.stream.Disconnect(pErr) } } diff --git a/pkg/kv/kvserver/rangefeed/registry_test.go b/pkg/kv/kvserver/rangefeed/registry_test.go index f3e43fc36a89..8c001eaaa722 100644 --- a/pkg/kv/kvserver/rangefeed/registry_test.go +++ b/pkg/kv/kvserver/rangefeed/registry_test.go @@ -15,6 +15,7 @@ import ( "fmt" "sync" "testing" + "time" _ "github.com/cockroachdb/cockroach/pkg/keys" // hook up pretty printer "github.com/cockroachdb/cockroach/pkg/kv/kvpb" @@ -22,7 +23,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" - "github.com/cockroachdb/cockroach/pkg/util/future" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/syncutil" @@ -45,6 +45,7 @@ var ( type testStream struct { ctx context.Context ctxDone func() + done chan *kvpb.Error mu struct { syncutil.Mutex sendErr error @@ -54,7 +55,7 @@ type testStream struct { func newTestStream() *testStream { ctx, done := context.WithCancel(context.Background()) - return &testStream{ctx: ctx, ctxDone: done} + return &testStream{ctx: ctx, ctxDone: done, done: make(chan *kvpb.Error, 1)} } func (s *testStream) Context() context.Context { @@ -97,6 +98,20 @@ func (s *testStream) BlockSend() func() { } } +func (s *testStream) Disconnect(err *kvpb.Error) { + s.done <- err +} + +func (s *testStream) WaitForErr(t *testing.T) error { + select { + case err := <-s.done: + return err.GoError() + case <-time.After(30 * time.Second): + t.Fatalf("time out waiting for rangefeed completion") + return nil + } +} + type testRegistration struct { registration stream *testStream @@ -136,7 +151,6 @@ func newTestRegistration( NewMetrics(), s, func() {}, - &future.ErrorFuture{}, ) return &testRegistration{ registration: r, @@ -148,13 +162,18 @@ func (r *testRegistration) Events() []*kvpb.RangeFeedEvent { return r.stream.Events() } -func (r *testRegistration) Err() error { - err, _ := future.Wait(context.Background(), r.done) - return err +func (r *testRegistration) WaitForErr() error { + err := <-r.stream.done + return err.GoError() } -func (r *testRegistration) TryErr() error { - return future.MakeAwaitableFuture(r.done).Get() +func (r *testRegistration) GetErr() error { + select { + case err := <-r.stream.done: + return err.GoError() + default: + return nil + } } func TestRegistrationBasic(t *testing.T) { @@ -205,7 +224,7 @@ func TestRegistrationBasic(t *testing.T) { require.NoError(t, disconnectReg.waitForCaughtUp(ctx)) discErr := kvpb.NewError(fmt.Errorf("disconnection error")) disconnectReg.disconnect(discErr) - require.Equal(t, discErr.GoError(), disconnectReg.Err()) + require.Equal(t, discErr.GoError(), disconnectReg.WaitForErr()) require.Equal(t, 2, len(disconnectReg.stream.Events())) // External Disconnect before output loop. @@ -215,7 +234,7 @@ func TestRegistrationBasic(t *testing.T) { disconnectEarlyReg.publish(ctx, ev2, nil /* alloc */) disconnectEarlyReg.disconnect(discErr) go disconnectEarlyReg.runOutputLoop(ctx, 0) - require.Equal(t, discErr.GoError(), disconnectEarlyReg.Err()) + require.Equal(t, discErr.GoError(), disconnectEarlyReg.WaitForErr()) require.Equal(t, 0, len(disconnectEarlyReg.stream.Events())) // Overflow. @@ -225,7 +244,7 @@ func TestRegistrationBasic(t *testing.T) { overflowReg.publish(ctx, ev1, nil /* alloc */) } go overflowReg.runOutputLoop(ctx, 0) - require.Equal(t, newErrBufferCapacityExceeded().GoError(), overflowReg.Err()) + require.Equal(t, newErrBufferCapacityExceeded().GoError(), overflowReg.WaitForErr()) require.Equal(t, cap(overflowReg.buf), len(overflowReg.Events())) // Stream Error. @@ -235,7 +254,7 @@ func TestRegistrationBasic(t *testing.T) { streamErrReg.stream.SetSendErr(streamErr) go streamErrReg.runOutputLoop(ctx, 0) streamErrReg.publish(ctx, ev1, nil /* alloc */) - require.Equal(t, streamErr.Error(), streamErrReg.Err().Error()) + require.Equal(t, streamErr.Error(), streamErrReg.WaitForErr().Error()) // Stream Context Canceled. streamCancelReg := newTestRegistration(spAB, hlc.Timestamp{}, nil, /* catchup */ @@ -244,7 +263,7 @@ func TestRegistrationBasic(t *testing.T) { streamCancelReg.stream.Cancel() go streamCancelReg.runOutputLoop(ctx, 0) require.NoError(t, streamCancelReg.waitForCaughtUp(ctx)) - require.Equal(t, streamCancelReg.stream.Context().Err(), streamCancelReg.Err()) + require.Equal(t, streamCancelReg.stream.Context().Err(), streamCancelReg.WaitForErr()) } func TestRegistrationCatchUpScan(t *testing.T) { @@ -406,8 +425,8 @@ func TestRegistryWithOmitOrigin(t *testing.T) { require.Equal(t, []*kvpb.RangeFeedEvent{noPrev(ev1), noPrev(ev2)}, rAC.Events()) require.Equal(t, []*kvpb.RangeFeedEvent{noPrev(ev1)}, originFiltering.Events()) - require.Nil(t, rAC.TryErr()) - require.Nil(t, originFiltering.TryErr()) + require.Nil(t, rAC.GetErr()) + require.Nil(t, originFiltering.GetErr()) } func TestRegistryBasic(t *testing.T) { @@ -478,11 +497,11 @@ func TestRegistryBasic(t *testing.T) { // Registration rACFiltering doesn't receive ev5 because both withFiltering // (for the registration) and OmitInRangefeeds (for the event) are true. require.Equal(t, []*kvpb.RangeFeedEvent{noPrev(ev1), noPrev(ev2), noPrev(ev4)}, rACFiltering.Events()) - require.Nil(t, rAB.TryErr()) - require.Nil(t, rBC.TryErr()) - require.Nil(t, rCD.TryErr()) - require.Nil(t, rAC.TryErr()) - require.Nil(t, rACFiltering.TryErr()) + require.Nil(t, rAB.GetErr()) + require.Nil(t, rBC.GetErr()) + require.Nil(t, rCD.GetErr()) + require.Nil(t, rAC.GetErr()) + require.Nil(t, rACFiltering.GetErr()) // Check the registry's operation filter. f := reg.NewFilter() @@ -510,7 +529,7 @@ func TestRegistryBasic(t *testing.T) { // Disconnect span that overlaps with rCD. reg.DisconnectWithErr(ctx, spCD, err1) require.Equal(t, 4, reg.Len()) - require.Equal(t, err1.GoError(), rCD.Err()) + require.Equal(t, err1.GoError(), rCD.WaitForErr()) // Can still publish to rAB. reg.PublishToOverlapping(ctx, spAB, ev4, logicalOpMetadata{}, nil /* alloc */) @@ -522,8 +541,8 @@ func TestRegistryBasic(t *testing.T) { // Disconnect from rAB without error. reg.Disconnect(ctx, spAB) - require.Nil(t, rAC.Err()) - require.Nil(t, rAB.Err()) + require.Nil(t, rAC.WaitForErr()) + require.Nil(t, rAB.WaitForErr()) require.Equal(t, 1, reg.Len()) // Check the registry's operation filter again. diff --git a/pkg/kv/kvserver/rangefeed/scheduled_processor.go b/pkg/kv/kvserver/rangefeed/scheduled_processor.go index 2bf3b17fdd2b..577f8ff0fa11 100644 --- a/pkg/kv/kvserver/rangefeed/scheduled_processor.go +++ b/pkg/kv/kvserver/rangefeed/scheduled_processor.go @@ -18,7 +18,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/buildutil" - "github.com/cockroachdb/cockroach/pkg/util/future" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -310,7 +309,6 @@ func (p *ScheduledProcessor) Register( withOmitRemote bool, stream Stream, disconnectFn func(), - done *future.ErrorFuture, ) (bool, *Filter) { // Synchronize the event channel so that this registration doesn't see any // events that were consumed before this registration was called. Instead, @@ -320,7 +318,7 @@ func (p *ScheduledProcessor) Register( blockWhenFull := p.Config.EventChanTimeout == 0 // for testing r := newRegistration( span.AsRawSpanWithNoLocals(), startTS, catchUpIter, withDiff, withFiltering, withOmitRemote, - p.Config.EventChanCap, blockWhenFull, p.Metrics, stream, disconnectFn, done, + p.Config.EventChanCap, blockWhenFull, p.Metrics, stream, disconnectFn, ) filter := runRequest(p, func(ctx context.Context, p *ScheduledProcessor) *Filter { diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index 67509e16dcb9..a398bc705577 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -33,7 +33,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/envutil" - "github.com/cockroachdb/cockroach/pkg/util/future" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metamorphic" @@ -240,17 +239,17 @@ func (tp *rangefeedTxnPusher) Barrier(ctx context.Context) error { // complete. The surrounding store's ConcurrentRequestLimiter is used to limit // the number of rangefeeds using catch-up iterators at the same time. func (r *Replica) RangeFeed( - args *kvpb.RangeFeedRequest, stream kvpb.RangeFeedEventSink, pacer *admission.Pacer, -) *future.ErrorFuture { + args *kvpb.RangeFeedRequest, stream rangefeed.Stream, pacer *admission.Pacer, +) error { ctx := r.AnnotateCtx(stream.Context()) rSpan, err := keys.SpanAddr(args.Span) if err != nil { - return future.MakeCompletedErrorFuture(err) + return err } if err := r.ensureClosedTimestampStarted(ctx); err != nil { - return future.MakeCompletedErrorFuture(err.GoError()) + return err.GoError() } // If the RangeFeed is performing a catch-up scan then it will observe all @@ -276,7 +275,7 @@ func (r *Replica) RangeFeed( usingCatchUpIter = true alloc, err := r.store.limiters.ConcurrentRangefeedIters.Begin(ctx) if err != nil { - return future.MakeCompletedErrorFuture(err) + return err } // Finish the iterator limit if we exit before the iterator finishes. @@ -301,7 +300,7 @@ func (r *Replica) RangeFeed( if err := r.checkExecutionCanProceedForRangeFeed(ctx, rSpan, checkTS); err != nil { r.raftMu.Unlock() iterSemRelease() - return future.MakeCompletedErrorFuture(err) + return err } // Register the stream with a catch-up iterator. @@ -315,15 +314,14 @@ func (r *Replica) RangeFeed( if err != nil { r.raftMu.Unlock() iterSemRelease() - return future.MakeCompletedErrorFuture(err) + return err } if f := r.store.TestingKnobs().RangefeedValueHeaderFilter; f != nil { catchUpIter.OnEmit = f } } - var done future.ErrorFuture - p := r.registerWithRangefeedRaftMuLocked( - ctx, rSpan, args.Timestamp, catchUpIter, args.WithDiff, args.WithFiltering, stream, &done, + p, err := r.registerWithRangefeedRaftMuLocked( + ctx, rSpan, args.Timestamp, catchUpIter, args.WithDiff, args.WithFiltering, stream, ) r.raftMu.Unlock() @@ -331,8 +329,7 @@ func (r *Replica) RangeFeed( // encountered an error after we created processor, disconnect if processor // is empty. defer r.maybeDisconnectEmptyRangefeed(p) - - return &done + return err } func (r *Replica) getRangefeedProcessorAndFilter() (rangefeed.Processor, *rangefeed.Filter) { @@ -421,8 +418,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( withDiff bool, withFiltering bool, stream rangefeed.Stream, - done *future.ErrorFuture, -) rangefeed.Processor { +) (rangefeed.Processor, error) { defer logSlowRangefeedRegistration(ctx)() // Always defer closing iterator to cover old and new failure cases. @@ -442,7 +438,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( if p != nil { reg, filter := p.Register(span, startTS, catchUpIter, withDiff, withFiltering, false, /* withOmitRemote */ - stream, func() { r.maybeDisconnectEmptyRangefeed(p) }, done) + stream, func() { r.maybeDisconnectEmptyRangefeed(p) }) if reg { // Registered successfully with an existing processor. // Update the rangefeed filter to avoid filtering ops @@ -450,7 +446,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( r.setRangefeedFilterLocked(filter) r.rangefeedMu.Unlock() catchUpIter = nil - return p + return p, nil } // If the registration failed, the processor was already being shut // down. Help unset it and then continue on with initializing a new @@ -508,7 +504,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( scanner, err := rangefeed.NewSeparatedIntentScanner(ctx, r.store.TODOEngine(), desc.RSpan()) if err != nil { - done.Set(err) + stream.Disconnect(kvpb.NewError(err)) return nil } return scanner @@ -520,8 +516,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( // due to stopping, but before it enters the quiescing state, then the select // below will fall through to the panic. if err := p.Start(r.store.Stopper(), rtsIter); err != nil { - done.Set(err) - return nil + return nil, err } // Register with the processor *before* we attach its reference to the @@ -530,12 +525,11 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( // this ensures that the only time the registration fails is during // server shutdown. reg, filter := p.Register(span, startTS, catchUpIter, withDiff, - withFiltering, false /* withOmitRemote */, stream, func() { r.maybeDisconnectEmptyRangefeed(p) }, done) + withFiltering, false /* withOmitRemote */, stream, func() { r.maybeDisconnectEmptyRangefeed(p) }) if !reg { select { case <-r.store.Stopper().ShouldQuiesce(): - done.Set(&kvpb.NodeUnavailableError{}) - return nil + return nil, &kvpb.NodeUnavailableError{} default: panic("unexpected Stopped processor") } @@ -551,7 +545,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( // Check for an initial closed timestamp update immediately to help // initialize the rangefeed's resolved timestamp as soon as possible. r.handleClosedTimestampUpdateRaftMuLocked(ctx, r.GetCurrentClosedTimestamp(ctx)) - return p + return p, nil } // maybeDisconnectEmptyRangefeed tears down the provided Processor if it is diff --git a/pkg/kv/kvserver/replica_rangefeed_test.go b/pkg/kv/kvserver/replica_rangefeed_test.go index 0fae7586be8c..2d9008e207b5 100644 --- a/pkg/kv/kvserver/replica_rangefeed_test.go +++ b/pkg/kv/kvserver/replica_rangefeed_test.go @@ -39,7 +39,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" - "github.com/cockroachdb/cockroach/pkg/util/future" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -54,6 +53,7 @@ import ( type testStream struct { ctx context.Context cancel func() + done chan *kvpb.Error mu struct { syncutil.Mutex events []*kvpb.RangeFeedEvent @@ -62,7 +62,7 @@ type testStream struct { func newTestStream() *testStream { ctx, cancel := context.WithCancel(context.Background()) - return &testStream{ctx: ctx, cancel: cancel} + return &testStream{ctx: ctx, cancel: cancel, done: make(chan *kvpb.Error, 1)} } func (s *testStream) SendMsg(m interface{}) error { panic("unimplemented") } @@ -92,9 +92,27 @@ func (s *testStream) Events() []*kvpb.RangeFeedEvent { return s.mu.events } -func waitRangeFeed(store *kvserver.Store, req *kvpb.RangeFeedRequest, stream *testStream) error { - retErr, _ := future.Wait(context.Background(), store.RangeFeed(req, stream)) - return retErr +func (s *testStream) Disconnect(error *kvpb.Error) { + s.done <- error +} + +func (s *testStream) WaitForErr(t *testing.T) error { + select { + case err := <-s.done: + return err.GoError() + case <-time.After(30 * time.Second): + t.Fatalf("time out waiting for rangefeed completion") + return nil + } +} + +func waitRangeFeed( + t *testing.T, store *kvserver.Store, req *kvpb.RangeFeedRequest, stream *testStream, +) error { + if err := store.RangeFeed(req, stream); err != nil { + return err + } + return stream.WaitForErr(t) } func TestReplicaRangefeed(t *testing.T) { @@ -168,7 +186,7 @@ func TestReplicaRangefeed(t *testing.T) { } timer := time.AfterFunc(10*time.Second, stream.Cancel) defer timer.Stop() - streamErrC <- waitRangeFeed(store, &req, stream) + streamErrC <- waitRangeFeed(t, store, &req, stream) }(i) } @@ -564,7 +582,7 @@ func TestReplicaRangefeed(t *testing.T) { defer timer.Stop() defer stream.Cancel() - if pErr := waitRangeFeed(store, &req, stream); !testutils.IsError( + if pErr := waitRangeFeed(t, store, &req, stream); !testutils.IsError( pErr, `must be after replica GC threshold`, ) { return pErr @@ -574,11 +592,6 @@ func TestReplicaRangefeed(t *testing.T) { }) } -func waitErrorFuture(f *future.ErrorFuture) error { - resultErr, _ := future.Wait(context.Background(), f) - return resultErr -} - func TestScheduledProcessorKillSwitch(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -768,7 +781,7 @@ func TestReplicaRangefeedErrors(t *testing.T) { } timer := time.AfterFunc(10*time.Second, stream.Cancel) defer timer.Stop() - streamErrC <- waitRangeFeed(store, &req, stream) + streamErrC <- waitRangeFeed(t, store, &req, stream) }() // Wait for the first checkpoint event. @@ -803,7 +816,7 @@ func TestReplicaRangefeedErrors(t *testing.T) { } timer := time.AfterFunc(10*time.Second, stream.Cancel) defer timer.Stop() - streamErrC <- waitRangeFeed(store, &req, stream) + streamErrC <- waitRangeFeed(t, store, &req, stream) }() // Wait for the first checkpoint event. @@ -848,7 +861,7 @@ func TestReplicaRangefeedErrors(t *testing.T) { } timer := time.AfterFunc(10*time.Second, streamLeft.Cancel) defer timer.Stop() - streamLeftErrC <- waitRangeFeed(store, &req, streamLeft) + streamLeftErrC <- waitRangeFeed(t, store, &req, streamLeft) }() // Establish a rangefeed on the right replica. @@ -864,7 +877,7 @@ func TestReplicaRangefeedErrors(t *testing.T) { } timer := time.AfterFunc(10*time.Second, streamRight.Cancel) defer timer.Stop() - streamRightErrC <- waitRangeFeed(store, &req, streamRight) + streamRightErrC <- waitRangeFeed(t, store, &req, streamRight) }() // Wait for the first checkpoint event on each stream. @@ -922,7 +935,7 @@ func TestReplicaRangefeedErrors(t *testing.T) { } timer := time.AfterFunc(10*time.Second, stream.Cancel) defer timer.Stop() - streamErrC <- waitRangeFeed(partitionStore, &req, stream) + streamErrC <- waitRangeFeed(t, partitionStore, &req, stream) }() // Wait for the first checkpoint event. @@ -1050,7 +1063,7 @@ func TestReplicaRangefeedErrors(t *testing.T) { kvserver.RangefeedEnabled.Override(ctx, &store.ClusterSettings().SV, true) timer := time.AfterFunc(10*time.Second, stream.Cancel) defer timer.Stop() - streamErrC <- waitRangeFeed(store, &req, stream) + streamErrC <- waitRangeFeed(t, store, &req, stream) }() // Wait for the first checkpoint event. @@ -1125,7 +1138,7 @@ func TestReplicaRangefeedErrors(t *testing.T) { } timer := time.AfterFunc(10*time.Second, stream.Cancel) defer timer.Stop() - streamErrC <- waitErrorFuture(store.RangeFeed(&req, stream)) + streamErrC <- waitRangeFeed(t, store, &req, stream) }() // Check the error. @@ -1145,7 +1158,7 @@ func TestReplicaRangefeedErrors(t *testing.T) { } timer := time.AfterFunc(10*time.Second, stream.Cancel) defer timer.Stop() - streamErrC <- waitErrorFuture(store.RangeFeed(&req, stream)) + streamErrC <- waitRangeFeed(t, store, &req, stream) }() // Wait for the first checkpoint event. @@ -1190,7 +1203,7 @@ func TestReplicaRangefeedMVCCHistoryMutationError(t *testing.T) { } timer := time.AfterFunc(10*time.Second, stream.Cancel) defer timer.Stop() - streamErrC <- waitRangeFeed(store, &req, stream) + streamErrC <- waitRangeFeed(t, store, &req, stream) }() // Wait for a checkpoint. diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 687d2b3e0772..76e628e191d0 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -74,7 +74,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/envutil" - "github.com/cockroachdb/cockroach/pkg/util/future" "github.com/cockroachdb/cockroach/pkg/util/grunning" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/iterutil" @@ -3132,23 +3131,21 @@ func (s *Store) Descriptor(ctx context.Context, useCached bool) (*roachpb.StoreD // RangeFeed registers a rangefeed over the specified span. It sends updates to // the provided stream and returns a future with an optional error when the rangefeed is // complete. -func (s *Store) RangeFeed( - args *kvpb.RangeFeedRequest, stream kvpb.RangeFeedEventSink, -) *future.ErrorFuture { +func (s *Store) RangeFeed(args *kvpb.RangeFeedRequest, stream rangefeed.Stream) error { if filter := s.TestingKnobs().TestingRangefeedFilter; filter != nil { if pErr := filter(args, stream); pErr != nil { - return future.MakeCompletedErrorFuture(pErr.GoError()) + return pErr.GoError() } } if err := verifyKeys(args.Span.Key, args.Span.EndKey, true); err != nil { - return future.MakeCompletedErrorFuture(err) + return err } // Get range and add command to the range for execution. repl, err := s.GetReplica(args.RangeID) if err != nil { - return future.MakeCompletedErrorFuture(err) + return err } if !repl.IsInitialized() { // (*Store).Send has an optimization for uninitialized replicas to send back @@ -3156,7 +3153,7 @@ func (s *Store) RangeFeed( // be found. RangeFeeds can always be served from followers and so don't // otherwise return NotLeaseHolderError. For simplicity we also don't return // one here. - return future.MakeCompletedErrorFuture(kvpb.NewRangeNotFoundError(args.RangeID, s.StoreID())) + return kvpb.NewRangeNotFoundError(args.RangeID, s.StoreID()) } tenID, _ := repl.TenantID() diff --git a/pkg/kv/kvserver/stores.go b/pkg/kv/kvserver/stores.go index 13ebfc91a079..0c50f2dfda88 100644 --- a/pkg/kv/kvserver/stores.go +++ b/pkg/kv/kvserver/stores.go @@ -20,10 +20,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/disk" - "github.com/cockroachdb/cockroach/pkg/util/future" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" @@ -210,9 +210,7 @@ func (ls *Stores) SendWithWriteBytes( // RangeFeed registers a rangefeed over the specified span. It sends // updates to the provided stream and returns a future with an optional error // when the rangefeed is complete. -func (ls *Stores) RangeFeed( - args *kvpb.RangeFeedRequest, stream kvpb.RangeFeedEventSink, -) *future.ErrorFuture { +func (ls *Stores) RangeFeed(args *kvpb.RangeFeedRequest, stream rangefeed.Stream) error { ctx := stream.Context() if args.RangeID == 0 { log.Fatal(ctx, "rangefeed request missing range ID") @@ -222,7 +220,7 @@ func (ls *Stores) RangeFeed( store, err := ls.GetStore(args.Replica.StoreID) if err != nil { - return future.MakeCompletedErrorFuture(err) + return err } return store.RangeFeed(args, stream) diff --git a/pkg/server/node.go b/pkg/server/node.go index e915f37df85e..90c27c312462 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -1828,6 +1828,7 @@ func (n *Node) RangeLookup( type muxer interface { Send(event *kvpb.MuxRangeFeedEvent) error + DisconnectRangefeedWithError(streamID int64, rangeID roachpb.RangeID, err *kvpb.Error) } var _ muxer = &rangefeed.StreamMuxer{} @@ -1856,7 +1857,12 @@ func (s *setRangeIDEventSink) Send(event *kvpb.RangeFeedEvent) error { return s.wrapped.Send(response) } +func (s *setRangeIDEventSink) Disconnect(err *kvpb.Error) { + s.wrapped.DisconnectRangefeedWithError(s.streamID, s.rangeID, err) +} + var _ kvpb.RangeFeedEventSink = (*setRangeIDEventSink)(nil) +var _ rangefeed.Stream = (*setRangeIDEventSink)(nil) // lockedMuxStream provides support for concurrent calls to Send. The underlying // MuxRangeFeedServer (default grpc.Stream) is not safe for concurrent calls to @@ -1921,10 +1927,9 @@ func (n *Node) MuxRangeFeed(stream kvpb.Internal_MuxRangeFeedServer) error { } streamMuxer.AddStream(req.StreamID, cancel) - f := n.stores.RangeFeed(req, streamSink) - f.WhenReady(func(err error) { + if err := n.stores.RangeFeed(req, streamSink); err != nil { streamMuxer.DisconnectRangefeedWithError(req.StreamID, req.RangeID, kvpb.NewError(err)) - }) + } } }