Skip to content

Commit

Permalink
to be squashed separated out for easier code review
Browse files Browse the repository at this point in the history
# Conflicts:
#	pkg/kv/kvserver/rangefeed/registry_test_helper.go

# Conflicts:
#	pkg/kv/kvserver/rangefeed/registry_helper_test.go
#	pkg/kv/kvserver/rangefeed/registry_test.go
  • Loading branch information
wenyihu6 committed Oct 23, 2024
1 parent e739e1f commit 0c6c10e
Show file tree
Hide file tree
Showing 8 changed files with 1,222 additions and 504 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/rangefeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ go_test(
"scheduler_test.go",
"sender_helper_test.go",
"task_test.go",
"unbuffered_registration_test.go",
"unbuffered_sender_test.go",
],
embed = [":rangefeed"],
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/rangefeed/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

type benchmarkRangefeedOpts struct {
procType procType
feedType rangefeedTestType
opType opType
numRegistrations int
budget int64
Expand All @@ -41,13 +41,13 @@ const (
// BenchmarkRangefeed benchmarks the processor and registrations, by submitting
// a set of events and waiting until they are all emitted.
func BenchmarkRangefeed(b *testing.B) {
for _, procType := range testTypes {
for _, feedType := range testTypes {
for _, opType := range []opType{writeOpType, commitOpType, closedTSOpType} {
for _, numRegistrations := range []int{1, 10, 100} {
name := fmt.Sprintf("procType=%s/opType=%s/numRegs=%d", procType, opType, numRegistrations)
name := fmt.Sprintf("rangefeedTestType=%s/opType=%s/numRegs=%d", feedType, opType, numRegistrations)
b.Run(name, func(b *testing.B) {
runBenchmarkRangefeed(b, benchmarkRangefeedOpts{
procType: procType,
feedType: feedType,
opType: opType,
numRegistrations: numRegistrations,
budget: math.MaxInt64,
Expand Down Expand Up @@ -90,7 +90,7 @@ func runBenchmarkRangefeed(b *testing.B, opts benchmarkRangefeedOpts) {
span := roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")}

p, h, stopper := newTestProcessor(b, withSpan(span), withBudget(budget), withChanCap(b.N),
withEventTimeout(time.Hour), withProcType(opts.procType))
withEventTimeout(time.Hour), withRangefeedTestType(opts.feedType))
defer stopper.Stop(ctx)

// Add registrations.
Expand Down
106 changes: 89 additions & 17 deletions pkg/kv/kvserver/rangefeed/processor_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,31 @@ import (
"github.com/stretchr/testify/require"
)

type testBufferedStream struct {
Stream
cleanup func()
}

var _ BufferedStream = (*testBufferedStream)(nil)

func (tb *testBufferedStream) SendBuffered(
e *kvpb.RangeFeedEvent, _ *SharedBudgetAllocation,
) error {
// In production code, buffered stream would be responsible for properly using
// and releasing the alloc. We just ignore memory accounting here.
return tb.SendUnbuffered(e)
}

func (tb *testBufferedStream) Disconnect(err *kvpb.Error) {
tb.Stream.SendError(err)
if tb.cleanup != nil {
// In practise, StreamMuxer is responsible for invoking cleanup in the
// background. For simplicity, we call it directly here.
go tb.cleanup()
time.Sleep(10 * time.Millisecond)
}
}

func makeLogicalOp(val interface{}) enginepb.MVCCLogicalOp {
var op enginepb.MVCCLogicalOp
op.MustSetValue(val)
Expand Down Expand Up @@ -189,11 +214,12 @@ const testProcessorEventCCap = 16
const testProcessorEventCTimeout = 10 * time.Millisecond

type processorTestHelper struct {
span roachpb.RSpan
rts *resolvedTimestamp
syncEventC func()
sendSpanSync func(*roachpb.Span)
scheduler *ClientScheduler
span roachpb.RSpan
rts *resolvedTimestamp
syncEventC func()
sendSpanSync func(*roachpb.Span)
scheduler *ClientScheduler
toBufferedStreamIfNeeded func(s Stream) Stream
}

// syncEventAndRegistrations waits for all previously sent events to be
Expand Down Expand Up @@ -236,23 +262,54 @@ func (h *processorTestHelper) triggerTxnPushUntilPushed(t *testing.T, pushedC <-
type procType bool

const (
legacyProcessor procType = false
schedulerProcessor = true
legacy procType = false
scheduler procType = true
)

var testTypes = []procType{legacyProcessor, schedulerProcessor}
type rangefeedTestType struct {
processorType procType
regType registrationType
}

func (t procType) String() string {
if t {
return "scheduler"
var (
legacyProcessor = rangefeedTestType{
processorType: legacy,
regType: buffered,
}
return "legacy"

scheduledProcessorWithUnbufferedReg = rangefeedTestType{
processorType: scheduler,
regType: unbuffered,
}

scheduledProcessorWithBufferedReg = rangefeedTestType{
processorType: scheduler,
regType: buffered,
}

testTypes = []rangefeedTestType{
legacyProcessor,
scheduledProcessorWithUnbufferedReg,
scheduledProcessorWithBufferedReg,
}
)

func (t rangefeedTestType) String() string {
switch t {
case legacyProcessor:
return "legacy"
case scheduledProcessorWithBufferedReg:
return "scheduled processor with buffered registration"
case scheduledProcessorWithUnbufferedReg:
return "scheduled processor with unbuffered registration"
}
panic("unknown processor type")
}

type testConfig struct {
Config
useScheduler bool
isc IntentScannerConstructor
isc IntentScannerConstructor
feedType rangefeedTestType
}

type option func(*testConfig)
Expand All @@ -265,9 +322,9 @@ func withPusher(txnPusher TxnPusher) option {
}
}

func withProcType(t procType) option {
func withRangefeedTestType(t rangefeedTestType) option {
return func(config *testConfig) {
config.useScheduler = bool(t)
config.feedType = t
}
}

Expand Down Expand Up @@ -401,7 +458,7 @@ func newTestProcessor(
for _, o := range opts {
o(&cfg)
}
if cfg.useScheduler {
if cfg.feedType != legacyProcessor {
sch := NewScheduler(SchedulerConfig{
Workers: 1,
PriorityWorkers: 1,
Expand Down Expand Up @@ -429,6 +486,10 @@ func newTestProcessor(
h.sendSpanSync = func(span *roachpb.Span) {
p.syncEventCWithEvent(&syncEvent{c: make(chan struct{}), testRegCatchupSpan: span})
}
h.toBufferedStreamIfNeeded = func(s Stream) Stream {
// Legacy processor does not support buffered streams.
return s
}
case *ScheduledProcessor:
h.rts = &p.rts
h.span = p.Span
Expand All @@ -437,6 +498,17 @@ func newTestProcessor(
p.syncSendAndWait(&syncEvent{c: make(chan struct{}), testRegCatchupSpan: span})
}
h.scheduler = &p.scheduler
if cfg.feedType == scheduledProcessorWithUnbufferedReg {
h.toBufferedStreamIfNeeded = func(s Stream) Stream {
// Unbuffered registration processor should use buffered stream.
return &testBufferedStream{Stream: s}
}
} else {
// Buffered registration processor should use unbuffered stream.
h.toBufferedStreamIfNeeded = func(s Stream) Stream {
return s
}
}
default:
panic("unknown processor type")
}
Expand Down
Loading

0 comments on commit 0c6c10e

Please sign in to comment.