diff --git a/pkg/kv/kvserver/rangefeed/BUILD.bazel b/pkg/kv/kvserver/rangefeed/BUILD.bazel index 49b36d2d72e7..757d873a5b68 100644 --- a/pkg/kv/kvserver/rangefeed/BUILD.bazel +++ b/pkg/kv/kvserver/rangefeed/BUILD.bazel @@ -63,6 +63,7 @@ go_test( srcs = [ "bench_test.go", "budget_test.go", + "buffered_sender_test.go", "catchup_scan_bench_test.go", "catchup_scan_test.go", "event_queue_test.go", diff --git a/pkg/kv/kvserver/rangefeed/buffered_sender.go b/pkg/kv/kvserver/rangefeed/buffered_sender.go index e57f4d8651aa..0fa31a4d2cd6 100644 --- a/pkg/kv/kvserver/rangefeed/buffered_sender.go +++ b/pkg/kv/kvserver/rangefeed/buffered_sender.go @@ -7,9 +7,13 @@ package rangefeed import ( "context" + "time" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" ) // ┌─────────────────┐ @@ -44,38 +48,140 @@ import ( // BufferedSender is embedded in every rangefeed.BufferedPerRangeEventSink, // serving as a helper which buffers events before forwarding events to the // underlying gRPC stream. -// -// Refer to the comments above UnbufferedSender for more details on the role of -// senders in the entire rangefeed architecture. type BufferedSender struct { // Note that lockedMuxStream wraps the underlying grpc server stream, ensuring // thread safety. sender ServerStreamSender + + // queueMu protects the buffer queue. + queueMu struct { + syncutil.Mutex + stopped bool + buffer *eventQueue + } + + // notifyDataC is used to notify the BufferedSender.run goroutine that there + // are events to send. Channel is initialised with a buffer of 1 and all writes to it + // are non-blocking. + notifyDataC chan struct{} } func NewBufferedSender(sender ServerStreamSender) *BufferedSender { bs := &BufferedSender{ sender: sender, } + bs.queueMu.buffer = newEventQueue() + bs.notifyDataC = make(chan struct{}, 1) return bs } +// sendBuffered buffers the event before sending it to the underlying +// gRPC stream. It does not block. sendBuffered will take the +// ownership of the alloc and release it if the returned error is +// non-nil. It only errors in the case of an already stopped stream. func (bs *BufferedSender) sendBuffered( ev *kvpb.MuxRangeFeedEvent, alloc *SharedBudgetAllocation, ) error { - panic("unimplemented: buffered sender for rangefeed #126560") + bs.queueMu.Lock() + defer bs.queueMu.Unlock() + if bs.queueMu.stopped { + return errors.New("stream sender is stopped") + } + // TODO(wenyihu6): pass an actual context here + alloc.Use(context.Background()) + bs.queueMu.buffer.pushBack(sharedMuxEvent{ev, alloc}) + select { + case bs.notifyDataC <- struct{}{}: + default: + } + return nil } +// sendUnbuffered sends the event directly to the underlying +// ServerStreamSender. It bypasses the buffer and thus may block. func (bs *BufferedSender) sendUnbuffered(ev *kvpb.MuxRangeFeedEvent) error { - panic("unimplemented: buffered sender for rangefeed #126560") + return bs.sender.Send(ev) } +// run loops until the sender or stopper signal teardown. In each +// iteration, it waits for events to enter the buffer and moves them +// to the sender. func (bs *BufferedSender) run( ctx context.Context, stopper *stop.Stopper, onError func(streamID int64), ) error { - panic("unimplemented: buffered sender for rangefeed #126560") + for { + select { + case <-ctx.Done(): + // Top level goroutine will receive the context cancellation and handle + // ctx.Err(). + return nil + case <-stopper.ShouldQuiesce(): + // Top level goroutine will receive the stopper quiesce signal and handle + // error. + return nil + case <-bs.notifyDataC: + for { + e, success := bs.popFront() + if success { + err := bs.sender.Send(e.ev) + e.alloc.Release(ctx) + if e.ev.Error != nil { + onError(e.ev.StreamID) + } + if err != nil { + return err + } + } else { + break + } + } + } + } +} + +// popFront pops the front event from the buffer queue. It returns the event and +// a boolean indicating if the event was successfully popped. +func (bs *BufferedSender) popFront() (e sharedMuxEvent, success bool) { + bs.queueMu.Lock() + defer bs.queueMu.Unlock() + event, ok := bs.queueMu.buffer.popFront() + return event, ok } +// cleanup is called when the sender is stopped. It is expected to free up +// buffer queue and no new events should be buffered after this. func (bs *BufferedSender) cleanup(ctx context.Context) { - panic("unimplemented: buffered sender for rangefeed #126560") + bs.queueMu.Lock() + defer bs.queueMu.Unlock() + bs.queueMu.stopped = true + bs.queueMu.buffer.drain(ctx) +} + +// Used for testing only. +func (bs *BufferedSender) len() int { + bs.queueMu.Lock() + defer bs.queueMu.Unlock() + return int(bs.queueMu.buffer.len()) +} + +// Used for testing only. +func (bs *BufferedSender) waitForEmptyBuffer(ctx context.Context) error { + opts := retry.Options{ + InitialBackoff: 5 * time.Millisecond, + Multiplier: 2, + MaxBackoff: 10 * time.Second, + MaxRetries: 50, + } + for re := retry.StartWithCtx(ctx, opts); re.Next(); { + bs.queueMu.Lock() + caughtUp := bs.queueMu.buffer.len() == 0 // nolint:deferunlockcheck + bs.queueMu.Unlock() + if caughtUp { + return nil + } + } + if err := ctx.Err(); err != nil { + return err + } + return errors.New("buffered sender failed to send in time") } diff --git a/pkg/kv/kvserver/rangefeed/buffered_sender_test.go b/pkg/kv/kvserver/rangefeed/buffered_sender_test.go new file mode 100644 index 000000000000..5b8517c71994 --- /dev/null +++ b/pkg/kv/kvserver/rangefeed/buffered_sender_test.go @@ -0,0 +1,164 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package rangefeed + +import ( + "context" + "sync" + "sync/atomic" + "testing" + + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +// TestBufferedSenderWithSendBufferedError tests that BufferedSender can handle stream +// disconnects properly including context canceled, metrics updates, rangefeed +// cleanup. +func TestBufferedSenderDisconnectStream(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + testServerStream := newTestServerStream() + testRangefeedCounter := newTestRangefeedCounter() + bs := NewBufferedSender(testServerStream) + sm := NewStreamManager(bs, testRangefeedCounter) + require.NoError(t, sm.Start(ctx, stopper)) + defer sm.Stop(ctx) + + const streamID = 0 + err := kvpb.NewError(kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_NO_LEASEHOLDER)) + errEvent := makeMuxRangefeedErrorEvent(int64(streamID), 1, err) + + t.Run("basic operation", func(t *testing.T) { + var num atomic.Int32 + sm.AddStream(int64(streamID), &cancelCtxDisconnector{ + cancel: func() { + num.Add(1) + require.NoError(t, sm.sender.sendBuffered(errEvent, nil)) + }, + }) + require.Equal(t, 1, testRangefeedCounter.get()) + require.Equal(t, 0, bs.len()) + sm.DisconnectStream(int64(streamID), err) + testServerStream.waitForEvent(t, errEvent) + require.Equal(t, int32(1), num.Load()) + require.Equal(t, 1, testServerStream.totalEventsSent()) + testRangefeedCounter.waitForRangefeedCount(t, 0) + testServerStream.reset() + }) + t.Run("disconnect stream on the same stream is idempotent", func(t *testing.T) { + sm.AddStream(int64(streamID), &cancelCtxDisconnector{ + cancel: func() { + require.NoError(t, sm.sender.sendBuffered(errEvent, nil)) + }, + }) + require.Equal(t, 1, testRangefeedCounter.get()) + sm.DisconnectStream(int64(streamID), err) + require.NoError(t, bs.waitForEmptyBuffer(ctx)) + sm.DisconnectStream(int64(streamID), err) + require.NoError(t, bs.waitForEmptyBuffer(ctx)) + require.Equal(t, 1, testServerStream.totalEventsSent()) + testRangefeedCounter.waitForRangefeedCount(t, 0) + }) +} + +func TestBufferedSenderChaosWithStop(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + testServerStream := newTestServerStream() + testRangefeedCounter := newTestRangefeedCounter() + bs := NewBufferedSender(testServerStream) + sm := NewStreamManager(bs, testRangefeedCounter) + require.NoError(t, sm.Start(ctx, stopper)) + + rng, _ := randutil.NewTestRand() + + // [activeStreamStart,activeStreamEnd) are in the active streams. + // activeStreamStart <= activeStreamEnd. If activeStreamStart == + // activeStreamEnd, no streams are active yet. [0, activeStreamStart) are + // disconnected. + var actualSum atomic.Int32 + activeStreamStart := int64(0) + activeStreamEnd := int64(0) + + t.Run("mixed operations of add and disconnect stream", func(t *testing.T) { + const ops = 1000 + var wg sync.WaitGroup + for i := 0; i < ops; i++ { + addStream := rng.Intn(2) == 0 + require.LessOrEqualf(t, activeStreamStart, activeStreamEnd, "test programming error") + if addStream || activeStreamStart == activeStreamEnd { + streamID := activeStreamEnd + sm.AddStream(streamID, &cancelCtxDisconnector{ + cancel: func() { + actualSum.Add(1) + _ = sm.sender.sendBuffered( + makeMuxRangefeedErrorEvent(streamID, 1, newErrBufferCapacityExceeded()), nil) + }, + }) + activeStreamEnd++ + } else { + wg.Add(1) + go func(id int64) { + defer wg.Done() + sm.DisconnectStream(id, newErrBufferCapacityExceeded()) + }(activeStreamStart) + activeStreamStart++ + } + } + + wg.Wait() + require.Equal(t, int32(activeStreamStart), actualSum.Load()) + + require.NoError(t, bs.waitForEmptyBuffer(ctx)) + // We stop the sender as a way to syncronize the send + // loop. While we've waiting for the buffer to be + // empty, we also need to know that the sender is done + // handling the last message that it processed before + // we observe any of the counters on the test stream. + stopper.Stop(ctx) + require.Equal(t, activeStreamStart, int64(testServerStream.totalEventsSent())) + expectedActiveStreams := activeStreamEnd - activeStreamStart + require.Equal(t, int(expectedActiveStreams), sm.activeStreamCount()) + testRangefeedCounter.waitForRangefeedCount(t, int(expectedActiveStreams)) + }) + + t.Run("stream manager on stop", func(t *testing.T) { + sm.Stop(ctx) + require.Equal(t, 0, testRangefeedCounter.get()) + require.Equal(t, 0, sm.activeStreamCount()) + // Cleanup functions should be called for all active streams. + require.Equal(t, int32(activeStreamEnd), actualSum.Load()) + // No error events should be sent during Stop(). + require.Equal(t, activeStreamStart, int64(testServerStream.totalEventsSent())) + + }) + + t.Run("stream manager after stop", func(t *testing.T) { + // No events should be buffered after stopped. + val1 := roachpb.Value{RawBytes: []byte("val"), Timestamp: hlc.Timestamp{WallTime: 1}} + ev1 := new(kvpb.RangeFeedEvent) + ev1.MustSetValue(&kvpb.RangeFeedValue{Key: keyA, Value: val1}) + muxEv := &kvpb.MuxRangeFeedEvent{RangeFeedEvent: *ev1, RangeID: 0, StreamID: 1} + require.Equal(t, bs.sendBuffered(muxEv, nil).Error(), errors.New("stream sender is stopped").Error()) + require.Equal(t, 0, bs.len()) + }) +}