Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-23.2: roachtest: update multitenant/distsql to use new roachprod service APIs #116525

Merged
Merged
114 changes: 87 additions & 27 deletions pkg/cmd/roachtest/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,8 +639,11 @@ type clusterImpl struct {
expiration time.Time
encAtRest bool // use encryption at rest

// clusterSettings are additional cluster settings set on cluster startup.
// clusterSettings are additional cluster settings set on the storage cluster startup.
clusterSettings map[string]string
// virtualClusterSettings are additional cluster settings to set on the
// virtual cluster startup.
virtualClusterSettings map[string]string
// goCoverDir is the directory for Go coverage data (if coverage is enabled).
// BAZEL_COVER_DIR will be set to this value when starting a node.
goCoverDir string
Expand Down Expand Up @@ -1910,6 +1913,38 @@ func (c *clusterImpl) clearStatusForClusterOpt(worker bool) {
}
}

func (c *clusterImpl) configureClusterSettingOptions(
defaultClusterSettings install.ClusterSettingsOption, settings install.ClusterSettings,
) []install.ClusterSettingOption {
setUnlessExists := func(name string, value interface{}) {
if !envExists(settings.Env, name) {
settings.Env = append(settings.Env, fmt.Sprintf("%s=%s", name, fmt.Sprint(value)))
}
}
// Set the same seed on every node, to be used by builds with
// runtime assertions enabled.
setUnlessExists("COCKROACH_RANDOM_SEED", c.cockroachRandomSeed())

// Panic on span use-after-Finish, so we catch such bugs.
setUnlessExists("COCKROACH_CRASH_ON_SPAN_USE_AFTER_FINISH", true)

if c.goCoverDir != "" {
settings.Env = append(settings.Env, fmt.Sprintf("BAZEL_COVER_DIR=%s", c.goCoverDir))
}

return []install.ClusterSettingOption{
install.TagOption(settings.Tag),
install.PGUrlCertsDirOption(settings.PGUrlCertsDir),
install.SecureOption(settings.Secure),
install.UseTreeDistOption(settings.UseTreeDist),
install.EnvOption(settings.Env),
install.NumRacksOption(settings.NumRacks),
install.BinaryOption(settings.Binary),
defaultClusterSettings,
install.ClusterSettingsOption(settings.ClusterSettings),
}
}

