diff --git a/pkg/cmd/roachtest/tests/cluster_to_cluster.go b/pkg/cmd/roachtest/tests/cluster_to_cluster.go index 6d29fe4d63f9..edc892c11cc2 100644 --- a/pkg/cmd/roachtest/tests/cluster_to_cluster.go +++ b/pkg/cmd/roachtest/tests/cluster_to_cluster.go @@ -244,9 +244,30 @@ type replicateKV struct { // the number of rows inserted into the cluster before c2c begins initRows int - // max size of raw data written during each insertion + // initWithSplitAndScatter splits and scatters the kv table after + // initialization. This should be set for tests that init little or no data. + // + // The kv workload can run split and scatter after the insertion step. After + // inserting a lot of kv table data (think multiple gb), however, kv has had + // enough time to split and scatter that table across the cluster, so there + // isn't a need to run an expensive scatter query. + // + // Initing a couple mb of kv table as other c2c tests do, however, does not + // lead kv to naturally split and scatter the table by the time the c2c job + // begins, which will lead the job to create a non-distributed initial dist + // sql plan because the source side dsp.PartitionSpans may only assign one + // node pair to stream all of the data. + initWithSplitAndScatter bool + + // maxBlockBytes indicates the maximum size of the kv payload value written + // during each insertion. The kv workload will randomly choose a value in the + // interval [1,maxBlockSize] with equal probability, i.e. via + // x~Uniform[1,maxBlockSize]. maxBlockBytes int + // maxQPS caps the queries per second sent to the source cluster after initialization. + maxQPS int + // partitionKVDatabaseInRegion constrains the kv database in the specified // region and asserts, before cutover, that the replicated span configuration // correctly enforces the regional constraint in the destination tenant. @@ -260,9 +281,11 @@ type replicateKV struct { func (kv replicateKV) sourceInitCmd(tenantName string, nodes option.NodeListOption) string { cmd := roachtestutil.NewCommand(`./workload init kv`). MaybeFlag(kv.initRows > 0, "insert-count", kv.initRows). + // Only set the max block byte values for the init command if we + // actually need to insert rows. MaybeFlag(kv.initRows > 0, "max-block-bytes", kv.maxBlockBytes). - Flag("splits", 100). - Option("scatter"). + MaybeFlag(kv.initWithSplitAndScatter, "splits", 100). + MaybeOption(kv.initWithSplitAndScatter, "scatter"). Arg("{pgurl%s:%s}", nodes, tenantName) return cmd.String() } @@ -273,6 +296,7 @@ func (kv replicateKV) sourceRunCmd(tenantName string, nodes option.NodeListOptio Flag("max-block-bytes", kv.maxBlockBytes). Flag("read-percent", kv.readPercent). MaybeFlag(kv.debugRunDuration > 0, "duration", kv.debugRunDuration). + MaybeFlag(kv.maxQPS > 0, "max-rate", kv.maxQPS). Arg("{pgurl%s:%s}", nodes, tenantName) return cmd.String() } @@ -394,6 +418,11 @@ type replicationSpec struct { // maxLatency override the maxAcceptedLatencyDefault. maxAcceptedLatency time.Duration + // TODO(msbutler): this knob only exists because the revision history + // fingerprint can encounter a gc ttl error for large fingerprints. Delete + // this knob once we lay a pts during fingerprinting. + nonRevisionHistoryFingerprint bool + // skipNodeDistributionCheck removes the requirement that multiple source and // destination nodes must participate in the replication stream. This should // be set if the roachtest runs on single node clusters or if the @@ -707,6 +736,13 @@ SELECT * FROM crdb_internal.fingerprint(crdb_internal.tenant_span($1::INT), '%s'::DECIMAL, true) AS OF SYSTEM TIME '%s'`, startTime.AsOfSystemTime(), endTime.AsOfSystemTime()) + if rd.rs.nonRevisionHistoryFingerprint { + fingerprintQuery = fmt.Sprintf(` +SELECT * +FROM crdb_internal.fingerprint(crdb_internal.tenant_span($1::INT), 0::DECIMAL, false) +AS OF SYSTEM TIME '%s'`, endTime.AsOfSystemTime()) + } + var srcFingerprint int64 fingerPrintMonitor := rd.newMonitor(ctx) fingerPrintMonitor.Go(func(ctx context.Context) error { @@ -982,7 +1018,7 @@ func runAcceptanceClusterReplication(ctx context.Context, t test.Test, c cluster dstNodes: 1, // The timeout field ensures the c2c roachtest driver behaves properly. timeout: 10 * time.Minute, - workload: replicateKV{readPercent: 0, debugRunDuration: 1 * time.Minute, maxBlockBytes: 1}, + workload: replicateKV{readPercent: 0, debugRunDuration: 1 * time.Minute, maxBlockBytes: 1, initWithSplitAndScatter: true}, additionalDuration: 0 * time.Minute, cutover: 30 * time.Second, skipNodeDistributionCheck: true, @@ -1035,13 +1071,17 @@ func registerClusterToCluster(r registry.Registry) { cutover: 30 * time.Minute, }, { - name: "c2c/kv0", - benchmark: true, - srcNodes: 3, - dstNodes: 3, - cpus: 8, - pdSize: 100, - workload: replicateKV{readPercent: 0, maxBlockBytes: 1024}, + name: "c2c/kv0", + benchmark: true, + srcNodes: 3, + dstNodes: 3, + cpus: 8, + pdSize: 100, + workload: replicateKV{ + readPercent: 0, + maxBlockBytes: 1024, + initWithSplitAndScatter: true, + }, timeout: 1 * time.Hour, additionalDuration: 10 * time.Minute, cutover: 5 * time.Minute, @@ -1064,6 +1104,28 @@ func registerClusterToCluster(r registry.Registry) { additionalDuration: 1 * time.Minute, cutover: 0, }, + { + // Large workload to test our 23.2 perf goals. + name: "c2c/weekly/kv50", + benchmark: true, + srcNodes: 8, + dstNodes: 8, + cpus: 8, + pdSize: 1000, + nonRevisionHistoryFingerprint: true, + + workload: replicateKV{ + // Write a ~2TB initial scan. + initRows: 350000000, + readPercent: 50, + maxBlockBytes: 4096, + maxQPS: 2000, + }, + timeout: 12 * time.Hour, + additionalDuration: 2 * time.Hour, + cutover: 0, + tags: registry.Tags("weekly", "aws-weekly"), + }, { name: "c2c/MultiRegion/SameRegions/kv0", benchmark: true, @@ -1074,6 +1136,7 @@ func registerClusterToCluster(r registry.Registry) { workload: replicateKV{ readPercent: 0, maxBlockBytes: 1024, + initWithSplitAndScatter: true, partitionKVDatabaseInRegion: "us-west1", antiRegion: "us-central1", }, @@ -1093,8 +1156,11 @@ func registerClusterToCluster(r registry.Registry) { dstNodes: 1, cpus: 4, pdSize: 10, - workload: replicateKV{readPercent: 0, debugRunDuration: 1 * time.Minute, - maxBlockBytes: 1024}, + workload: replicateKV{ + readPercent: 0, + debugRunDuration: 1 * time.Minute, + initWithSplitAndScatter: true, + maxBlockBytes: 1024}, timeout: 5 * time.Minute, additionalDuration: 0 * time.Minute, cutover: 30 * time.Second, @@ -1394,7 +1460,7 @@ func registerClusterReplicationResilience(r registry.Registry) { srcNodes: 4, dstNodes: 4, cpus: 8, - workload: replicateKV{readPercent: 0, initRows: 5000000, maxBlockBytes: 1024}, + workload: replicateKV{readPercent: 0, initRows: 5000000, maxBlockBytes: 1024, initWithSplitAndScatter: true}, timeout: 20 * time.Minute, additionalDuration: 6 * time.Minute, cutover: 3 * time.Minute, @@ -1507,7 +1573,7 @@ func registerClusterReplicationDisconnect(r registry.Registry) { srcNodes: 3, dstNodes: 3, cpus: 4, - workload: replicateKV{readPercent: 0, initRows: 1000000, maxBlockBytes: 1024}, + workload: replicateKV{readPercent: 0, initRows: 1000000, maxBlockBytes: 1024, initWithSplitAndScatter: true}, timeout: 20 * time.Minute, additionalDuration: 10 * time.Minute, cutover: 2 * time.Minute, diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index 42abbdae4463..5c440d427359 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -511,13 +511,13 @@ func (p *LegacyProcessor) run( // Close registrations and exit when signaled. case pErr := <-p.stopC: - p.reg.DisconnectWithErr(all, pErr) + p.reg.DisconnectAllOnShutdown(pErr) return // Exit on stopper. case <-stopper.ShouldQuiesce(): pErr := kvpb.NewError(&kvpb.NodeUnavailableError{}) - p.reg.DisconnectWithErr(all, pErr) + p.reg.DisconnectAllOnShutdown(pErr) return } } diff --git a/pkg/kv/kvserver/rangefeed/registry.go b/pkg/kv/kvserver/rangefeed/registry.go index 6dc9487b4692..55dac7775dd3 100644 --- a/pkg/kv/kvserver/rangefeed/registry.go +++ b/pkg/kv/kvserver/rangefeed/registry.go @@ -512,6 +512,18 @@ func (reg *registry) Unregister(ctx context.Context, r *registration) { r.drainAllocations(ctx) } +// DisconnectAllOnShutdown disconnectes all registrations on processor shutdown. +// This is different from normal disconnect as registrations won't be able to +// perform Unregister when processor's work loop is already terminated. +// This method will cleanup metrics controlled by registry itself beside posting +// errors to registrations. +// TODO: this should be revisited as part of +// https://github.com/cockroachdb/cockroach/issues/110634 +func (reg *registry) DisconnectAllOnShutdown(pErr *kvpb.Error) { + reg.metrics.RangeFeedRegistrations.Dec(int64(reg.tree.Len())) + reg.DisconnectWithErr(all, pErr) +} + // Disconnect disconnects all registrations that overlap the specified span with // a nil error. func (reg *registry) Disconnect(span roachpb.Span) { diff --git a/pkg/kv/kvserver/rangefeed/registry_test.go b/pkg/kv/kvserver/rangefeed/registry_test.go index cf618d16ef5e..118a617091c1 100644 --- a/pkg/kv/kvserver/rangefeed/registry_test.go +++ b/pkg/kv/kvserver/rangefeed/registry_test.go @@ -591,3 +591,23 @@ func TestRegistrationString(t *testing.T) { require.Equal(t, tc.exp, tc.r.String()) } } + +// TestRegistryShutdown test verifies that when we shutdown registry with +// existing registration, registration won't try to update any metrics +// implicitly. +func TestRegistryShutdownMetrics(t *testing.T) { + defer leaktest.AfterTest(t)() + reg := makeRegistry(NewMetrics()) + + regDoneC := make(chan interface{}) + r := newTestRegistration(spAB, hlc.Timestamp{WallTime: 10}, nil, false) + go func() { + r.runOutputLoop(context.Background(), 0) + close(regDoneC) + }() + reg.Register(&r.registration) + + reg.DisconnectAllOnShutdown(nil) + <-regDoneC + require.Zero(t, reg.metrics.RangeFeedRegistrations.Value(), "metric is not zero on stop") +} diff --git a/pkg/kv/kvserver/rangefeed/scheduled_processor.go b/pkg/kv/kvserver/rangefeed/scheduled_processor.go index 96469c74ca7d..8674cee3b5ef 100644 --- a/pkg/kv/kvserver/rangefeed/scheduled_processor.go +++ b/pkg/kv/kvserver/rangefeed/scheduled_processor.go @@ -218,16 +218,16 @@ func (p *ScheduledProcessor) processStop() { } func (p *ScheduledProcessor) cleanup() { - // Cleanup is called when all registrations were already disconnected prior to - // triggering processor stop (or before we accepted first registration if we - // failed to start). - // However, we want some defence in depth if lifecycle bug will allow shutdown - // before registrations are disconnected and drained. To handle that we will - // perform disconnect so that registrations have a chance to stop their work - // loop and terminate. This would at least trigger a warning that we are using - // memory budget already released by processor. + // Cleanup is normally called when all registrations are disconnected and + // unregistered or were not created yet (processor start failure). + // However, there's a case where processor is stopped by replica action while + // registrations are still active. In that case registrations won't have a + // chance to unregister themselves after their work loop terminates because + // processor is already disconnected from scheduler. + // To avoid leaking any registry resources and metrics, processor performs + // explicit registry termination in that case. pErr := kvpb.NewError(&kvpb.NodeUnavailableError{}) - p.reg.DisconnectWithErr(all, pErr) + p.reg.DisconnectAllOnShutdown(pErr) // Unregister callback from scheduler p.scheduler.Unregister()