diff --git a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go index e5ba37ce8890..5faedb453121 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go @@ -1638,6 +1638,8 @@ func (c *channelSink) Context() context.Context { return c.ctx } +func (c *channelSink) SendIsThreadSafe() {} + func (c *channelSink) Send(e *kvpb.RangeFeedEvent) error { select { case c.ch <- e: diff --git a/pkg/kv/kvpb/api.go b/pkg/kv/kvpb/api.go index 6dc08c93fcd2..0507872f9575 100644 --- a/pkg/kv/kvpb/api.go +++ b/pkg/kv/kvpb/api.go @@ -2524,6 +2524,9 @@ type RangeFeedEventSink interface { // stream breaks. Send must be safe to call on the same stream in different // goroutines. Send(*RangeFeedEvent) error + // SendIsThreadSafe is a no-op declaration method. It is a contract that the + // interface has a thread-safe Send method. + SendIsThreadSafe() } // RangeFeedEventProducer is an adapter for receiving rangefeed events with either diff --git a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go index 44abe30267e1..a4cb5c810eba 100644 --- a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go +++ b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go @@ -456,6 +456,8 @@ func (s *dummyStream) Context() context.Context { return s.ctx } +func (s *dummyStream) SendIsThreadSafe() {} + func (s *dummyStream) Send(ev *kvpb.RangeFeedEvent) error { if ev.Val == nil && ev.Error == nil { return nil diff --git a/pkg/kv/kvserver/rangefeed/bench_test.go b/pkg/kv/kvserver/rangefeed/bench_test.go index 50a2c39cdfec..af59ca71bf5e 100644 --- a/pkg/kv/kvserver/rangefeed/bench_test.go +++ b/pkg/kv/kvserver/rangefeed/bench_test.go @@ -207,3 +207,7 @@ func (s *noopStream) Send(*kvpb.RangeFeedEvent) error { s.events++ return nil } + +// Note that Send itself is not thread-safe, but it is written to be used only +// in a single threaded environment in this test, ensuring thread-safety. +func (s *noopStream) SendIsThreadSafe() {} diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go index 954b179ebcb2..b0e30c67c112 100644 --- a/pkg/kv/kvserver/rangefeed/processor_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_test.go @@ -1811,6 +1811,8 @@ func newConsumer(blockAfter int) *consumer { } } +func (c *consumer) SendIsThreadSafe() {} + func (c *consumer) Send(e *kvpb.RangeFeedEvent) error { if e.Val != nil { v := int(atomic.AddInt32(&c.sentValues, 1)) diff --git a/pkg/kv/kvserver/rangefeed/registry_test.go b/pkg/kv/kvserver/rangefeed/registry_test.go index f3e43fc36a89..4b3e76dede3a 100644 --- a/pkg/kv/kvserver/rangefeed/registry_test.go +++ b/pkg/kv/kvserver/rangefeed/registry_test.go @@ -65,6 +65,8 @@ func (s *testStream) Cancel() { s.ctxDone() } +func (s *testStream) SendIsThreadSafe() {} + func (s *testStream) Send(e *kvpb.RangeFeedEvent) error { s.mu.Lock() defer s.mu.Unlock() diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index e11ca36b9a9f..6e6f5475a505 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -37,7 +37,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metamorphic" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" @@ -122,24 +121,6 @@ const defaultEventChanCap = 4096 var defaultEventChanTimeout = envutil.EnvOrDefaultDuration( "COCKROACH_RANGEFEED_SEND_TIMEOUT", 50*time.Millisecond) -// lockedRangefeedStream is an implementation of rangefeed.Stream which provides -// support for concurrent calls to Send. Note that the default implementation of -// grpc.Stream is not safe for concurrent calls to Send. -type lockedRangefeedStream struct { - wrapped kvpb.RangeFeedEventSink - sendMu syncutil.Mutex -} - -func (s *lockedRangefeedStream) Context() context.Context { - return s.wrapped.Context() -} - -func (s *lockedRangefeedStream) Send(e *kvpb.RangeFeedEvent) error { - s.sendMu.Lock() - defer s.sendMu.Unlock() - return s.wrapped.Send(e) -} - // rangefeedTxnPusher is a shim around intentResolver that implements the // rangefeed.TxnPusher interface. type rangefeedTxnPusher struct { @@ -294,8 +275,6 @@ func (r *Replica) RangeFeed( checkTS = r.Clock().Now() } - lockedStream := &lockedRangefeedStream{wrapped: stream} - // If we will be using a catch-up iterator, wait for the limiter here before // locking raftMu. usingCatchUpIter := false @@ -352,7 +331,7 @@ func (r *Replica) RangeFeed( var done future.ErrorFuture p := r.registerWithRangefeedRaftMuLocked( - ctx, rSpan, args.Timestamp, catchUpIter, args.WithDiff, args.WithFiltering, omitRemote, lockedStream, &done, + ctx, rSpan, args.Timestamp, catchUpIter, args.WithDiff, args.WithFiltering, omitRemote, stream, &done, ) r.raftMu.Unlock() diff --git a/pkg/kv/kvserver/replica_rangefeed_test.go b/pkg/kv/kvserver/replica_rangefeed_test.go index 08d188b65849..2ba4716d6d71 100644 --- a/pkg/kv/kvserver/replica_rangefeed_test.go +++ b/pkg/kv/kvserver/replica_rangefeed_test.go @@ -79,6 +79,8 @@ func (s *testStream) Cancel() { s.cancel() } +func (s *testStream) SendIsThreadSafe() {} + func (s *testStream) Send(e *kvpb.RangeFeedEvent) error { s.mu.Lock() defer s.mu.Unlock() diff --git a/pkg/rpc/context_test.go b/pkg/rpc/context_test.go index f16f56c683e5..958cb29dc738 100644 --- a/pkg/rpc/context_test.go +++ b/pkg/rpc/context_test.go @@ -288,6 +288,11 @@ func (s *rangefeedEventSink) Context() context.Context { return s.ctx } +// Note that Send itself is not thread-safe (grpc stream is not thread-safe), +// but tests were written in a way that sends sequentially, ensuring +// thread-safety for Send. +func (s *rangefeedEventSink) SendIsThreadSafe() {} + func (s *rangefeedEventSink) Send(event *kvpb.RangeFeedEvent) error { return s.stream.Send(&kvpb.MuxRangeFeedEvent{RangeFeedEvent: *event}) } diff --git a/pkg/server/node.go b/pkg/server/node.go index 7021625a552b..c6e587f05857 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -1832,10 +1832,10 @@ func (n *Node) RangeLookup( return resp, nil } -// setRangeIDEventSink annotates each response with range and stream IDs. -// This is used by MuxRangeFeed. -// TODO: This code can be removed in 22.2 once MuxRangeFeed is the default, and -// the old style RangeFeed deprecated. +// setRangeIDEventSink is an implementation of rangefeed.Stream which annotates +// each response with rangeID and streamID. It is used by MuxRangeFeed. Note +// that the wrapped stream is a locked mux stream, ensuring safe concurrent Send +// calls. type setRangeIDEventSink struct { ctx context.Context cancel context.CancelFunc @@ -1857,10 +1857,14 @@ func (s *setRangeIDEventSink) Send(event *kvpb.RangeFeedEvent) error { return s.wrapped.Send(response) } +// Wrapped stream is a locked mux stream, ensuring safe concurrent Send. +func (s *setRangeIDEventSink) SendIsThreadSafe() {} + var _ kvpb.RangeFeedEventSink = (*setRangeIDEventSink)(nil) -// lockedMuxStream provides support for concurrent calls to Send. -// The underlying MuxRangeFeedServer is not safe for concurrent calls to Send. +// lockedMuxStream provides support for concurrent calls to Send. The underlying +// MuxRangeFeedServer (default grpc.Stream) is not safe for concurrent calls to +// Send. type lockedMuxStream struct { wrapped kvpb.Internal_MuxRangeFeedServer sendMu syncutil.Mutex diff --git a/pkg/sql/flowinfra/flow_registry_test.go b/pkg/sql/flowinfra/flow_registry_test.go index 877158806323..e48a1197814b 100644 --- a/pkg/sql/flowinfra/flow_registry_test.go +++ b/pkg/sql/flowinfra/flow_registry_test.go @@ -717,6 +717,8 @@ type delayedErrorServerStream struct { err error } +func (s *delayedErrorServerStream) SendIsThreadSafe() {} + func (s *delayedErrorServerStream) Send(*execinfrapb.ConsumerSignal) error { s.rpcCalledCh <- struct{}{} <-s.delayCh