// StartE starts cockroach nodes on a subset of the cluster. The nodes parameter
// can either be a specific node, empty (to indicate all nodes), or a pair of
// nodes indicating a range.
Expand All @@ -1928,17 +1963,6 @@ func (c *clusterImpl) StartE(

startOpts.RoachprodOpts.EncryptedStores = c.encAtRest

setUnlessExists := func(name string, value interface{}) {
if !envExists(settings.Env, name) {
settings.Env = append(settings.Env, fmt.Sprintf("%s=%s", name, fmt.Sprint(value)))
}
}
// Panic on span use-after-Finish, so we catch such bugs.
setUnlessExists("COCKROACH_CRASH_ON_SPAN_USE_AFTER_FINISH", true)
// Set the same seed on every node, to be used by builds with
// runtime assertions enabled.
setUnlessExists("COCKROACH_RANDOM_SEED", c.cockroachRandomSeed())

// Needed for backward-compat on crdb_internal.ranges{_no_leases}.
// Remove in v23.2.
if !envExists(settings.Env, "COCKROACH_FORCE_DEPRECATED_SHOW_RANGE_BEHAVIOR") {
Expand All @@ -1947,21 +1971,7 @@ func (c *clusterImpl) StartE(
settings.Env = append(settings.Env, "COCKROACH_FORCE_DEPRECATED_SHOW_RANGE_BEHAVIOR=false")
}

if c.goCoverDir != "" {
settings.Env = append(settings.Env, fmt.Sprintf("BAZEL_COVER_DIR=%s", c.goCoverDir))
}

clusterSettingsOpts := []install.ClusterSettingOption{
install.TagOption(settings.Tag),
install.PGUrlCertsDirOption(settings.PGUrlCertsDir),
install.SecureOption(settings.Secure),
install.UseTreeDistOption(settings.UseTreeDist),
install.EnvOption(settings.Env),
install.NumRacksOption(settings.NumRacks),
install.BinaryOption(settings.Binary),
install.ClusterSettingsOption(c.clusterSettings),
install.ClusterSettingsOption(settings.ClusterSettings),
}
clusterSettingsOpts := c.configureClusterSettingOptions(c.clusterSettings, settings)

if err := roachprod.Start(ctx, l, c.MakeNodes(opts...), startOpts.RoachprodOpts, clusterSettingsOpts...); err != nil {
return err
Expand All @@ -1977,6 +1987,56 @@ func (c *clusterImpl) StartE(
return nil
}

// StartServiceForVirtualClusterE can start either external or shared process
// virtual clusters. This can be specified in startOpts.RoachprodOpts. Set the
// `Target` to the required virtual cluster type. Refer to the virtual cluster
// section in the struct for more information on what fields are available for
// virtual clusters.
//
// With external process virtual clusters an external process will be started on
// each node specified in the externalNodes parameter.
//
// With shared process virtual clusters the required queries will be run on a
// storage node of the cluster specified in the opts parameter.
func (c *clusterImpl) StartServiceForVirtualClusterE(
ctx context.Context,
l *logger.Logger,
externalNodes option.NodeListOption,
startOpts option.StartOpts,
settings install.ClusterSettings,
opts ...option.Option,
) error {

c.setStatusForClusterOpt("starting virtual cluster", startOpts.RoachtestOpts.Worker, opts...)
defer c.clearStatusForClusterOpt(startOpts.RoachtestOpts.Worker)

clusterSettingsOpts := c.configureClusterSettingOptions(c.virtualClusterSettings, settings)

if err := roachprod.StartServiceForVirtualCluster(ctx, l, c.MakeNodes(externalNodes), c.MakeNodes(opts...), startOpts.RoachprodOpts, clusterSettingsOpts...); err != nil {
return err
}

if settings.Secure {
if err := c.RefetchCertsFromNode(ctx, 1); err != nil {
return err
}
}
return nil
}

func (c *clusterImpl) StartServiceForVirtualCluster(
ctx context.Context,
l *logger.Logger,
externalNodes option.NodeListOption,
startOpts option.StartOpts,
settings install.ClusterSettings,
opts ...option.Option,
) {
if err := c.StartServiceForVirtualClusterE(ctx, l, externalNodes, startOpts, settings, opts...); err != nil {
c.t.Fatal(err)
}
}

func (c *clusterImpl) RefetchCertsFromNode(ctx context.Context, node int) error {
var err error
c.localCertsDir, err = os.MkdirTemp("", "roachtest-certs")
Expand Down
5 changes: 5 additions & 0 deletions pkg/cmd/roachtest/cluster/cluster_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ type Cluster interface {
StopCockroachGracefullyOnNode(ctx context.Context, l *logger.Logger, node int) error
NewMonitor(context.Context, ...option.Option) Monitor

// Starting virtual clusters.

StartServiceForVirtualClusterE(ctx context.Context, l *logger.Logger, externalNodes option.NodeListOption, startOpts option.StartOpts, settings install.ClusterSettings, opts ...option.Option) error
StartServiceForVirtualCluster(ctx context.Context, l *logger.Logger, externalNodes option.NodeListOption, startOpts option.StartOpts, settings install.ClusterSettings, opts ...option.Option)

// Hostnames and IP addresses of the nodes.

InternalAddr(ctx context.Context, l *logger.Logger, node option.NodeListOption) ([]string, error)
Expand Down
6 changes: 6 additions & 0 deletions pkg/cmd/roachtest/option/connection_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ func TenantName(tenantName string) func(*ConnOption) {
}
}

func SQLInstance(sqlInstance int) func(*ConnOption) {
return func(option *ConnOption) {
option.SQLInstance = sqlInstance
}
}

func ConnectionOption(key, value string) func(*ConnOption) {
return func(option *ConnOption) {
if len(option.Options) == 0 {
Expand Down
10 changes: 10 additions & 0 deletions pkg/cmd/roachtest/option/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ func DefaultStartSingleNodeOpts() StartOpts {
return startOpts
}

// DefaultStartVirtualClusterOpts returns StartOpts for starting an external
// process virtual cluster with the given tenant name and SQL instance.
func DefaultStartVirtualClusterOpts(tenantName string, sqlInstance int) StartOpts {
startOpts := StartOpts{RoachprodOpts: roachprod.DefaultStartOpts()}
startOpts.RoachprodOpts.Target = install.StartServiceForVirtualCluster
startOpts.RoachprodOpts.VirtualClusterName = tenantName
startOpts.RoachprodOpts.SQLInstance = sqlInstance
return startOpts
}

// StopOpts is a type that combines the stop options needed by roachprod and roachtest.
type StopOpts struct {
// TODO(radu): we should use a higher-level abstraction instead of
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/test_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,7 @@ func (r *testRunner) runWorker(

// Set initial cluster settings for this test.
c.clusterSettings = map[string]string{}
c.virtualClusterSettings = map[string]string{}

switch testSpec.Leases {
case registry.DefaultLeases:
Expand Down
77 changes: 22 additions & 55 deletions pkg/cmd/roachtest/tests/multitenant_distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package tests
import (
"archive/zip"
"context"
gosql "database/sql"
"fmt"
"io"
"strconv"
Expand Down Expand Up @@ -64,82 +63,49 @@ func runMultiTenantDistSQL(
// 1 byte to bypass the guardrails.
settings := install.MakeClusterSettings(install.SecureOption(true))
settings.Env = append(settings.Env, "COCKROACH_MIN_RANGE_MAX_BYTES=1")
tenantEnvOpt := createTenantEnvVar(settings.Env[len(settings.Env)-1])
c.Start(ctx, t.L(), option.DefaultStartOpts(), settings, c.Node(1))
c.Start(ctx, t.L(), option.DefaultStartOpts(), settings, c.Node(2))
c.Start(ctx, t.L(), option.DefaultStartOpts(), settings, c.Node(3))
storageNodes := c.Range(1, 3)

const (
tenantID = 11
tenantBaseHTTPPort = 8081
tenantBaseSQLPort = 26259
// localPortOffset is used to avoid port conflicts with nodes on a local
// cluster.
localPortOffset = 1000
)

tenantHTTPPort := func(offset int) int {
if c.IsLocal() || numInstances > c.Spec().NodeCount {
return tenantBaseHTTPPort + localPortOffset + offset
}
return tenantBaseHTTPPort
}
tenantSQLPort := func(offset int) int {
if c.IsLocal() || numInstances > c.Spec().NodeCount {
return tenantBaseSQLPort + localPortOffset + offset
}
return tenantBaseSQLPort
tenantName := "test-tenant"
var nodes intsets.Fast
for i := 0; i < numInstances; i++ {
node := (i % c.Spec().NodeCount) + 1
sqlInstance := i / c.Spec().NodeCount
instStartOps := option.DefaultStartVirtualClusterOpts(tenantName, sqlInstance)
t.L().Printf("Starting instance %d on node %d", i, node)
c.StartServiceForVirtualCluster(ctx, t.L(), c.Node(node), instStartOps, settings, storageNodes)
nodes.Add(i + 1)
}

storConn := c.Conn(ctx, t.L(), 1)
_, err := storConn.Exec(`SELECT crdb_internal.create_tenant($1::INT)`, tenantID)
require.NoError(t, err)

instances := make([]*tenantNode, 0, numInstances)
instance1 := createTenantNode(ctx, t, c, c.Node(1), tenantID, 2 /* node */, tenantHTTPPort(0), tenantSQLPort(0),
createTenantCertNodes(c.All()), tenantEnvOpt)
instances = append(instances, instance1)
defer instance1.stop(ctx, t, c)
instance1.start(ctx, t, c, "./cockroach")

// Open things up so we can configure range sizes below.
_, err = storConn.Exec(`ALTER TENANT [$1] SET CLUSTER SETTING sql.zone_configs.allow_for_secondary_tenant.enabled = true`, tenantID)
// Open things up, so we can configure range sizes below.
_, err := storConn.Exec(`ALTER TENANT $1 SET CLUSTER SETTING sql.zone_configs.allow_for_secondary_tenant.enabled = true`, tenantName)
require.NoError(t, err)

// Create numInstances sql pods and spread them evenly across the machines.
var nodes intsets.Fast
nodes.Add(1)
for i := 1; i < numInstances; i++ {
node := ((i + 1) % c.Spec().NodeCount) + 1
inst, err := newTenantInstance(ctx, instance1, t, c, node, tenantHTTPPort(i), tenantSQLPort(i))
instances = append(instances, inst)
require.NoError(t, err)
defer inst.stop(ctx, t, c)
inst.start(ctx, t, c, "./cockroach")
nodes.Add(i + 1)
}

m := c.NewMonitor(ctx, c.Nodes(1, 2, 3))

inst1Conn, err := gosql.Open("postgres", instance1.pgURL)
inst1Conn, err := c.ConnE(ctx, t.L(), 1, option.TenantName(tenantName))
require.NoError(t, err)
_, err = inst1Conn.Exec("CREATE TABLE t(n INT, i INT,s STRING, PRIMARY KEY(n,i))")
require.NoError(t, err)

// DistSQL needs at least a range per node to distribute query everywhere
// and test takes too long and too much resources with default range sizes
// and test takes too long and too many resources with default range sizes
// so make them much smaller.
_, err = inst1Conn.Exec(`ALTER TABLE t CONFIGURE ZONE USING range_min_bytes = 1000,range_max_bytes = 100000`)
require.NoError(t, err)

insertCtx, cancel := context.WithCancel(ctx)
defer cancel()

for i, inst := range instances {
url := inst.pgURL
for i := 0; i < numInstances; i++ {
li := i
m.Go(func(ctx context.Context) error {
dbi, err := gosql.Open("postgres", url)
node := (li % c.Spec().NodeCount) + 1
sqlInstance := li / c.Spec().NodeCount
dbi, err := c.ConnE(ctx, t.L(), node, option.TenantName(tenantName), option.SQLInstance(sqlInstance))
require.NoError(t, err)
iter := 0
for {
Expand All @@ -149,7 +115,7 @@ func runMultiTenantDistSQL(
t.L().Printf("worker %d done:%v", li, insertCtx.Err())
return nil
default:
// procede to report error
// proceed to report error
}
require.NoError(t, err, "instance idx = %d, iter = %d", li, iter)
iter++
Expand Down Expand Up @@ -189,7 +155,6 @@ func runMultiTenantDistSQL(
} else {
t.L().Printf("Only %d nodes present: %v", nodesInPlan.Len(), nodesInPlan)
}

}
m.Wait()

Expand Down Expand Up @@ -233,7 +198,9 @@ func runMultiTenantDistSQL(
if bundle {
// Open bundle and verify its contents
sqlConnCtx := clisqlclient.Context{}
conn := sqlConnCtx.MakeSQLConn(io.Discard, io.Discard, instance1.pgURL)
pgURL, err := c.ExternalPGUrl(ctx, t.L(), c.Node(1), tenantName, 0)
require.NoError(t, err)
conn := sqlConnCtx.MakeSQLConn(io.Discard, io.Discard, pgURL[0])
bundles, err := clisqlclient.StmtDiagListBundles(ctx, conn)
require.NoError(t, err)

Expand Down
Loading