-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
136309: kvserver/rangefeed: add node level buffered sender r=stevendanna a=wenyihu6 This patch adds a node level BufferedSender which uses a queue to buffer events before forwarding events to underlying grpc stream. Part of: #110432 Release note: None Co-authored-by: Steven Danna <[email protected]> Co-authored-by: Wenyi Hu <[email protected]>
- Loading branch information
Showing
3 changed files
with
278 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,164 @@ | ||
// Copyright 2024 The Cockroach Authors. | ||
// | ||
// Use of this software is governed by the CockroachDB Software License | ||
// included in the /LICENSE file. | ||
|
||
package rangefeed | ||
|
||
import ( | ||
"context" | ||
"sync" | ||
"sync/atomic" | ||
"testing" | ||
|
||
"github.com/cockroachdb/cockroach/pkg/kv/kvpb" | ||
"github.com/cockroachdb/cockroach/pkg/roachpb" | ||
"github.com/cockroachdb/cockroach/pkg/util/hlc" | ||
"github.com/cockroachdb/cockroach/pkg/util/leaktest" | ||
"github.com/cockroachdb/cockroach/pkg/util/log" | ||
"github.com/cockroachdb/cockroach/pkg/util/randutil" | ||
"github.com/cockroachdb/cockroach/pkg/util/stop" | ||
"github.com/cockroachdb/errors" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
// TestBufferedSenderWithSendBufferedError tests that BufferedSender can handle stream | ||
// disconnects properly including context canceled, metrics updates, rangefeed | ||
// cleanup. | ||
func TestBufferedSenderDisconnectStream(t *testing.T) { | ||
defer leaktest.AfterTest(t)() | ||
defer log.Scope(t).Close(t) | ||
ctx := context.Background() | ||
|
||
stopper := stop.NewStopper() | ||
defer stopper.Stop(ctx) | ||
testServerStream := newTestServerStream() | ||
testRangefeedCounter := newTestRangefeedCounter() | ||
bs := NewBufferedSender(testServerStream) | ||
sm := NewStreamManager(bs, testRangefeedCounter) | ||
require.NoError(t, sm.Start(ctx, stopper)) | ||
defer sm.Stop(ctx) | ||
|
||
const streamID = 0 | ||
err := kvpb.NewError(kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_NO_LEASEHOLDER)) | ||
errEvent := makeMuxRangefeedErrorEvent(int64(streamID), 1, err) | ||
|
||
t.Run("basic operation", func(t *testing.T) { | ||
var num atomic.Int32 | ||
sm.AddStream(int64(streamID), &cancelCtxDisconnector{ | ||
cancel: func() { | ||
num.Add(1) | ||
require.NoError(t, sm.sender.sendBuffered(errEvent, nil)) | ||
}, | ||
}) | ||
require.Equal(t, 1, testRangefeedCounter.get()) | ||
require.Equal(t, 0, bs.len()) | ||
sm.DisconnectStream(int64(streamID), err) | ||
testServerStream.waitForEvent(t, errEvent) | ||
require.Equal(t, int32(1), num.Load()) | ||
require.Equal(t, 1, testServerStream.totalEventsSent()) | ||
testRangefeedCounter.waitForRangefeedCount(t, 0) | ||
testServerStream.reset() | ||
}) | ||
t.Run("disconnect stream on the same stream is idempotent", func(t *testing.T) { | ||
sm.AddStream(int64(streamID), &cancelCtxDisconnector{ | ||
cancel: func() { | ||
require.NoError(t, sm.sender.sendBuffered(errEvent, nil)) | ||
}, | ||
}) | ||
require.Equal(t, 1, testRangefeedCounter.get()) | ||
sm.DisconnectStream(int64(streamID), err) | ||
require.NoError(t, bs.waitForEmptyBuffer(ctx)) | ||
sm.DisconnectStream(int64(streamID), err) | ||
require.NoError(t, bs.waitForEmptyBuffer(ctx)) | ||
require.Equal(t, 1, testServerStream.totalEventsSent()) | ||
testRangefeedCounter.waitForRangefeedCount(t, 0) | ||
}) | ||
} | ||
|
||
func TestBufferedSenderChaosWithStop(t *testing.T) { | ||
defer leaktest.AfterTest(t)() | ||
defer log.Scope(t).Close(t) | ||
ctx := context.Background() | ||
|
||
stopper := stop.NewStopper() | ||
defer stopper.Stop(ctx) | ||
testServerStream := newTestServerStream() | ||
testRangefeedCounter := newTestRangefeedCounter() | ||
bs := NewBufferedSender(testServerStream) | ||
sm := NewStreamManager(bs, testRangefeedCounter) | ||
require.NoError(t, sm.Start(ctx, stopper)) | ||
|
||
rng, _ := randutil.NewTestRand() | ||
|
||
// [activeStreamStart,activeStreamEnd) are in the active streams. | ||
// activeStreamStart <= activeStreamEnd. If activeStreamStart == | ||
// activeStreamEnd, no streams are active yet. [0, activeStreamStart) are | ||
// disconnected. | ||
var actualSum atomic.Int32 | ||
activeStreamStart := int64(0) | ||
activeStreamEnd := int64(0) | ||
|
||
t.Run("mixed operations of add and disconnect stream", func(t *testing.T) { | ||
const ops = 1000 | ||
var wg sync.WaitGroup | ||
for i := 0; i < ops; i++ { | ||
addStream := rng.Intn(2) == 0 | ||
require.LessOrEqualf(t, activeStreamStart, activeStreamEnd, "test programming error") | ||
if addStream || activeStreamStart == activeStreamEnd { | ||
streamID := activeStreamEnd | ||
sm.AddStream(streamID, &cancelCtxDisconnector{ | ||
cancel: func() { | ||
actualSum.Add(1) | ||
_ = sm.sender.sendBuffered( | ||
makeMuxRangefeedErrorEvent(streamID, 1, newErrBufferCapacityExceeded()), nil) | ||
}, | ||
}) | ||
activeStreamEnd++ | ||
} else { | ||
wg.Add(1) | ||
go func(id int64) { | ||
defer wg.Done() | ||
sm.DisconnectStream(id, newErrBufferCapacityExceeded()) | ||
}(activeStreamStart) | ||
activeStreamStart++ | ||
} | ||
} | ||
|
||
wg.Wait() | ||
require.Equal(t, int32(activeStreamStart), actualSum.Load()) | ||
|
||
require.NoError(t, bs.waitForEmptyBuffer(ctx)) | ||
// We stop the sender as a way to syncronize the send | ||
// loop. While we've waiting for the buffer to be | ||
// empty, we also need to know that the sender is done | ||
// handling the last message that it processed before | ||
// we observe any of the counters on the test stream. | ||
stopper.Stop(ctx) | ||
require.Equal(t, activeStreamStart, int64(testServerStream.totalEventsSent())) | ||
expectedActiveStreams := activeStreamEnd - activeStreamStart | ||
require.Equal(t, int(expectedActiveStreams), sm.activeStreamCount()) | ||
testRangefeedCounter.waitForRangefeedCount(t, int(expectedActiveStreams)) | ||
}) | ||
|
||
t.Run("stream manager on stop", func(t *testing.T) { | ||
sm.Stop(ctx) | ||
require.Equal(t, 0, testRangefeedCounter.get()) | ||
require.Equal(t, 0, sm.activeStreamCount()) | ||
// Cleanup functions should be called for all active streams. | ||
require.Equal(t, int32(activeStreamEnd), actualSum.Load()) | ||
// No error events should be sent during Stop(). | ||
require.Equal(t, activeStreamStart, int64(testServerStream.totalEventsSent())) | ||
|
||
}) | ||
|
||
t.Run("stream manager after stop", func(t *testing.T) { | ||
// No events should be buffered after stopped. | ||
val1 := roachpb.Value{RawBytes: []byte("val"), Timestamp: hlc.Timestamp{WallTime: 1}} | ||
ev1 := new(kvpb.RangeFeedEvent) | ||
ev1.MustSetValue(&kvpb.RangeFeedValue{Key: keyA, Value: val1}) | ||
muxEv := &kvpb.MuxRangeFeedEvent{RangeFeedEvent: *ev1, RangeID: 0, StreamID: 1} | ||
require.Equal(t, bs.sendBuffered(muxEv, nil).Error(), errors.New("stream sender is stopped").Error()) | ||
require.Equal(t, 0, bs.len()) | ||
}) | ||
} |