Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

roachtest: add c2c/multiregion/SameRegions/kv0 roachtest #110638

Merged
merged 1 commit into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1197,4 +1197,31 @@ func TestStreamingRegionalConstraint(t *testing.T) {

testutils.SucceedsSoon(t,
checkLocalities(tableDesc.PrimaryIndexSpan(destCodec), rangedesc.NewScanner(c.DestSysServer.DB())))

tableName := "test"
tabledIDQuery := fmt.Sprintf(`SELECT id FROM system.namespace WHERE name ='%s'`, tableName)

var tableID uint32
c.SrcTenantSQL.QueryRow(t, tabledIDQuery).Scan(&tableID)
fmt.Printf("%d", tableID)

checkLocalityRanges(t, c.SrcSysSQL, srcCodec, uint32(tableDesc.GetID()), "mars")

}

func checkLocalityRanges(
t *testing.T, sysSQL *sqlutils.SQLRunner, codec keys.SQLCodec, tableID uint32, region string,
) {
targetPrefix := codec.TablePrefix(tableID)
distinctQuery := fmt.Sprintf(`
SELECT
DISTINCT replica_localities
FROM
[SHOW CLUSTER RANGES]
WHERE
start_key ~ '%s'
`, targetPrefix)
var locality string
sysSQL.QueryRow(t, distinctQuery).Scan(&locality)
require.Contains(t, locality, region)
}
101 changes: 101 additions & 0 deletions pkg/cmd/roachtest/tests/cluster_to_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/multitenant/mtinfopb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
Expand Down Expand Up @@ -245,6 +246,15 @@ type replicateKV struct {

// max size of raw data written during each insertion
maxBlockBytes int

// partitionKVDatabaseInRegion constrains the kv database in the specified
// region and asserts, before cutover, that the replicated span configuration
// correctly enforces the regional constraint in the destination tenant.
partitionKVDatabaseInRegion string

// antiRegion is the region we do not expect any kv data to reside in if
// partitionKVDatabaseInRegion is set.
antiRegion string
}

