Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
110602: roachtest: add c2c/weekly/kv50 roachtest r=adityamaru a=msbutler

This patch adds a new weekly c2c roachtest that tests our 23.2
performance requirements under a given cluster configuraiton.

The workload:
- kv0, 2TB initial scan, 50% writes, 2000 max qps, insert row size uniformly
  between 1 and 4096 bytes.

The cluster specs:
- 8 nodes in each src and dest clusters, 8 vcpus, pdSize 1 TB

Release note: none

Epic: none

110959: rangefeed: fix kv.rangefeed.registrations metric r=erikgrinaker a=aliher1911

Previously when rangefeed processor was stopped by replica, registrations metric was not correctly decreased leading to metric creep on changefeed restarts or range splits. This commit fixes metric decrease for such cases.

Epic: none
Fixes: #106126

Release note: None

Co-authored-by: Michael Butler <[email protected]>
Co-authored-by: Oleg Afanasyev <[email protected]>
  • Loading branch information
3 people committed Sep 22, 2023
3 parents 6a2097e + f330f6f + cdafd14 commit ab5fb87
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 26 deletions.
96 changes: 81 additions & 15 deletions pkg/cmd/roachtest/tests/cluster_to_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,30 @@ type replicateKV struct {
// the number of rows inserted into the cluster before c2c begins
initRows int

// max size of raw data written during each insertion
// initWithSplitAndScatter splits and scatters the kv table after
// initialization. This should be set for tests that init little or no data.
//
// The kv workload can run split and scatter after the insertion step. After
// inserting a lot of kv table data (think multiple gb), however, kv has had
// enough time to split and scatter that table across the cluster, so there
// isn't a need to run an expensive scatter query.
//
// Initing a couple mb of kv table as other c2c tests do, however, does not
// lead kv to naturally split and scatter the table by the time the c2c job
// begins, which will lead the job to create a non-distributed initial dist
// sql plan because the source side dsp.PartitionSpans may only assign one
// node pair to stream all of the data.
initWithSplitAndScatter bool

// maxBlockBytes indicates the maximum size of the kv payload value written
// during each insertion. The kv workload will randomly choose a value in the
// interval [1,maxBlockSize] with equal probability, i.e. via
// x~Uniform[1,maxBlockSize].
maxBlockBytes int

// maxQPS caps the queries per second sent to the source cluster after initialization.
maxQPS int

// partitionKVDatabaseInRegion constrains the kv database in the specified
// region and asserts, before cutover, that the replicated span configuration
// correctly enforces the regional constraint in the destination tenant.
Expand All @@ -260,9 +281,11 @@ type replicateKV struct {
func (kv replicateKV) sourceInitCmd(tenantName string, nodes option.NodeListOption) string {
cmd := roachtestutil.NewCommand(`./workload init kv`).
MaybeFlag(kv.initRows > 0, "insert-count", kv.initRows).
// Only set the max block byte values for the init command if we
// actually need to insert rows.
MaybeFlag(kv.initRows > 0, "max-block-bytes", kv.maxBlockBytes).
Flag("splits", 100).
Option("scatter").
MaybeFlag(kv.initWithSplitAndScatter, "splits", 100).
MaybeOption(kv.initWithSplitAndScatter, "scatter").
Arg("{pgurl%s:%s}", nodes, tenantName)
return cmd.String()
}
Expand All @@ -273,6 +296,7 @@ func (kv replicateKV) sourceRunCmd(tenantName string, nodes option.NodeListOptio
Flag("max-block-bytes", kv.maxBlockBytes).
Flag("read-percent", kv.readPercent).
MaybeFlag(kv.debugRunDuration > 0, "duration", kv.debugRunDuration).
MaybeFlag(kv.maxQPS > 0, "max-rate", kv.maxQPS).
Arg("{pgurl%s:%s}", nodes, tenantName)
return cmd.String()
}
Expand Down Expand Up @@ -394,6 +418,11 @@ type replicationSpec struct {
// maxLatency override the maxAcceptedLatencyDefault.
maxAcceptedLatency time.Duration

// TODO(msbutler): this knob only exists because the revision history
// fingerprint can encounter a gc ttl error for large fingerprints. Delete
// this knob once we lay a pts during fingerprinting.
nonRevisionHistoryFingerprint bool

// skipNodeDistributionCheck removes the requirement that multiple source and
// destination nodes must participate in the replication stream. This should
// be set if the roachtest runs on single node clusters or if the
Expand Down Expand Up @@ -707,6 +736,13 @@ SELECT *
FROM crdb_internal.fingerprint(crdb_internal.tenant_span($1::INT), '%s'::DECIMAL, true)
AS OF SYSTEM TIME '%s'`, startTime.AsOfSystemTime(), endTime.AsOfSystemTime())

if rd.rs.nonRevisionHistoryFingerprint {
fingerprintQuery = fmt.Sprintf(`
SELECT *
FROM crdb_internal.fingerprint(crdb_internal.tenant_span($1::INT), 0::DECIMAL, false)
AS OF SYSTEM TIME '%s'`, endTime.AsOfSystemTime())
}

var srcFingerprint int64
fingerPrintMonitor := rd.newMonitor(ctx)
fingerPrintMonitor.Go(func(ctx context.Context) error {
Expand Down Expand Up @@ -982,7 +1018,7 @@ func runAcceptanceClusterReplication(ctx context.Context, t test.Test, c cluster
dstNodes: 1,
// The timeout field ensures the c2c roachtest driver behaves properly.
timeout: 10 * time.Minute,
workload: replicateKV{readPercent: 0, debugRunDuration: 1 * time.Minute, maxBlockBytes: 1},
workload: replicateKV{readPercent: 0, debugRunDuration: 1 * time.Minute, maxBlockBytes: 1, initWithSplitAndScatter: true},
additionalDuration: 0 * time.Minute,
cutover: 30 * time.Second,
skipNodeDistributionCheck: true,
Expand Down Expand Up @@ -1035,13 +1071,17 @@ func registerClusterToCluster(r registry.Registry) {
cutover: 30 * time.Minute,
},
{
name: "c2c/kv0",
benchmark: true,
srcNodes: 3,
dstNodes: 3,
cpus: 8,
pdSize: 100,
workload: replicateKV{readPercent: 0, maxBlockBytes: 1024},
name: "c2c/kv0",
benchmark: true,
srcNodes: 3,
dstNodes: 3,
cpus: 8,
pdSize: 100,
workload: replicateKV{
readPercent: 0,
maxBlockBytes: 1024,
initWithSplitAndScatter: true,
},
timeout: 1 * time.Hour,
additionalDuration: 10 * time.Minute,
cutover: 5 * time.Minute,
Expand All @@ -1064,6 +1104,28 @@ func registerClusterToCluster(r registry.Registry) {
additionalDuration: 1 * time.Minute,
cutover: 0,
},
{
// Large workload to test our 23.2 perf goals.
name: "c2c/weekly/kv50",
benchmark: true,
srcNodes: 8,
dstNodes: 8,
cpus: 8,
pdSize: 1000,
nonRevisionHistoryFingerprint: true,

workload: replicateKV{
// Write a ~2TB initial scan.
initRows: 350000000,
readPercent: 50,
maxBlockBytes: 4096,
maxQPS: 2000,
},
timeout: 12 * time.Hour,
additionalDuration: 2 * time.Hour,
cutover: 0,
tags: registry.Tags("weekly", "aws-weekly"),
},
{
name: "c2c/MultiRegion/SameRegions/kv0",
benchmark: true,
Expand All @@ -1074,6 +1136,7 @@ func registerClusterToCluster(r registry.Registry) {
workload: replicateKV{
readPercent: 0,
maxBlockBytes: 1024,
initWithSplitAndScatter: true,
partitionKVDatabaseInRegion: "us-west1",
antiRegion: "us-central1",
},
Expand All @@ -1093,8 +1156,11 @@ func registerClusterToCluster(r registry.Registry) {
dstNodes: 1,
cpus: 4,
pdSize: 10,
workload: replicateKV{readPercent: 0, debugRunDuration: 1 * time.Minute,
maxBlockBytes: 1024},
workload: replicateKV{
readPercent: 0,
debugRunDuration: 1 * time.Minute,
initWithSplitAndScatter: true,
maxBlockBytes: 1024},
timeout: 5 * time.Minute,
additionalDuration: 0 * time.Minute,
cutover: 30 * time.Second,
Expand Down Expand Up @@ -1394,7 +1460,7 @@ func registerClusterReplicationResilience(r registry.Registry) {
srcNodes: 4,
dstNodes: 4,
cpus: 8,
workload: replicateKV{readPercent: 0, initRows: 5000000, maxBlockBytes: 1024},
workload: replicateKV{readPercent: 0, initRows: 5000000, maxBlockBytes: 1024, initWithSplitAndScatter: true},
timeout: 20 * time.Minute,
additionalDuration: 6 * time.Minute,
cutover: 3 * time.Minute,
Expand Down Expand Up @@ -1507,7 +1573,7 @@ func registerClusterReplicationDisconnect(r registry.Registry) {
srcNodes: 3,
dstNodes: 3,
cpus: 4,
workload: replicateKV{readPercent: 0, initRows: 1000000, maxBlockBytes: 1024},
workload: replicateKV{readPercent: 0, initRows: 1000000, maxBlockBytes: 1024, initWithSplitAndScatter: true},
timeout: 20 * time.Minute,
additionalDuration: 10 * time.Minute,
cutover: 2 * time.Minute,
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,13 +511,13 @@ func (p *LegacyProcessor) run(

// Close registrations and exit when signaled.
case pErr := <-p.stopC:
p.reg.DisconnectWithErr(all, pErr)
p.reg.DisconnectAllOnShutdown(pErr)
return

// Exit on stopper.
case <-stopper.ShouldQuiesce():
pErr := kvpb.NewError(&kvpb.NodeUnavailableError{})
p.reg.DisconnectWithErr(all, pErr)
p.reg.DisconnectAllOnShutdown(pErr)
return
}
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/kv/kvserver/rangefeed/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,18 @@ func (reg *registry) Unregister(ctx context.Context, r *registration) {
r.drainAllocations(ctx)
}

// DisconnectAllOnShutdown disconnectes all registrations on processor shutdown.
// This is different from normal disconnect as registrations won't be able to
// perform Unregister when processor's work loop is already terminated.
// This method will cleanup metrics controlled by registry itself beside posting
// errors to registrations.
// TODO: this should be revisited as part of
// https://github.com/cockroachdb/cockroach/issues/110634
func (reg *registry) DisconnectAllOnShutdown(pErr *kvpb.Error) {
reg.metrics.RangeFeedRegistrations.Dec(int64(reg.tree.Len()))
reg.DisconnectWithErr(all, pErr)
}

// Disconnect disconnects all registrations that overlap the specified span with
// a nil error.
func (reg *registry) Disconnect(span roachpb.Span) {
Expand Down
20 changes: 20 additions & 0 deletions pkg/kv/kvserver/rangefeed/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,3 +591,23 @@ func TestRegistrationString(t *testing.T) {
require.Equal(t, tc.exp, tc.r.String())
}
}

// TestRegistryShutdown test verifies that when we shutdown registry with
// existing registration, registration won't try to update any metrics
// implicitly.
func TestRegistryShutdownMetrics(t *testing.T) {
defer leaktest.AfterTest(t)()
reg := makeRegistry(NewMetrics())

regDoneC := make(chan interface{})
r := newTestRegistration(spAB, hlc.Timestamp{WallTime: 10}, nil, false)
go func() {
r.runOutputLoop(context.Background(), 0)
close(regDoneC)
}()
reg.Register(&r.registration)

reg.DisconnectAllOnShutdown(nil)
<-regDoneC
require.Zero(t, reg.metrics.RangeFeedRegistrations.Value(), "metric is not zero on stop")
}
18 changes: 9 additions & 9 deletions pkg/kv/kvserver/rangefeed/scheduled_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,16 +218,16 @@ func (p *ScheduledProcessor) processStop() {
}

func (p *ScheduledProcessor) cleanup() {
// Cleanup is called when all registrations were already disconnected prior to
// triggering processor stop (or before we accepted first registration if we
// failed to start).
// However, we want some defence in depth if lifecycle bug will allow shutdown
// before registrations are disconnected and drained. To handle that we will
// perform disconnect so that registrations have a chance to stop their work
// loop and terminate. This would at least trigger a warning that we are using
// memory budget already released by processor.
// Cleanup is normally called when all registrations are disconnected and
// unregistered or were not created yet (processor start failure).
// However, there's a case where processor is stopped by replica action while
// registrations are still active. In that case registrations won't have a
// chance to unregister themselves after their work loop terminates because
// processor is already disconnected from scheduler.
// To avoid leaking any registry resources and metrics, processor performs
// explicit registry termination in that case.
pErr := kvpb.NewError(&kvpb.NodeUnavailableError{})
p.reg.DisconnectWithErr(all, pErr)
p.reg.DisconnectAllOnShutdown(pErr)

// Unregister callback from scheduler
p.scheduler.Unregister()
Expand Down

0 comments on commit ab5fb87

Please sign in to comment.