From 8fb9efbbdd759af579e2eaabd33d8089bba8100b Mon Sep 17 00:00:00 2001 From: Wenyi Hu Date: Fri, 28 Jun 2024 21:20:05 -0400 Subject: [PATCH] kvserver/rangefeed: move active streams to stream muxer This patch moves the existing active stream management to StreamMuxer without changing any existing behaviour. The main purpose is to make future commits cleaner. Part of: #126561 Epic: none Release note: none --- pkg/kv/kvserver/rangefeed/BUILD.bazel | 2 + pkg/kv/kvserver/rangefeed/stream_muxer.go | 68 ++++++- .../kvserver/rangefeed/stream_muxer_test.go | 171 ++++++++++++++++++ .../rangefeed/stream_muxer_test_helper.go | 113 ++++++++++++ pkg/server/node.go | 55 +----- 5 files changed, 356 insertions(+), 53 deletions(-) create mode 100644 pkg/kv/kvserver/rangefeed/stream_muxer_test.go create mode 100644 pkg/kv/kvserver/rangefeed/stream_muxer_test_helper.go diff --git a/pkg/kv/kvserver/rangefeed/BUILD.bazel b/pkg/kv/kvserver/rangefeed/BUILD.bazel index c02cd9ec3161..16e0e1fcb991 100644 --- a/pkg/kv/kvserver/rangefeed/BUILD.bazel +++ b/pkg/kv/kvserver/rangefeed/BUILD.bazel @@ -14,6 +14,7 @@ go_library( "scheduled_processor.go", "scheduler.go", "stream_muxer.go", + "stream_muxer_test_helper.go", "task.go", "testutil.go", ], @@ -65,6 +66,7 @@ go_test( "registry_test.go", "resolved_timestamp_test.go", "scheduler_test.go", + "stream_muxer_test.go", "task_test.go", ], embed = [":rangefeed"], diff --git a/pkg/kv/kvserver/rangefeed/stream_muxer.go b/pkg/kv/kvserver/rangefeed/stream_muxer.go index d9172a85b163..f3921a489fdb 100644 --- a/pkg/kv/kvserver/rangefeed/stream_muxer.go +++ b/pkg/kv/kvserver/rangefeed/stream_muxer.go @@ -12,8 +12,10 @@ package rangefeed import ( "context" + "sync" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" @@ -88,6 +90,9 @@ type StreamMuxer struct { // thread safety. sender ServerStreamSender + // streamID -> context.CancelFunc for active rangefeeds + activeStreams sync.Map + // notifyMuxError is a buffered channel of size 1 used to signal the presence // of muxErrors. Additional signals are dropped if the channel is already full // so that it's unblocking. @@ -110,20 +115,74 @@ func NewStreamMuxer(sender ServerStreamSender) *StreamMuxer { } } -// AppendMuxError appends a mux rangefeed completion error to be sent back to +// AddStream registers a server rangefeed stream with the StreamMuxer. It +// remains active until DisconnectRangefeedWithError is called with the same +// streamID. Caller must ensure no duplicate stream IDs are added without +// disconnecting the old one first. +func (sm *StreamMuxer) AddStream(streamID int64, cancel context.CancelFunc) { + if _, loaded := sm.activeStreams.LoadOrStore(streamID, cancel); loaded { + log.Fatalf(context.Background(), "stream %d already exists", streamID) + } + +} + +// transformRangefeedErrToClientError converts a rangefeed error to a client +// error to be sent back to client. This also handles nil values, preventing nil +// pointer dereference. +func transformRangefeedErrToClientError(err *kvpb.Error) *kvpb.Error { + if err == nil { + // When processor is stopped when it no longer has any registrations, it + // would attempt to close all feeds again with a nil error. Theoretically, + // this should never happen as processor would always stop with a reason if + // feeds are active. + return kvpb.NewError(kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED)) + } + return err +} + +// appendMuxError appends a mux rangefeed completion error to be sent back to // the client. Note that this method cannot block on IO. If the underlying // stream is broken, the error will be dropped. -func (sm *StreamMuxer) AppendMuxError(e *kvpb.MuxRangeFeedEvent) { +func (sm *StreamMuxer) appendMuxError(e *kvpb.MuxRangeFeedEvent) { sm.mu.Lock() defer sm.mu.Unlock() sm.mu.muxErrors = append(sm.mu.muxErrors, e) - // Note that notify is unblocking. + // Note that notifyMuxError is non-blocking. select { case sm.notifyMuxError <- struct{}{}: default: } } +// DisconnectRangefeedWithError disconnects a stream with an error. Safe to call +// repeatedly for the same stream, but subsequent errors are ignored. It ensures +// 1. the stream context is cancelled 2. exactly one error is sent back to the +// client on behalf of the stream. +// +// Note that this function can be called by the processor worker while holding +// raftMu, so it is important that this function doesn't block IO. It does so by +// delegating the responsibility of sending mux error to StreamMuxer.Run. +func (sm *StreamMuxer) DisconnectRangefeedWithError( + streamID int64, rangeID roachpb.RangeID, err *kvpb.Error, +) { + if cancelFunc, ok := sm.activeStreams.LoadAndDelete(streamID); ok { + f, ok := cancelFunc.(context.CancelFunc) + if !ok { + log.Fatalf(context.Background(), "unexpected stream type %T", cancelFunc) + } + f() + clientErrorEvent := transformRangefeedErrToClientError(err) + ev := &kvpb.MuxRangeFeedEvent{ + StreamID: streamID, + RangeID: rangeID, + } + ev.MustSetValue(&kvpb.RangeFeedError{ + Error: *clientErrorEvent, + }) + sm.appendMuxError(ev) + } +} + // detachMuxErrors returns muxErrors and clears the slice. Caller must ensure // the returned errors are sent back to the client. func (sm *StreamMuxer) detachMuxErrors() []*kvpb.MuxRangeFeedEvent { @@ -155,7 +214,8 @@ func (sm *StreamMuxer) Run(ctx context.Context, stopper *stop.Stopper) { for { select { case <-sm.notifyMuxError: - for _, clientErr := range sm.detachMuxErrors() { + toSend := sm.detachMuxErrors() + for _, clientErr := range toSend { if err := sm.sender.Send(clientErr); err != nil { log.Errorf(ctx, "failed to send rangefeed completion error back to client due to broken stream: %v", err) diff --git a/pkg/kv/kvserver/rangefeed/stream_muxer_test.go b/pkg/kv/kvserver/rangefeed/stream_muxer_test.go new file mode 100644 index 000000000000..0ea7f2c211d4 --- /dev/null +++ b/pkg/kv/kvserver/rangefeed/stream_muxer_test.go @@ -0,0 +1,171 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeed + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +// TestStreamMuxer tests that correctly forwards rangefeed completion errors to +// the server stream. +func TestStreamMuxer(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx, cancel := context.WithCancel(context.Background()) + stopper := stop.NewStopper() + + testServerStream := newTestServerStream() + muxer, cleanUp := NewTestStreamMuxer(t, ctx, stopper, testServerStream) + defer func() { + cancel() + cleanUp() + stopper.Stop(ctx) + }() + + t.Run("nil handling", func(t *testing.T) { + const streamID = 0 + const rangeID = 1 + streamCtx, cancel := context.WithCancel(context.Background()) + muxer.AddStream(0, cancel) + // Note that kvpb.NewError(nil) == nil. + muxer.DisconnectRangefeedWithError(streamID, rangeID, kvpb.NewError(nil)) + require.Equal(t, context.Canceled, streamCtx.Err()) + expectedErrEvent := &kvpb.MuxRangeFeedEvent{ + StreamID: streamID, + RangeID: rangeID, + } + expectedErrEvent.MustSetValue(&kvpb.RangeFeedError{ + Error: *kvpb.NewError(kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED)), + }) + time.Sleep(10 * time.Millisecond) + require.Equal(t, 1, testServerStream.totalEventsSent()) + require.True(t, testServerStream.hasEvent(expectedErrEvent)) + + // Repeat closing the stream does nothing. + muxer.DisconnectRangefeedWithError(streamID, rangeID, + kvpb.NewError(kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED))) + time.Sleep(10 * time.Millisecond) + require.Equal(t, 1, testServerStream.totalEventsSent()) + }) + + t.Run("send rangefeed completion error", func(t *testing.T) { + testRangefeedCompletionErrors := []struct { + streamID int64 + rangeID roachpb.RangeID + Error error + }{ + {0, 1, kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED)}, + {1, 1, context.Canceled}, + {2, 2, &kvpb.NodeUnavailableError{}}, + } + + for _, muxError := range testRangefeedCompletionErrors { + muxer.AddStream(muxError.streamID, func() {}) + } + + var wg sync.WaitGroup + for _, muxError := range testRangefeedCompletionErrors { + wg.Add(1) + go func(streamID int64, rangeID roachpb.RangeID, err error) { + defer wg.Done() + muxer.DisconnectRangefeedWithError(streamID, rangeID, kvpb.NewError(err)) + }(muxError.streamID, muxError.rangeID, muxError.Error) + } + wg.Wait() + + for _, muxError := range testRangefeedCompletionErrors { + testutils.SucceedsSoon(t, func() error { + ev := &kvpb.MuxRangeFeedEvent{ + StreamID: muxError.streamID, + RangeID: muxError.rangeID, + } + ev.MustSetValue(&kvpb.RangeFeedError{ + Error: *kvpb.NewError(muxError.Error), + }) + if testServerStream.hasEvent(ev) { + return nil + } + return errors.Newf("expected error %v not found", muxError) + }) + } + }) +} + +// TestStreamMuxerOnBlockingIO tests that the +// StreamMuxer.DisconnectRangefeedWithError doesn't block on IO. +func TestStreamMuxerOnBlockingIO(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx, cancel := context.WithCancel(context.Background()) + stopper := stop.NewStopper() + + testServerStream := newTestServerStream() + muxer, cleanUp := NewTestStreamMuxer(t, ctx, stopper, testServerStream) + defer func() { + cancel() + cleanUp() + stopper.Stop(ctx) + }() + + const streamID = 0 + const rangeID = 1 + streamCtx, streamCancel := context.WithCancel(context.Background()) + muxer.AddStream(0, streamCancel) + + ev := &kvpb.MuxRangeFeedEvent{ + StreamID: streamID, + RangeID: rangeID, + } + ev.MustSetValue(&kvpb.RangeFeedCheckpoint{ + Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("m")}, + ResolvedTS: hlc.Timestamp{WallTime: 1}, + }) + err := muxer.sender.Send(ev) + require.NoError(t, err) + require.Truef(t, testServerStream.hasEvent(ev), + "expected event %v not found in %v", ev, testServerStream) + + // Block the stream. + unblock := testServerStream.BlockSend() + + // Although stream is blocked, we should be able to disconnect the stream + // without blocking. + muxer.DisconnectRangefeedWithError(streamID, rangeID, + kvpb.NewError(kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_NO_LEASEHOLDER))) + require.Equal(t, streamCtx.Err(), context.Canceled) + unblock() + time.Sleep(100 * time.Millisecond) + expectedErrEvent := &kvpb.MuxRangeFeedEvent{ + StreamID: streamID, + RangeID: rangeID, + } + expectedErrEvent.MustSetValue(&kvpb.RangeFeedError{ + Error: *kvpb.NewError(kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_NO_LEASEHOLDER)), + }) + // Receive the event after unblocking. + require.Truef(t, testServerStream.hasEvent(expectedErrEvent), + "expected event %v not found in %v", ev, testServerStream) +} diff --git a/pkg/kv/kvserver/rangefeed/stream_muxer_test_helper.go b/pkg/kv/kvserver/rangefeed/stream_muxer_test_helper.go new file mode 100644 index 000000000000..d303e3f5a5c0 --- /dev/null +++ b/pkg/kv/kvserver/rangefeed/stream_muxer_test_helper.go @@ -0,0 +1,113 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeed + +import ( + "context" + "fmt" + "reflect" + "strings" + "sync" + "testing" + + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) +// testServerStream mocks grpc server stream for testing. +type testServerStream struct { + syncutil.Mutex + // eventsSent is the total number of events sent. + eventsSent int + // streamEvents is a map of streamID to a list of events sent to that stream. + streamEvents map[int64][]*kvpb.MuxRangeFeedEvent +} + +func newTestServerStream() *testServerStream { + return &testServerStream{ + streamEvents: make(map[int64][]*kvpb.MuxRangeFeedEvent), + } +} + +func (s *testServerStream) totalEventsSent() int { + s.Lock() + defer s.Unlock() + return s.eventsSent +} + +// hasEvent returns true if the event is found in the streamEvents map. Note +// that it does a deep equal comparison. +func (s *testServerStream) hasEvent(e *kvpb.MuxRangeFeedEvent) bool { + if e == nil { + return false + } + s.Lock() + defer s.Unlock() + for _, streamEvent := range s.streamEvents[e.StreamID] { + if reflect.DeepEqual(e, streamEvent) { + return true + } + } + return false +} + +// String returns a string representation of the events sent in the stream. +func (s *testServerStream) String() string { + str := strings.Builder{} + for streamID, eventList := range s.streamEvents { + str.WriteString( + fmt.Sprintf("StreamID:%d, Len:%d\n", streamID, len(eventList))) + } + return str.String() +} + +func (s *testServerStream) SendIsThreadSafe() {} + +func (s *testServerStream) Send(e *kvpb.MuxRangeFeedEvent) error { + s.Lock() + defer s.Unlock() + s.eventsSent++ + s.streamEvents[e.StreamID] = append(s.streamEvents[e.StreamID], e) + return nil +} + +func (s *testServerStream) BlockSend() func() { + s.Lock() + var once sync.Once + return func() { + // safe to call multiple times, e.g. defer and explicit + once.Do(s.Unlock) //nolint:deferunlockcheck + } +} + +// NewTestStreamMuxer is a helper function to create a StreamMuxer for testing. +// It uses the actual StreamMuxer. Example usage: +// +// serverStream := newTestServerStream() +// stopper := stop.NewStopper() +// streamMuxer, cleanUp := NewTestStreamMuxer(t, ctx, stopper, serverStream) +// defer cleanUp() +// defer stopper.Stop(ctx) // or defer cancel() - important to stop the StreamMuxer before cleanUp() +func NewTestStreamMuxer( + t *testing.T, ctx context.Context, stopper *stop.Stopper, sender ServerStreamSender, +) (muxer *StreamMuxer, cleanUp func()) { + muxer = NewStreamMuxer(sender) + var wg sync.WaitGroup + wg.Add(1) + if err := stopper.RunAsyncTask(ctx, "test-stream-muxer", func(ctx context.Context) { + defer wg.Done() + muxer.Run(ctx, stopper) + }); err != nil { + wg.Done() + t.Fatal(err) + } + return muxer, wg.Wait +} diff --git a/pkg/server/node.go b/pkg/server/node.go index 02831d0e5430..629c6456b06e 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -1889,8 +1889,6 @@ func (n *Node) MuxRangeFeed(stream kvpb.Internal_MuxRangeFeedServer) error { n.metrics.ActiveMuxRangeFeed.Inc(1) defer n.metrics.ActiveMuxRangeFeed.Inc(-1) - var activeStreams sync.Map - for { req, err := stream.Recv() if err != nil { @@ -1898,17 +1896,10 @@ func (n *Node) MuxRangeFeed(stream kvpb.Internal_MuxRangeFeedServer) error { } if req.CloseStream { - // Client issued a request to close previously established stream. - if v, loaded := activeStreams.LoadAndDelete(req.StreamID); loaded { - s := v.(*setRangeIDEventSink) - s.cancel() - } else { - // This is a bit strange, but it could happen if this stream completes - // just before we receive close request. So, just print out a warning. - if log.V(1) { - log.Infof(ctx, "closing unknown rangefeed stream ID %d", req.StreamID) - } - } + // Note that we will call disconnect again when future.Error returns, but + // DisconnectRangefeedWithError will ignore subsequent errors. + streamMuxer.DisconnectRangefeedWithError(req.StreamID, req.RangeID, + kvpb.NewError(kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED))) continue } @@ -1924,48 +1915,14 @@ func (n *Node) MuxRangeFeed(stream kvpb.Internal_MuxRangeFeedServer) error { streamID: req.StreamID, wrapped: muxStream, } - activeStreams.Store(req.StreamID, streamSink) + streamMuxer.AddStream(req.StreamID, cancel) n.metrics.NumMuxRangeFeed.Inc(1) n.metrics.ActiveMuxRangeFeed.Inc(1) f := n.stores.RangeFeed(req, streamSink) f.WhenReady(func(err error) { n.metrics.ActiveMuxRangeFeed.Inc(-1) - - _, loaded := activeStreams.LoadAndDelete(req.StreamID) - streamClosedByClient := !loaded - streamSink.cancel() - - if streamClosedByClient && streamSink.ctx.Err() != nil { - // If the stream was explicitly closed by the client, we expect to see - // context.Canceled error. In this case, return - // kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED to the client. - err = kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED) - } - - if err == nil { - cause := kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED - err = kvpb.NewRangeFeedRetryError(cause) - } - - e := &kvpb.MuxRangeFeedEvent{ - RangeID: req.RangeID, - StreamID: req.StreamID, - } - - e.SetValue(&kvpb.RangeFeedError{ - Error: *kvpb.NewError(err), - }) - - // When rangefeed completes, we must notify the client about that. - // - // NB: even though calling sink.Send() to send notification might seem - // correct, it is also unsafe. This future may be completed at any point, - // including during critical section when some important lock (such as - // raftMu in processor) may be held. Issuing potentially blocking IO - // during that time is not a good idea. Thus, we shunt the notification to - // a dedicated goroutine. - streamMuxer.AppendMuxError(e) + streamMuxer.DisconnectRangefeedWithError(req.StreamID, req.RangeID, kvpb.NewError(err)) }) } }