Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
107525: rangefeed: add `BenchmarkRangefeed` r=erikgrinaker a=erikgrinaker

Resolves #107440.
Epic: none
Release note: None

107533: kvserver: improve TestClosedTimestampFrozenAfterSubsumption r=erikgrinaker a=tbg

Adopt setupClusterForClosedTSTesting which wraps setupClusterWithDummyRange
with goodies such as #107531.

Fixes #107179[^1]

[^1]: #107179 (comment)

Epic: None
Release note: None


Co-authored-by: Erik Grinaker <[email protected]>
Co-authored-by: Tobias Grieger <[email protected]>
  • Loading branch information
3 people committed Jul 26, 2023
3 parents 8c52fea + 11f8799 + 208a7eb commit 9db3a0e
Show file tree
Hide file tree
Showing 6 changed files with 232 additions and 73 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func TestRangefeedIsRoutedToNonVoter(t *testing.T) {
clusterArgs.ReplicationMode = base.ReplicationManual
// NB: setupClusterForClosedTSTesting sets a low closed timestamp target
// duration.
tc, _, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, clusterArgs, "cttest", "kv")
tc, _, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, clusterArgs, "cttest", "kv")
defer tc.Stopper().Stop(ctx)
tc.AddNonVotersOrFatal(t, desc.StartKey.AsRawKey(), tc.Target(1))

Expand Down
28 changes: 15 additions & 13 deletions pkg/kv/kvserver/closed_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestClosedTimestampCanServe(t *testing.T) {
// Disable the replicateQueue so that it doesn't interfere with replica
// membership ranges.
clusterArgs.ReplicationMode = base.ReplicationManual
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, clusterArgs, dbName, tableName)
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, clusterArgs, dbName, tableName)
defer tc.Stopper().Stop(ctx)

if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil {
Expand Down Expand Up @@ -156,7 +156,7 @@ func TestClosedTimestampCanServeOnVoterIncoming(t *testing.T) {
clusterArgs.ReplicationMode = base.ReplicationManual
knobs, ltk := makeReplicationTestKnobs()
clusterArgs.ServerArgs.Knobs = knobs
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, clusterArgs, dbName, tableName)
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, clusterArgs, dbName, tableName)
defer tc.Stopper().Stop(ctx)

