Skip to content

Commit

Permalink
c2c: refactor tenant ru limit removal in roachtests
Browse files Browse the repository at this point in the history
This patch streamlines how we remove ru limiting for roachtests that use
tenants. For the c2c tests specifically, we know remove the limits on the dst
cluster tenant as soon as the replication stream begins.

Release note: None
  • Loading branch information
msbutler committed Feb 23, 2023
1 parent 62c7f8f commit 48c0e08
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 41 deletions.
59 changes: 25 additions & 34 deletions pkg/cmd/roachtest/tests/cluster_to_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ type clusterInfo struct {
// db provides a connection to the system tenant
db *gosql.DB

// sql provides a sql connection to the host cluster
sql *sqlutils.SQLRunner
// sql provides a sql connection to the system tenant
sysSQL *sqlutils.SQLRunner

// nodes indicates the roachprod nodes running the cluster's nodes
nodes option.NodeListOption
Expand Down Expand Up @@ -300,18 +300,18 @@ func setupC2C(
require.NoError(t, err)

srcTenantInfo := clusterInfo{
name: srcTenantName,
ID: srcTenantID,
pgURL: pgURL,
sql: srcSQL,
db: srcDB,
nodes: srcCluster}
name: srcTenantName,
ID: srcTenantID,
pgURL: pgURL,
sysSQL: srcSQL,
db: srcDB,
nodes: srcCluster}
destTenantInfo := clusterInfo{
name: destTenantName,
ID: destTenantID,
sql: destSQL,
db: destDB,
nodes: dstCluster}
name: destTenantName,
ID: destTenantID,
sysSQL: destSQL,
db: destDB,
nodes: dstCluster}

