Skip to content

Commit

Permalink
roachtest: update multitenant/distsql to use new roachprod service APIs
Browse files Browse the repository at this point in the history
Previously the multitenant distsql roachtest relied on an internal util in
`roachtest` to start virtual clusters. This change updates the test to use the
new official `roachtest` and `roachprod` APIs for starting virtual clusters.

Epic: None
Release Note: None
  • Loading branch information
herkolategan committed Dec 5, 2023
1 parent c689b1d commit f9ae9ea
Showing 1 changed file with 42 additions and 55 deletions.
97 changes: 42 additions & 55 deletions pkg/cmd/roachtest/tests/multitenant_distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ package tests
import (
"archive/zip"
"context"
gosql "database/sql"
"fmt"
"io"
"strconv"
"strings"
"sync"
"time"

"github.com/cockroachdb/cockroach/pkg/cli/clisqlclient"
Expand Down Expand Up @@ -62,84 +62,63 @@ func runMultiTenantDistSQL(
// This test sets a smaller default range size than the default due to
// performance and resource limitations. We set the minimum range max bytes to
// 1 byte to bypass the guardrails.
install.MakeClusterSettings(install.SecureOption(true))
settings := install.MakeClusterSettings(install.SecureOption(true))
settings.Env = append(settings.Env, "COCKROACH_MIN_RANGE_MAX_BYTES=1")
tenantEnvOpt := createTenantEnvVar(settings.Env[len(settings.Env)-1])
c.Start(ctx, t.L(), option.DefaultStartOpts(), settings, c.Node(1))
c.Start(ctx, t.L(), option.DefaultStartOpts(), settings, c.Node(2))
c.Start(ctx, t.L(), option.DefaultStartOpts(), settings, c.Node(3))
storNodes := c.Range(1, 3)

const (
tenantID = 11
tenantBaseHTTPPort = 8081
tenantBaseSQLPort = 26259
// localPortOffset is used to avoid port conflicts with nodes on a local
// cluster.
localPortOffset = 1000
)

tenantHTTPPort := func(offset int) int {
if c.IsLocal() || numInstances > c.Spec().NodeCount {
return tenantBaseHTTPPort + localPortOffset + offset
}
return tenantBaseHTTPPort
}
tenantSQLPort := func(offset int) int {
if c.IsLocal() || numInstances > c.Spec().NodeCount {
return tenantBaseSQLPort + localPortOffset + offset
tenantName := "test-tenant"
var nodes intsets.Fast
for i := 0; i < numInstances; i++ {
node := (i % c.Spec().NodeCount) + 1
sqlInstance := i / c.Spec().NodeCount
instStartOps := option.DefaultStartOpts()
instStartOps.RoachprodOpts.Target = install.StartServiceForVirtualCluster
instStartOps.RoachprodOpts.VirtualClusterName = tenantName
instStartOps.RoachprodOpts.SQLInstance = sqlInstance

t.L().Printf("Starting instance %d on node %d", i, node)
instStartOps.RoachprodOpts.SQLPort = 0
instStartOps.RoachprodOpts.AdminUIPort = 0
err := c.StartServiceForVirtualClusterE(ctx, t.L(), c.Node(node), instStartOps, settings, storNodes)
if err != nil {
t.L().Printf("Error starting instance %d on node %d: %+v", i, node, err)
t.FailNow()
}
return tenantBaseSQLPort
nodes.Add(i + 1)
}

storConn := c.Conn(ctx, t.L(), 1)
_, err := storConn.Exec(`SELECT crdb_internal.create_tenant($1::INT)`, tenantID)
require.NoError(t, err)

instances := make([]*tenantNode, 0, numInstances)
instance1 := createTenantNode(ctx, t, c, c.Node(1), tenantID, 2 /* node */, tenantHTTPPort(0), tenantSQLPort(0),
createTenantCertNodes(c.All()), tenantEnvOpt)
instances = append(instances, instance1)
defer instance1.stop(ctx, t, c)
instance1.start(ctx, t, c, "./cockroach")

// Open things up so we can configure range sizes below.
_, err = storConn.Exec(`ALTER TENANT [$1] SET CLUSTER SETTING sql.zone_configs.allow_for_secondary_tenant.enabled = true`, tenantID)
// Open things up, so we can configure range sizes below.
_, err := storConn.Exec(`ALTER TENANT $1 SET CLUSTER SETTING sql.zone_configs.allow_for_secondary_tenant.enabled = true`, tenantName)
require.NoError(t, err)

// Create numInstances sql pods and spread them evenly across the machines.
var nodes intsets.Fast
nodes.Add(1)
for i := 1; i < numInstances; i++ {
node := ((i + 1) % c.Spec().NodeCount) + 1
inst, err := newTenantInstance(ctx, instance1, t, c, node, tenantHTTPPort(i), tenantSQLPort(i))
instances = append(instances, inst)
require.NoError(t, err)
defer inst.stop(ctx, t, c)
inst.start(ctx, t, c, "./cockroach")
nodes.Add(i + 1)
}

m := c.NewMonitor(ctx, c.Nodes(1, 2, 3))

inst1Conn, err := gosql.Open("postgres", instance1.pgURL)
//inst1Conn, err := gosql.Open("postgres", c.PGUrl(ctx, 1)
inst1Conn, err := c.ConnE(ctx, t.L(), 1, option.TenantName(tenantName))
require.NoError(t, err)
_, err = inst1Conn.Exec("CREATE TABLE t(n INT, i INT,s STRING, PRIMARY KEY(n,i))")
require.NoError(t, err)

// DistSQL needs at least a range per node to distribute query everywhere
// and test takes too long and too much resources with default range sizes
// and test takes too long and too many resources with default range sizes
// so make them much smaller.
_, err = inst1Conn.Exec(`ALTER TABLE t CONFIGURE ZONE USING range_min_bytes = 1000,range_max_bytes = 100000`)
require.NoError(t, err)

insertCtx, cancel := context.WithCancel(ctx)
defer cancel()

for i, inst := range instances {
url := inst.pgURL
for i := 0; i < numInstances; i++ {
li := i
m.Go(func(ctx context.Context) error {
dbi, err := gosql.Open("postgres", url)
node := (li % c.Spec().NodeCount) + 1
sqlInstance := li / c.Spec().NodeCount
dbi, err := c.ConnE(ctx, t.L(), node, option.TenantName(tenantName), option.SQLInstance(sqlInstance))
require.NoError(t, err)
iter := 0
for {
Expand All @@ -149,14 +128,23 @@ func runMultiTenantDistSQL(
t.L().Printf("worker %d done:%v", li, insertCtx.Err())
return nil
default:
// procede to report error
// proceed to report error
}
require.NoError(t, err, "instance idx = %d, iter = %d", li, iter)
iter++
}
})
}

// Wait for all instances to be done inserting. This is done in a separate go
// routine because the main routine is used to run the query.
var wg sync.WaitGroup
wg.Add(1)
go func() {
m.Wait()
wg.Done()
}()

// Loop until all instances show up in the query.
attempts := 180
for {
Expand Down Expand Up @@ -189,9 +177,7 @@ func runMultiTenantDistSQL(
} else {
t.L().Printf("Only %d nodes present: %v", nodesInPlan.Len(), nodesInPlan)
}

}
m.Wait()

// Don't move on until statistics are collected. Originally just
// debugging feature but leaving it in because its nice to know
Expand Down Expand Up @@ -233,7 +219,8 @@ func runMultiTenantDistSQL(
if bundle {
// Open bundle and verify its contents
sqlConnCtx := clisqlclient.Context{}
conn := sqlConnCtx.MakeSQLConn(io.Discard, io.Discard, instance1.pgURL)
pgURL, err := c.ExternalPGUrl(ctx, t.L(), c.Node(1), tenantName, 0)
conn := sqlConnCtx.MakeSQLConn(io.Discard, io.Discard, pgURL[0])
bundles, err := clisqlclient.StmtDiagListBundles(ctx, conn)
require.NoError(t, err)

Expand Down

0 comments on commit f9ae9ea

Please sign in to comment.