diff --git a/pkg/cmd/roachtest/tests/cluster_to_cluster.go b/pkg/cmd/roachtest/tests/cluster_to_cluster.go index 52392d16ab9f..2e2beb87fa8e 100644 --- a/pkg/cmd/roachtest/tests/cluster_to_cluster.go +++ b/pkg/cmd/roachtest/tests/cluster_to_cluster.go @@ -56,8 +56,8 @@ type clusterInfo struct { // db provides a connection to the system tenant db *gosql.DB - // sql provides a sql connection to the host cluster - sql *sqlutils.SQLRunner + // sql provides a sql connection to the system tenant + sysSQL *sqlutils.SQLRunner // nodes indicates the roachprod nodes running the cluster's nodes nodes option.NodeListOption @@ -300,18 +300,18 @@ func setupC2C( require.NoError(t, err) srcTenantInfo := clusterInfo{ - name: srcTenantName, - ID: srcTenantID, - pgURL: pgURL, - sql: srcSQL, - db: srcDB, - nodes: srcCluster} + name: srcTenantName, + ID: srcTenantID, + pgURL: pgURL, + sysSQL: srcSQL, + db: srcDB, + nodes: srcCluster} destTenantInfo := clusterInfo{ - name: destTenantName, - ID: destTenantID, - sql: destSQL, - db: destDB, - nodes: dstCluster} + name: destTenantName, + ID: destTenantID, + sysSQL: destSQL, + db: destDB, + nodes: dstCluster} setup := &c2cSetup{ src: srcTenantInfo, @@ -463,10 +463,13 @@ func registerClusterToCluster(r registry.Registry) { t.Status("starting replication stream") setup.metrics.initalScanStart = newMetricSnapshot(metricSnapper, timeutil.Now()) + + // There's no need to remove the tenant limiters for this new app tenant, as + // all replication traffic flows through the system tenant. streamReplStmt := fmt.Sprintf("CREATE TENANT %q FROM REPLICATION OF %q ON '%s'", setup.dst.name, setup.src.name, setup.src.pgURL) - setup.dst.sql.Exec(t, streamReplStmt) - ingestionJobID := getIngestionJobID(t, setup.dst.sql, setup.dst.name) + setup.dst.sysSQL.Exec(t, streamReplStmt) + ingestionJobID := getIngestionJobID(t, setup.dst.sysSQL, setup.dst.name) // TODO(ssd): The job doesn't record the initial // statement time, so we can't correctly measure the @@ -485,7 +488,7 @@ func registerClusterToCluster(r registry.Registry) { sp.additionalDuration)) var currentTime time.Time - setup.dst.sql.QueryRow(t, "SELECT clock_timestamp()").Scan(¤tTime) + setup.dst.sysSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(¤tTime) cutoverTime := currentTime.Add(sp.additionalDuration - sp.cutover) t.Status("cutover time chosen: ", cutoverTime.String()) @@ -498,10 +501,10 @@ func registerClusterToCluster(r registry.Registry) { return } t.Status(fmt.Sprintf("waiting for replication stream to cutover to %s", cutoverTime.String())) - retainedTime := getReplicationRetainedTime(t, setup.dst.sql, roachpb.TenantName(setup.dst.name)) + retainedTime := getReplicationRetainedTime(t, setup.dst.sysSQL, roachpb.TenantName(setup.dst.name)) setup.metrics.cutoverTo = newMetricSnapshot(metricSnapper, cutoverTime) setup.metrics.cutoverStart = newMetricSnapshot(metricSnapper, timeutil.Now()) - stopReplicationStream(t, setup.dst.sql, ingestionJobID, cutoverTime) + stopReplicationStream(t, setup.dst.sysSQL, ingestionJobID, cutoverTime) setup.metrics.cutoverEnd = newMetricSnapshot(metricSnapper, timeutil.Now()) setup.metrics.export(t, len(setup.src.nodes)) @@ -548,14 +551,14 @@ AS OF SYSTEM TIME '%s'`, startTimeStr, aost) var srcFingerprint int64 m.Go(func(ctx context.Context) error { - setup.src.sql.QueryRow(t, fingerprintQuery, setup.src.ID).Scan(&srcFingerprint) + setup.src.sysSQL.QueryRow(t, fingerprintQuery, setup.src.ID).Scan(&srcFingerprint) return nil }) var destFingerprint int64 m.Go(func(ctx context.Context) error { // TODO(adityamaru): Measure and record fingerprinting throughput. setup.metrics.fingerprintingStart = timeutil.Now() - setup.dst.sql.QueryRow(t, fingerprintQuery, setup.dst.ID).Scan(&destFingerprint) + setup.dst.sysSQL.QueryRow(t, fingerprintQuery, setup.dst.ID).Scan(&destFingerprint) setup.metrics.fingerprintingEnd = timeutil.Now() fingerprintingDuration := setup.metrics.fingerprintingEnd.Sub(setup.metrics.fingerprintingStart).String() t.L().Printf("fingerprinting the destination tenant took %s", fingerprintingDuration) @@ -660,24 +663,12 @@ func stopReplicationStream( } func srcClusterSettings(t test.Test, db *sqlutils.SQLRunner) { - db.ExecMultiple(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true;`, - `SET CLUSTER SETTING kv.tenant_rate_limiter.burst_limit_seconds = 10000;`, - `SET CLUSTER SETTING kv.tenant_rate_limiter.rate_limit = -1000; `, - `SET CLUSTER SETTING kv.tenant_rate_limiter.read_batch_cost = 0;`, - `SET CLUSTER SETTING kv.tenant_rate_limiter.read_cost_per_mebibyte = 0;`, - `SET CLUSTER SETTING kv.tenant_rate_limiter.write_cost_per_megabyte = 0;`, - `SET CLUSTER SETTING kv.tenant_rate_limiter.write_request_cost = 0;`) + db.ExecMultiple(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true;`) } func destClusterSettings(t test.Test, db *sqlutils.SQLRunner) { db.ExecMultiple(t, `SET CLUSTER SETTING cross_cluster_replication.enabled = true;`, - `SET CLUSTER SETTING kv.rangefeed.enabled = true;`, - `SET CLUSTER SETTING kv.tenant_rate_limiter.burst_limit_seconds = 10000;`, - `SET CLUSTER SETTING kv.tenant_rate_limiter.rate_limit = -1000; `, - `SET CLUSTER SETTING kv.tenant_rate_limiter.read_batch_cost = 0;`, - `SET CLUSTER SETTING kv.tenant_rate_limiter.read_cost_per_mebibyte = 0;`, - `SET CLUSTER SETTING kv.tenant_rate_limiter.write_cost_per_megabyte = 0;`, - `SET CLUSTER SETTING kv.tenant_rate_limiter.write_request_cost = 0;`) + `SET CLUSTER SETTING kv.rangefeed.enabled = true;`) } func copyPGCertsAndMakeURL( diff --git a/pkg/cmd/roachtest/tests/multitenant_utils.go b/pkg/cmd/roachtest/tests/multitenant_utils.go index 686feb9d78a7..990add690cb9 100644 --- a/pkg/cmd/roachtest/tests/multitenant_utils.go +++ b/pkg/cmd/roachtest/tests/multitenant_utils.go @@ -347,6 +347,8 @@ func createInMemoryTenant( sysSQL.Exec(t, "CREATE TENANT $1", tenantName) sysSQL.Exec(t, "ALTER TENANT $1 START SERVICE SHARED", tenantName) + removeTenantRateLimiters(t, sysSQL, tenantName) + // Opening a SQL session to a newly created in-process tenant may require a // few retries. Unfortunately, the c.ConnE and MakeSQLRunner APIs do not make // it clear if they eagerly open a session with the tenant or wait until the @@ -365,14 +367,22 @@ func createInMemoryTenant( return nil }) - // Currently, a tenant has by default a 10m RU burst limit, which can be - // reached during these tests. To prevent RU limit throttling, add 10B RUs to - // the tenant. - var tenantID int - sysSQL.QueryRow(t, `SELECT id FROM [SHOW TENANT $1]`, tenantName).Scan(&tenantID) - sysSQL.Exec(t, `SELECT crdb_internal.update_tenant_resource_limits($1, 10000000000, 0, -10000000000, now(), 0);`, tenantID) if secure { createTenantAdminRole(t, tenantName, tenantSQL) } } + +// removeTenantRateLimiters ensures the tenant is not throttled by limiters. +func removeTenantRateLimiters(t test.Test, systemSQL *sqlutils.SQLRunner, tenantName string) { + var tenantID int + systemSQL.QueryRow(t, `SELECT id FROM [SHOW TENANT $1]`, tenantName).Scan(&tenantID) + systemSQL.Exec(t, `SELECT crdb_internal.update_tenant_resource_limits($1, 10000000000, 0, +10000000000, now(), 0);`, tenantID) + systemSQL.ExecMultiple(t, + `SET CLUSTER SETTING kv.tenant_rate_limiter.burst_limit_seconds = 10000;`, + `SET CLUSTER SETTING kv.tenant_rate_limiter.rate_limit = -1000; `, + `SET CLUSTER SETTING kv.tenant_rate_limiter.read_batch_cost = 0;`, + `SET CLUSTER SETTING kv.tenant_rate_limiter.read_cost_per_mebibyte = 0;`, + `SET CLUSTER SETTING kv.tenant_rate_limiter.write_cost_per_megabyte = 0;`, + `SET CLUSTER SETTING kv.tenant_rate_limiter.write_request_cost = 0;`) +}