From c00eac5728854145a24b1dcc1c2c865829c7b495 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Mon, 11 Sep 2023 14:04:09 -0400 Subject: [PATCH] roachtest: add c2c/multiregion/SameRegions/kv0 roachtest This patch adds a new c2c roachtest that spins up multiregion source and destination clusters, constrains the kv database to the us-east1-b region, and asserts that the replicated span configuration enforces the regional constraint on the destination cluster during replication. Informs #109059 Release note: None --- .../replication_stream_e2e_test.go | 27 +++++ pkg/cmd/roachtest/tests/cluster_to_cluster.go | 101 ++++++++++++++++++ pkg/cmd/roachtest/tests/multitenant_utils.go | 1 + 3 files changed, 129 insertions(+) diff --git a/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go b/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go index c5673e8c5a09..923fd741ca9d 100644 --- a/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go +++ b/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go @@ -1197,4 +1197,31 @@ func TestStreamingRegionalConstraint(t *testing.T) { testutils.SucceedsSoon(t, checkLocalities(tableDesc.PrimaryIndexSpan(destCodec), rangedesc.NewScanner(c.DestSysServer.DB()))) + + tableName := "test" + tabledIDQuery := fmt.Sprintf(`SELECT id FROM system.namespace WHERE name ='%s'`, tableName) + + var tableID uint32 + c.SrcTenantSQL.QueryRow(t, tabledIDQuery).Scan(&tableID) + fmt.Printf("%d", tableID) + + checkLocalityRanges(t, c.SrcSysSQL, srcCodec, uint32(tableDesc.GetID()), "mars") + +} + +func checkLocalityRanges( + t *testing.T, sysSQL *sqlutils.SQLRunner, codec keys.SQLCodec, tableID uint32, region string, +) { + targetPrefix := codec.TablePrefix(tableID) + distinctQuery := fmt.Sprintf(` +SELECT + DISTINCT replica_localities +FROM + [SHOW CLUSTER RANGES] +WHERE + start_key ~ '%s' +`, targetPrefix) + var locality string + sysSQL.QueryRow(t, distinctQuery).Scan(&locality) + require.Contains(t, locality, region) } diff --git a/pkg/cmd/roachtest/tests/cluster_to_cluster.go b/pkg/cmd/roachtest/tests/cluster_to_cluster.go index 197beda00885..e14e3be65d4f 100644 --- a/pkg/cmd/roachtest/tests/cluster_to_cluster.go +++ b/pkg/cmd/roachtest/tests/cluster_to_cluster.go @@ -34,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/multitenant/mtinfopb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/roachprod/install" @@ -245,6 +246,15 @@ type replicateKV struct { // max size of raw data written during each insertion maxBlockBytes int + + // partitionKVDatabaseInRegion constrains the kv database in the specified + // region and asserts, before cutover, that the replicated span configuration + // correctly enforces the regional constraint in the destination tenant. + partitionKVDatabaseInRegion string + + // antiRegion is the region we do not expect any kv data to reside in if + // partitionKVDatabaseInRegion is set. + antiRegion string } func (kv replicateKV) sourceInitCmd(tenantName string, nodes option.NodeListOption) string { @@ -270,9 +280,47 @@ func (kv replicateKV) sourceRunCmd(tenantName string, nodes option.NodeListOptio func (kv replicateKV) runDriver( workloadCtx context.Context, c cluster.Cluster, t test.Test, setup *c2cSetup, ) error { + if kv.partitionKVDatabaseInRegion != "" { + require.NotEqual(t, "", kv.antiRegion, "if partitionKVDatabaseInRegion is set, then antiRegion must be set") + t.L().Printf("constrain the kv database to region %s", kv.partitionKVDatabaseInRegion) + alterStmt := fmt.Sprintf("ALTER DATABASE kv CONFIGURE ZONE USING constraints = '[+region=%s]'", kv.partitionKVDatabaseInRegion) + srcTenantConn := c.Conn(workloadCtx, t.L(), setup.src.nodes.RandNode()[0], option.TenantName(setup.src.name)) + srcTenantSQL := sqlutils.MakeSQLRunner(srcTenantConn) + srcTenantSQL.Exec(t, alterStmt) + defer kv.checkRegionalConstraints(t, setup, srcTenantSQL) + } return defaultWorkloadDriver(workloadCtx, setup, c, kv) } +// checkRegionalConstraints checks that the kv table is constrained to the +// expected locality. +func (kv replicateKV) checkRegionalConstraints( + t test.Test, setup *c2cSetup, srcTenantSQL *sqlutils.SQLRunner, +) { + + var kvTableID uint32 + srcTenantSQL.QueryRow(t, `SELECT id FROM system.namespace WHERE name ='kv' AND "parentID" != 0`).Scan(&kvTableID) + + dstTenantCodec := keys.MakeSQLCodec(roachpb.MustMakeTenantID(uint64(setup.dst.ID))) + tablePrefix := dstTenantCodec.TablePrefix(kvTableID) + t.L().Printf("Checking replica localities in destination side kv table, id %d and table prefix %s", kvTableID, tablePrefix) + + distinctQuery := fmt.Sprintf(` +SELECT + DISTINCT replica_localities +FROM + [SHOW CLUSTER RANGES] +WHERE + start_key ~ '%s' +`, tablePrefix) + + res := setup.dst.sysSQL.QueryStr(t, distinctQuery) + require.Equal(t, 1, len(res), "expected only one distinct locality") + locality := res[0][0] + require.Contains(t, locality, kv.partitionKVDatabaseInRegion) + require.False(t, strings.Contains(locality, kv.antiRegion), "region %s is in locality %s", kv.antiRegion, locality) +} + type replicateBulkOps struct { // short uses less data during the import and rollback steps. Also only runs one rollback. short bool @@ -325,6 +373,9 @@ type replicationSpec struct { // workload specifies the streaming workload. workload streamingWorkload + // multiregion specifies multiregion cluster specs + multiregion multiRegionSpecs + // additionalDuration specifies how long the workload will run after the initial scan //completes. If the time out is set to 0, it will run until completion. additionalDuration time.Duration @@ -360,6 +411,17 @@ type replicationSpec struct { tags map[string]struct{} } +type multiRegionSpecs struct { + // srcLocalities specifies the zones each src node should live. The length of this array must match the number of src nodes. + srcLocalities []string + + // destLocalities specifies the zones each src node should live. The length of this array must match the number of dest nodes. + destLocalities []string + + // workloadNodeZone specifies the zone that the workload node should live + workloadNodeZone string +} + // replicationDriver manages c2c roachtest execution. type replicationDriver struct { rs replicationSpec @@ -392,6 +454,13 @@ func makeReplicationDriver(t test.Test, c cluster.Cluster, rs replicationSpec) * } func (rd *replicationDriver) setupC2C(ctx context.Context, t test.Test, c cluster.Cluster) { + if len(rd.rs.multiregion.srcLocalities) != 0 { + nodeCount := rd.rs.srcNodes + rd.rs.dstNodes + localityCount := len(rd.rs.multiregion.srcLocalities) + len(rd.rs.multiregion.destLocalities) + require.Equal(t, nodeCount, localityCount) + require.NotEqual(t, "", rd.rs.multiregion.workloadNodeZone) + } + c.Put(ctx, t.Cockroach(), "./cockroach") srcCluster := c.Range(1, rd.rs.srcNodes) dstCluster := c.Range(rd.rs.srcNodes+1, rd.rs.srcNodes+rd.rs.dstNodes) @@ -881,6 +950,15 @@ func c2cRegisterWrapper( clusterOps = append(clusterOps, spec.VolumeSize(sp.pdSize)) } + if len(sp.multiregion.srcLocalities) > 0 { + allZones := make([]string, 0, sp.srcNodes+sp.dstNodes+1) + allZones = append(allZones, sp.multiregion.srcLocalities...) + allZones = append(allZones, sp.multiregion.destLocalities...) + allZones = append(allZones, sp.multiregion.workloadNodeZone) + clusterOps = append(clusterOps, spec.Zones(strings.Join(allZones, ","))) + clusterOps = append(clusterOps, spec.Geo()) + } + r.Add(registry.TestSpec{ Name: sp.name, Owner: registry.OwnerDisasterRecovery, @@ -986,6 +1064,29 @@ func registerClusterToCluster(r registry.Registry) { additionalDuration: 1 * time.Minute, cutover: 0, }, + { + name: "c2c/MultiRegion/SameRegions/kv0", + benchmark: true, + srcNodes: 4, + dstNodes: 4, + cpus: 8, + pdSize: 100, + workload: replicateKV{ + readPercent: 0, + maxBlockBytes: 1024, + partitionKVDatabaseInRegion: "us-west1", + antiRegion: "us-central1", + }, + timeout: 1 * time.Hour, + additionalDuration: 10 * time.Minute, + cutover: 1 * time.Minute, + multiregion: multiRegionSpecs{ + // gcp specific + srcLocalities: []string{"us-west1-b", "us-west1-b", "us-west1-b", "us-central1-b"}, + destLocalities: []string{"us-central1-b", "us-west1-b", "us-west1-b", "us-west1-b"}, + workloadNodeZone: "us-west1-b", + }, + }, { name: "c2c/UnitTest", srcNodes: 1, diff --git a/pkg/cmd/roachtest/tests/multitenant_utils.go b/pkg/cmd/roachtest/tests/multitenant_utils.go index 9115b28cdec8..0d7dab42ab95 100644 --- a/pkg/cmd/roachtest/tests/multitenant_utils.go +++ b/pkg/cmd/roachtest/tests/multitenant_utils.go @@ -366,6 +366,7 @@ func startInMemoryTenant( sysSQL.Exec(t, `ALTER TENANT $1 SET CLUSTER SETTING sql.split_at.allow_for_secondary_tenant.enabled=true`, tenantName) sysSQL.Exec(t, `ALTER TENANT $1 SET CLUSTER SETTING sql.scatter.allow_for_secondary_tenant.enabled=true`, tenantName) sysSQL.Exec(t, `ALTER TENANT $1 SET CLUSTER SETTING sql.zone_configs.allow_for_secondary_tenant.enabled=true`, tenantName) + sysSQL.Exec(t, `ALTER TENANT $1 SET CLUSTER SETTING sql.virtual_cluster.feature_access.multiregion.enabled=true`, tenantName) sysSQL.Exec(t, `ALTER TENANT $1 SET CLUSTER SETTING enterprise.license = $2`, tenantName, config.CockroachDevLicense) sysSQL.Exec(t, `ALTER TENANT $1 SET CLUSTER SETTING cluster.organization = 'Cockroach Labs - Production Testing'`, tenantName) removeTenantRateLimiters(t, sysSQL, tenantName)