diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go index 122a543ade02..4c0e2105bd59 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go @@ -17,10 +17,12 @@ import ( "sync/atomic" "unsafe" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/future" "github.com/cockroachdb/cockroach/pkg/util/grpcutil" @@ -308,6 +310,17 @@ func (s *activeMuxRangeFeed) start(ctx context.Context, m *rangefeedMuxer) error } continue } + + if m.cfg.knobs.captureMuxRangeFeedRequestSender != nil { + m.cfg.knobs.captureMuxRangeFeedRequestSender( + args.Replica.NodeID, + func(req *kvpb.RangeFeedRequest) error { + conn.mu.Lock() + defer conn.mu.Unlock() + return conn.mu.sender.Send(req) + }) + } + return nil } @@ -441,7 +454,7 @@ func (m *rangefeedMuxer) receiveEventsFromNode( } if m.cfg.knobs.onRangefeedEvent != nil { - skip, err := m.cfg.knobs.onRangefeedEvent(ctx, active.Span, &event.RangeFeedEvent) + skip, err := m.cfg.knobs.onRangefeedEvent(ctx, active.Span, event.StreamID, &event.RangeFeedEvent) if err != nil { return err } @@ -604,3 +617,16 @@ func (c *muxStream) close() []*activeMuxRangeFeed { return toRestart } + +// NewCloseStreamRequest returns a mux rangefeed request to close specified stream. +func NewCloseStreamRequest( + ctx context.Context, st *cluster.Settings, streamID int64, +) (*kvpb.RangeFeedRequest, error) { + if !st.Version.IsActive(ctx, clusterversion.V23_2) { + return nil, errors.Newf("CloseStream request requires cluster version 23.2 or above, found %s", st.Version) + } + return &kvpb.RangeFeedRequest{ + StreamID: streamID, + CloseStream: true, + }, nil +} diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index 2d4d825fac3a..75cf47e2e4c1 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -97,9 +97,13 @@ type rangeFeedConfig struct { // onRangefeedEvent invoked on each rangefeed event. // Returns boolean indicating if event should be skipped or an error // indicating if rangefeed should terminate. - onRangefeedEvent func(ctx context.Context, s roachpb.Span, event *kvpb.RangeFeedEvent) (skip bool, _ error) + // streamID set only for mux rangefeed. + onRangefeedEvent func(ctx context.Context, s roachpb.Span, muxStreamID int64, event *kvpb.RangeFeedEvent) (skip bool, _ error) // metrics overrides rangefeed metrics to use. metrics *DistSenderRangeFeedMetrics + // captureMuxRangeFeedRequestSender is a callback invoked when mux + // rangefeed establishes connection to the node. + captureMuxRangeFeedRequestSender func(nodeID roachpb.NodeID, sender func(req *kvpb.RangeFeedRequest) error) } } @@ -795,7 +799,7 @@ func (ds *DistSender) singleRangeFeed( } if cfg.knobs.onRangefeedEvent != nil { - skip, err := cfg.knobs.onRangefeedEvent(ctx, span, event) + skip, err := cfg.knobs.onRangefeedEvent(ctx, span, 0 /*streamID */, event) if err != nil { return args.Timestamp, err } @@ -871,7 +875,7 @@ var errRestartStuckRange = errors.New("rangefeed restarting due to inactivity") // TestingWithOnRangefeedEvent returns a test only option to modify rangefeed event. func TestingWithOnRangefeedEvent( - fn func(ctx context.Context, s roachpb.Span, event *kvpb.RangeFeedEvent) (skip bool, _ error), + fn func(ctx context.Context, s roachpb.Span, streamID int64, event *kvpb.RangeFeedEvent) (skip bool, _ error), ) RangeFeedOption { return optionFunc(func(c *rangeFeedConfig) { c.knobs.onRangefeedEvent = fn @@ -886,5 +890,15 @@ func TestingWithRangeFeedMetrics(m *DistSenderRangeFeedMetrics) RangeFeedOption }) } +// TestingWithMuxRangeFeedRequestSenderCapture returns a test only option to specify a callback +// that will be invoked when mux establishes connection to a node. +func TestingWithMuxRangeFeedRequestSenderCapture( + fn func(nodeID roachpb.NodeID, capture func(request *kvpb.RangeFeedRequest) error), +) RangeFeedOption { + return optionFunc(func(c *rangeFeedConfig) { + c.knobs.captureMuxRangeFeedRequestSender = fn + }) +} + // TestingMakeRangeFeedMetrics exposes makeDistSenderRangeFeedMetrics for test use. var TestingMakeRangeFeedMetrics = makeDistSenderRangeFeedMetrics diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go index 91da6a87a593..2452db21890b 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go @@ -12,6 +12,8 @@ package kvcoord_test import ( "context" + "math/rand" + "sync" "sync/atomic" "testing" "time" @@ -629,7 +631,7 @@ func TestMuxRangeCatchupScanQuotaReleased(t *testing.T) { enoughErrors := make(chan struct{}) closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startTime, noValuesExpected, true, kvcoord.TestingWithOnRangefeedEvent( - func(_ context.Context, _ roachpb.Span, event *kvpb.RangeFeedEvent) (skip bool, _ error) { + func(_ context.Context, _ roachpb.Span, _ int64, event *kvpb.RangeFeedEvent) (skip bool, _ error) { *event = transientErrEvent if numErrors.Add(1) == numErrsToReturn { close(enoughErrors) @@ -727,7 +729,7 @@ func TestRangeFeedMetricsManagement(t *testing.T) { closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startTime, ignoreValues, useMux, kvcoord.TestingWithRangeFeedMetrics(&metrics), kvcoord.TestingWithOnRangefeedEvent( - func(ctx context.Context, s roachpb.Span, event *kvpb.RangeFeedEvent) (skip bool, _ error) { + func(ctx context.Context, s roachpb.Span, _ int64, event *kvpb.RangeFeedEvent) (skip bool, _ error) { switch t := event.GetValue().(type) { case *kvpb.RangeFeedValue: // If we previously arranged for the range to be skipped (stuck catchup scan), @@ -802,3 +804,135 @@ func TestRangeFeedMetricsManagement(t *testing.T) { require.EqualValues(t, numCatchupToBlock, metrics.RangefeedCatchupRanges.Value()) }) } + +// TestMuxRangeFeedCanCloseStream verifies stream termination functionality in mux rangefeed. +func TestMuxRangeFeedCanCloseStream(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + }) + defer tc.Stopper().Stop(ctx) + + ts := tc.Server(0) + sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + + // Insert 1000 rows, and split them into 10 ranges. + sqlDB.ExecMultiple(t, + `SET CLUSTER SETTING kv.rangefeed.enabled = true`, + `SET CLUSTER SETTING kv.closed_timestamp.target_duration='100ms'`, + `ALTER DATABASE defaultdb CONFIGURE ZONE USING num_replicas = 1`, + `CREATE TABLE foo (key INT PRIMARY KEY)`, + `INSERT INTO foo (key) SELECT * FROM generate_series(1, 1000)`, + `ALTER TABLE foo SPLIT AT (SELECT * FROM generate_series(100, 900, 100))`, + ) + + fooDesc := desctestutils.TestingGetPublicTableDescriptor( + ts.DB(), keys.SystemSQLCodec, "defaultdb", "foo") + fooSpan := fooDesc.PrimaryIndexSpan(keys.SystemSQLCodec) + + frontier, err := span.MakeFrontier(fooSpan) + require.NoError(t, err) + + expectFrontierAdvance := func() { + t.Helper() + // Closed timestamp for range advances every100ms. We'll require frontier to + // advance a bit more thn that. + threshold := frontier.Frontier().AddDuration(250 * time.Millisecond) + testutils.SucceedsWithin(t, func() error { + if frontier.Frontier().Less(threshold) { + return errors.Newf("waiting for frontier advance to at least %s", threshold) + } + return nil + }, 10*time.Second) + } + + var observedStreams sync.Map + var capturedSender atomic.Value + + ignoreValues := func(event kvcoord.RangeFeedMessage) {} + var numRestartStreams atomic.Int32 + + closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, ts.Clock().Now(), ignoreValues, true, + kvcoord.WithMuxRangeFeed(), + kvcoord.TestingWithMuxRangeFeedRequestSenderCapture( + // We expect a single mux sender since we have 1 node in this test. + func(nodeID roachpb.NodeID, capture func(request *kvpb.RangeFeedRequest) error) { + capturedSender.Store(capture) + }, + ), + kvcoord.TestingWithOnRangefeedEvent( + func(ctx context.Context, s roachpb.Span, streamID int64, event *kvpb.RangeFeedEvent) (skip bool, _ error) { + switch t := event.GetValue().(type) { + case *kvpb.RangeFeedCheckpoint: + observedStreams.Store(streamID, nil) + _, err := frontier.Forward(t.Span, t.ResolvedTS) + if err != nil { + return true, err + } + case *kvpb.RangeFeedError: + // Keep track of mux errors due to RangeFeedRetryError_REASON_RANGEFEED_CLOSED. + // Those results when we issue CloseStream request. + err := t.Error.GoError() + log.Infof(ctx, "Got err: %v", err) + var retryErr *kvpb.RangeFeedRetryError + if ok := errors.As(err, &retryErr); ok && retryErr.Reason == kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED { + numRestartStreams.Add(1) + } + } + + return false, nil + }), + ) + defer closeFeed() + + // Wait until we capture mux rangefeed request sender. There should only be 1. + var muxRangeFeedRequestSender func(req *kvpb.RangeFeedRequest) error + testutils.SucceedsWithin(t, func() error { + v, ok := capturedSender.Load().(func(request *kvpb.RangeFeedRequest) error) + if ok { + muxRangeFeedRequestSender = v + return nil + } + return errors.New("waiting to capture mux rangefeed request sender.") + }, 10*time.Second) + + cancelledStreams := make(map[int64]struct{}) + for i := 0; i < 5; i++ { + // Wait for the test frontier to advance. Once it advances, + // we know the rangefeed is started, all ranges are running. + expectFrontierAdvance() + + // Pick some number of streams to close. Since sync.Map iteration order is non-deterministic, + // we'll pick few random streams. + initialClosed := numRestartStreams.Load() + numToCancel := 1 + rand.Int31n(3) + var numCancelled int32 = 0 + observedStreams.Range(func(key any, _ any) bool { + streamID := key.(int64) + if _, wasCancelled := cancelledStreams[streamID]; wasCancelled { + return true // try another stream. + } + numCancelled++ + cancelledStreams[streamID] = struct{}{} + req, err := kvcoord.NewCloseStreamRequest(ctx, ts.ClusterSettings(), streamID) + require.NoError(t, err) + require.NoError(t, muxRangeFeedRequestSender(req)) + return numCancelled < numToCancel + }) + + // Observe numToCancel errors. + testutils.SucceedsWithin(t, func() error { + numRestarted := numRestartStreams.Load() + if numRestarted == initialClosed+numCancelled { + return nil + } + return errors.Newf("waiting for %d streams to be closed (%d so far)", numCancelled, numRestarted-initialClosed) + }, 10*time.Second) + + // When we close the stream(s), the rangefeed server responds with a retryable error. + // Mux rangefeed should retry, and thus we expect frontier to keep advancing. + } +} diff --git a/pkg/kv/kvpb/api.proto b/pkg/kv/kvpb/api.proto index 46993c4a2f28..4488c757647a 100644 --- a/pkg/kv/kvpb/api.proto +++ b/pkg/kv/kvpb/api.proto @@ -2986,6 +2986,13 @@ message RangeFeedRequest { // StreamID is set by the client issuing MuxRangeFeed requests. int64 stream_id = 5 [(gogoproto.customname) = "StreamID"]; + // CloseStream is set by the mux RangeFeed client to indicate that + // the server should close the stream with the specified stream_id. + // When this bit is set, the server should attempt, as best effort, to + // quickly terminate rangefeed for this stream. + // When CloseStream is set, only the StreamID must be set, and + // other fields (such as Span) are ignored. + bool close_stream = 6; } // RangeFeedValue is a variant of RangeFeedEvent that represents an update to diff --git a/pkg/server/node.go b/pkg/server/node.go index 1284d7cfcb86..9a9dd08e55e0 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -1656,6 +1656,10 @@ func (n *Node) RangeLookup( // RangeFeed implements the roachpb.InternalServer interface. func (n *Node) RangeFeed(args *kvpb.RangeFeedRequest, stream kvpb.Internal_RangeFeedServer) error { + if args.StreamID > 0 || args.CloseStream { + return errors.AssertionFailedf("unexpected mux rangefeed arguments set when calling RangeFeed") + } + ctx := n.AnnotateCtx(stream.Context()) ctx = logtags.AddTag(ctx, "r", args.RangeID) ctx = logtags.AddTag(ctx, "s", args.Replica.StoreID) @@ -1685,6 +1689,7 @@ func (n *Node) RangeFeed(args *kvpb.RangeFeedRequest, stream kvpb.Internal_Range // the old style RangeFeed deprecated. type setRangeIDEventSink struct { ctx context.Context + cancel context.CancelFunc rangeID roachpb.RangeID streamID int64 wrapped *lockedMuxStream @@ -1712,10 +1717,6 @@ type lockedMuxStream struct { sendMu syncutil.Mutex } -func (s *lockedMuxStream) Context() context.Context { - return s.wrapped.Context() -} - func (s *lockedMuxStream) Send(e *kvpb.MuxRangeFeedEvent) error { s.sendMu.Lock() defer s.sendMu.Unlock() @@ -1728,7 +1729,7 @@ func (s *lockedMuxStream) Send(e *kvpb.MuxRangeFeedEvent) error { // to avoid blocking on IO (sender.Send) during potentially critical areas. // Thus, the forwarding should happen on a dedicated goroutine. func newMuxRangeFeedCompletionWatcher( - ctx context.Context, stopper *stop.Stopper, sender *lockedMuxStream, + ctx context.Context, stopper *stop.Stopper, send func(e *kvpb.MuxRangeFeedEvent) error, ) (doneFn func(event *kvpb.MuxRangeFeedEvent), cleanup func(), _ error) { // structure to help coordination of event forwarding and shutdown. var fin = struct { @@ -1751,14 +1752,12 @@ func newMuxRangeFeedCompletionWatcher( toSend, fin.completed = fin.completed, nil fin.Unlock() for _, e := range toSend { - if err := sender.Send(e); err != nil { + if err := send(e); err != nil { // If we failed to send, there is nothing else we can do. // The stream is broken anyway. return } } - case <-sender.wrapped.Context().Done(): - return case <-ctx.Done(): return case <-stopper.ShouldQuiesce(): @@ -1795,7 +1794,12 @@ func newMuxRangeFeedCompletionWatcher( func (n *Node) MuxRangeFeed(stream kvpb.Internal_MuxRangeFeedServer) error { muxStream := &lockedMuxStream{wrapped: stream} - rangefeedCompleted, cleanup, err := newMuxRangeFeedCompletionWatcher(stream.Context(), n.stopper, muxStream) + // All context created below should derive from this context, which is + // cancelled once MuxRangeFeed exits. + ctx, cancel := context.WithCancel(n.AnnotateCtx(stream.Context())) + defer cancel() + + rangefeedCompleted, cleanup, err := newMuxRangeFeedCompletionWatcher(ctx, n.stopper, muxStream.Send) if err != nil { return err } @@ -1805,31 +1809,63 @@ func (n *Node) MuxRangeFeed(stream kvpb.Internal_MuxRangeFeedServer) error { n.metrics.ActiveMuxRangeFeed.Inc(1) defer n.metrics.ActiveMuxRangeFeed.Inc(-1) + var activeStreams sync.Map + for { req, err := stream.Recv() if err != nil { return err } - streamCtx := n.AnnotateCtx(stream.Context()) + if req.CloseStream { + // Client issued a request to close previously established stream. + if v, loaded := activeStreams.LoadAndDelete(req.StreamID); loaded { + s := v.(*setRangeIDEventSink) + s.cancel() + } else { + // This is a bit strange, but it could happen if this stream completes + // just before we receive close request. So, just print out a warning. + if log.V(1) { + log.Infof(ctx, "closing unknown rangefeed stream ID %d", req.StreamID) + } + } + continue + } + + streamCtx, cancel := context.WithCancel(ctx) streamCtx = logtags.AddTag(streamCtx, "r", req.RangeID) streamCtx = logtags.AddTag(streamCtx, "s", req.Replica.StoreID) + streamCtx = logtags.AddTag(streamCtx, "sid", req.StreamID) - sink := setRangeIDEventSink{ + streamSink := &setRangeIDEventSink{ ctx: streamCtx, + cancel: cancel, rangeID: req.RangeID, streamID: req.StreamID, wrapped: muxStream, } + activeStreams.Store(req.StreamID, streamSink) n.metrics.NumMuxRangeFeed.Inc(1) n.metrics.ActiveMuxRangeFeed.Inc(1) - f := n.stores.RangeFeed(req, &sink) + f := n.stores.RangeFeed(req, streamSink) f.WhenReady(func(err error) { n.metrics.ActiveMuxRangeFeed.Inc(-1) + + _, loaded := activeStreams.LoadAndDelete(req.StreamID) + streamClosedByClient := !loaded + streamSink.cancel() + + if streamClosedByClient && streamSink.ctx.Err() != nil { + // If the stream was explicitly closed by the client, we expect to see + // context.Canceled error. In this case, return + // kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED to the client. + err = kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED) + } + if err == nil { cause := kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED - if !n.storeCfg.Settings.Version.IsActive(stream.Context(), clusterversion.V23_2) { + if !n.storeCfg.Settings.Version.IsActive(ctx, clusterversion.V23_2) { cause = kvpb.RangeFeedRetryError_REASON_REPLICA_REMOVED } err = kvpb.NewRangeFeedRetryError(cause)