From cdafd14d854251afc852c6c87ab136ebed30032f Mon Sep 17 00:00:00 2001 From: Oleg Afanasyev Date: Wed, 20 Sep 2023 13:25:59 +0100 Subject: [PATCH 1/2] rangefeed: fix kv.rangefeed.registrations metric Previously when rangefeed processor was stopped by replica, registrations metric was not correctly decreased leading to metric creep on changefeed restarts or range splits. This commit fixes metric decrease for such cases. Epic: none Fixes: #106126 Release note: None --- pkg/kv/kvserver/rangefeed/processor.go | 4 ++-- pkg/kv/kvserver/rangefeed/registry.go | 12 +++++++++++ pkg/kv/kvserver/rangefeed/registry_test.go | 20 +++++++++++++++++++ .../kvserver/rangefeed/scheduled_processor.go | 18 ++++++++--------- 4 files changed, 43 insertions(+), 11 deletions(-) diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index 00587f161081..aaef5aa22f18 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -509,13 +509,13 @@ func (p *LegacyProcessor) run( // Close registrations and exit when signaled. case pErr := <-p.stopC: - p.reg.DisconnectWithErr(all, pErr) + p.reg.DisconnectAllOnShutdown(pErr) return // Exit on stopper. case <-stopper.ShouldQuiesce(): pErr := kvpb.NewError(&kvpb.NodeUnavailableError{}) - p.reg.DisconnectWithErr(all, pErr) + p.reg.DisconnectAllOnShutdown(pErr) return } } diff --git a/pkg/kv/kvserver/rangefeed/registry.go b/pkg/kv/kvserver/rangefeed/registry.go index 6dc9487b4692..55dac7775dd3 100644 --- a/pkg/kv/kvserver/rangefeed/registry.go +++ b/pkg/kv/kvserver/rangefeed/registry.go @@ -512,6 +512,18 @@ func (reg *registry) Unregister(ctx context.Context, r *registration) { r.drainAllocations(ctx) } +// DisconnectAllOnShutdown disconnectes all registrations on processor shutdown. +// This is different from normal disconnect as registrations won't be able to +// perform Unregister when processor's work loop is already terminated. +// This method will cleanup metrics controlled by registry itself beside posting +// errors to registrations. +// TODO: this should be revisited as part of +// https://github.com/cockroachdb/cockroach/issues/110634 +func (reg *registry) DisconnectAllOnShutdown(pErr *kvpb.Error) { + reg.metrics.RangeFeedRegistrations.Dec(int64(reg.tree.Len())) + reg.DisconnectWithErr(all, pErr) +} + // Disconnect disconnects all registrations that overlap the specified span with // a nil error. func (reg *registry) Disconnect(span roachpb.Span) { diff --git a/pkg/kv/kvserver/rangefeed/registry_test.go b/pkg/kv/kvserver/rangefeed/registry_test.go index cf618d16ef5e..118a617091c1 100644 --- a/pkg/kv/kvserver/rangefeed/registry_test.go +++ b/pkg/kv/kvserver/rangefeed/registry_test.go @@ -591,3 +591,23 @@ func TestRegistrationString(t *testing.T) { require.Equal(t, tc.exp, tc.r.String()) } } + +// TestRegistryShutdown test verifies that when we shutdown registry with +// existing registration, registration won't try to update any metrics +// implicitly. +func TestRegistryShutdownMetrics(t *testing.T) { + defer leaktest.AfterTest(t)() + reg := makeRegistry(NewMetrics()) + + regDoneC := make(chan interface{}) + r := newTestRegistration(spAB, hlc.Timestamp{WallTime: 10}, nil, false) + go func() { + r.runOutputLoop(context.Background(), 0) + close(regDoneC) + }() + reg.Register(&r.registration) + + reg.DisconnectAllOnShutdown(nil) + <-regDoneC + require.Zero(t, reg.metrics.RangeFeedRegistrations.Value(), "metric is not zero on stop") +} diff --git a/pkg/kv/kvserver/rangefeed/scheduled_processor.go b/pkg/kv/kvserver/rangefeed/scheduled_processor.go index 112817a50d9b..45314c9825e9 100644 --- a/pkg/kv/kvserver/rangefeed/scheduled_processor.go +++ b/pkg/kv/kvserver/rangefeed/scheduled_processor.go @@ -215,16 +215,16 @@ func (p *ScheduledProcessor) processStop() { } func (p *ScheduledProcessor) cleanup() { - // Cleanup is called when all registrations were already disconnected prior to - // triggering processor stop (or before we accepted first registration if we - // failed to start). - // However, we want some defence in depth if lifecycle bug will allow shutdown - // before registrations are disconnected and drained. To handle that we will - // perform disconnect so that registrations have a chance to stop their work - // loop and terminate. This would at least trigger a warning that we are using - // memory budget already released by processor. + // Cleanup is normally called when all registrations are disconnected and + // unregistered or were not created yet (processor start failure). + // However, there's a case where processor is stopped by replica action while + // registrations are still active. In that case registrations won't have a + // chance to unregister themselves after their work loop terminates because + // processor is already disconnected from scheduler. + // To avoid leaking any registry resources and metrics, processor performs + // explicit registry termination in that case. pErr := kvpb.NewError(&kvpb.NodeUnavailableError{}) - p.reg.DisconnectWithErr(all, pErr) + p.reg.DisconnectAllOnShutdown(pErr) // Unregister callback from scheduler p.scheduler.Unregister() From f330f6f2b4be18d54f631f78f8ec78ffa2aa7c7c Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Wed, 13 Sep 2023 16:15:30 -0400 Subject: [PATCH 2/2] roachtest: add c2c/weekly/kv50 roachtest This patch adds a new weekly c2c roachtest that tests our 23.2 performance requirements under a given cluster configuraiton. The workload: - kv0, 2TB initial scan, 50% writes, 2000 max qps, insert row size uniformly between 1 and 4096 bytes. The cluster specs: - 8 nodes in each src and dest clusters, 8 vcpus, pdSize 1 TB Release note: none Epic: none --- pkg/cmd/roachtest/tests/cluster_to_cluster.go | 96 ++++++++++++++++--- 1 file changed, 81 insertions(+), 15 deletions(-) diff --git a/pkg/cmd/roachtest/tests/cluster_to_cluster.go b/pkg/cmd/roachtest/tests/cluster_to_cluster.go index 6d29fe4d63f9..edc892c11cc2 100644 --- a/pkg/cmd/roachtest/tests/cluster_to_cluster.go +++ b/pkg/cmd/roachtest/tests/cluster_to_cluster.go @@ -244,9 +244,30 @@ type replicateKV struct { // the number of rows inserted into the cluster before c2c begins initRows int - // max size of raw data written during each insertion + // initWithSplitAndScatter splits and scatters the kv table after + // initialization. This should be set for tests that init little or no data. + // + // The kv workload can run split and scatter after the insertion step. After + // inserting a lot of kv table data (think multiple gb), however, kv has had + // enough time to split and scatter that table across the cluster, so there + // isn't a need to run an expensive scatter query. + // + // Initing a couple mb of kv table as other c2c tests do, however, does not + // lead kv to naturally split and scatter the table by the time the c2c job + // begins, which will lead the job to create a non-distributed initial dist + // sql plan because the source side dsp.PartitionSpans may only assign one + // node pair to stream all of the data. + initWithSplitAndScatter bool + + // maxBlockBytes indicates the maximum size of the kv payload value written + // during each insertion. The kv workload will randomly choose a value in the + // interval [1,maxBlockSize] with equal probability, i.e. via + // x~Uniform[1,maxBlockSize]. maxBlockBytes int + // maxQPS caps the queries per second sent to the source cluster after initialization. + maxQPS int + // partitionKVDatabaseInRegion constrains the kv database in the specified // region and asserts, before cutover, that the replicated span configuration // correctly enforces the regional constraint in the destination tenant. @@ -260,9 +281,11 @@ type replicateKV struct { func (kv replicateKV) sourceInitCmd(tenantName string, nodes option.NodeListOption) string { cmd := roachtestutil.NewCommand(`./workload init kv`). MaybeFlag(kv.initRows > 0, "insert-count", kv.initRows). + // Only set the max block byte values for the init command if we + // actually need to insert rows. MaybeFlag(kv.initRows > 0, "max-block-bytes", kv.maxBlockBytes). - Flag("splits", 100). - Option("scatter"). + MaybeFlag(kv.initWithSplitAndScatter, "splits", 100). + MaybeOption(kv.initWithSplitAndScatter, "scatter"). Arg("{pgurl%s:%s}", nodes, tenantName) return cmd.String() } @@ -273,6 +296,7 @@ func (kv replicateKV) sourceRunCmd(tenantName string, nodes option.NodeListOptio Flag("max-block-bytes", kv.maxBlockBytes). Flag("read-percent", kv.readPercent). MaybeFlag(kv.debugRunDuration > 0, "duration", kv.debugRunDuration). + MaybeFlag(kv.maxQPS > 0, "max-rate", kv.maxQPS). Arg("{pgurl%s:%s}", nodes, tenantName) return cmd.String() } @@ -394,6 +418,11 @@ type replicationSpec struct { // maxLatency override the maxAcceptedLatencyDefault. maxAcceptedLatency time.Duration + // TODO(msbutler): this knob only exists because the revision history + // fingerprint can encounter a gc ttl error for large fingerprints. Delete + // this knob once we lay a pts during fingerprinting. + nonRevisionHistoryFingerprint bool + // skipNodeDistributionCheck removes the requirement that multiple source and // destination nodes must participate in the replication stream. This should // be set if the roachtest runs on single node clusters or if the @@ -707,6 +736,13 @@ SELECT * FROM crdb_internal.fingerprint(crdb_internal.tenant_span($1::INT), '%s'::DECIMAL, true) AS OF SYSTEM TIME '%s'`, startTime.AsOfSystemTime(), endTime.AsOfSystemTime()) + if rd.rs.nonRevisionHistoryFingerprint { + fingerprintQuery = fmt.Sprintf(` +SELECT * +FROM crdb_internal.fingerprint(crdb_internal.tenant_span($1::INT), 0::DECIMAL, false) +AS OF SYSTEM TIME '%s'`, endTime.AsOfSystemTime()) + } + var srcFingerprint int64 fingerPrintMonitor := rd.newMonitor(ctx) fingerPrintMonitor.Go(func(ctx context.Context) error { @@ -982,7 +1018,7 @@ func runAcceptanceClusterReplication(ctx context.Context, t test.Test, c cluster dstNodes: 1, // The timeout field ensures the c2c roachtest driver behaves properly. timeout: 10 * time.Minute, - workload: replicateKV{readPercent: 0, debugRunDuration: 1 * time.Minute, maxBlockBytes: 1}, + workload: replicateKV{readPercent: 0, debugRunDuration: 1 * time.Minute, maxBlockBytes: 1, initWithSplitAndScatter: true}, additionalDuration: 0 * time.Minute, cutover: 30 * time.Second, skipNodeDistributionCheck: true, @@ -1035,13 +1071,17 @@ func registerClusterToCluster(r registry.Registry) { cutover: 30 * time.Minute, }, { - name: "c2c/kv0", - benchmark: true, - srcNodes: 3, - dstNodes: 3, - cpus: 8, - pdSize: 100, - workload: replicateKV{readPercent: 0, maxBlockBytes: 1024}, + name: "c2c/kv0", + benchmark: true, + srcNodes: 3, + dstNodes: 3, + cpus: 8, + pdSize: 100, + workload: replicateKV{ + readPercent: 0, + maxBlockBytes: 1024, + initWithSplitAndScatter: true, + }, timeout: 1 * time.Hour, additionalDuration: 10 * time.Minute, cutover: 5 * time.Minute, @@ -1064,6 +1104,28 @@ func registerClusterToCluster(r registry.Registry) { additionalDuration: 1 * time.Minute, cutover: 0, }, + { + // Large workload to test our 23.2 perf goals. + name: "c2c/weekly/kv50", + benchmark: true, + srcNodes: 8, + dstNodes: 8, + cpus: 8, + pdSize: 1000, + nonRevisionHistoryFingerprint: true, + + workload: replicateKV{ + // Write a ~2TB initial scan. + initRows: 350000000, + readPercent: 50, + maxBlockBytes: 4096, + maxQPS: 2000, + }, + timeout: 12 * time.Hour, + additionalDuration: 2 * time.Hour, + cutover: 0, + tags: registry.Tags("weekly", "aws-weekly"), + }, { name: "c2c/MultiRegion/SameRegions/kv0", benchmark: true, @@ -1074,6 +1136,7 @@ func registerClusterToCluster(r registry.Registry) { workload: replicateKV{ readPercent: 0, maxBlockBytes: 1024, + initWithSplitAndScatter: true, partitionKVDatabaseInRegion: "us-west1", antiRegion: "us-central1", }, @@ -1093,8 +1156,11 @@ func registerClusterToCluster(r registry.Registry) { dstNodes: 1, cpus: 4, pdSize: 10, - workload: replicateKV{readPercent: 0, debugRunDuration: 1 * time.Minute, - maxBlockBytes: 1024}, + workload: replicateKV{ + readPercent: 0, + debugRunDuration: 1 * time.Minute, + initWithSplitAndScatter: true, + maxBlockBytes: 1024}, timeout: 5 * time.Minute, additionalDuration: 0 * time.Minute, cutover: 30 * time.Second, @@ -1394,7 +1460,7 @@ func registerClusterReplicationResilience(r registry.Registry) { srcNodes: 4, dstNodes: 4, cpus: 8, - workload: replicateKV{readPercent: 0, initRows: 5000000, maxBlockBytes: 1024}, + workload: replicateKV{readPercent: 0, initRows: 5000000, maxBlockBytes: 1024, initWithSplitAndScatter: true}, timeout: 20 * time.Minute, additionalDuration: 6 * time.Minute, cutover: 3 * time.Minute, @@ -1507,7 +1573,7 @@ func registerClusterReplicationDisconnect(r registry.Registry) { srcNodes: 3, dstNodes: 3, cpus: 4, - workload: replicateKV{readPercent: 0, initRows: 1000000, maxBlockBytes: 1024}, + workload: replicateKV{readPercent: 0, initRows: 1000000, maxBlockBytes: 1024, initWithSplitAndScatter: true}, timeout: 20 * time.Minute, additionalDuration: 10 * time.Minute, cutover: 2 * time.Minute,