Skip to content

Commit

Permalink
kvserver/rangefeed: remove lockedRangefeedStream
Browse files Browse the repository at this point in the history
Previously, we created separate locked rangefeed streams for each individual
rangefeed stream to ensure Send can be called concurrently as the underlying
grpc stream is not thread safe. However, since the introduction of the mux
rangefeed support, we already have a dedicated lock for the underlying mux
stream, making the Send method on each rangefeed stream thread safe already.
This patch removes the redundant locks from each individual rangefeed stream.

Epic: none
Release note: none
  • Loading branch information
wenyihu6 committed Jul 2, 2024
1 parent 96aea87 commit 2df6a9c
Show file tree
Hide file tree
Showing 11 changed files with 35 additions and 28 deletions.
2 changes: 2 additions & 0 deletions pkg/kv/kvclient/rangefeed/rangefeed_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1638,6 +1638,8 @@ func (c *channelSink) Context() context.Context {
return c.ctx
}

func (c *channelSink) SendIsThreadSafe() {}

func (c *channelSink) Send(e *kvpb.RangeFeedEvent) error {
select {
case c.ch <- e:
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2524,6 +2524,9 @@ type RangeFeedEventSink interface {
// stream breaks. Send must be safe to call on the same stream in different
// goroutines.
Send(*RangeFeedEvent) error
// SendIsThreadSafe is a no-op declaration method. It is a contract that the
// interface has a thread-safe Send method.
SendIsThreadSafe()
}

// RangeFeedEventProducer is an adapter for receiving rangefeed events with either
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/client_replica_circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,8 @@ func (s *dummyStream) Context() context.Context {
return s.ctx
}

func (s *dummyStream) SendIsThreadSafe() {}

func (s *dummyStream) Send(ev *kvpb.RangeFeedEvent) error {
if ev.Val == nil && ev.Error == nil {
return nil
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/rangefeed/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,3 +207,7 @@ func (s *noopStream) Send(*kvpb.RangeFeedEvent) error {
s.events++
return nil
}

// Note that Send itself is not thread-safe, but it is written to be used only
// in a single threaded environment in this test, ensuring thread-safety.
func (s *noopStream) SendIsThreadSafe() {}
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/rangefeed/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1811,6 +1811,8 @@ func newConsumer(blockAfter int) *consumer {
}
}

func (c *consumer) SendIsThreadSafe() {}

func (c *consumer) Send(e *kvpb.RangeFeedEvent) error {
if e.Val != nil {
v := int(atomic.AddInt32(&c.sentValues, 1))
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/rangefeed/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ func (s *testStream) Cancel() {
s.ctxDone()
}

func (s *testStream) SendIsThreadSafe() {}

func (s *testStream) Send(e *kvpb.RangeFeedEvent) error {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down
23 changes: 1 addition & 22 deletions pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metamorphic"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
Expand Down Expand Up @@ -122,24 +121,6 @@ const defaultEventChanCap = 4096
var defaultEventChanTimeout = envutil.EnvOrDefaultDuration(
"COCKROACH_RANGEFEED_SEND_TIMEOUT", 50*time.Millisecond)

// lockedRangefeedStream is an implementation of rangefeed.Stream which provides
// support for concurrent calls to Send. Note that the default implementation of
// grpc.Stream is not safe for concurrent calls to Send.
type lockedRangefeedStream struct {
wrapped kvpb.RangeFeedEventSink
sendMu syncutil.Mutex
}

func (s *lockedRangefeedStream) Context() context.Context {
return s.wrapped.Context()
}

func (s *lockedRangefeedStream) Send(e *kvpb.RangeFeedEvent) error {
s.sendMu.Lock()
defer s.sendMu.Unlock()
return s.wrapped.Send(e)
}

// rangefeedTxnPusher is a shim around intentResolver that implements the
// rangefeed.TxnPusher interface.
type rangefeedTxnPusher struct {
Expand Down Expand Up @@ -294,8 +275,6 @@ func (r *Replica) RangeFeed(
checkTS = r.Clock().Now()
}

lockedStream := &lockedRangefeedStream{wrapped: stream}

// If we will be using a catch-up iterator, wait for the limiter here before
// locking raftMu.
usingCatchUpIter := false
Expand Down Expand Up @@ -352,7 +331,7 @@ func (r *Replica) RangeFeed(

var done future.ErrorFuture
p := r.registerWithRangefeedRaftMuLocked(
ctx, rSpan, args.Timestamp, catchUpIter, args.WithDiff, args.WithFiltering, omitRemote, lockedStream, &done,
ctx, rSpan, args.Timestamp, catchUpIter, args.WithDiff, args.WithFiltering, omitRemote, stream, &done,
)
r.raftMu.Unlock()

Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/replica_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ func (s *testStream) Cancel() {
s.cancel()
}

func (s *testStream) SendIsThreadSafe() {}

func (s *testStream) Send(e *kvpb.RangeFeedEvent) error {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down
5 changes: 5 additions & 0 deletions pkg/rpc/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,11 @@ func (s *rangefeedEventSink) Context() context.Context {
return s.ctx
}

// Note that Send itself is not thread-safe (grpc stream is not thread-safe),
// but tests were written in a way that sends sequentially, ensuring
// thread-safety for Send.
func (s *rangefeedEventSink) SendIsThreadSafe() {}

func (s *rangefeedEventSink) Send(event *kvpb.RangeFeedEvent) error {
return s.stream.Send(&kvpb.MuxRangeFeedEvent{RangeFeedEvent: *event})
}
Expand Down
16 changes: 10 additions & 6 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1832,10 +1832,10 @@ func (n *Node) RangeLookup(
return resp, nil
}

// setRangeIDEventSink annotates each response with range and stream IDs.
// This is used by MuxRangeFeed.
// TODO: This code can be removed in 22.2 once MuxRangeFeed is the default, and
// the old style RangeFeed deprecated.
// setRangeIDEventSink is an implementation of rangefeed.Stream which annotates
// each response with rangeID and streamID. It is used by MuxRangeFeed. Note
// that the wrapped stream is a locked mux stream, ensuring safe concurrent Send
// calls.
type setRangeIDEventSink struct {
ctx context.Context
cancel context.CancelFunc
Expand All @@ -1857,10 +1857,14 @@ func (s *setRangeIDEventSink) Send(event *kvpb.RangeFeedEvent) error {
return s.wrapped.Send(response)
}

// Wrapped stream is a locked mux stream, ensuring safe concurrent Send.
func (s *setRangeIDEventSink) SendIsThreadSafe() {}

var _ kvpb.RangeFeedEventSink = (*setRangeIDEventSink)(nil)

// lockedMuxStream provides support for concurrent calls to Send.
// The underlying MuxRangeFeedServer is not safe for concurrent calls to Send.
// lockedMuxStream provides support for concurrent calls to Send. The underlying
// MuxRangeFeedServer (default grpc.Stream) is not safe for concurrent calls to
// Send.
type lockedMuxStream struct {
wrapped kvpb.Internal_MuxRangeFeedServer
sendMu syncutil.Mutex
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/flowinfra/flow_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,8 @@ type delayedErrorServerStream struct {
err error
}

func (s *delayedErrorServerStream) SendIsThreadSafe() {}

func (s *delayedErrorServerStream) Send(*execinfrapb.ConsumerSignal) error {
s.rpcCalledCh <- struct{}{}
<-s.delayCh
Expand Down

0 comments on commit 2df6a9c

Please sign in to comment.