Skip to content

Commit

Permalink
roachtest: add c2c/multiregion/SameRegions/kv0 roachtest
Browse files Browse the repository at this point in the history
This patch adds a new c2c roachtest that spins up multiregion source and
destination clusters, constrains the kv database to the us-east1-b region, and
asserts that the replicated span configuration enforces the regional constraint
on the destination cluster during replication.

Informs #109059

Release note: None
  • Loading branch information
msbutler committed Sep 14, 2023
1 parent 0cde11b commit 5d91b69
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 0 deletions.
27 changes: 27 additions & 0 deletions pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1197,4 +1197,31 @@ func TestStreamingRegionalConstraint(t *testing.T) {

testutils.SucceedsSoon(t,
checkLocalities(tableDesc.PrimaryIndexSpan(destCodec), rangedesc.NewScanner(c.DestSysServer.DB())))

tableName := "test"
tabledIDQuery := fmt.Sprintf(`SELECT id FROM system.namespace WHERE name ='%s'`, tableName)

var tableID uint32
c.SrcTenantSQL.QueryRow(t, tabledIDQuery).Scan(&tableID)
fmt.Printf("%d", tableID)

checkLocalityRanges(t, c.SrcSysSQL, srcCodec, uint32(tableDesc.GetID()), "mars")

}

func checkLocalityRanges(
t *testing.T, sysSQL *sqlutils.SQLRunner, codec keys.SQLCodec, tableID uint32, region string,
) {
targetPrefix := codec.TablePrefix(tableID)
distinctQuery := fmt.Sprintf(`
SELECT
DISTINCT replica_localities
FROM
[SHOW CLUSTER RANGES]
WHERE
start_key ~ '%s'
`, targetPrefix)
var locality string
sysSQL.QueryRow(t, distinctQuery).Scan(&locality)
require.Contains(t, locality, region)
}
101 changes: 101 additions & 0 deletions pkg/cmd/roachtest/tests/cluster_to_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/multitenant/mtinfopb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
Expand Down Expand Up @@ -245,6 +246,15 @@ type replicateKV struct {

// max size of raw data written during each insertion
maxBlockBytes int

// partitionKVDatabaseInRegion constrains the kv database in the specified
// region and asserts, before cutover, that the replicated span configuration
// correctly enforces the regional constraint in the destination tenant.
partitionKVDatabaseInRegion string

// antiRegion is the region we do not expect any kv data to reside in if
// partitionKVDatabaseInRegion is set.
antiRegion string
}

