Skip to content

Commit

Permalink
Merge pull request #107688 from erikgrinaker/backport23.1-107525
Browse files Browse the repository at this point in the history
release-23.1: rangefeed: add `BenchmarkRangefeed`
  • Loading branch information
erikgrinaker authored Aug 8, 2023
2 parents 66f4979 + 9557bb3 commit 5a0d329
Show file tree
Hide file tree
Showing 3 changed files with 212 additions and 59 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/rangefeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ go_test(
name = "rangefeed_test",
size = "large",
srcs = [
"bench_test.go",
"budget_test.go",
"catchup_scan_bench_test.go",
"catchup_scan_test.go",
Expand Down
211 changes: 211 additions & 0 deletions pkg/kv/kvserver/rangefeed/bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package rangefeed

import (
"context"
"encoding/binary"
"fmt"
"math"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/future"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/stretchr/testify/require"
)

type benchmarkRangefeedOpts struct {
opType opType
numRegistrations int
budget int64
}

type opType string

const (
writeOpType opType = "write" // individual 1PC writes
commitOpType opType = "commit" // intent + commit writes, 1 per txn
closedTSOpType opType = "closedts"
)

// BenchmarkRangefeed benchmarks the processor and registrations, by submitting
// a set of events and waiting until they are all emitted.
func BenchmarkRangefeed(b *testing.B) {
for _, opType := range []opType{writeOpType, commitOpType, closedTSOpType} {
for _, numRegistrations := range []int{1, 10, 100} {
name := fmt.Sprintf("opType=%s/numRegs=%d", opType, numRegistrations)
b.Run(name, func(b *testing.B) {
runBenchmarkRangefeed(b, benchmarkRangefeedOpts{
opType: opType,
numRegistrations: numRegistrations,
budget: math.MaxInt64,
})
})
}
}
}

// BenchmarkRangefeedBudget benchmarks the effect of enabling/disabling the
// processor budget. It sets up a single processor and registration, and
// processes a set of events.
func BenchmarkRangefeedBudget(b *testing.B) {
for _, budget := range []bool{false, true} {
b.Run(fmt.Sprintf("budget=%t", budget), func(b *testing.B) {
var budgetSize int64
if budget {
budgetSize = math.MaxInt64
}
runBenchmarkRangefeed(b, benchmarkRangefeedOpts{
opType: writeOpType,
numRegistrations: 1,
budget: budgetSize,
})
})
}
}

// runBenchmarkRangefeed runs a rangefeed benchmark, emitting b.N events across
// a rangefeed.
func runBenchmarkRangefeed(b *testing.B, opts benchmarkRangefeedOpts) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

var budget *FeedBudget
if opts.budget > 0 {
budget = newTestBudget(opts.budget)
}
span := roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")}

// Set up processor.
p := NewProcessor(Config{
AmbientContext: log.MakeTestingAmbientContext(nil),
Clock: hlc.NewClockForTesting(nil),
Metrics: NewMetrics(),
Span: span,
MemBudget: budget,
EventChanCap: b.N,
EventChanTimeout: time.Hour,
})
require.NoError(b, p.Start(stopper, nil))

// Add registrations.
streams := make([]*noopStream, opts.numRegistrations)
futures := make([]*future.ErrorFuture, opts.numRegistrations)
for i := 0; i < opts.numRegistrations; i++ {
// withDiff does not matter for these benchmarks, since the previous value
// is fetched and populated during Raft application.
const withDiff = false
streams[i] = &noopStream{ctx: ctx}
futures[i] = &future.ErrorFuture{}
ok, _ := p.Register(span, hlc.MinTimestamp, nil, withDiff, streams[i], nil, futures[i])
require.True(b, ok)
}

// Construct b.N events beforehand -- we don't want to measure this cost.
var (
logicalOps []enginepb.MVCCLogicalOp
closedTimestamps []hlc.Timestamp
prefix = roachpb.Key("key-")
value = roachpb.MakeValueFromString("a few bytes of data").RawBytes
)
switch opts.opType {
case writeOpType:
logicalOps = make([]enginepb.MVCCLogicalOp, 0, b.N)
for i := 0; i < b.N; i++ {
key := append(prefix, make([]byte, 4)...)
binary.BigEndian.PutUint32(key[len(prefix):], uint32(i))
ts := hlc.Timestamp{WallTime: int64(i + 1)}
logicalOps = append(logicalOps, makeLogicalOp(&enginepb.MVCCWriteValueOp{
Key: key,
Timestamp: ts,
Value: value,
}))
}

case commitOpType:
logicalOps = make([]enginepb.MVCCLogicalOp, 2*b.N)
// Write all intents first, then all commits. Txns are tracked in an
// internal heap, and we want to cover some of this cost, even though we
// write and commit them incrementally.
for i := 0; i < b.N; i++ {
var txnID uuid.UUID
txnID.DeterministicV4(uint64(i), uint64(b.N))
key := append(prefix, make([]byte, 4)...)
binary.BigEndian.PutUint32(key[len(prefix):], uint32(i))
ts := hlc.Timestamp{WallTime: int64(i + 1)}
logicalOps[i] = writeIntentOpWithKey(txnID, key, ts)
logicalOps[b.N+i] = commitIntentOpWithKV(txnID, key, ts, value)
}

case closedTSOpType:
closedTimestamps = make([]hlc.Timestamp, 0, b.N)
for i := 0; i < b.N; i++ {
closedTimestamps = append(closedTimestamps, hlc.Timestamp{WallTime: int64(i + 1)})
}

default:
b.Fatalf("unknown operation type %q", opts.opType)
}

// Wait for catchup scans and flush checkpoint events.
p.syncEventAndRegistrations()

// Run the benchmark. We accounted for b.N when constructing events.
b.ResetTimer()

for _, logicalOp := range logicalOps {
if !p.ConsumeLogicalOps(ctx, logicalOp) {
b.Fatal("failed to submit logical operation")
}
}
for _, closedTS := range closedTimestamps {
if !p.ForwardClosedTS(ctx, closedTS) {
b.Fatal("failed to forward closed timestamp")
}
}
p.syncEventAndRegistrations()

// Check that all registrations ended successfully, and emitted the expected
// number of events.
b.StopTimer()
p.Stop()

for i, f := range futures {
regErr, err := future.Wait(ctx, f)
require.NoError(b, err)
require.NoError(b, regErr)
require.Equal(b, b.N, streams[i].events-1) // ignore checkpoint after catchup
}
}

// noopStream is a stream that does nothing, except count events.
type noopStream struct {
ctx context.Context
events int
}

func (s *noopStream) Context() context.Context {
return s.ctx
}

func (s *noopStream) Send(*kvpb.RangeFeedEvent) error {
s.events++
return nil
}
59 changes: 0 additions & 59 deletions pkg/kv/kvserver/rangefeed/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1453,65 +1453,6 @@ func (c *consumer) Consumed() int {
return int(atomic.LoadInt32(&c.sentValues))
}

func BenchmarkProcessorWithBudget(b *testing.B) {
benchmarkEvents := 1

var budget *FeedBudget
if false {
budget = newTestBudget(math.MaxInt64)
}

stopper := stop.NewStopper()
var pushTxnInterval, pushTxnAge time.Duration = 0, 0 // disable
p := NewProcessor(Config{
AmbientContext: log.MakeTestingAmbientCtxWithNewTracer(),
Clock: hlc.NewClockForTesting(nil),
Span: roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")},
PushTxnsInterval: pushTxnInterval,
PushTxnsAge: pushTxnAge,
EventChanCap: benchmarkEvents * b.N,
CheckStreamsInterval: 10 * time.Millisecond,
Metrics: NewMetrics(),
MemBudget: budget,
EventChanTimeout: time.Minute,
})
require.NoError(b, p.Start(stopper, nil))
ctx := context.Background()
defer stopper.Stop(ctx)

// Add a registration.
r1Stream := newTestStream()

var r1Done future.ErrorFuture
_, _ = p.Register(
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")},
hlc.Timestamp{WallTime: 1},
nil, /* catchUpIter */
false, /* withDiff */
r1Stream,
func() {},
&r1Done,
)
p.syncEventAndRegistrations()

b.ResetTimer()
for bi := 0; bi < b.N; bi++ {
for i := 0; i < benchmarkEvents; i++ {
p.ConsumeLogicalOps(ctx, writeValueOpWithKV(
roachpb.Key("k"),
hlc.Timestamp{WallTime: int64(bi*benchmarkEvents + i + 2)},
[]byte("this is value")))
}
}

p.syncEventAndRegistrations()

// Sanity check that subscription was not dropped.
if p.reg.Len() == 0 {
require.NoError(b, waitErrorFuture(&r1Done))
}
}

// TestSizeOfEvent tests the size of the event struct. It is fine if this struct
// changes in size, as long as this is done consciously.
func TestSizeOfEvent(t *testing.T) {
Expand Down

0 comments on commit 5a0d329

Please sign in to comment.