Skip to content

Commit

Permalink
rangefeed: remove LegacyProcessor
Browse files Browse the repository at this point in the history
The ScheduledProcessor has been in default use since 24.1,
substantially out performs the LegacyProcessor, and is the likely
place that new improvements will be implemented.

Here, we remove the LegacyProcessor, retiring the associated cluster
setting.

This is similar to cockroachdb#114410, but reconstructed from scratch.

Epic: none

Release note (ops change): The setting kv.rangefeed.scheduler.enabled
has been retired. The rangefeed scheduler is now unconditionally
enabled.

Co-authored-by: Erik Grinaker <[email protected]>
  • Loading branch information
stevendanna and erikgrinaker committed Oct 23, 2024
1 parent 62803f3 commit 062801d
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 804 deletions.
10 changes: 5 additions & 5 deletions pkg/cmd/roachtest/tests/cdc_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,17 @@ const (
// practice it can.
cdcBenchColdCatchupScan cdcBenchScanType = "catchup-cold"

cdcBenchNoServer cdcBenchServer = ""
cdcBenchProcessorServer cdcBenchServer = "processor" // legacy processor
cdcBenchNoServer cdcBenchServer = ""
// The legacy processor was removed in 25.1+. In such
// timeseries, "processor" refers to the now defunct legacy
// processor.
cdcBenchSchedulerServer cdcBenchServer = "scheduler" // new scheduler
)

var (
cdcBenchScanTypes = []cdcBenchScanType{
cdcBenchInitialScan, cdcBenchCatchupScan, cdcBenchColdCatchupScan}
cdcBenchServers = []cdcBenchServer{cdcBenchProcessorServer, cdcBenchSchedulerServer}
cdcBenchServers = []cdcBenchServer{cdcBenchSchedulerServer}
)

func registerCDCBench(r registry.Registry) {
Expand Down Expand Up @@ -416,8 +418,6 @@ func runCDCBenchWorkload(
settings.ClusterSettings["server.child_metrics.enabled"] = "true"

switch server {
case cdcBenchProcessorServer:
settings.ClusterSettings["kv.rangefeed.scheduler.enabled"] = "false"
case cdcBenchSchedulerServer:
settings.ClusterSettings["kv.rangefeed.scheduler.enabled"] = "true"
case cdcBenchNoServer:
Expand Down
3 changes: 0 additions & 3 deletions pkg/kv/kvclient/rangefeed/rangefeed_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,6 @@ func (t feedProcessorType) String() string {
}

var procTypes = []feedProcessorType{
{
useScheduler: false,
},
{
useScheduler: true,
},
Expand Down
1 change: 0 additions & 1 deletion pkg/kv/kvserver/rangefeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ go_library(
"stream.go",
"task.go",
"test_helpers.go",
"testutil.go",
"unbuffered_sender.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed",
Expand Down
18 changes: 9 additions & 9 deletions pkg/kv/kvserver/rangefeed/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import (
)

type benchmarkRangefeedOpts struct {
procType procType
opType opType
numRegistrations int
budget int64
rangefeedTestType rangefeedTestType
opType opType
numRegistrations int
budget int64
}

type opType string
Expand All @@ -47,10 +47,10 @@ func BenchmarkRangefeed(b *testing.B) {
name := fmt.Sprintf("procType=%s/opType=%s/numRegs=%d", procType, opType, numRegistrations)
b.Run(name, func(b *testing.B) {
runBenchmarkRangefeed(b, benchmarkRangefeedOpts{
procType: procType,
opType: opType,
numRegistrations: numRegistrations,
budget: math.MaxInt64,
rangefeedTestType: procType,
opType: opType,
numRegistrations: numRegistrations,
budget: math.MaxInt64,
})
})
}
Expand Down Expand Up @@ -90,7 +90,7 @@ func runBenchmarkRangefeed(b *testing.B, opts benchmarkRangefeedOpts) {
span := roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")}

p, h, stopper := newTestProcessor(b, withSpan(span), withBudget(budget), withChanCap(b.N),
withEventTimeout(time.Hour), withProcType(opts.procType))
withEventTimeout(time.Hour), withRangefeedTestType(opts.rangefeedTestType))
defer stopper.Stop(ctx)

// Add registrations.
Expand Down
Loading

0 comments on commit 062801d

Please sign in to comment.