func (kv replicateKV) sourceInitCmd(tenantName string, nodes option.NodeListOption) string {
Expand All @@ -270,9 +280,47 @@ func (kv replicateKV) sourceRunCmd(tenantName string, nodes option.NodeListOptio
func (kv replicateKV) runDriver(
workloadCtx context.Context, c cluster.Cluster, t test.Test, setup *c2cSetup,
) error {
if kv.partitionKVDatabaseInRegion != "" {
require.NotEqual(t, "", kv.antiRegion, "if partitionKVDatabaseInRegion is set, then antiRegion must be set")
t.L().Printf("constrain the kv database to region %s", kv.partitionKVDatabaseInRegion)
alterStmt := fmt.Sprintf("ALTER DATABASE kv CONFIGURE ZONE USING constraints = '[+region=%s]'", kv.partitionKVDatabaseInRegion)
srcTenantConn := c.Conn(workloadCtx, t.L(), setup.src.nodes.RandNode()[0], option.TenantName(setup.src.name))
srcTenantSQL := sqlutils.MakeSQLRunner(srcTenantConn)
srcTenantSQL.Exec(t, alterStmt)
defer kv.checkRegionalConstraints(t, setup, srcTenantSQL)
}
return defaultWorkloadDriver(workloadCtx, setup, c, kv)
}

// checkRegionalConstraints checks that the kv table is constrained to the
// expected locality.
func (kv replicateKV) checkRegionalConstraints(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we'll need to retry this in the future in case it takes some time for KV to enforce the constraints. I assume that right now we are assuming the test runs long enough that it really ought to be done by the time the test complete.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmmm good question. For now, I'm inclined to not add retry behavior. If this specific test fails because it does not replicate the regional constraints properly after 10 minutes, I'd like to know that. I fear that adding retry behavior could hide a bug. If this test starts to flake, I'll reconsider the logic here.

t test.Test, setup *c2cSetup, srcTenantSQL *sqlutils.SQLRunner,
) {

var kvTableID uint32
srcTenantSQL.QueryRow(t, `SELECT id FROM system.namespace WHERE name ='kv' AND "parentID" != 0`).Scan(&kvTableID)

dstTenantCodec := keys.MakeSQLCodec(roachpb.MustMakeTenantID(uint64(setup.dst.ID)))
tablePrefix := dstTenantCodec.TablePrefix(kvTableID)
t.L().Printf("Checking replica localities in destination side kv table, id %d and table prefix %s", kvTableID, tablePrefix)

distinctQuery := fmt.Sprintf(`
SELECT
DISTINCT replica_localities
FROM
[SHOW CLUSTER RANGES]
WHERE
start_key ~ '%s'
`, tablePrefix)

res := setup.dst.sysSQL.QueryStr(t, distinctQuery)
require.Equal(t, 1, len(res), "expected only one distinct locality")
locality := res[0][0]
require.Contains(t, locality, kv.partitionKVDatabaseInRegion)
require.False(t, strings.Contains(locality, kv.antiRegion), "region %s is in locality %s", kv.antiRegion, locality)
}

type replicateBulkOps struct {
// short uses less data during the import and rollback steps. Also only runs one rollback.
short bool
Expand Down Expand Up @@ -325,6 +373,9 @@ type replicationSpec struct {
// workload specifies the streaming workload.
workload streamingWorkload

// multiregion specifies multiregion cluster specs
multiregion multiRegionSpecs

// additionalDuration specifies how long the workload will run after the initial scan
//completes. If the time out is set to 0, it will run until completion.
additionalDuration time.Duration
Expand Down Expand Up @@ -360,6 +411,17 @@ type replicationSpec struct {
tags map[string]struct{}
}

type multiRegionSpecs struct {
// srcLocalities specifies the zones each src node should live. The length of this array must match the number of src nodes.
srcLocalities []string

// destLocalities specifies the zones each src node should live. The length of this array must match the number of dest nodes.
destLocalities []string

// workloadNodeZone specifies the zone that the workload node should live
workloadNodeZone string
}

// replicationDriver manages c2c roachtest execution.
type replicationDriver struct {
rs replicationSpec
Expand Down Expand Up @@ -392,6 +454,13 @@ func makeReplicationDriver(t test.Test, c cluster.Cluster, rs replicationSpec) *
}

func (rd *replicationDriver) setupC2C(ctx context.Context, t test.Test, c cluster.Cluster) {
if len(rd.rs.multiregion.srcLocalities) != 0 {
nodeCount := rd.rs.srcNodes + rd.rs.dstNodes
localityCount := len(rd.rs.multiregion.srcLocalities) + len(rd.rs.multiregion.destLocalities)
require.Equal(t, nodeCount, localityCount)
require.NotEqual(t, "", rd.rs.multiregion.workloadNodeZone)
}

c.Put(ctx, t.Cockroach(), "./cockroach")
srcCluster := c.Range(1, rd.rs.srcNodes)
dstCluster := c.Range(rd.rs.srcNodes+1, rd.rs.srcNodes+rd.rs.dstNodes)
Expand Down Expand Up @@ -881,6 +950,15 @@ func c2cRegisterWrapper(
clusterOps = append(clusterOps, spec.VolumeSize(sp.pdSize))
}

if len(sp.multiregion.srcLocalities) > 0 {
allZones := make([]string, 0, sp.srcNodes+sp.dstNodes+1)
allZones = append(allZones, sp.multiregion.srcLocalities...)
allZones = append(allZones, sp.multiregion.destLocalities...)
allZones = append(allZones, sp.multiregion.workloadNodeZone)
clusterOps = append(clusterOps, spec.Zones(strings.Join(allZones, ",")))
clusterOps = append(clusterOps, spec.Geo())
}

r.Add(registry.TestSpec{
Name: sp.name,
Owner: registry.OwnerDisasterRecovery,
Expand Down Expand Up @@ -986,6 +1064,29 @@ func registerClusterToCluster(r registry.Registry) {
additionalDuration: 1 * time.Minute,
cutover: 0,
},
{
name: "c2c/MultiRegion/SameRegions/kv0",
benchmark: true,
srcNodes: 4,
dstNodes: 4,
cpus: 8,
pdSize: 100,
workload: replicateKV{
readPercent: 0,
maxBlockBytes: 1024,
partitionKVDatabaseInRegion: "us-west1",
antiRegion: "us-central1",
},
timeout: 1 * time.Hour,
additionalDuration: 10 * time.Minute,
cutover: 1 * time.Minute,
multiregion: multiRegionSpecs{
// gcp specific
srcLocalities: []string{"us-west1-b", "us-west1-b", "us-west1-b", "us-central1-b"},
destLocalities: []string{"us-central1-b", "us-west1-b", "us-west1-b", "us-west1-b"},
workloadNodeZone: "us-west1-b",
},
},
{
name: "c2c/UnitTest",
srcNodes: 1,
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/multitenant_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ func startInMemoryTenant(
sysSQL.Exec(t, `ALTER TENANT $1 SET CLUSTER SETTING sql.split_at.allow_for_secondary_tenant.enabled=true`, tenantName)
sysSQL.Exec(t, `ALTER TENANT $1 SET CLUSTER SETTING sql.scatter.allow_for_secondary_tenant.enabled=true`, tenantName)
sysSQL.Exec(t, `ALTER TENANT $1 SET CLUSTER SETTING sql.zone_configs.allow_for_secondary_tenant.enabled=true`, tenantName)
sysSQL.Exec(t, `ALTER TENANT $1 SET CLUSTER SETTING sql.virtual_cluster.feature_access.multiregion.enabled=true`, tenantName)
sysSQL.Exec(t, `ALTER TENANT $1 SET CLUSTER SETTING enterprise.license = $2`, tenantName, config.CockroachDevLicense)
sysSQL.Exec(t, `ALTER TENANT $1 SET CLUSTER SETTING cluster.organization = 'Cockroach Labs - Production Testing'`, tenantName)
removeTenantRateLimiters(t, sysSQL, tenantName)
Expand Down