Skip to content

Commit

Permalink
roachtest: update multitenant-multiregion to use roachprod API
Browse files Browse the repository at this point in the history
This updates the `multitenant-multiregion` test to use the "official"
multitenant roachprod API instead of the deprecated functions in
`multitenant_utils.go`.

Fixes: #124029

Release note: None
  • Loading branch information
renatolabs committed Sep 9, 2024
1 parent 32a1f71 commit 0bfa284
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 120 deletions.
246 changes: 136 additions & 110 deletions pkg/cmd/roachtest/tests/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ package tests

import (
"context"
gosql "database/sql"
"fmt"
"path/filepath"
"math/rand"
"slices"
"strings"
"time"

Expand Down Expand Up @@ -83,22 +83,40 @@ func runAcceptanceMultitenant(ctx context.Context, t test.Test, c cluster.Cluste
// Runs a test on a multi-region multi-tenant cluster, which will be
// spread across at least two regions.
func runMultiTenantMultiRegion(ctx context.Context, t test.Test, c cluster.Cluster) {
startOptions := option.NewStartOpts(option.NoBackupSchedule)
c.Start(ctx, t.L(), startOptions, install.MakeClusterSettings(install.SecureOption(true)), c.All())
regions := strings.Split(c.Spec().GCE.Zones, ",")
regionOnly := func(regionAndZone string) string {
r := strings.Split(regionAndZone, "-")
return r[0] + "-" + r[1]
systemStartOpts := option.NewStartOpts(option.NoBackupSchedule)
c.Start(ctx, t.L(), systemStartOpts, install.MakeClusterSettings())

zones := strings.Split(c.Spec().GCE.Zones, ",")
regions := make([]string, 0, len(zones))
var uniqueRegions []string
for _, z := range zones {
parts := strings.Split(z, "-")
region := parts[0] + "-" + parts[1]
regions = append(regions, region)

if slices.Index(uniqueRegions, region) == -1 {
uniqueRegions = append(uniqueRegions, region)
}
}

const (
n1 = 1
otherRegionNode = 7
)

// Verify that the constant above is valid.
if regions[n1-1] == regions[otherRegionNode-1] {
t.Fatal(fmt.Errorf(
"expected n%d and n%d to be in different regions, but they are both in %s",
n1, otherRegionNode, regions[n1-1],
))
}

const tenantID = 123
{
// Intentionally, alter settings so that the system database span config
// changes propagate faster, when we convert the system database to MR.
conn := c.Conn(ctx, t.L(), 1)
defer conn.Close()
_, err := conn.Exec(`SELECT crdb_internal.create_tenant($1::INT)`, tenantID)
require.NoError(t, err)
configStmts := []string{
`SET CLUSTER SETTING sql.virtual_cluster.feature_access.multiregion.enabled='true'`,
`SET CLUSTER SETTING kv.closed_timestamp.target_duration = '200ms'`,
Expand All @@ -111,68 +129,75 @@ func runMultiTenantMultiRegion(ctx context.Context, t test.Test, c cluster.Clust
`SET CLUSTER setting kv.replication_reports.interval = '5s';`,
}
for _, stmt := range configStmts {
t.L().Printf("running statement: %s", stmt)
_, err := conn.Exec(stmt)
require.NoError(t, err)
}
}

const (
tenantHTTPPort = 8081
tenantSQLPort = 30258
otherRegionNode = 7
// Start an equal number of tenants on the cluster, located in the
// same regions.
virtualCluster := "multiregion-tenant"
tenantStartOpts := option.StartVirtualClusterOpts(virtualCluster, c.All(), option.NoBackupSchedule)
c.StartServiceForVirtualCluster(ctx, t.L(), tenantStartOpts, install.MakeClusterSettings())

tenantConn := c.Conn(ctx, t.L(), 1, option.VirtualClusterName(virtualCluster))
defer tenantConn.Close()
rootTenantConn := c.Conn(ctx, t.L(), 1,
option.VirtualClusterName(virtualCluster),
option.AuthMode(install.AuthRootCert),
)
// Start an equal number of tenants on the cluster, located in the same regions.
tenants := make([]*tenantNode, 0, len(c.All()))
for i, node := range c.All() {
region := regions[i]
regionInfo := fmt.Sprintf("cloud=%s,region=%s,zone=%s", c.Cloud(), regionOnly(region), region)
tenant := deprecatedCreateTenantNode(ctx, t, c, c.All(), tenantID, node, tenantHTTPPort, tenantSQLPort, createTenantRegion(regionInfo))
tenant.start(ctx, t, c, "./cockroach")
tenants = append(tenants, tenant)

// Setup the system database for multi-region, and add all the region
// in our cluster.
if i == 0 {
includedRegions := make(map[string]struct{})
verifySQL(t, tenants[0].pgURL,
mkStmt("SET CLUSTER SETTING sql.region_liveness.enabled='yes'"),
)
verifySQL(t, tenants[0].pgURL,
mkStmt(fmt.Sprintf(`ALTER DATABASE system SET PRIMARY REGION '%s'`, regionOnly(regions[0]))),
mkStmt(fmt.Sprintf(`ALTER DATABASE defaultdb SET PRIMARY REGION '%s'`, regionOnly(regions[0]))))
includedRegions[regions[0]] = struct{}{}
for _, region := range regions {
if _, ok := includedRegions[region]; ok {
continue
}
includedRegions[region] = struct{}{}
verifySQL(t, tenants[0].pgURL,
mkStmt(fmt.Sprintf(`ALTER DATABASE system ADD REGION '%s'`, regionOnly(region))),
mkStmt(fmt.Sprintf(`ALTER DATABASE defaultdb ADD REGION '%s'`, regionOnly(region))))
defer rootTenantConn.Close()

t.L().Printf("enabling region liveness on the tenant")
_, err := tenantConn.Exec("SET CLUSTER SETTING sql.region_liveness.enabled='yes'")
require.NoError(t, err)

workloadDB := "ycsb"
workloadType := "A"
if rand.Float64() < 0.5 {
workloadType = "B"
}
cmd := fmt.Sprintf(
"./cockroach workload init ycsb --db %s --workload=%s {pgurl%s:%s}",
workloadDB, workloadType, c.All(), virtualCluster,
)
c.Run(ctx, option.WithNodes(c.Node(1)), cmd)

for _, r := range uniqueRegions {
op := "ADD"
if r == regions[0] {
op = "SET PRIMARY"
}

for _, dbName := range []string{"system", "defaultdb", workloadDB} {
stmt := fmt.Sprintf("ALTER DATABASE %s %s REGION '%s'", dbName, op, r)
t.L().Printf("running statement: %s", stmt)

c := tenantConn
if dbName == "system" {
c = rootTenantConn // only the root can modify the system database
}
_, err := c.Exec(stmt)
require.NoError(t, err)
}
}

// Sanity: Make sure the first tenant can be connected to.
t.Status("checking that a client can connect to the tenant server")
verifySQL(t, tenants[0].pgURL,
mkStmt(`CREATE TABLE foo (id INT PRIMARY KEY, v STRING)`),
mkStmt(`INSERT INTO foo VALUES($1, $2)`, 1, "bar"),
mkStmt(`SELECT * FROM foo LIMIT 1`).
withResults([][]string{{"1", "bar"}}))
t.L().Printf("running YCSB on the tenant for a few minutes")
cmd = fmt.Sprintf(
"./cockroach workload run ycsb --db %s --workload=%s --duration 10m {pgurl%s:%s}",
workloadDB, workloadType, c.All(), virtualCluster,
)
c.Run(ctx, option.WithNodes(c.Node(1)), cmd)

// Wait for the span configs to propagate. After we know they have
// propagated, we'll shut down the tenant and wait for them to get
// applied.
tdb, tdbCloser := openDBAndMakeSQLRunner(t, tenants[0].pgURL)
defer tdbCloser()
t.Status("Waiting for span config reconciliation...")
t.Status("waiting for span config reconciliation...")
tdb := sqlutils.MakeSQLRunner(tenantConn)
sqlutils.WaitForSpanConfigReconciliation(t, tdb)
t.Status("Span config reconciliation complete")
t.Status("Waiting for replication changes...")
conn := c.Conn(ctx, t.L(), 1)
defer conn.Close()
//systemConn := sqlutils.MakeSQLRunner(conn)
t.Status("span config reconciliation complete, waiting for replication changes")

checkStartTime := timeutil.Now()
count := 0
tableStartKeys := []string{
Expand Down Expand Up @@ -254,31 +279,31 @@ func runMultiTenantMultiRegion(ctx context.Context, t test.Test, c cluster.Clust
t.Status("Replication changes complete")

// Stop all the tenants gracefully first.
for _, tenant := range tenants {
tenant.stop(ctx, t, c)
for _, node := range c.All() {
t.L().Printf("stopping tenant on n%d", node)
stopOpts := option.StopVirtualClusterOpts(virtualCluster, c.Node(node), option.Graceful(60))
c.StopServiceForVirtualCluster(ctx, t.L(), stopOpts)
}

// Start them all up again.
for _, tenant := range tenants {
tenant.start(ctx, t, c, "./cockroach")
}
t.L().Printf("restarting virtual cluster")
c.StartServiceForVirtualCluster(ctx, t.L(), tenantStartOpts, install.MakeClusterSettings())

grp := ctxgroup.WithContext(ctx)
startSchemaChange := make(chan struct{})
waitForSchemaChange := make(chan struct{})
killNodes := make(chan struct{})
nodesKilled := make(chan struct{})
otherRegionConn := c.Conn(ctx, t.L(), otherRegionNode, option.VirtualClusterName(virtualCluster))
defer otherRegionConn.Close()

// Start a connection that will hold a lease on a table that we are going
// to schema change on. The region we are connecting to will be intentionally,
// killed off.
grp.GoCtx(func(ctx context.Context) (err error) {
db, err := gosql.Open("postgres", tenants[otherRegionNode].pgURL)
if err != nil {
return err
}
defer db.Close()
txn, err := db.BeginTx(ctx, nil)
txn, err := otherRegionConn.BeginTx(ctx, nil)
if err != nil {
return err
return errors.Wrap(err, "starting transaction")
}

defer func() {
Expand All @@ -288,9 +313,11 @@ func runMultiTenantMultiRegion(ctx context.Context, t test.Test, c cluster.Clust
commitErr = nil
}
err = errors.CombineErrors(err, commitErr)
t.Status("Committed lease holding txn with error: ", err)
if err != nil {
t.L().Printf("Committed lease holding txn with error: %#v", err)
}
}()
_, err = txn.Exec("SELECT * FROM foo")
_, err = txn.Exec(fmt.Sprintf("SELECT * FROM %s.usertable", workloadDB))
startSchemaChange <- struct{}{}
<-waitForSchemaChange
return err
Expand All @@ -303,23 +330,18 @@ func runMultiTenantMultiRegion(ctx context.Context, t test.Test, c cluster.Clust
defer func() {
waitForSchemaChange <- struct{}{}
}()
db, err := gosql.Open("postgres", tenants[0].pgURL)
if err != nil {
return err
}
defer db.Close()
killNodes <- struct{}{}
<-nodesKilled
for {
t.Status("running schema change with lease held...")
_, err = db.Exec("ALTER TABLE foo ADD COLUMN newcol int")
_, err = tenantConn.Exec(fmt.Sprintf("ALTER TABLE %s.usertable ADD COLUMN newcol int", workloadDB))
// Confirm that we hit the expected error or no error.
if err != nil &&
!strings.Contains(err.Error(), "count-lease timed out reading from a region") {
// Unrelated error, so lets kill off the test.
return errors.NewAssertionErrorWithWrappedErrf(err, "no time out detected because of dead region")
} else if err != nil {
t.Status("waiting for schema change completion, found expected error: ", err)
t.L().Printf("waiting for schema change completion, found expected error: %v", err)
continue
} else {
// Schema change compleded successfully.
Expand All @@ -332,49 +354,53 @@ func runMultiTenantMultiRegion(ctx context.Context, t test.Test, c cluster.Clust
<-killNodes
// Kill both tenants and storage servers in the region we want dead. The schema
// change should just naturally unblock and succeed.
c.Run(ctx, install.WithNodes(c.Range(otherRegionNode, len(c.All())).InstallNodes()), "killall -9 cockroach")
killedRegion := c.Range(otherRegionNode, len(c.All()))
t.Status("stopping the server ahead of checking for the tenant server")
c.Stop(ctx, t.L(), option.DefaultStopOpts(), killedRegion)
nodesKilled <- struct{}{}

require.NoErrorf(t, grp.Wait(), "waited for go routines, expected no error.")
t.Status("stopping the server ahead of checking for the tenant server")

// Restart the KV storage servers first.
c.Start(ctx, t.L(), startOptions, install.MakeClusterSettings(install.SecureOption(true)), c.Range(otherRegionNode, len(c.All())))
// Re-add any dead tenants back again.
for _, tenant := range tenants[otherRegionNode-1:] {
tenant.start(ctx, t, c, "./cockroach")
}
c.Start(ctx, t.L(), systemStartOpts, install.MakeClusterSettings(), killedRegion)
// Re-add dead tenants back again.
killedRegionStartOpts := option.StartVirtualClusterOpts(
virtualCluster, killedRegion, option.NoBackupSchedule,
)
c.StartServiceForVirtualCluster(ctx, t.L(), killedRegionStartOpts, install.MakeClusterSettings())

// Validate that no region is labeled as unavailable after.
for _, tenant := range tenants[otherRegionNode-1:] {
verifySQL(t, tenant.pgURL,
mkStmt("SELECT * FROM system.region_liveness").withResults([][]string{}))
}
// Stop the server, which also ensures that log files get flushed.
for _, tenant := range tenants {
tenant.stop(ctx, t, c)
}
// Check that the server identifiers are present in the tenant log file.
logFile := filepath.Join(tenants[0].logDir(), "*.log")
if err := c.RunE(ctx, install.WithNodes(c.Node(1).InstallNodes()),
"grep", "-q", "'start\\.go.*clusterID:'", logFile); err != nil {
t.Fatal(errors.Wrap(err, "cluster ID not found in log file"))
}
if err := c.RunE(ctx, install.WithNodes(c.Node(1).InstallNodes()),
"grep", "-q", "'start\\.go.*tenantID:'", logFile); err != nil {
t.Fatal(errors.Wrap(err, "tenant ID not found in log file"))
}
if err := c.RunE(ctx, install.WithNodes(c.Node(1).InstallNodes()),
"grep", "-q", "'start\\.go.*instanceID:'", logFile); err != nil {
t.Fatal(errors.Wrap(err, "SQL instance ID not found in log file"))
for _, node := range killedRegion {
tenantDB := c.Conn(ctx, t.L(), node, option.VirtualClusterName(virtualCluster))
defer tenantDB.Close()

rows, err := tenantDB.Query("SELECT crdb_region, unavailable_at FROM system.region_liveness")
require.NoError(t, err, "error querying region liveness on n%d", node)

var unavailableRegions []string
for rows.Next() {
var region []byte
var unavailableAt time.Time

require.NoError(t, rows.Scan(&region, unavailableAt), "reading region liveness on n%d", node)
unavailableRegions = append(
unavailableRegions,
fmt.Sprintf("region: %x, unavailable_at: %s", region, unavailableAt),
)
}

require.NoError(t, rows.Err(), "rows.Err() on n%d", node)
if len(unavailableRegions) > 0 {
t.Fatalf("unavailable regions on n%d:\n%s", node, strings.Join(unavailableRegions, "\n"))
}
}

t.Status("checking log file contents")
t.L().Printf("validated region liveness")
}

func registerMultiTenantMultiregion(r registry.Registry) {
r.Add(registry.TestSpec{
Name: "multitenant-multiregion",
Timeout: 20 * time.Minute,
Timeout: 30 * time.Minute,
Owner: registry.OwnerSQLFoundations,
Cluster: r.MakeClusterSpec(
9,
Expand Down
6 changes: 0 additions & 6 deletions pkg/cmd/roachtest/tests/multitenant_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,6 @@ type createTenantOptions struct {
}
type createTenantOpt func(*createTenantOptions)

func createTenantRegion(region string) createTenantOpt {
return func(c *createTenantOptions) {
c.region = region
}
}

func createTenantNodeInternal(
ctx context.Context,
t test.Test,
Expand Down
Loading

0 comments on commit 0bfa284

Please sign in to comment.