Skip to content

Commit

Permalink
kvserver/rangefeed: move active streams to stream muxer
Browse files Browse the repository at this point in the history
This patch moves the existing active stream management to StreamMuxer
without changing any existing behaviour. The main purpose is to make future
commits cleaner.

Part of: cockroachdb#126561
Epic: none
Release note: none
  • Loading branch information
wenyihu6 committed Jul 9, 2024
1 parent efc6a74 commit 8fb9efb
Show file tree
Hide file tree
Showing 5 changed files with 356 additions and 53 deletions.
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/rangefeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
"scheduled_processor.go",
"scheduler.go",
"stream_muxer.go",
"stream_muxer_test_helper.go",
"task.go",
"testutil.go",
],
Expand Down Expand Up @@ -65,6 +66,7 @@ go_test(
"registry_test.go",
"resolved_timestamp_test.go",
"scheduler_test.go",
"stream_muxer_test.go",
"task_test.go",
],
embed = [":rangefeed"],
Expand Down
68 changes: 64 additions & 4 deletions pkg/kv/kvserver/rangefeed/stream_muxer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ package rangefeed

import (
"context"
"sync"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand Down Expand Up @@ -88,6 +90,9 @@ type StreamMuxer struct {
// thread safety.
sender ServerStreamSender

// streamID -> context.CancelFunc for active rangefeeds
activeStreams sync.Map

// notifyMuxError is a buffered channel of size 1 used to signal the presence
// of muxErrors. Additional signals are dropped if the channel is already full
// so that it's unblocking.
Expand All @@ -110,20 +115,74 @@ func NewStreamMuxer(sender ServerStreamSender) *StreamMuxer {
}
}

// AppendMuxError appends a mux rangefeed completion error to be sent back to
// AddStream registers a server rangefeed stream with the StreamMuxer. It
// remains active until DisconnectRangefeedWithError is called with the same
// streamID. Caller must ensure no duplicate stream IDs are added without
// disconnecting the old one first.
func (sm *StreamMuxer) AddStream(streamID int64, cancel context.CancelFunc) {
if _, loaded := sm.activeStreams.LoadOrStore(streamID, cancel); loaded {
log.Fatalf(context.Background(), "stream %d already exists", streamID)
}

}

// transformRangefeedErrToClientError converts a rangefeed error to a client
// error to be sent back to client. This also handles nil values, preventing nil
// pointer dereference.
func transformRangefeedErrToClientError(err *kvpb.Error) *kvpb.Error {
if err == nil {
// When processor is stopped when it no longer has any registrations, it
// would attempt to close all feeds again with a nil error. Theoretically,
// this should never happen as processor would always stop with a reason if
// feeds are active.
return kvpb.NewError(kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED))
}
return err
}

