Skip to content

Commit

Permalink
rangefeed: remove LegacyProcessor
Browse files Browse the repository at this point in the history
The ScheduledProcessor has been in default use since 24.1,
substantially out performs the LegacyProcessor, and is the likely
place that new improvements will be implemented.

Here, we remove the LegacyProcessor, retiring the associated cluster
setting.

This is similar to cockroachdb#114410, but reconstructed from scratch.

Epic: none

Release note (ops change): The setting kv.rangefeed.scheduler.enabled
has been retired. The rangefeed scheduler is now unconditionally
enabled.

Co-authored-by: Erik Grinaker <[email protected]>
  • Loading branch information
stevendanna and erikgrinaker committed Oct 17, 2024
1 parent f4cb007 commit 89b2de8
Show file tree
Hide file tree
Showing 11 changed files with 2,165 additions and 3,075 deletions.
2,380 changes: 1,142 additions & 1,238 deletions pkg/kv/kvclient/rangefeed/rangefeed_external_test.go

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,6 @@ go_test(
"//pkg/kv",
"//pkg/kv/kvclient",
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvclient/rangefeed",
"//pkg/kv/kvclient/rangefeed/rangefeedcache",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/abortspan",
Expand Down
1 change: 0 additions & 1 deletion pkg/kv/kvserver/rangefeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ go_library(
"stream.go",
"task.go",
"test_helpers.go",
"testutil.go",
"unbuffered_sender.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed",
Expand Down
24 changes: 10 additions & 14 deletions pkg/kv/kvserver/rangefeed/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
)

type benchmarkRangefeedOpts struct {
procType procType
opType opType
numRegistrations int
budget int64
Expand All @@ -41,19 +40,16 @@ const (
// BenchmarkRangefeed benchmarks the processor and registrations, by submitting
// a set of events and waiting until they are all emitted.
func BenchmarkRangefeed(b *testing.B) {
for _, procType := range testTypes {
for _, opType := range []opType{writeOpType, commitOpType, closedTSOpType} {
for _, numRegistrations := range []int{1, 10, 100} {
name := fmt.Sprintf("procType=%s/opType=%s/numRegs=%d", procType, opType, numRegistrations)
b.Run(name, func(b *testing.B) {
runBenchmarkRangefeed(b, benchmarkRangefeedOpts{
procType: procType,
opType: opType,
numRegistrations: numRegistrations,
budget: math.MaxInt64,
})
for _, opType := range []opType{writeOpType, commitOpType, closedTSOpType} {
for _, numRegistrations := range []int{1, 10, 100} {
name := fmt.Sprintf("opType=%s/numRegs=%d", opType, numRegistrations)
b.Run(name, func(b *testing.B) {
runBenchmarkRangefeed(b, benchmarkRangefeedOpts{
opType: opType,
numRegistrations: numRegistrations,
budget: math.MaxInt64,
})
}
})
}
}
}
Expand Down Expand Up @@ -90,7 +86,7 @@ func runBenchmarkRangefeed(b *testing.B, opts benchmarkRangefeedOpts) {
span := roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")}

p, h, stopper := newTestProcessor(b, withSpan(span), withBudget(budget), withChanCap(b.N),
withEventTimeout(time.Hour), withProcType(opts.procType))
withEventTimeout(time.Hour))
defer stopper.Stop(ctx)

// Add registrations.
Expand Down
Loading

0 comments on commit 89b2de8

Please sign in to comment.