From 2df6a9c2d863424884eaaf873ab902cf7bcb8c4c Mon Sep 17 00:00:00 2001 From: Wenyi Hu Date: Fri, 28 Jun 2024 20:30:48 -0400 Subject: [PATCH] kvserver/rangefeed: remove lockedRangefeedStream Previously, we created separate locked rangefeed streams for each individual rangefeed stream to ensure Send can be called concurrently as the underlying grpc stream is not thread safe. However, since the introduction of the mux rangefeed support, we already have a dedicated lock for the underlying mux stream, making the Send method on each rangefeed stream thread safe already. This patch removes the redundant locks from each individual rangefeed stream. Epic: none Release note: none --- .../rangefeed/rangefeed_external_test.go | 2 ++ pkg/kv/kvpb/api.go | 3 +++ .../client_replica_circuit_breaker_test.go | 2 ++ pkg/kv/kvserver/rangefeed/bench_test.go | 4 ++++ pkg/kv/kvserver/rangefeed/processor_test.go | 2 ++ pkg/kv/kvserver/rangefeed/registry_test.go | 2 ++ pkg/kv/kvserver/replica_rangefeed.go | 23 +------------------ pkg/kv/kvserver/replica_rangefeed_test.go | 2 ++ pkg/rpc/context_test.go | 5 ++++ pkg/server/node.go | 16 ++++++++----- pkg/sql/flowinfra/flow_registry_test.go | 2 ++ 11 files changed, 35 insertions(+), 28 deletions(-) diff --git a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go index e5ba37ce8890..5faedb453121 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go @@ -1638,6 +1638,8 @@ func (c *channelSink) Context() context.Context { return c.ctx } +func (c *channelSink) SendIsThreadSafe() {} + func (c *channelSink) Send(e *kvpb.RangeFeedEvent) error { select { case c.ch <- e: diff --git a/pkg/kv/kvpb/api.go b/pkg/kv/kvpb/api.go index 6dc08c93fcd2..0507872f9575 100644 --- a/pkg/kv/kvpb/api.go +++ b/pkg/kv/kvpb/api.go @@ -2524,6 +2524,9 @@ type RangeFeedEventSink interface { // stream breaks. Send must be safe to call on the same stream in different // goroutines. Send(*RangeFeedEvent) error + // SendIsThreadSafe is a no-op declaration method. It is a contract that the + // interface has a thread-safe Send method. + SendIsThreadSafe() } // RangeFeedEventProducer is an adapter for receiving rangefeed events with either diff --git a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go index 44abe30267e1..a4cb5c810eba 100644 --- a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go +++ b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go @@ -456,6 +456,8 @@ func (s *dummyStream) Context() context.Context { return s.ctx } +func (s *dummyStream) SendIsThreadSafe() {} + func (s *dummyStream) Send(ev *kvpb.RangeFeedEvent) error { if ev.Val == nil && ev.Error == nil { return nil diff --git a/pkg/kv/kvserver/rangefeed/bench_test.go b/pkg/kv/kvserver/rangefeed/bench_test.go index 50a2c39cdfec..af59ca71bf5e 100644 --- a/pkg/kv/kvserver/rangefeed/bench_test.go +++ b/pkg/kv/kvserver/rangefeed/bench_test.go @@ -207,3 +207,7 @@ func (s *noopStream) Send(*kvpb.RangeFeedEvent) error { s.events++ return nil } + +// Note that Send itself is not thread-safe, but it is written to be used only +// in a single threaded environment in this test, ensuring thread-safety. +func (s *noopStream) SendIsThreadSafe() {} diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go index 954b179ebcb2..b0e30c67c112 100644 --- a/pkg/kv/kvserver/rangefeed/processor_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_test.go @@ -1811,6 +1811,8 @@ func newConsumer(blockAfter int) *consumer { } } +func (c *consumer) SendIsThreadSafe() {} + func (c *consumer) Send(e *kvpb.RangeFeedEvent) error { if e.Val != nil { v := int(atomic.AddInt32(&c.sentValues, 1)) diff --git a/pkg/kv/kvserver/rangefeed/registry_test.go b/pkg/kv/kvserver/rangefeed/registry_test.go index f3e43fc36a89..4b3e76dede3a 100644 --- a/pkg/kv/kvserver/rangefeed/registry_test.go +++ b/pkg/kv/kvserver/rangefeed/registry_test.go @@ -65,6 +65,8 @@ func (s *testStream) Cancel() { s.ctxDone() } +func (s *testStream) SendIsThreadSafe() {} + func (s *testStream) Send(e *kvpb.RangeFeedEvent) error { s.mu.Lock() defer s.mu.Unlock() diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index e11ca36b9a9f..6e6f5475a505 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -37,7 +37,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metamorphic" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" @@ -122,24 +121,6 @@ const defaultEventChanCap = 4096 var defaultEventChanTimeout = envutil.EnvOrDefaultDuration( "COCKROACH_RANGEFEED_SEND_TIMEOUT", 50*time.Millisecond) -// lockedRangefeedStream is an implementation of rangefeed.Stream which provides -// support for concurrent calls to Send. Note that the default implementation of -// grpc.Stream is not safe for concurrent calls to Send. -type lockedRangefeedStream struct { - wrapped kvpb.RangeFeedEventSink - sendMu syncutil.Mutex -} - -func (s *lockedRangefeedStream) Context() context.Context { - return s.wrapped.Context() -} - -func (s *lockedRangefeedStream) Send(e *kvpb.RangeFeedEvent) error { - s.sendMu.Lock() - defer s.sendMu.Unlock() - return s.wrapped.Send(e) -} - // rangefeedTxnPusher is a shim around intentResolver that implements the // rangefeed.TxnPusher interface. type rangefeedTxnPusher struct { @@ -294,8 +275,6 @@ func (r *Replica) RangeFeed( checkTS = r.Clock().Now() } - lockedStream := &lockedRangefeedStream{wrapped: stream} - // If we will be using a catch-up iterator, wait for the limiter here before // locking raftMu. usingCatchUpIter := false @@ -352,7 +331,7 @@ func (r *Replica) RangeFeed( var done future.ErrorFuture p := r.registerWithRangefeedRaftMuLocked( - ctx, rSpan, args.Timestamp, catchUpIter, args.WithDiff, args.WithFiltering, omitRemote, lockedStream, &done, + ctx, rSpan, args.Timestamp, catchUpIter, args.WithDiff, args.WithFiltering, omitRemote, stream, &done, ) r.raftMu.Unlock() diff --git a/pkg/kv/kvserver/replica_rangefeed_test.go b/pkg/kv/kvserver/replica_rangefeed_test.go index 08d188b65849..2ba4716d6d71 100644 --- a/pkg/kv/kvserver/replica_rangefeed_test.go +++ b/pkg/kv/kvserver/replica_rangefeed_test.go @@ -79,6 +79,8 @@ func (s *testStream) Cancel() { s.cancel() } +func (s *testStream) SendIsThreadSafe() {} + func (s *testStream) Send(e *kvpb.RangeFeedEvent) error { s.mu.Lock() defer s.mu.Unlock() diff --git a/pkg/rpc/context_test.go b/pkg/rpc/context_test.go index f16f56c683e5..958cb29dc738 100644 --- a/pkg/rpc/context_test.go +++ b/pkg/rpc/context_test.go @@ -288,6 +288,11 @@ func (s *rangefeedEventSink) Context() context.Context { return s.ctx } +// Note that Send itself is not thread-safe (grpc stream is not thread-safe), +// but tests were written in a way that sends sequentially, ensuring +// thread-safety for Send. +func (s *rangefeedEventSink) SendIsThreadSafe() {} + func (s *rangefeedEventSink) Send(event *kvpb.RangeFeedEvent) error { return s.stream.Send(&kvpb.MuxRangeFeedEvent{RangeFeedEvent: *event}) } diff --git a/pkg/server/node.go b/pkg/server/node.go index 7021625a552b..c6e587f05857 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -1832,10 +1832,10 @@ func (n *Node) RangeLookup( return resp, nil } -// setRangeIDEventSink annotates each response with range and stream IDs. -// This is used by MuxRangeFeed. -// TODO: This code can be removed in 22.2 once MuxRangeFeed is the default, and -// the old style RangeFeed deprecated. +// setRangeIDEventSink is an implementation of rangefeed.Stream which annotates +// each response with rangeID and streamID. It is used by MuxRangeFeed. Note +// that the wrapped stream is a locked mux stream, ensuring safe concurrent Send +// calls. type setRangeIDEventSink struct { ctx context.Context cancel context.CancelFunc @@ -1857,10 +1857,14 @@ func (s *setRangeIDEventSink) Send(event *kvpb.RangeFeedEvent) error { return s.wrapped.Send(response) } +// Wrapped stream is a locked mux stream, ensuring safe concurrent Send. +func (s *setRangeIDEventSink) SendIsThreadSafe() {} + var _ kvpb.RangeFeedEventSink = (*setRangeIDEventSink)(nil) -// lockedMuxStream provides support for concurrent calls to Send. -// The underlying MuxRangeFeedServer is not safe for concurrent calls to Send. +// lockedMuxStream provides support for concurrent calls to Send. The underlying +// MuxRangeFeedServer (default grpc.Stream) is not safe for concurrent calls to +// Send. type lockedMuxStream struct { wrapped kvpb.Internal_MuxRangeFeedServer sendMu syncutil.Mutex diff --git a/pkg/sql/flowinfra/flow_registry_test.go b/pkg/sql/flowinfra/flow_registry_test.go index 877158806323..e48a1197814b 100644 --- a/pkg/sql/flowinfra/flow_registry_test.go +++ b/pkg/sql/flowinfra/flow_registry_test.go @@ -717,6 +717,8 @@ type delayedErrorServerStream struct { err error } +func (s *delayedErrorServerStream) SendIsThreadSafe() {} + func (s *delayedErrorServerStream) Send(*execinfrapb.ConsumerSignal) error { s.rpcCalledCh <- struct{}{} <-s.delayCh