// appendMuxError appends a mux rangefeed completion error to be sent back to
// the client. Note that this method cannot block on IO. If the underlying
// stream is broken, the error will be dropped.
func (sm *StreamMuxer) AppendMuxError(e *kvpb.MuxRangeFeedEvent) {
func (sm *StreamMuxer) appendMuxError(e *kvpb.MuxRangeFeedEvent) {
sm.mu.Lock()
defer sm.mu.Unlock()
sm.mu.muxErrors = append(sm.mu.muxErrors, e)
// Note that notify is unblocking.
// Note that notifyMuxError is non-blocking.
select {
case sm.notifyMuxError <- struct{}{}:
default:
}
}

// DisconnectRangefeedWithError disconnects a stream with an error. Safe to call
// repeatedly for the same stream, but subsequent errors are ignored. It ensures
// 1. the stream context is cancelled 2. exactly one error is sent back to the
// client on behalf of the stream.
//
// Note that this function can be called by the processor worker while holding
// raftMu, so it is important that this function doesn't block IO. It does so by
// delegating the responsibility of sending mux error to StreamMuxer.Run.
func (sm *StreamMuxer) DisconnectRangefeedWithError(
streamID int64, rangeID roachpb.RangeID, err *kvpb.Error,
) {
if cancelFunc, ok := sm.activeStreams.LoadAndDelete(streamID); ok {
f, ok := cancelFunc.(context.CancelFunc)
if !ok {
log.Fatalf(context.Background(), "unexpected stream type %T", cancelFunc)
}
f()
clientErrorEvent := transformRangefeedErrToClientError(err)
ev := &kvpb.MuxRangeFeedEvent{
StreamID: streamID,
RangeID: rangeID,
}
ev.MustSetValue(&kvpb.RangeFeedError{
Error: *clientErrorEvent,
})
sm.appendMuxError(ev)
}
}

// detachMuxErrors returns muxErrors and clears the slice. Caller must ensure
// the returned errors are sent back to the client.
func (sm *StreamMuxer) detachMuxErrors() []*kvpb.MuxRangeFeedEvent {
Expand Down Expand Up @@ -155,7 +214,8 @@ func (sm *StreamMuxer) Run(ctx context.Context, stopper *stop.Stopper) {
for {
select {
case <-sm.notifyMuxError:
for _, clientErr := range sm.detachMuxErrors() {
toSend := sm.detachMuxErrors()
for _, clientErr := range toSend {
if err := sm.sender.Send(clientErr); err != nil {
log.Errorf(ctx,
"failed to send rangefeed completion error back to client due to broken stream: %v", err)
Expand Down
171 changes: 171 additions & 0 deletions pkg/kv/kvserver/rangefeed/stream_muxer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package rangefeed

import (
"context"
"sync"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

// TestStreamMuxer tests that correctly forwards rangefeed completion errors to
// the server stream.
func TestStreamMuxer(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx, cancel := context.WithCancel(context.Background())
stopper := stop.NewStopper()

testServerStream := newTestServerStream()
muxer, cleanUp := NewTestStreamMuxer(t, ctx, stopper, testServerStream)
defer func() {
cancel()
cleanUp()
stopper.Stop(ctx)
}()

t.Run("nil handling", func(t *testing.T) {
const streamID = 0
const rangeID = 1
streamCtx, cancel := context.WithCancel(context.Background())
muxer.AddStream(0, cancel)
// Note that kvpb.NewError(nil) == nil.
muxer.DisconnectRangefeedWithError(streamID, rangeID, kvpb.NewError(nil))
require.Equal(t, context.Canceled, streamCtx.Err())
expectedErrEvent := &kvpb.MuxRangeFeedEvent{
StreamID: streamID,
RangeID: rangeID,
}
expectedErrEvent.MustSetValue(&kvpb.RangeFeedError{
Error: *kvpb.NewError(kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED)),
})
time.Sleep(10 * time.Millisecond)
require.Equal(t, 1, testServerStream.totalEventsSent())
require.True(t, testServerStream.hasEvent(expectedErrEvent))

// Repeat closing the stream does nothing.
muxer.DisconnectRangefeedWithError(streamID, rangeID,
kvpb.NewError(kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED)))
time.Sleep(10 * time.Millisecond)
require.Equal(t, 1, testServerStream.totalEventsSent())
})

t.Run("send rangefeed completion error", func(t *testing.T) {
testRangefeedCompletionErrors := []struct {
streamID int64
rangeID roachpb.RangeID
Error error
}{
{0, 1, kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED)},
{1, 1, context.Canceled},
{2, 2, &kvpb.NodeUnavailableError{}},
}

for _, muxError := range testRangefeedCompletionErrors {
muxer.AddStream(muxError.streamID, func() {})
}

var wg sync.WaitGroup
for _, muxError := range testRangefeedCompletionErrors {
wg.Add(1)
go func(streamID int64, rangeID roachpb.RangeID, err error) {
defer wg.Done()
muxer.DisconnectRangefeedWithError(streamID, rangeID, kvpb.NewError(err))
}(muxError.streamID, muxError.rangeID, muxError.Error)
}
wg.Wait()

for _, muxError := range testRangefeedCompletionErrors {
testutils.SucceedsSoon(t, func() error {
ev := &kvpb.MuxRangeFeedEvent{
StreamID: muxError.streamID,
RangeID: muxError.rangeID,
}
ev.MustSetValue(&kvpb.RangeFeedError{
Error: *kvpb.NewError(muxError.Error),
})
if testServerStream.hasEvent(ev) {
return nil
}
return errors.Newf("expected error %v not found", muxError)
})
}
})
}

// TestStreamMuxerOnBlockingIO tests that the
// StreamMuxer.DisconnectRangefeedWithError doesn't block on IO.
func TestStreamMuxerOnBlockingIO(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx, cancel := context.WithCancel(context.Background())
stopper := stop.NewStopper()

testServerStream := newTestServerStream()
muxer, cleanUp := NewTestStreamMuxer(t, ctx, stopper, testServerStream)
defer func() {
cancel()
cleanUp()
stopper.Stop(ctx)
}()

const streamID = 0
const rangeID = 1
streamCtx, streamCancel := context.WithCancel(context.Background())
muxer.AddStream(0, streamCancel)

ev := &kvpb.MuxRangeFeedEvent{
StreamID: streamID,
RangeID: rangeID,
}
ev.MustSetValue(&kvpb.RangeFeedCheckpoint{
Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("m")},
ResolvedTS: hlc.Timestamp{WallTime: 1},
})
err := muxer.sender.Send(ev)
require.NoError(t, err)
require.Truef(t, testServerStream.hasEvent(ev),
"expected event %v not found in %v", ev, testServerStream)

// Block the stream.
unblock := testServerStream.BlockSend()

// Although stream is blocked, we should be able to disconnect the stream
// without blocking.
muxer.DisconnectRangefeedWithError(streamID, rangeID,
kvpb.NewError(kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_NO_LEASEHOLDER)))
require.Equal(t, streamCtx.Err(), context.Canceled)
unblock()
time.Sleep(100 * time.Millisecond)
expectedErrEvent := &kvpb.MuxRangeFeedEvent{
StreamID: streamID,
RangeID: rangeID,
}
expectedErrEvent.MustSetValue(&kvpb.RangeFeedError{
Error: *kvpb.NewError(kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_NO_LEASEHOLDER)),
})
// Receive the event after unblocking.
require.Truef(t, testServerStream.hasEvent(expectedErrEvent),
"expected event %v not found in %v", ev, testServerStream)
}
Loading

0 comments on commit 8fb9efb

Please sign in to comment.