setup := &c2cSetup{
src: srcTenantInfo,
Expand Down Expand Up @@ -463,10 +463,13 @@ func registerClusterToCluster(r registry.Registry) {

t.Status("starting replication stream")
setup.metrics.initalScanStart = newMetricSnapshot(metricSnapper, timeutil.Now())

// There's no need to remove the tenant limiters for this new app tenant, as
// all replication traffic flows through the system tenant.
streamReplStmt := fmt.Sprintf("CREATE TENANT %q FROM REPLICATION OF %q ON '%s'",
setup.dst.name, setup.src.name, setup.src.pgURL)
setup.dst.sql.Exec(t, streamReplStmt)
ingestionJobID := getIngestionJobID(t, setup.dst.sql, setup.dst.name)
setup.dst.sysSQL.Exec(t, streamReplStmt)
ingestionJobID := getIngestionJobID(t, setup.dst.sysSQL, setup.dst.name)

// TODO(ssd): The job doesn't record the initial
// statement time, so we can't correctly measure the
Expand All @@ -485,7 +488,7 @@ func registerClusterToCluster(r registry.Registry) {
sp.additionalDuration))

var currentTime time.Time
setup.dst.sql.QueryRow(t, "SELECT clock_timestamp()").Scan(&currentTime)
setup.dst.sysSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(&currentTime)
cutoverTime := currentTime.Add(sp.additionalDuration - sp.cutover)
t.Status("cutover time chosen: ", cutoverTime.String())

Expand All @@ -498,10 +501,10 @@ func registerClusterToCluster(r registry.Registry) {
return
}
t.Status(fmt.Sprintf("waiting for replication stream to cutover to %s", cutoverTime.String()))
retainedTime := getReplicationRetainedTime(t, setup.dst.sql, roachpb.TenantName(setup.dst.name))
retainedTime := getReplicationRetainedTime(t, setup.dst.sysSQL, roachpb.TenantName(setup.dst.name))
setup.metrics.cutoverTo = newMetricSnapshot(metricSnapper, cutoverTime)
setup.metrics.cutoverStart = newMetricSnapshot(metricSnapper, timeutil.Now())
stopReplicationStream(t, setup.dst.sql, ingestionJobID, cutoverTime)
stopReplicationStream(t, setup.dst.sysSQL, ingestionJobID, cutoverTime)
setup.metrics.cutoverEnd = newMetricSnapshot(metricSnapper, timeutil.Now())

setup.metrics.export(t, len(setup.src.nodes))
Expand Down Expand Up @@ -548,14 +551,14 @@ AS OF SYSTEM TIME '%s'`, startTimeStr, aost)

var srcFingerprint int64
m.Go(func(ctx context.Context) error {
setup.src.sql.QueryRow(t, fingerprintQuery, setup.src.ID).Scan(&srcFingerprint)
setup.src.sysSQL.QueryRow(t, fingerprintQuery, setup.src.ID).Scan(&srcFingerprint)
return nil
})
var destFingerprint int64
m.Go(func(ctx context.Context) error {
// TODO(adityamaru): Measure and record fingerprinting throughput.
setup.metrics.fingerprintingStart = timeutil.Now()
setup.dst.sql.QueryRow(t, fingerprintQuery, setup.dst.ID).Scan(&destFingerprint)
setup.dst.sysSQL.QueryRow(t, fingerprintQuery, setup.dst.ID).Scan(&destFingerprint)
setup.metrics.fingerprintingEnd = timeutil.Now()
fingerprintingDuration := setup.metrics.fingerprintingEnd.Sub(setup.metrics.fingerprintingStart).String()
t.L().Printf("fingerprinting the destination tenant took %s", fingerprintingDuration)
Expand Down Expand Up @@ -660,24 +663,12 @@ func stopReplicationStream(
}

func srcClusterSettings(t test.Test, db *sqlutils.SQLRunner) {
db.ExecMultiple(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true;`,
`SET CLUSTER SETTING kv.tenant_rate_limiter.burst_limit_seconds = 10000;`,
`SET CLUSTER SETTING kv.tenant_rate_limiter.rate_limit = -1000; `,
`SET CLUSTER SETTING kv.tenant_rate_limiter.read_batch_cost = 0;`,
`SET CLUSTER SETTING kv.tenant_rate_limiter.read_cost_per_mebibyte = 0;`,
`SET CLUSTER SETTING kv.tenant_rate_limiter.write_cost_per_megabyte = 0;`,
`SET CLUSTER SETTING kv.tenant_rate_limiter.write_request_cost = 0;`)
db.ExecMultiple(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true;`)
}

func destClusterSettings(t test.Test, db *sqlutils.SQLRunner) {
db.ExecMultiple(t, `SET CLUSTER SETTING cross_cluster_replication.enabled = true;`,
`SET CLUSTER SETTING kv.rangefeed.enabled = true;`,
`SET CLUSTER SETTING kv.tenant_rate_limiter.burst_limit_seconds = 10000;`,
`SET CLUSTER SETTING kv.tenant_rate_limiter.rate_limit = -1000; `,
`SET CLUSTER SETTING kv.tenant_rate_limiter.read_batch_cost = 0;`,
`SET CLUSTER SETTING kv.tenant_rate_limiter.read_cost_per_mebibyte = 0;`,
`SET CLUSTER SETTING kv.tenant_rate_limiter.write_cost_per_megabyte = 0;`,
`SET CLUSTER SETTING kv.tenant_rate_limiter.write_request_cost = 0;`)
`SET CLUSTER SETTING kv.rangefeed.enabled = true;`)
}

func copyPGCertsAndMakeURL(
Expand Down
24 changes: 17 additions & 7 deletions pkg/cmd/roachtest/tests/multitenant_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,8 @@ func createInMemoryTenant(
sysSQL.Exec(t, "CREATE TENANT $1", tenantName)
sysSQL.Exec(t, "ALTER TENANT $1 START SERVICE SHARED", tenantName)

removeTenantRateLimiters(t, sysSQL, tenantName)

// Opening a SQL session to a newly created in-process tenant may require a
// few retries. Unfortunately, the c.ConnE and MakeSQLRunner APIs do not make
// it clear if they eagerly open a session with the tenant or wait until the
Expand All @@ -365,14 +367,22 @@ func createInMemoryTenant(
return nil
})

// Currently, a tenant has by default a 10m RU burst limit, which can be
// reached during these tests. To prevent RU limit throttling, add 10B RUs to
// the tenant.
var tenantID int
sysSQL.QueryRow(t, `SELECT id FROM [SHOW TENANT $1]`, tenantName).Scan(&tenantID)
sysSQL.Exec(t, `SELECT crdb_internal.update_tenant_resource_limits($1, 10000000000, 0,
10000000000, now(), 0);`, tenantID)
if secure {
createTenantAdminRole(t, tenantName, tenantSQL)
}
}

// removeTenantRateLimiters ensures the tenant is not throttled by limiters.
func removeTenantRateLimiters(t test.Test, systemSQL *sqlutils.SQLRunner, tenantName string) {
var tenantID int
systemSQL.QueryRow(t, `SELECT id FROM [SHOW TENANT $1]`, tenantName).Scan(&tenantID)
systemSQL.Exec(t, `SELECT crdb_internal.update_tenant_resource_limits($1, 10000000000, 0,
10000000000, now(), 0);`, tenantID)
systemSQL.ExecMultiple(t,
`SET CLUSTER SETTING kv.tenant_rate_limiter.burst_limit_seconds = 10000;`,
`SET CLUSTER SETTING kv.tenant_rate_limiter.rate_limit = -1000; `,
`SET CLUSTER SETTING kv.tenant_rate_limiter.read_batch_cost = 0;`,
`SET CLUSTER SETTING kv.tenant_rate_limiter.read_cost_per_mebibyte = 0;`,
`SET CLUSTER SETTING kv.tenant_rate_limiter.write_cost_per_megabyte = 0;`,
`SET CLUSTER SETTING kv.tenant_rate_limiter.write_request_cost = 0;`)
}

0 comments on commit 48c0e08

Please sign in to comment.