Skip to content

Commit

Permalink
kvserver/rangefeed: add node level buffered sender
Browse files Browse the repository at this point in the history
This patch adds a node level BufferedSender which uses a queue
to buffer events before forwarding events to underlying grpc
stream.

Part of: #110432
Release note: None

Co-authored-by: Steven Danna <[email protected]>
  • Loading branch information
wenyihu6 and stevendanna committed Dec 5, 2024
1 parent 844d763 commit 923e459
Show file tree
Hide file tree
Showing 3 changed files with 278 additions and 7 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/rangefeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ go_test(
srcs = [
"bench_test.go",
"budget_test.go",
"buffered_sender_test.go",
"catchup_scan_bench_test.go",
"catchup_scan_test.go",
"event_queue_test.go",
Expand Down
120 changes: 113 additions & 7 deletions pkg/kv/kvserver/rangefeed/buffered_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@ package rangefeed

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
)

// ┌─────────────────┐
Expand Down Expand Up @@ -44,38 +48,140 @@ import (
// BufferedSender is embedded in every rangefeed.BufferedPerRangeEventSink,
// serving as a helper which buffers events before forwarding events to the
// underlying gRPC stream.
//
// Refer to the comments above UnbufferedSender for more details on the role of
// senders in the entire rangefeed architecture.
type BufferedSender struct {
// Note that lockedMuxStream wraps the underlying grpc server stream, ensuring
// thread safety.
sender ServerStreamSender

// queueMu protects the buffer queue.
queueMu struct {
syncutil.Mutex
stopped bool
buffer *eventQueue
}

// notifyDataC is used to notify the BufferedSender.run goroutine that there
// are events to send. Channel is initialised with a buffer of 1 and all writes to it
// are non-blocking.
notifyDataC chan struct{}
}

func NewBufferedSender(sender ServerStreamSender) *BufferedSender {
bs := &BufferedSender{
sender: sender,
}
bs.queueMu.buffer = newEventQueue()
bs.notifyDataC = make(chan struct{}, 1)
return bs
}

// sendBuffered buffers the event before sending it to the underlying
// gRPC stream. It does not block. sendBuffered will take the
// ownership of the alloc and release it if the returned error is
// non-nil. It only errors in the case of an already stopped stream.
func (bs *BufferedSender) sendBuffered(
ev *kvpb.MuxRangeFeedEvent, alloc *SharedBudgetAllocation,
) error {
panic("unimplemented: buffered sender for rangefeed #126560")
bs.queueMu.Lock()
defer bs.queueMu.Unlock()
if bs.queueMu.stopped {
return errors.New("stream sender is stopped")
}
// TODO(wenyihu6): pass an actual context here
alloc.Use(context.Background())
bs.queueMu.buffer.pushBack(sharedMuxEvent{ev, alloc})
select {
case bs.notifyDataC <- struct{}{}:
default:
}
return nil
}

// sendUnbuffered sends the event directly to the underlying
// ServerStreamSender. It bypasses the buffer and thus may block.
func (bs *BufferedSender) sendUnbuffered(ev *kvpb.MuxRangeFeedEvent) error {
panic("unimplemented: buffered sender for rangefeed #126560")
return bs.sender.Send(ev)
}

// run loops until the sender or stopper signal teardown. In each
// iteration, it waits for events to enter the buffer and moves them
// to the sender.
func (bs *BufferedSender) run(
ctx context.Context, stopper *stop.Stopper, onError func(streamID int64),
) error {
panic("unimplemented: buffered sender for rangefeed #126560")
for {
select {
case <-ctx.Done():
// Top level goroutine will receive the context cancellation and handle
// ctx.Err().
return nil
case <-stopper.ShouldQuiesce():
// Top level goroutine will receive the stopper quiesce signal and handle
// error.
return nil
case <-bs.notifyDataC:
for {
e, success := bs.popFront()
if success {
err := bs.sender.Send(e.ev)
e.alloc.Release(ctx)
if e.ev.Error != nil {
onError(e.ev.StreamID)
}
if err != nil {
return err
}
} else {
break
}
}
}
}
}

// popFront pops the front event from the buffer queue. It returns the event and
// a boolean indicating if the event was successfully popped.
func (bs *BufferedSender) popFront() (e sharedMuxEvent, success bool) {
bs.queueMu.Lock()
defer bs.queueMu.Unlock()
event, ok := bs.queueMu.buffer.popFront()
return event, ok
}

// cleanup is called when the sender is stopped. It is expected to free up
// buffer queue and no new events should be buffered after this.
func (bs *BufferedSender) cleanup(ctx context.Context) {
panic("unimplemented: buffered sender for rangefeed #126560")
bs.queueMu.Lock()
defer bs.queueMu.Unlock()
bs.queueMu.stopped = true
bs.queueMu.buffer.drain(ctx)
}

// Used for testing only.
func (bs *BufferedSender) len() int {
bs.queueMu.Lock()
defer bs.queueMu.Unlock()
return int(bs.queueMu.buffer.len())
}

// Used for testing only.
func (bs *BufferedSender) waitForEmptyBuffer(ctx context.Context) error {
opts := retry.Options{
InitialBackoff: 5 * time.Millisecond,
Multiplier: 2,
MaxBackoff: 10 * time.Second,
MaxRetries: 50,
}
for re := retry.StartWithCtx(ctx, opts); re.Next(); {
bs.queueMu.Lock()
caughtUp := bs.queueMu.buffer.len() == 0 // nolint:deferunlockcheck
bs.queueMu.Unlock()
if caughtUp {
return nil
}
}
if err := ctx.Err(); err != nil {
return err
}
return errors.New("buffered sender failed to send in time")
}
164 changes: 164 additions & 0 deletions pkg/kv/kvserver/rangefeed/buffered_sender_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package rangefeed

import (
"context"
"sync"
"sync/atomic"
"testing"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

// TestBufferedSenderWithSendBufferedError tests that BufferedSender can handle stream
// disconnects properly including context canceled, metrics updates, rangefeed
// cleanup.
func TestBufferedSenderDisconnectStream(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

stopper := stop.NewStopper()
defer stopper.Stop(ctx)
testServerStream := newTestServerStream()
testRangefeedCounter := newTestRangefeedCounter()
bs := NewBufferedSender(testServerStream)
sm := NewStreamManager(bs, testRangefeedCounter)
require.NoError(t, sm.Start(ctx, stopper))
defer sm.Stop(ctx)

const streamID = 0
err := kvpb.NewError(kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_NO_LEASEHOLDER))
errEvent := makeMuxRangefeedErrorEvent(int64(streamID), 1, err)

t.Run("basic operation", func(t *testing.T) {
var num atomic.Int32
sm.AddStream(int64(streamID), &cancelCtxDisconnector{
cancel: func() {
num.Add(1)
require.NoError(t, sm.sender.sendBuffered(errEvent, nil))
},
})
require.Equal(t, 1, testRangefeedCounter.get())
require.Equal(t, 0, bs.len())
sm.DisconnectStream(int64(streamID), err)
testServerStream.waitForEvent(t, errEvent)
require.Equal(t, int32(1), num.Load())
require.Equal(t, 1, testServerStream.totalEventsSent())
testRangefeedCounter.waitForRangefeedCount(t, 0)
testServerStream.reset()
})
t.Run("disconnect stream on the same stream is idempotent", func(t *testing.T) {
sm.AddStream(int64(streamID), &cancelCtxDisconnector{
cancel: func() {
require.NoError(t, sm.sender.sendBuffered(errEvent, nil))
},
})
require.Equal(t, 1, testRangefeedCounter.get())
sm.DisconnectStream(int64(streamID), err)
require.NoError(t, bs.waitForEmptyBuffer(ctx))
sm.DisconnectStream(int64(streamID), err)
require.NoError(t, bs.waitForEmptyBuffer(ctx))
require.Equal(t, 1, testServerStream.totalEventsSent())
testRangefeedCounter.waitForRangefeedCount(t, 0)
})
}

func TestBufferedSenderChaosWithStop(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

stopper := stop.NewStopper()
defer stopper.Stop(ctx)
testServerStream := newTestServerStream()
testRangefeedCounter := newTestRangefeedCounter()
bs := NewBufferedSender(testServerStream)
sm := NewStreamManager(bs, testRangefeedCounter)
require.NoError(t, sm.Start(ctx, stopper))

rng, _ := randutil.NewTestRand()

// [activeStreamStart,activeStreamEnd) are in the active streams.
// activeStreamStart <= activeStreamEnd. If activeStreamStart ==
// activeStreamEnd, no streams are active yet. [0, activeStreamStart) are
// disconnected.
var actualSum atomic.Int32
activeStreamStart := int64(0)
activeStreamEnd := int64(0)

t.Run("mixed operations of add and disconnect stream", func(t *testing.T) {
const ops = 1000
var wg sync.WaitGroup
for i := 0; i < ops; i++ {
addStream := rng.Intn(2) == 0
require.LessOrEqualf(t, activeStreamStart, activeStreamEnd, "test programming error")
if addStream || activeStreamStart == activeStreamEnd {
streamID := activeStreamEnd
sm.AddStream(streamID, &cancelCtxDisconnector{
cancel: func() {
actualSum.Add(1)
_ = sm.sender.sendBuffered(
makeMuxRangefeedErrorEvent(streamID, 1, newErrBufferCapacityExceeded()), nil)
},
})
activeStreamEnd++
} else {
wg.Add(1)
go func(id int64) {
defer wg.Done()
sm.DisconnectStream(id, newErrBufferCapacityExceeded())
}(activeStreamStart)
activeStreamStart++
}
}

wg.Wait()
require.Equal(t, int32(activeStreamStart), actualSum.Load())

require.NoError(t, bs.waitForEmptyBuffer(ctx))
// We stop the sender as a way to syncronize the send
// loop. While we've waiting for the buffer to be
// empty, we also need to know that the sender is done
// handling the last message that it processed before
// we observe any of the counters on the test stream.
stopper.Stop(ctx)
require.Equal(t, activeStreamStart, int64(testServerStream.totalEventsSent()))
expectedActiveStreams := activeStreamEnd - activeStreamStart
require.Equal(t, int(expectedActiveStreams), sm.activeStreamCount())
testRangefeedCounter.waitForRangefeedCount(t, int(expectedActiveStreams))
})

t.Run("stream manager on stop", func(t *testing.T) {
sm.Stop(ctx)
require.Equal(t, 0, testRangefeedCounter.get())
require.Equal(t, 0, sm.activeStreamCount())
// Cleanup functions should be called for all active streams.
require.Equal(t, int32(activeStreamEnd), actualSum.Load())
// No error events should be sent during Stop().
require.Equal(t, activeStreamStart, int64(testServerStream.totalEventsSent()))

})

t.Run("stream manager after stop", func(t *testing.T) {
// No events should be buffered after stopped.
val1 := roachpb.Value{RawBytes: []byte("val"), Timestamp: hlc.Timestamp{WallTime: 1}}
ev1 := new(kvpb.RangeFeedEvent)
ev1.MustSetValue(&kvpb.RangeFeedValue{Key: keyA, Value: val1})
muxEv := &kvpb.MuxRangeFeedEvent{RangeFeedEvent: *ev1, RangeID: 0, StreamID: 1}
require.Equal(t, bs.sendBuffered(muxEv, nil).Error(), errors.New("stream sender is stopped").Error())
require.Equal(t, 0, bs.len())
})
}

0 comments on commit 923e459

Please sign in to comment.