diff --git a/pkg/kv/kvserver/rangefeed/BUILD.bazel b/pkg/kv/kvserver/rangefeed/BUILD.bazel index c02cd9ec3161..16e0e1fcb991 100644 --- a/pkg/kv/kvserver/rangefeed/BUILD.bazel +++ b/pkg/kv/kvserver/rangefeed/BUILD.bazel @@ -14,6 +14,7 @@ go_library( "scheduled_processor.go", "scheduler.go", "stream_muxer.go", + "stream_muxer_test_helper.go", "task.go", "testutil.go", ], @@ -65,6 +66,7 @@ go_test( "registry_test.go", "resolved_timestamp_test.go", "scheduler_test.go", + "stream_muxer_test.go", "task_test.go", ], embed = [":rangefeed"], diff --git a/pkg/kv/kvserver/rangefeed/stream_muxer.go b/pkg/kv/kvserver/rangefeed/stream_muxer.go index d9172a85b163..f3921a489fdb 100644 --- a/pkg/kv/kvserver/rangefeed/stream_muxer.go +++ b/pkg/kv/kvserver/rangefeed/stream_muxer.go @@ -12,8 +12,10 @@ package rangefeed import ( "context" + "sync" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" @@ -88,6 +90,9 @@ type StreamMuxer struct { // thread safety. sender ServerStreamSender + // streamID -> context.CancelFunc for active rangefeeds + activeStreams sync.Map + // notifyMuxError is a buffered channel of size 1 used to signal the presence // of muxErrors. Additional signals are dropped if the channel is already full // so that it's unblocking. @@ -110,20 +115,74 @@ func NewStreamMuxer(sender ServerStreamSender) *StreamMuxer { } } -// AppendMuxError appends a mux rangefeed completion error to be sent back to +// AddStream registers a server rangefeed stream with the StreamMuxer. It +// remains active until DisconnectRangefeedWithError is called with the same +// streamID. Caller must ensure no duplicate stream IDs are added without +// disconnecting the old one first. +func (sm *StreamMuxer) AddStream(streamID int64, cancel context.CancelFunc) { + if _, loaded := sm.activeStreams.LoadOrStore(streamID, cancel); loaded { + log.Fatalf(context.Background(), "stream %d already exists", streamID) + } + +} + +// transformRangefeedErrToClientError converts a rangefeed error to a client +// error to be sent back to client. This also handles nil values, preventing nil +// pointer dereference. +func transformRangefeedErrToClientError(err *kvpb.Error) *kvpb.Error { + if err == nil { + // When processor is stopped when it no longer has any registrations, it + // would attempt to close all feeds again with a nil error. Theoretically, + // this should never happen as processor would always stop with a reason if + // feeds are active. + return kvpb.NewError(kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED)) + } + return err +} + +// appendMuxError appends a mux rangefeed completion error to be sent back to // the client. Note that this method cannot block on IO. If the underlying // stream is broken, the error will be dropped. -func (sm *StreamMuxer) AppendMuxError(e *kvpb.MuxRangeFeedEvent) { +func (sm *StreamMuxer) appendMuxError(e *kvpb.MuxRangeFeedEvent) { sm.mu.Lock() defer sm.mu.Unlock() sm.mu.muxErrors = append(sm.mu.muxErrors, e) - // Note that notify is unblocking. + // Note that notifyMuxError is non-blocking. select { case sm.notifyMuxError <- struct{}{}: default: } } +// DisconnectRangefeedWithError disconnects a stream with an error. Safe to call +// repeatedly for the same stream, but subsequent errors are ignored. It ensures +// 1. the stream context is cancelled 2. exactly one error is sent back to the +// client on behalf of the stream. +// +// Note that this function can be called by the processor worker while holding +// raftMu, so it is important that this function doesn't block IO. It does so by +// delegating the responsibility of sending mux error to StreamMuxer.Run. +func (sm *StreamMuxer) DisconnectRangefeedWithError( + streamID int64, rangeID roachpb.RangeID, err *kvpb.Error, +) { + if cancelFunc, ok := sm.activeStreams.LoadAndDelete(streamID); ok { + f, ok := cancelFunc.(context.CancelFunc) + if !ok { + log.Fatalf(context.Background(), "unexpected stream type %T", cancelFunc) + } + f() + clientErrorEvent := transformRangefeedErrToClientError(err) + ev := &kvpb.MuxRangeFeedEvent{ + StreamID: streamID, + RangeID: rangeID, + } + ev.MustSetValue(&kvpb.RangeFeedError{ + Error: *clientErrorEvent, + }) + sm.appendMuxError(ev) + } +} + // detachMuxErrors returns muxErrors and clears the slice. Caller must ensure // the returned errors are sent back to the client. func (sm *StreamMuxer) detachMuxErrors() []*kvpb.MuxRangeFeedEvent { @@ -155,7 +214,8 @@ func (sm *StreamMuxer) Run(ctx context.Context, stopper *stop.Stopper) { for { select { case <-sm.notifyMuxError: - for _, clientErr := range sm.detachMuxErrors() { + toSend := sm.detachMuxErrors() + for _, clientErr := range toSend { if err := sm.sender.Send(clientErr); err != nil { log.Errorf(ctx, "failed to send rangefeed completion error back to client due to broken stream: %v", err) diff --git a/pkg/kv/kvserver/rangefeed/stream_muxer_test.go b/pkg/kv/kvserver/rangefeed/stream_muxer_test.go new file mode 100644 index 000000000000..0ea7f2c211d4 --- /dev/null +++ b/pkg/kv/kvserver/rangefeed/stream_muxer_test.go @@ -0,0 +1,171 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeed + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +// TestStreamMuxer tests that correctly forwards rangefeed completion errors to +// the server stream. +func TestStreamMuxer(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx, cancel := context.WithCancel(context.Background()) + stopper := stop.NewStopper() + + testServerStream := newTestServerStream() + muxer, cleanUp := NewTestStreamMuxer(t, ctx, stopper, testServerStream) + defer func() { + cancel() + cleanUp() + stopper.Stop(ctx) + }() + + t.Run("nil handling", func(t *testing.T) { + const streamID = 0 + const rangeID = 1 + streamCtx, cancel := context.WithCancel(context.Background()) + muxer.AddStream(0, cancel) + // Note that kvpb.NewError(nil) == nil. + muxer.DisconnectRangefeedWithError(streamID, rangeID, kvpb.NewError(nil)) + require.Equal(t, context.Canceled, streamCtx.Err()) + expectedErrEvent := &kvpb.MuxRangeFeedEvent{ + StreamID: streamID, + RangeID: rangeID, + } + expectedErrEvent.MustSetValue(&kvpb.RangeFeedError{ + Error: *kvpb.NewError(kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED)), + }) + time.Sleep(10 * time.Millisecond) + require.Equal(t, 1, testServerStream.totalEventsSent()) + require.True(t, testServerStream.hasEvent(expectedErrEvent)) + + // Repeat closing the stream does nothing. + muxer.DisconnectRangefeedWithError(streamID, rangeID, + kvpb.NewError(kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED))) + time.Sleep(10 * time.Millisecond) + require.Equal(t, 1, testServerStream.totalEventsSent()) + }) + + t.Run("send rangefeed completion error", func(t *testing.T) { + testRangefeedCompletionErrors := []struct { + streamID int64 + rangeID roachpb.RangeID + Error error + }{ + {0, 1, kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED)}, + {1, 1, context.Canceled}, + {2, 2, &kvpb.NodeUnavailableError{}}, + } + + for _, muxError := range testRangefeedCompletionErrors { + muxer.AddStream(muxError.streamID, func() {}) + } + + var wg sync.WaitGroup + for _, muxError := range testRangefeedCompletionErrors { + wg.Add(1) + go func(streamID int64, rangeID roachpb.RangeID, err error) { + defer wg.Done() + muxer.DisconnectRangefeedWithError(streamID, rangeID, kvpb.NewError(err)) + }(muxError.streamID, muxError.rangeID, muxError.Error) + } + wg.Wait() + + for _, muxError := range testRangefeedCompletionErrors { + testutils.SucceedsSoon(t, func() error { + ev := &kvpb.MuxRangeFeedEvent{ + StreamID: muxError.streamID, + RangeID: muxError.rangeID, + } + ev.MustSetValue(&kvpb.RangeFeedError{ + Error: *kvpb.NewError(muxError.Error), + }) + if testServerStream.hasEvent(ev) { + return nil + } + return errors.Newf("expected error %v not found", muxError) + }) + } + }) +} + +// TestStreamMuxerOnBlockingIO tests that the +// StreamMuxer.DisconnectRangefeedWithError doesn't block on IO. +func TestStreamMuxerOnBlockingIO(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx, cancel := context.WithCancel(context.Background()) + stopper := stop.NewStopper() + + testServerStream := newTestServerStream() + muxer, cleanUp := NewTestStreamMuxer(t, ctx, stopper, testServerStream) + defer func() { + cancel() + cleanUp() + stopper.Stop(ctx) + }() + + const streamID = 0 + const rangeID = 1 + streamCtx, streamCancel := context.WithCancel(context.Background()) + muxer.AddStream(0, streamCancel) + + ev := &kvpb.MuxRangeFeedEvent{ + StreamID: streamID, + RangeID: rangeID, + } + ev.MustSetValue(&kvpb.RangeFeedCheckpoint{ + Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("m")}, + ResolvedTS: hlc.Timestamp{WallTime: 1}, + }) + err := muxer.sender.Send(ev) + require.NoError(t, err) + require.Truef(t, testServerStream.hasEvent(ev), + "expected event %v not found in %v", ev, testServerStream) + + // Block the stream. + unblock := testServerStream.BlockSend() + + // Although stream is blocked, we should be able to disconnect the stream + // without blocking. + muxer.DisconnectRangefeedWithError(streamID, rangeID, + kvpb.NewError(kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_NO_LEASEHOLDER))) + require.Equal(t, streamCtx.Err(), context.Canceled) + unblock() + time.Sleep(100 * time.Millisecond) + expectedErrEvent := &kvpb.MuxRangeFeedEvent{ + StreamID: streamID, + RangeID: rangeID, + } + expectedErrEvent.MustSetValue(&kvpb.RangeFeedError{ + Error: *kvpb.NewError(kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_NO_LEASEHOLDER)), + }) + // Receive the event after unblocking. + require.Truef(t, testServerStream.hasEvent(expectedErrEvent), + "expected event %v not found in %v", ev, testServerStream) +} diff --git a/pkg/kv/kvserver/rangefeed/stream_muxer_test_helper.go b/pkg/kv/kvserver/rangefeed/stream_muxer_test_helper.go new file mode 100644 index 000000000000..d303e3f5a5c0 --- /dev/null +++ b/pkg/kv/kvserver/rangefeed/stream_muxer_test_helper.go @@ -0,0 +1,113 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeed + +import ( + "context" + "fmt" + "reflect" + "strings" + "sync" + "testing" + + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) +// testServerStream mocks grpc server stream for testing. +type testServerStream struct { + syncutil.Mutex + // eventsSent is the total number of events sent. + eventsSent int + // streamEvents is a map of streamID to a list of events sent to that stream. + streamEvents map[int64][]*kvpb.MuxRangeFeedEvent +} + +func newTestServerStream() *testServerStream { + return &testServerStream{ + streamEvents: make(map[int64][]*kvpb.MuxRangeFeedEvent), + } +} + +func (s *testServerStream) totalEventsSent() int { + s.Lock() + defer s.Unlock() + return s.eventsSent +} + +// hasEvent returns true if the event is found in the streamEvents map. Note +// that it does a deep equal comparison. +func (s *testServerStream) hasEvent(e *kvpb.MuxRangeFeedEvent) bool { + if e == nil { + return false + } + s.Lock() + defer s.Unlock() + for _, streamEvent := range s.streamEvents[e.StreamID] { + if reflect.DeepEqual(e, streamEvent) { + return true + } + } + return false +} + +// String returns a string representation of the events sent in the stream. +func (s *testServerStream) String() string { + str := strings.Builder{} + for streamID, eventList := range s.streamEvents { + str.WriteString( + fmt.Sprintf("StreamID:%d, Len:%d\n", streamID, len(eventList))) + } + return str.String() +} + +func (s *testServerStream) SendIsThreadSafe() {} + +func (s *testServerStream) Send(e *kvpb.MuxRangeFeedEvent) error { + s.Lock() + defer s.Unlock() + s.eventsSent++ + s.streamEvents[e.StreamID] = append(s.streamEvents[e.StreamID], e) + return nil +} + +func (s *testServerStream) BlockSend() func() { + s.Lock() + var once sync.Once + return func() { + // safe to call multiple times, e.g. defer and explicit + once.Do(s.Unlock) //nolint:deferunlockcheck + } +} + +// NewTestStreamMuxer is a helper function to create a StreamMuxer for testing. +// It uses the actual StreamMuxer. Example usage: +// +// serverStream := newTestServerStream() +// stopper := stop.NewStopper() +// streamMuxer, cleanUp := NewTestStreamMuxer(t, ctx, stopper, serverStream) +// defer cleanUp() +// defer stopper.Stop(ctx) // or defer cancel() - important to stop the StreamMuxer before cleanUp() +func NewTestStreamMuxer( + t *testing.T, ctx context.Context, stopper *stop.Stopper, sender ServerStreamSender, +) (muxer *StreamMuxer, cleanUp func()) { + muxer = NewStreamMuxer(sender) + var wg sync.WaitGroup + wg.Add(1) + if err := stopper.RunAsyncTask(ctx, "test-stream-muxer", func(ctx context.Context) { + defer wg.Done() + muxer.Run(ctx, stopper) + }); err != nil { + wg.Done() + t.Fatal(err) + } + return muxer, wg.Wait +} diff --git a/pkg/server/node.go b/pkg/server/node.go index 02831d0e5430..629c6456b06e 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -1889,8 +1889,6 @@ func (n *Node) MuxRangeFeed(stream kvpb.Internal_MuxRangeFeedServer) error { n.metrics.ActiveMuxRangeFeed.Inc(1) defer n.metrics.ActiveMuxRangeFeed.Inc(-1) - var activeStreams sync.Map - for { req, err := stream.Recv() if err != nil { @@ -1898,17 +1896,10 @@ func (n *Node) MuxRangeFeed(stream kvpb.Internal_MuxRangeFeedServer) error { } if req.CloseStream { - // Client issued a request to close previously established stream. - if v, loaded := activeStreams.LoadAndDelete(req.StreamID); loaded { - s := v.(*setRangeIDEventSink) - s.cancel() - } else { - // This is a bit strange, but it could happen if this stream completes - // just before we receive close request. So, just print out a warning. - if log.V(1) { - log.Infof(ctx, "closing unknown rangefeed stream ID %d", req.StreamID) - } - } + // Note that we will call disconnect again when future.Error returns, but + // DisconnectRangefeedWithError will ignore subsequent errors. + streamMuxer.DisconnectRangefeedWithError(req.StreamID, req.RangeID, + kvpb.NewError(kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED))) continue } @@ -1924,48 +1915,14 @@ func (n *Node) MuxRangeFeed(stream kvpb.Internal_MuxRangeFeedServer) error { streamID: req.StreamID, wrapped: muxStream, } - activeStreams.Store(req.StreamID, streamSink) + streamMuxer.AddStream(req.StreamID, cancel) n.metrics.NumMuxRangeFeed.Inc(1) n.metrics.ActiveMuxRangeFeed.Inc(1) f := n.stores.RangeFeed(req, streamSink) f.WhenReady(func(err error) { n.metrics.ActiveMuxRangeFeed.Inc(-1) - - _, loaded := activeStreams.LoadAndDelete(req.StreamID) - streamClosedByClient := !loaded - streamSink.cancel() - - if streamClosedByClient && streamSink.ctx.Err() != nil { - // If the stream was explicitly closed by the client, we expect to see - // context.Canceled error. In this case, return - // kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED to the client. - err = kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED) - } - - if err == nil { - cause := kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED - err = kvpb.NewRangeFeedRetryError(cause) - } - - e := &kvpb.MuxRangeFeedEvent{ - RangeID: req.RangeID, - StreamID: req.StreamID, - } - - e.SetValue(&kvpb.RangeFeedError{ - Error: *kvpb.NewError(err), - }) - - // When rangefeed completes, we must notify the client about that. - // - // NB: even though calling sink.Send() to send notification might seem - // correct, it is also unsafe. This future may be completed at any point, - // including during critical section when some important lock (such as - // raftMu in processor) may be held. Issuing potentially blocking IO - // during that time is not a good idea. Thus, we shunt the notification to - // a dedicated goroutine. - streamMuxer.AppendMuxError(e) + streamMuxer.DisconnectRangefeedWithError(req.StreamID, req.RangeID, kvpb.NewError(err)) }) } }