if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil {
Expand Down Expand Up @@ -192,7 +192,7 @@ func TestClosedTimestampCanServeThroughoutLeaseTransfer(t *testing.T) {
skip.UnderRace(t)

ctx := context.Background()
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
defer tc.Stopper().Stop(ctx)
repls := replsForRange(ctx, t, tc, desc)

Expand Down Expand Up @@ -270,7 +270,7 @@ func TestClosedTimestampCantServeWithConflictingIntent(t *testing.T) {
defer txnwait.TestingOverrideTxnLivenessThreshold(time.Hour)()

ctx := context.Background()
tc, _, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
tc, _, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
defer tc.Stopper().Stop(ctx)
repls := replsForRange(ctx, t, tc, desc)
ds := tc.Server(0).DistSenderI().(*kvcoord.DistSender)
Expand Down Expand Up @@ -377,7 +377,7 @@ func TestClosedTimestampCanServeAfterSplitAndMerges(t *testing.T) {
skip.UnderRace(t)

ctx := context.Background()
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
repls := replsForRange(ctx, t, tc, desc)
// Disable the automatic merging.
if _, err := db0.Exec("SET CLUSTER SETTING kv.range_merge.queue_enabled = false"); err != nil {
Expand Down Expand Up @@ -457,7 +457,7 @@ func TestClosedTimestampCantServeBasedOnUncertaintyLimit(t *testing.T) {
ctx := context.Background()
// Set up the target duration to be very long and rely on lease transfers to
// drive MaxClosed.
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
defer tc.Stopper().Stop(ctx)
repls := replsForRange(ctx, t, tc, desc)

Expand Down Expand Up @@ -490,7 +490,7 @@ func TestClosedTimestampCanServeForWritingTransaction(t *testing.T) {
skip.UnderRace(t)

ctx := context.Background()
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
defer tc.Stopper().Stop(ctx)
repls := replsForRange(ctx, t, tc, desc)

Expand Down Expand Up @@ -537,7 +537,7 @@ func TestClosedTimestampCantServeForNonTransactionalReadRequest(t *testing.T) {
skip.UnderRace(t)

ctx := context.Background()
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
defer tc.Stopper().Stop(ctx)
repls := replsForRange(ctx, t, tc, desc)

Expand Down Expand Up @@ -579,7 +579,7 @@ func TestClosedTimestampCantServeForNonTransactionalBatch(t *testing.T) {

testutils.RunTrueAndFalse(t, "tsFromServer", func(t *testing.T, tsFromServer bool) {
ctx := context.Background()
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
defer tc.Stopper().Stop(ctx)
repls := replsForRange(ctx, t, tc, desc)

Expand Down Expand Up @@ -722,8 +722,7 @@ func TestClosedTimestampFrozenAfterSubsumption(t *testing.T) {
// Set up the closed timestamp timing such that, when we block a merge and
// transfer the RHS lease, the closed timestamp advances over the LHS
// lease but not over the RHS lease.
const numNodes = 3
tc, _ := setupTestClusterWithDummyRange(t, clusterArgs, "cttest" /* dbName */, "kv" /* tableName */, numNodes)
tc, _, _ := setupClusterForClosedTSTesting(ctx, t, 5*time.Second, 100*time.Millisecond, clusterArgs, "cttest", "kv")
defer tc.Stopper().Stop(ctx)
sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0))
sqlDB.ExecMultiple(t, strings.Split(fmt.Sprintf(`
Expand Down Expand Up @@ -1195,19 +1194,22 @@ func aggressiveResolvedTimestampPushKnobs() *kvserver.StoreTestingKnobs {
func setupClusterForClosedTSTesting(
ctx context.Context,
t *testing.T,
targetDuration time.Duration,
targetDuration, sideTransportInterval time.Duration,
clusterArgs base.TestClusterArgs,
dbName, tableName string,
) (tc serverutils.TestClusterInterface, db0 *gosql.DB, kvTableDesc roachpb.RangeDescriptor) {
const numNodes = 3
if sideTransportInterval == 0 {
sideTransportInterval = targetDuration / 4
}
tc, desc := setupTestClusterWithDummyRange(t, clusterArgs, dbName, tableName, numNodes)
sqlRunner := sqlutils.MakeSQLRunner(tc.ServerConn(0))
sqlRunner.ExecMultiple(t, strings.Split(fmt.Sprintf(`
SET CLUSTER SETTING kv.closed_timestamp.target_duration = '%s';
SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '%s';
SET CLUSTER SETTING kv.closed_timestamp.follower_reads_enabled = true;
SET CLUSTER SETTING kv.allocator.load_based_rebalancing = 'off';
`, targetDuration, targetDuration/4),
`, targetDuration, sideTransportInterval),
";")...)

// Disable replicate queues to avoid errant lease transfers.
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/rangefeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ go_test(
name = "rangefeed_test",
size = "large",
srcs = [
"bench_test.go",
"budget_test.go",
"catchup_scan_bench_test.go",
"catchup_scan_test.go",
Expand Down
214 changes: 214 additions & 0 deletions pkg/kv/kvserver/rangefeed/bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package rangefeed

import (
"context"
"encoding/binary"
"fmt"
"math"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/future"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/stretchr/testify/require"
)

type benchmarkRangefeedOpts struct {
opType opType
numRegistrations int
budget int64
}

type opType string

const (
writeOpType opType = "write" // individual 1PC writes
commitOpType opType = "commit" // intent + commit writes, 1 per txn
closedTSOpType opType = "closedts"
)

// BenchmarkRangefeed benchmarks the processor and registrations, by submitting
// a set of events and waiting until they are all emitted.
func BenchmarkRangefeed(b *testing.B) {
for _, opType := range []opType{writeOpType, commitOpType, closedTSOpType} {
for _, numRegistrations := range []int{1, 10, 100} {
name := fmt.Sprintf("opType=%s/numRegs=%d", opType, numRegistrations)
b.Run(name, func(b *testing.B) {
runBenchmarkRangefeed(b, benchmarkRangefeedOpts{
opType: opType,
numRegistrations: numRegistrations,
budget: math.MaxInt64,
})
})
}
}
}

// BenchmarkRangefeedBudget benchmarks the effect of enabling/disabling the
// processor budget. It sets up a single processor and registration, and
// processes a set of events.
func BenchmarkRangefeedBudget(b *testing.B) {
for _, budget := range []bool{false, true} {
b.Run(fmt.Sprintf("budget=%t", budget), func(b *testing.B) {
var budgetSize int64
if budget {
budgetSize = math.MaxInt64
}
runBenchmarkRangefeed(b, benchmarkRangefeedOpts{
opType: writeOpType,
numRegistrations: 1,
budget: budgetSize,
})
})
}
}

// runBenchmarkRangefeed runs a rangefeed benchmark, emitting b.N events across
// a rangefeed.
func runBenchmarkRangefeed(b *testing.B, opts benchmarkRangefeedOpts) {
defer log.Scope(b).Close(b)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

var budget *FeedBudget
if opts.budget > 0 {
budget = newTestBudget(opts.budget)
}
span := roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")}

// Set up processor.
p := NewProcessor(Config{
AmbientContext: log.MakeTestingAmbientContext(nil),
Clock: hlc.NewClockForTesting(nil),
Metrics: NewMetrics(),
Span: span,
MemBudget: budget,
EventChanCap: b.N,
EventChanTimeout: time.Hour,
})
require.NoError(b, p.Start(stopper, nil))

// Add registrations.
streams := make([]*noopStream, opts.numRegistrations)
futures := make([]*future.ErrorFuture, opts.numRegistrations)
for i := 0; i < opts.numRegistrations; i++ {
// withDiff does not matter for these benchmarks, since the previous value
// is fetched and populated during Raft application.
const withDiff = false
streams[i] = &noopStream{ctx: ctx}
futures[i] = &future.ErrorFuture{}
ok, _ := p.Register(span, hlc.MinTimestamp, nil, withDiff, streams[i], nil, futures[i])
require.True(b, ok)
}

// Construct b.N events beforehand -- we don't want to measure this cost.
var (
logicalOps []enginepb.MVCCLogicalOp
closedTimestamps []hlc.Timestamp
prefix = roachpb.Key("key-")
value = roachpb.MakeValueFromString("a few bytes of data").RawBytes
)
switch opts.opType {
case writeOpType:
logicalOps = make([]enginepb.MVCCLogicalOp, 0, b.N)
for i := 0; i < b.N; i++ {
key := append(prefix, make([]byte, 4)...)
binary.BigEndian.PutUint32(key[len(prefix):], uint32(i))
ts := hlc.Timestamp{WallTime: int64(i + 1)}
logicalOps = append(logicalOps, makeLogicalOp(&enginepb.MVCCWriteValueOp{
Key: key,
Timestamp: ts,
Value: value,
}))
}

case commitOpType:
logicalOps = make([]enginepb.MVCCLogicalOp, 2*b.N)
// Write all intents first, then all commits. Txns are tracked in an
// internal heap, and we want to cover some of this cost, even though we
// write and commit them incrementally.
for i := 0; i < b.N; i++ {
var txnID uuid.UUID
txnID.DeterministicV4(uint64(i), uint64(b.N))
key := append(prefix, make([]byte, 4)...)
binary.BigEndian.PutUint32(key[len(prefix):], uint32(i))
ts := hlc.Timestamp{WallTime: int64(i + 1)}
logicalOps[i] = writeIntentOpWithKey(txnID, key, isolation.Serializable, ts)
logicalOps[b.N+i] = commitIntentOpWithKV(txnID, key, ts, value)
}

case closedTSOpType:
closedTimestamps = make([]hlc.Timestamp, 0, b.N)
for i := 0; i < b.N; i++ {
closedTimestamps = append(closedTimestamps, hlc.Timestamp{WallTime: int64(i + 1)})
}

default:
b.Fatalf("unknown operation type %q", opts.opType)
}

// Wait for catchup scans and flush checkpoint events.
p.syncEventAndRegistrations()

// Run the benchmark. We accounted for b.N when constructing events.
b.ResetTimer()

for _, logicalOp := range logicalOps {
if !p.ConsumeLogicalOps(ctx, logicalOp) {
b.Fatal("failed to submit logical operation")
}
}
for _, closedTS := range closedTimestamps {
if !p.ForwardClosedTS(ctx, closedTS) {
b.Fatal("failed to forward closed timestamp")
}
}
p.syncEventAndRegistrations()

// Check that all registrations ended successfully, and emitted the expected
// number of events.
b.StopTimer()
p.Stop()

for i, f := range futures {
regErr, err := future.Wait(ctx, f)
require.NoError(b, err)
require.NoError(b, regErr)
require.Equal(b, b.N, streams[i].events-1) // ignore checkpoint after catchup
}
}

// noopStream is a stream that does nothing, except count events.
type noopStream struct {
ctx context.Context
events int
}

func (s *noopStream) Context() context.Context {
return s.ctx
}

func (s *noopStream) Send(*kvpb.RangeFeedEvent) error {
s.events++
return nil
}
Loading

0 comments on commit 9db3a0e

Please sign in to comment.