Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
126491: kvserver/rangefeed: add rangeID as part of activeStreams map r=nvanbenschoten a=wenyihu6

This patch adds rangeID into the activeStreams map without changing any existing
behavior. The main purpose is to make future commits cleaner. RangeID will be
needed when we enable streamMuxer to shut down all streams.


Part of: cockroachdb#126560
Release note: none

Co-authored-by: Wenyi Hu <[email protected]>
  • Loading branch information
craig[bot] and wenyihu6 committed Jul 29, 2024
2 parents 1cab70f + 89d6cd4 commit cc46011
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 10 deletions.
25 changes: 19 additions & 6 deletions pkg/kv/kvserver/rangefeed/stream_muxer.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ type StreamMuxer struct {
// metrics is used to record rangefeed metrics for the node.
metrics RangefeedMetricsRecorder

// streamID -> context.CancelFunc for active rangefeeds
activeStreams syncutil.Map[int64, context.CancelFunc]
// streamID -> streamInfo for active rangefeeds
activeStreams syncutil.Map[int64, streamInfo]

// notifyMuxError is a buffered channel of size 1 used to signal the presence
// of muxErrors. Additional signals are dropped if the channel is already full
Expand All @@ -142,12 +142,24 @@ func NewStreamMuxer(sender ServerStreamSender, metrics RangefeedMetricsRecorder)
}
}

// streamInfo contains the rangeID and cancel function for an active rangefeed.
// It should be treated as immutable.
type streamInfo struct {
rangeID roachpb.RangeID
cancel context.CancelFunc
}

// AddStream registers a server rangefeed stream with the StreamMuxer. It
// remains active until DisconnectStreamWithError is called with the same
// streamID. Caller must ensure no duplicate stream IDs are added without
// disconnecting the old one first.
func (sm *StreamMuxer) AddStream(streamID int64, cancel context.CancelFunc) {
if _, loaded := sm.activeStreams.LoadOrStore(streamID, &cancel); loaded {
func (sm *StreamMuxer) AddStream(
streamID int64, rangeID roachpb.RangeID, cancel context.CancelFunc,
) {
if _, loaded := sm.activeStreams.LoadOrStore(streamID, &streamInfo{
rangeID: rangeID,
cancel: cancel,
}); loaded {
log.Fatalf(context.Background(), "stream %d already exists", streamID)
}
sm.metrics.UpdateMetricsOnRangefeedConnect()
Expand Down Expand Up @@ -201,8 +213,9 @@ func (sm *StreamMuxer) appendMuxError(e *kvpb.MuxRangeFeedEvent) {
func (sm *StreamMuxer) DisconnectStreamWithError(
streamID int64, rangeID roachpb.RangeID, err *kvpb.Error,
) {
if cancelFunc, ok := sm.activeStreams.LoadAndDelete(streamID); ok {
(*cancelFunc)()
if stream, ok := sm.activeStreams.LoadAndDelete(streamID); ok {
// Fine to skip nil checking here since that would be a programming error.
stream.cancel()
clientErrorEvent := transformRangefeedErrToClientError(err)
ev := &kvpb.MuxRangeFeedEvent{
StreamID: streamID,
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/rangefeed/stream_muxer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestStreamMuxer(t *testing.T) {
const streamID = 0
const rangeID = 1
streamCtx, cancel := context.WithCancel(context.Background())
muxer.AddStream(0, cancel)
muxer.AddStream(streamID, rangeID, cancel)
// Note that kvpb.NewError(nil) == nil.
require.Equal(t, testRangefeedCounter.get(), int32(1))
muxer.DisconnectStreamWithError(streamID, rangeID, kvpb.NewError(nil))
Expand Down Expand Up @@ -86,7 +86,7 @@ func TestStreamMuxer(t *testing.T) {
require.Equal(t, testRangefeedCounter.get(), int32(0))

for _, muxError := range testRangefeedCompletionErrors {
muxer.AddStream(muxError.streamID, func() {})
muxer.AddStream(muxError.streamID, muxError.rangeID, func() {})
}

require.Equal(t, testRangefeedCounter.get(), int32(3))
Expand Down Expand Up @@ -140,7 +140,7 @@ func TestStreamMuxerOnBlockingIO(t *testing.T) {
const streamID = 0
const rangeID = 1
streamCtx, streamCancel := context.WithCancel(context.Background())
muxer.AddStream(0, streamCancel)
muxer.AddStream(0, rangeID, streamCancel)

ev := &kvpb.MuxRangeFeedEvent{
StreamID: streamID,
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -2027,7 +2027,7 @@ func (n *Node) MuxRangeFeed(stream kvpb.Internal_MuxRangeFeedServer) error {
streamID: req.StreamID,
wrapped: streamMuxer,
}
streamMuxer.AddStream(req.StreamID, cancel)
streamMuxer.AddStream(req.StreamID, req.RangeID, cancel)

// Rangefeed attempts to register rangefeed a request over the specified
// span. If registration fails, it returns an error. Otherwise, it returns
Expand Down

0 comments on commit cc46011

Please sign in to comment.