diff --git a/pkg/kv/kvserver/client_rangefeed_test.go b/pkg/kv/kvserver/client_rangefeed_test.go index b4e068695a0c..d8214a035999 100644 --- a/pkg/kv/kvserver/client_rangefeed_test.go +++ b/pkg/kv/kvserver/client_rangefeed_test.go @@ -227,7 +227,7 @@ func TestRangefeedIsRoutedToNonVoter(t *testing.T) { clusterArgs.ReplicationMode = base.ReplicationManual // NB: setupClusterForClosedTSTesting sets a low closed timestamp target // duration. - tc, _, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, clusterArgs, "cttest", "kv") + tc, _, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, clusterArgs, "cttest", "kv") defer tc.Stopper().Stop(ctx) tc.AddNonVotersOrFatal(t, desc.StartKey.AsRawKey(), tc.Target(1)) diff --git a/pkg/kv/kvserver/closed_timestamp_test.go b/pkg/kv/kvserver/closed_timestamp_test.go index 8bebfbbb082b..ecbec6715e13 100644 --- a/pkg/kv/kvserver/closed_timestamp_test.go +++ b/pkg/kv/kvserver/closed_timestamp_test.go @@ -85,7 +85,7 @@ func TestClosedTimestampCanServe(t *testing.T) { // Disable the replicateQueue so that it doesn't interfere with replica // membership ranges. clusterArgs.ReplicationMode = base.ReplicationManual - tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, clusterArgs, dbName, tableName) + tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, clusterArgs, dbName, tableName) defer tc.Stopper().Stop(ctx) if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { @@ -156,7 +156,7 @@ func TestClosedTimestampCanServeOnVoterIncoming(t *testing.T) { clusterArgs.ReplicationMode = base.ReplicationManual knobs, ltk := makeReplicationTestKnobs() clusterArgs.ServerArgs.Knobs = knobs - tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, clusterArgs, dbName, tableName) + tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, clusterArgs, dbName, tableName) defer tc.Stopper().Stop(ctx) if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { @@ -192,7 +192,7 @@ func TestClosedTimestampCanServeThroughoutLeaseTransfer(t *testing.T) { skip.UnderRace(t) ctx := context.Background() - tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") + tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") defer tc.Stopper().Stop(ctx) repls := replsForRange(ctx, t, tc, desc) @@ -270,7 +270,7 @@ func TestClosedTimestampCantServeWithConflictingIntent(t *testing.T) { defer txnwait.TestingOverrideTxnLivenessThreshold(time.Hour)() ctx := context.Background() - tc, _, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") + tc, _, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") defer tc.Stopper().Stop(ctx) repls := replsForRange(ctx, t, tc, desc) ds := tc.Server(0).DistSenderI().(*kvcoord.DistSender) @@ -377,7 +377,7 @@ func TestClosedTimestampCanServeAfterSplitAndMerges(t *testing.T) { skip.UnderRace(t) ctx := context.Background() - tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") + tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") repls := replsForRange(ctx, t, tc, desc) // Disable the automatic merging. if _, err := db0.Exec("SET CLUSTER SETTING kv.range_merge.queue_enabled = false"); err != nil { @@ -457,7 +457,7 @@ func TestClosedTimestampCantServeBasedOnUncertaintyLimit(t *testing.T) { ctx := context.Background() // Set up the target duration to be very long and rely on lease transfers to // drive MaxClosed. - tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") + tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") defer tc.Stopper().Stop(ctx) repls := replsForRange(ctx, t, tc, desc) @@ -490,7 +490,7 @@ func TestClosedTimestampCanServeForWritingTransaction(t *testing.T) { skip.UnderRace(t) ctx := context.Background() - tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") + tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") defer tc.Stopper().Stop(ctx) repls := replsForRange(ctx, t, tc, desc) @@ -537,7 +537,7 @@ func TestClosedTimestampCantServeForNonTransactionalReadRequest(t *testing.T) { skip.UnderRace(t) ctx := context.Background() - tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") + tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") defer tc.Stopper().Stop(ctx) repls := replsForRange(ctx, t, tc, desc) @@ -579,7 +579,7 @@ func TestClosedTimestampCantServeForNonTransactionalBatch(t *testing.T) { testutils.RunTrueAndFalse(t, "tsFromServer", func(t *testing.T, tsFromServer bool) { ctx := context.Background() - tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") + tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") defer tc.Stopper().Stop(ctx) repls := replsForRange(ctx, t, tc, desc) @@ -722,8 +722,7 @@ func TestClosedTimestampFrozenAfterSubsumption(t *testing.T) { // Set up the closed timestamp timing such that, when we block a merge and // transfer the RHS lease, the closed timestamp advances over the LHS // lease but not over the RHS lease. - const numNodes = 3 - tc, _ := setupTestClusterWithDummyRange(t, clusterArgs, "cttest" /* dbName */, "kv" /* tableName */, numNodes) + tc, _, _ := setupClusterForClosedTSTesting(ctx, t, 5*time.Second, 100*time.Millisecond, clusterArgs, "cttest", "kv") defer tc.Stopper().Stop(ctx) sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0)) sqlDB.ExecMultiple(t, strings.Split(fmt.Sprintf(` @@ -1195,11 +1194,14 @@ func aggressiveResolvedTimestampPushKnobs() *kvserver.StoreTestingKnobs { func setupClusterForClosedTSTesting( ctx context.Context, t *testing.T, - targetDuration time.Duration, + targetDuration, sideTransportInterval time.Duration, clusterArgs base.TestClusterArgs, dbName, tableName string, ) (tc serverutils.TestClusterInterface, db0 *gosql.DB, kvTableDesc roachpb.RangeDescriptor) { const numNodes = 3 + if sideTransportInterval == 0 { + sideTransportInterval = targetDuration / 4 + } tc, desc := setupTestClusterWithDummyRange(t, clusterArgs, dbName, tableName, numNodes) sqlRunner := sqlutils.MakeSQLRunner(tc.ServerConn(0)) sqlRunner.ExecMultiple(t, strings.Split(fmt.Sprintf(` @@ -1207,7 +1209,7 @@ SET CLUSTER SETTING kv.closed_timestamp.target_duration = '%s'; SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '%s'; SET CLUSTER SETTING kv.closed_timestamp.follower_reads_enabled = true; SET CLUSTER SETTING kv.allocator.load_based_rebalancing = 'off'; -`, targetDuration, targetDuration/4), +`, targetDuration, sideTransportInterval), ";")...) // Disable replicate queues to avoid errant lease transfers. diff --git a/pkg/kv/kvserver/rangefeed/BUILD.bazel b/pkg/kv/kvserver/rangefeed/BUILD.bazel index c5a683ec6f49..1d421e146057 100644 --- a/pkg/kv/kvserver/rangefeed/BUILD.bazel +++ b/pkg/kv/kvserver/rangefeed/BUILD.bazel @@ -45,6 +45,7 @@ go_test( name = "rangefeed_test", size = "large", srcs = [ + "bench_test.go", "budget_test.go", "catchup_scan_bench_test.go", "catchup_scan_test.go", diff --git a/pkg/kv/kvserver/rangefeed/bench_test.go b/pkg/kv/kvserver/rangefeed/bench_test.go new file mode 100644 index 000000000000..637381f271a2 --- /dev/null +++ b/pkg/kv/kvserver/rangefeed/bench_test.go @@ -0,0 +1,214 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeed + +import ( + "context" + "encoding/binary" + "fmt" + "math" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/util/future" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/stretchr/testify/require" +) + +type benchmarkRangefeedOpts struct { + opType opType + numRegistrations int + budget int64 +} + +type opType string + +const ( + writeOpType opType = "write" // individual 1PC writes + commitOpType opType = "commit" // intent + commit writes, 1 per txn + closedTSOpType opType = "closedts" +) + +// BenchmarkRangefeed benchmarks the processor and registrations, by submitting +// a set of events and waiting until they are all emitted. +func BenchmarkRangefeed(b *testing.B) { + for _, opType := range []opType{writeOpType, commitOpType, closedTSOpType} { + for _, numRegistrations := range []int{1, 10, 100} { + name := fmt.Sprintf("opType=%s/numRegs=%d", opType, numRegistrations) + b.Run(name, func(b *testing.B) { + runBenchmarkRangefeed(b, benchmarkRangefeedOpts{ + opType: opType, + numRegistrations: numRegistrations, + budget: math.MaxInt64, + }) + }) + } + } +} + +// BenchmarkRangefeedBudget benchmarks the effect of enabling/disabling the +// processor budget. It sets up a single processor and registration, and +// processes a set of events. +func BenchmarkRangefeedBudget(b *testing.B) { + for _, budget := range []bool{false, true} { + b.Run(fmt.Sprintf("budget=%t", budget), func(b *testing.B) { + var budgetSize int64 + if budget { + budgetSize = math.MaxInt64 + } + runBenchmarkRangefeed(b, benchmarkRangefeedOpts{ + opType: writeOpType, + numRegistrations: 1, + budget: budgetSize, + }) + }) + } +} + +// runBenchmarkRangefeed runs a rangefeed benchmark, emitting b.N events across +// a rangefeed. +func runBenchmarkRangefeed(b *testing.B, opts benchmarkRangefeedOpts) { + defer log.Scope(b).Close(b) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + var budget *FeedBudget + if opts.budget > 0 { + budget = newTestBudget(opts.budget) + } + span := roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")} + + // Set up processor. + p := NewProcessor(Config{ + AmbientContext: log.MakeTestingAmbientContext(nil), + Clock: hlc.NewClockForTesting(nil), + Metrics: NewMetrics(), + Span: span, + MemBudget: budget, + EventChanCap: b.N, + EventChanTimeout: time.Hour, + }) + require.NoError(b, p.Start(stopper, nil)) + + // Add registrations. + streams := make([]*noopStream, opts.numRegistrations) + futures := make([]*future.ErrorFuture, opts.numRegistrations) + for i := 0; i < opts.numRegistrations; i++ { + // withDiff does not matter for these benchmarks, since the previous value + // is fetched and populated during Raft application. + const withDiff = false + streams[i] = &noopStream{ctx: ctx} + futures[i] = &future.ErrorFuture{} + ok, _ := p.Register(span, hlc.MinTimestamp, nil, withDiff, streams[i], nil, futures[i]) + require.True(b, ok) + } + + // Construct b.N events beforehand -- we don't want to measure this cost. + var ( + logicalOps []enginepb.MVCCLogicalOp + closedTimestamps []hlc.Timestamp + prefix = roachpb.Key("key-") + value = roachpb.MakeValueFromString("a few bytes of data").RawBytes + ) + switch opts.opType { + case writeOpType: + logicalOps = make([]enginepb.MVCCLogicalOp, 0, b.N) + for i := 0; i < b.N; i++ { + key := append(prefix, make([]byte, 4)...) + binary.BigEndian.PutUint32(key[len(prefix):], uint32(i)) + ts := hlc.Timestamp{WallTime: int64(i + 1)} + logicalOps = append(logicalOps, makeLogicalOp(&enginepb.MVCCWriteValueOp{ + Key: key, + Timestamp: ts, + Value: value, + })) + } + + case commitOpType: + logicalOps = make([]enginepb.MVCCLogicalOp, 2*b.N) + // Write all intents first, then all commits. Txns are tracked in an + // internal heap, and we want to cover some of this cost, even though we + // write and commit them incrementally. + for i := 0; i < b.N; i++ { + var txnID uuid.UUID + txnID.DeterministicV4(uint64(i), uint64(b.N)) + key := append(prefix, make([]byte, 4)...) + binary.BigEndian.PutUint32(key[len(prefix):], uint32(i)) + ts := hlc.Timestamp{WallTime: int64(i + 1)} + logicalOps[i] = writeIntentOpWithKey(txnID, key, isolation.Serializable, ts) + logicalOps[b.N+i] = commitIntentOpWithKV(txnID, key, ts, value) + } + + case closedTSOpType: + closedTimestamps = make([]hlc.Timestamp, 0, b.N) + for i := 0; i < b.N; i++ { + closedTimestamps = append(closedTimestamps, hlc.Timestamp{WallTime: int64(i + 1)}) + } + + default: + b.Fatalf("unknown operation type %q", opts.opType) + } + + // Wait for catchup scans and flush checkpoint events. + p.syncEventAndRegistrations() + + // Run the benchmark. We accounted for b.N when constructing events. + b.ResetTimer() + + for _, logicalOp := range logicalOps { + if !p.ConsumeLogicalOps(ctx, logicalOp) { + b.Fatal("failed to submit logical operation") + } + } + for _, closedTS := range closedTimestamps { + if !p.ForwardClosedTS(ctx, closedTS) { + b.Fatal("failed to forward closed timestamp") + } + } + p.syncEventAndRegistrations() + + // Check that all registrations ended successfully, and emitted the expected + // number of events. + b.StopTimer() + p.Stop() + + for i, f := range futures { + regErr, err := future.Wait(ctx, f) + require.NoError(b, err) + require.NoError(b, regErr) + require.Equal(b, b.N, streams[i].events-1) // ignore checkpoint after catchup + } +} + +// noopStream is a stream that does nothing, except count events. +type noopStream struct { + ctx context.Context + events int +} + +func (s *noopStream) Context() context.Context { + return s.ctx +} + +func (s *noopStream) Send(*kvpb.RangeFeedEvent) error { + s.events++ + return nil +} diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go index 19e1c0030e21..da4e279d3998 100644 --- a/pkg/kv/kvserver/rangefeed/processor_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_test.go @@ -1451,64 +1451,6 @@ func (c *consumer) Consumed() int { return int(atomic.LoadInt32(&c.sentValues)) } -func BenchmarkProcessorWithBudget(b *testing.B) { - benchmarkEvents := 1 - - var budget *FeedBudget - if false { - budget = newTestBudget(math.MaxInt64) - } - - stopper := stop.NewStopper() - var pushTxnInterval, pushTxnAge time.Duration = 0, 0 // disable - p := NewProcessor(Config{ - AmbientContext: log.MakeTestingAmbientCtxWithNewTracer(), - Clock: hlc.NewClockForTesting(nil), - Span: roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")}, - PushTxnsInterval: pushTxnInterval, - PushTxnsAge: pushTxnAge, - EventChanCap: benchmarkEvents * b.N, - Metrics: NewMetrics(), - MemBudget: budget, - EventChanTimeout: time.Minute, - }) - require.NoError(b, p.Start(stopper, nil)) - ctx := context.Background() - defer stopper.Stop(ctx) - - // Add a registration. - r1Stream := newTestStream() - - var r1Done future.ErrorFuture - _, _ = p.Register( - roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, - hlc.Timestamp{WallTime: 1}, - nil, /* catchUpIter */ - false, /* withDiff */ - r1Stream, - func() {}, - &r1Done, - ) - p.syncEventAndRegistrations() - - b.ResetTimer() - for bi := 0; bi < b.N; bi++ { - for i := 0; i < benchmarkEvents; i++ { - p.ConsumeLogicalOps(ctx, writeValueOpWithKV( - roachpb.Key("k"), - hlc.Timestamp{WallTime: int64(bi*benchmarkEvents + i + 2)}, - []byte("this is value"))) - } - } - - p.syncEventAndRegistrations() - - // Sanity check that subscription was not dropped. - if p.reg.Len() == 0 { - require.NoError(b, waitErrorFuture(&r1Done)) - } -} - // TestSizeOfEvent tests the size of the event struct. It is fine if this struct // changes in size, as long as this is done consciously. func TestSizeOfEvent(t *testing.T) { diff --git a/pkg/kv/kvserver/replica_rangefeed_test.go b/pkg/kv/kvserver/replica_rangefeed_test.go index 0c85ec4016e2..41724a7d5e7e 100644 --- a/pkg/kv/kvserver/replica_rangefeed_test.go +++ b/pkg/kv/kvserver/replica_rangefeed_test.go @@ -1116,7 +1116,7 @@ func TestReplicaRangefeedPushesTransactions(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - tc, db, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") + tc, db, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") defer tc.Stopper().Stop(ctx) repls := replsForRange(ctx, t, tc, desc)