func (kv replicateKV) sourceInitCmd(tenantName string, nodes option.NodeListOption) string {
Expand All @@ -270,9 +280,47 @@ func (kv replicateKV) sourceRunCmd(tenantName string, nodes option.NodeListOptio
func (kv replicateKV) runDriver(
workloadCtx context.Context, c cluster.Cluster, t test.Test, setup *c2cSetup,
) error {
if kv.partitionKVDatabaseInRegion != "" {
require.NotEqual(t, "", kv.antiRegion, "if partitionKVDatabaseInRegion is set, then antiRegion must be set")
t.L().Printf("constrain the kv database to region %s", kv.partitionKVDatabaseInRegion)
alterStmt := fmt.Sprintf("ALTER DATABASE kv CONFIGURE ZONE USING constraints = '[+region=%s]'", kv.partitionKVDatabaseInRegion)
srcTenantConn := c.Conn(workloadCtx, t.L(), setup.src.nodes.RandNode()[0], option.TenantName(setup.src.name))
srcTenantSQL := sqlutils.MakeSQLRunner(srcTenantConn)
srcTenantSQL.Exec(t, alterStmt)
defer kv.checkRegionalConstraints(t, setup, srcTenantSQL)
}
return defaultWorkloadDriver(workloadCtx, setup, c, kv)
}

// checkRegionalConstraints checks that the kv table is constrained to the
// expected locality.
func (kv replicateKV) checkRegionalConstraints(
t test.Test, setup *c2cSetup, srcTenantSQL *sqlutils.SQLRunner,
) {

var kvTableID uint32
srcTenantSQL.QueryRow(t, `SELECT id FROM system.namespace WHERE name ='kv' AND "parentID" != 0`).Scan(&kvTableID)

dstTenantCodec := keys.MakeSQLCodec(roachpb.MustMakeTenantID(uint64(setup.dst.ID)))
tablePrefix := dstTenantCodec.TablePrefix(kvTableID)
t.L().Printf("Checking replica localities in destination side kv table, id %d and table prefix %s", kvTableID, tablePrefix)

distinctQuery := fmt.Sprintf(`
SELECT
DISTINCT replica_localities
FROM
[SHOW CLUSTER RANGES]
WHERE
start_key ~ '%s'
`, tablePrefix)

res := setup.dst.sysSQL.QueryStr(t, distinctQuery)
require.Equal(t, len(res), 1, "expected only one distinct locality")
locality := res[0][0]
require.Contains(t, locality, kv.partitionKVDatabaseInRegion)
require.False(t, strings.Contains(locality, kv.antiRegion), "region %s is in locality %s", kv.antiRegion, locality)
}

type replicateBulkOps struct {
// short uses less data during the import and rollback steps. Also only runs one rollback.
short bool
Expand Down Expand Up @@ -325,6 +373,9 @@ type replicationSpec struct {
// workload specifies the streaming workload.
workload streamingWorkload

// multiregion specifies multiregion cluster specs
multiregion multiRegionSpecs

// additionalDuration specifies how long the workload will run after the initial scan
//completes. If the time out is set to 0, it will run until completion.
additionalDuration time.Duration
Expand Down Expand Up @@ -360,6 +411,17 @@ type replicationSpec struct {
tags map[string]struct{}
}

type multiRegionSpecs struct {
// srcLocalities specifies the zones each src node should live. The length of this array must match the number of src nodes.
srcLocalities []string

// destLocalities specifies the zones each src node should live. The length of this array must match the number of dest nodes.
destLocalities []string

// workloadNodeZone specifies the zone that the workload node should live
workloadNodeZone string
}

// replicationDriver manages c2c roachtest execution.
type replicationDriver struct {
rs replicationSpec
Expand Down Expand Up @@ -392,6 +454,13 @@ func makeReplicationDriver(t test.Test, c cluster.Cluster, rs replicationSpec) *
}

func (rd *replicationDriver) setupC2C(ctx context.Context, t test.Test, c cluster.Cluster) {
if len(rd.rs.multiregion.srcLocalities) != 0 {
nodeCount := rd.rs.srcNodes + rd.rs.dstNodes
localityCount := len(rd.rs.multiregion.srcLocalities) + len(rd.rs.multiregion.destLocalities)
require.Equal(t, nodeCount, localityCount)
require.NotEqual(t, "", rd.rs.multiregion.workloadNodeZone)
}

c.Put(ctx, t.Cockroach(), "./cockroach")
srcCluster := c.Range(1, rd.rs.srcNodes)
dstCluster := c.Range(rd.rs.srcNodes+1, rd.rs.srcNodes+rd.rs.dstNodes)
Expand Down Expand Up @@ -881,6 +950,15 @@ func c2cRegisterWrapper(
clusterOps = append(clusterOps, spec.VolumeSize(sp.pdSize))
}

if len(sp.multiregion.srcLocalities) > 0 {
allZones := make([]string, 0, sp.srcNodes+sp.dstNodes+1)
allZones = append(allZones, sp.multiregion.srcLocalities...)
allZones = append(allZones, sp.multiregion.destLocalities...)
allZones = append(allZones, sp.multiregion.workloadNodeZone)
clusterOps = append(clusterOps, spec.Zones(strings.Join(allZones, ",")))
clusterOps = append(clusterOps, spec.Geo())
}

r.Add(registry.TestSpec{
Name: sp.name,
Owner: registry.OwnerDisasterRecovery,
Expand Down Expand Up @@ -986,6 +1064,29 @@ func registerClusterToCluster(r registry.Registry) {
additionalDuration: 1 * time.Minute,
cutover: 0,
},
{
name: "c2c/MultiRegion/SameRegions/kv0",
benchmark: true,
srcNodes: 4,
dstNodes: 4,
cpus: 8,
pdSize: 100,
workload: replicateKV{
readPercent: 0,
maxBlockBytes: 1024,
partitionKVDatabaseInRegion: "us-west1",
antiRegion: "us-central1",
},
timeout: 1 * time.Hour,
additionalDuration: 10 * time.Minute,
cutover: 1 * time.Minute,
multiregion: multiRegionSpecs{
// gcp specific
srcLocalities: []string{"us-west1-b", "us-west1-b", "us-west1-b", "us-central1-b"},
destLocalities: []string{"us-central1-b", "us-west1-b", "us-west1-b", "us-west1-b"},
workloadNodeZone: "us-west1-b",
},
},
{
name: "c2c/UnitTest",
srcNodes: 1,
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/multitenant_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ func startInMemoryTenant(
sysSQL.Exec(t, `ALTER TENANT $1 SET CLUSTER SETTING sql.split_at.allow_for_secondary_tenant.enabled=true`, tenantName)
sysSQL.Exec(t, `ALTER TENANT $1 SET CLUSTER SETTING sql.scatter.allow_for_secondary_tenant.enabled=true`, tenantName)
sysSQL.Exec(t, `ALTER TENANT $1 SET CLUSTER SETTING sql.zone_configs.allow_for_secondary_tenant.enabled=true`, tenantName)
sysSQL.Exec(t, `ALTER TENANT $1 SET CLUSTER SETTING sql.virtual_cluster.feature_access.multiregion.enabled=true`, tenantName)
sysSQL.Exec(t, `ALTER TENANT $1 SET CLUSTER SETTING enterprise.license = $2`, tenantName, config.CockroachDevLicense)
sysSQL.Exec(t, `ALTER TENANT $1 SET CLUSTER SETTING cluster.organization = 'Cockroach Labs - Production Testing'`, tenantName)
removeTenantRateLimiters(t, sysSQL, tenantName)
Expand Down

0 comments on commit 5d91b69

Please sign in to comment.