Skip to content

Commit

Permalink
Merge #115599
Browse files Browse the repository at this point in the history
115599: roachtest: update multitenant/distsql to use new roachprod service APIs r=renatolabs,srosenberg a=herkolategan

Previously the multitenant distsql roachtest relied on an internal util in `roachtest` to start virtual clusters. This PR updates the test to use the new official `roachtest` and `roachprod` APIs for starting virtual clusters.


Some additional changes were required to support upgrading the test. The cluster interface only exposed a method to start storage nodes, but that is insufficient to start virtual clusters that have a separate method on the `roachprod` API (for starting).

This change adds a new method `StartServiceForVirtualCluster` to the cluster interface to enable roachtests to start virtual clusters. Some refactoring was required to enable different sets of cluster settings, depending on what service
type is going to be started.

There are now two sets of cluster settings that can be utilised in `test_runner`. For virtual clusters `virtualClusterSettings` will be used, and for storage clusters `clusterSettings` will be utilised.

Fixes: #116019

Release Note: None
Epic: CRDB-31933


Co-authored-by: Herko Lategan <[email protected]>
  • Loading branch information
craig[bot] and herkolategan committed Dec 14, 2023
2 parents b1fdba0 + 555437e commit cbcdca3
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 130 deletions.
114 changes: 87 additions & 27 deletions pkg/cmd/roachtest/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,8 +639,11 @@ type clusterImpl struct {
expiration time.Time
encAtRest bool // use encryption at rest

// clusterSettings are additional cluster settings set on cluster startup.
// clusterSettings are additional cluster settings set on the storage cluster startup.
clusterSettings map[string]string
// virtualClusterSettings are additional cluster settings to set on the
// virtual cluster startup.
virtualClusterSettings map[string]string
// goCoverDir is the directory for Go coverage data (if coverage is enabled).
// BAZEL_COVER_DIR will be set to this value when starting a node.
goCoverDir string
Expand Down Expand Up @@ -1910,6 +1913,38 @@ func (c *clusterImpl) clearStatusForClusterOpt(worker bool) {
}
}

func (c *clusterImpl) configureClusterSettingOptions(
defaultClusterSettings install.ClusterSettingsOption, settings install.ClusterSettings,
) []install.ClusterSettingOption {
setUnlessExists := func(name string, value interface{}) {
if !envExists(settings.Env, name) {
settings.Env = append(settings.Env, fmt.Sprintf("%s=%s", name, fmt.Sprint(value)))
}
}
// Set the same seed on every node, to be used by builds with
// runtime assertions enabled.
setUnlessExists("COCKROACH_RANDOM_SEED", c.cockroachRandomSeed())

// Panic on span use-after-Finish, so we catch such bugs.
setUnlessExists("COCKROACH_CRASH_ON_SPAN_USE_AFTER_FINISH", true)

if c.goCoverDir != "" {
settings.Env = append(settings.Env, fmt.Sprintf("BAZEL_COVER_DIR=%s", c.goCoverDir))
}

return []install.ClusterSettingOption{
install.TagOption(settings.Tag),
install.PGUrlCertsDirOption(settings.PGUrlCertsDir),
install.SecureOption(settings.Secure),
install.UseTreeDistOption(settings.UseTreeDist),
install.EnvOption(settings.Env),
install.NumRacksOption(settings.NumRacks),
install.BinaryOption(settings.Binary),
defaultClusterSettings,
install.ClusterSettingsOption(settings.ClusterSettings),
}
}

// StartE starts cockroach nodes on a subset of the cluster. The nodes parameter
// can either be a specific node, empty (to indicate all nodes), or a pair of
// nodes indicating a range.
Expand All @@ -1928,17 +1963,6 @@ func (c *clusterImpl) StartE(

startOpts.RoachprodOpts.EncryptedStores = c.encAtRest

setUnlessExists := func(name string, value interface{}) {
if !envExists(settings.Env, name) {
settings.Env = append(settings.Env, fmt.Sprintf("%s=%s", name, fmt.Sprint(value)))
}
}
// Panic on span use-after-Finish, so we catch such bugs.
setUnlessExists("COCKROACH_CRASH_ON_SPAN_USE_AFTER_FINISH", true)
// Set the same seed on every node, to be used by builds with
// runtime assertions enabled.
setUnlessExists("COCKROACH_RANDOM_SEED", c.cockroachRandomSeed())

// Needed for backward-compat on crdb_internal.ranges{_no_leases}.
// Remove in v23.2.
if !envExists(settings.Env, "COCKROACH_FORCE_DEPRECATED_SHOW_RANGE_BEHAVIOR") {
Expand All @@ -1947,21 +1971,7 @@ func (c *clusterImpl) StartE(
settings.Env = append(settings.Env, "COCKROACH_FORCE_DEPRECATED_SHOW_RANGE_BEHAVIOR=false")
}

if c.goCoverDir != "" {
settings.Env = append(settings.Env, fmt.Sprintf("BAZEL_COVER_DIR=%s", c.goCoverDir))
}

clusterSettingsOpts := []install.ClusterSettingOption{
install.TagOption(settings.Tag),
install.PGUrlCertsDirOption(settings.PGUrlCertsDir),
install.SecureOption(settings.Secure),
install.UseTreeDistOption(settings.UseTreeDist),
install.EnvOption(settings.Env),
install.NumRacksOption(settings.NumRacks),
install.BinaryOption(settings.Binary),
install.ClusterSettingsOption(c.clusterSettings),
install.ClusterSettingsOption(settings.ClusterSettings),
}
clusterSettingsOpts := c.configureClusterSettingOptions(c.clusterSettings, settings)

if err := roachprod.Start(ctx, l, c.MakeNodes(opts...), startOpts.RoachprodOpts, clusterSettingsOpts...); err != nil {
return err
Expand All @@ -1977,6 +1987,56 @@ func (c *clusterImpl) StartE(
return nil
}

// StartServiceForVirtualClusterE can start either external or shared process
// virtual clusters. This can be specified in startOpts.RoachprodOpts. Set the
// `Target` to the required virtual cluster type. Refer to the virtual cluster
// section in the struct for more information on what fields are available for
// virtual clusters.
//
// With external process virtual clusters an external process will be started on
// each node specified in the externalNodes parameter.
//
// With shared process virtual clusters the required queries will be run on a
// storage node of the cluster specified in the opts parameter.
func (c *clusterImpl) StartServiceForVirtualClusterE(
ctx context.Context,
l *logger.Logger,
externalNodes option.NodeListOption,
startOpts option.StartOpts,
settings install.ClusterSettings,
opts ...option.Option,
) error {

c.setStatusForClusterOpt("starting virtual cluster", startOpts.RoachtestOpts.Worker, opts...)
defer c.clearStatusForClusterOpt(startOpts.RoachtestOpts.Worker)

clusterSettingsOpts := c.configureClusterSettingOptions(c.virtualClusterSettings, settings)

if err := roachprod.StartServiceForVirtualCluster(ctx, l, c.MakeNodes(externalNodes), c.MakeNodes(opts...), startOpts.RoachprodOpts, clusterSettingsOpts...); err != nil {
return err
}

if settings.Secure {
if err := c.RefetchCertsFromNode(ctx, 1); err != nil {
return err
}
}
return nil
}

func (c *clusterImpl) StartServiceForVirtualCluster(
ctx context.Context,
l *logger.Logger,
externalNodes option.NodeListOption,
startOpts option.StartOpts,
settings install.ClusterSettings,
opts ...option.Option,
) {
if err := c.StartServiceForVirtualClusterE(ctx, l, externalNodes, startOpts, settings, opts...); err != nil {
c.t.Fatal(err)
}
}

func (c *clusterImpl) RefetchCertsFromNode(ctx context.Context, node int) error {
var err error
c.localCertsDir, err = os.MkdirTemp("", "roachtest-certs")
Expand Down
5 changes: 5 additions & 0 deletions pkg/cmd/roachtest/cluster/cluster_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ type Cluster interface {
StopCockroachGracefullyOnNode(ctx context.Context, l *logger.Logger, node int) error
NewMonitor(context.Context, ...option.Option) Monitor

// Starting virtual clusters.

StartServiceForVirtualClusterE(ctx context.Context, l *logger.Logger, externalNodes option.NodeListOption, startOpts option.StartOpts, settings install.ClusterSettings, opts ...option.Option) error
StartServiceForVirtualCluster(ctx context.Context, l *logger.Logger, externalNodes option.NodeListOption, startOpts option.StartOpts, settings install.ClusterSettings, opts ...option.Option)

// Hostnames and IP addresses of the nodes.

InternalAddr(ctx context.Context, l *logger.Logger, node option.NodeListOption) ([]string, error)
Expand Down
6 changes: 6 additions & 0 deletions pkg/cmd/roachtest/option/connection_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ func TenantName(tenantName string) func(*ConnOption) {
}
}

func SQLInstance(sqlInstance int) func(*ConnOption) {
return func(option *ConnOption) {
option.SQLInstance = sqlInstance
}
}

func ConnectionOption(key, value string) func(*ConnOption) {
return func(option *ConnOption) {
if len(option.Options) == 0 {
Expand Down
10 changes: 10 additions & 0 deletions pkg/cmd/roachtest/option/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ func DefaultStartSingleNodeOpts() StartOpts {
return startOpts
}

// DefaultStartVirtualClusterOpts returns StartOpts for starting an external
// process virtual cluster with the given tenant name and SQL instance.
func DefaultStartVirtualClusterOpts(tenantName string, sqlInstance int) StartOpts {
startOpts := StartOpts{RoachprodOpts: roachprod.DefaultStartOpts()}
startOpts.RoachprodOpts.Target = install.StartServiceForVirtualCluster
startOpts.RoachprodOpts.VirtualClusterName = tenantName
startOpts.RoachprodOpts.SQLInstance = sqlInstance
return startOpts
}

// StopOpts is a type that combines the stop options needed by roachprod and roachtest.
type StopOpts struct {
// TODO(radu): we should use a higher-level abstraction instead of
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/test_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,7 @@ func (r *testRunner) runWorker(

// Set initial cluster settings for this test.
c.clusterSettings = map[string]string{}
c.virtualClusterSettings = map[string]string{}

switch testSpec.Leases {
case registry.DefaultLeases:
Expand Down
77 changes: 22 additions & 55 deletions pkg/cmd/roachtest/tests/multitenant_distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package tests
import (
"archive/zip"
"context"
gosql "database/sql"
"fmt"
"io"
"strconv"
Expand Down Expand Up @@ -64,82 +63,49 @@ func runMultiTenantDistSQL(
// 1 byte to bypass the guardrails.
settings := install.MakeClusterSettings(install.SecureOption(true))
settings.Env = append(settings.Env, "COCKROACH_MIN_RANGE_MAX_BYTES=1")
tenantEnvOpt := createTenantEnvVar(settings.Env[len(settings.Env)-1])
c.Start(ctx, t.L(), option.DefaultStartOpts(), settings, c.Node(1))
c.Start(ctx, t.L(), option.DefaultStartOpts(), settings, c.Node(2))
c.Start(ctx, t.L(), option.DefaultStartOpts(), settings, c.Node(3))
storageNodes := c.Range(1, 3)

const (
tenantID = 11
tenantBaseHTTPPort = 8081
tenantBaseSQLPort = 26259
// localPortOffset is used to avoid port conflicts with nodes on a local
// cluster.
localPortOffset = 1000
)

tenantHTTPPort := func(offset int) int {
if c.IsLocal() || numInstances > c.Spec().NodeCount {
return tenantBaseHTTPPort + localPortOffset + offset
}
return tenantBaseHTTPPort
}
tenantSQLPort := func(offset int) int {
if c.IsLocal() || numInstances > c.Spec().NodeCount {
return tenantBaseSQLPort + localPortOffset + offset
}
return tenantBaseSQLPort
tenantName := "test-tenant"
var nodes intsets.Fast
for i := 0; i < numInstances; i++ {
node := (i % c.Spec().NodeCount) + 1
sqlInstance := i / c.Spec().NodeCount
instStartOps := option.DefaultStartVirtualClusterOpts(tenantName, sqlInstance)
t.L().Printf("Starting instance %d on node %d", i, node)
c.StartServiceForVirtualCluster(ctx, t.L(), c.Node(node), instStartOps, settings, storageNodes)
nodes.Add(i + 1)
}

storConn := c.Conn(ctx, t.L(), 1)
_, err := storConn.Exec(`SELECT crdb_internal.create_tenant($1::INT)`, tenantID)
require.NoError(t, err)

instances := make([]*tenantNode, 0, numInstances)
instance1 := createTenantNode(ctx, t, c, c.Node(1), tenantID, 2 /* node */, tenantHTTPPort(0), tenantSQLPort(0),
createTenantCertNodes(c.All()), tenantEnvOpt)
instances = append(instances, instance1)
defer instance1.stop(ctx, t, c)
instance1.start(ctx, t, c, "./cockroach")

// Open things up so we can configure range sizes below.
_, err = storConn.Exec(`ALTER TENANT [$1] SET CLUSTER SETTING sql.zone_configs.allow_for_secondary_tenant.enabled = true`, tenantID)
// Open things up, so we can configure range sizes below.
_, err := storConn.Exec(`ALTER TENANT $1 SET CLUSTER SETTING sql.zone_configs.allow_for_secondary_tenant.enabled = true`, tenantName)
require.NoError(t, err)

// Create numInstances sql pods and spread them evenly across the machines.
var nodes intsets.Fast
nodes.Add(1)
for i := 1; i < numInstances; i++ {
node := ((i + 1) % c.Spec().NodeCount) + 1
inst, err := newTenantInstance(ctx, instance1, t, c, node, tenantHTTPPort(i), tenantSQLPort(i))
instances = append(instances, inst)
require.NoError(t, err)
defer inst.stop(ctx, t, c)
inst.start(ctx, t, c, "./cockroach")
nodes.Add(i + 1)
}

m := c.NewMonitor(ctx, c.Nodes(1, 2, 3))

inst1Conn, err := gosql.Open("postgres", instance1.pgURL)
inst1Conn, err := c.ConnE(ctx, t.L(), 1, option.TenantName(tenantName))
require.NoError(t, err)
_, err = inst1Conn.Exec("CREATE TABLE t(n INT, i INT,s STRING, PRIMARY KEY(n,i))")
require.NoError(t, err)

// DistSQL needs at least a range per node to distribute query everywhere
// and test takes too long and too much resources with default range sizes
// and test takes too long and too many resources with default range sizes
// so make them much smaller.
_, err = inst1Conn.Exec(`ALTER TABLE t CONFIGURE ZONE USING range_min_bytes = 1000,range_max_bytes = 100000`)
require.NoError(t, err)

insertCtx, cancel := context.WithCancel(ctx)
defer cancel()

for i, inst := range instances {
url := inst.pgURL
for i := 0; i < numInstances; i++ {
li := i
m.Go(func(ctx context.Context) error {
dbi, err := gosql.Open("postgres", url)
node := (li % c.Spec().NodeCount) + 1
sqlInstance := li / c.Spec().NodeCount
dbi, err := c.ConnE(ctx, t.L(), node, option.TenantName(tenantName), option.SQLInstance(sqlInstance))
require.NoError(t, err)
iter := 0
for {
Expand All @@ -149,7 +115,7 @@ func runMultiTenantDistSQL(
t.L().Printf("worker %d done:%v", li, insertCtx.Err())
return nil
default:
// procede to report error
// proceed to report error
}
require.NoError(t, err, "instance idx = %d, iter = %d", li, iter)
iter++
Expand Down Expand Up @@ -189,7 +155,6 @@ func runMultiTenantDistSQL(
} else {
t.L().Printf("Only %d nodes present: %v", nodesInPlan.Len(), nodesInPlan)
}

}
m.Wait()

Expand Down Expand Up @@ -233,7 +198,9 @@ func runMultiTenantDistSQL(
if bundle {
// Open bundle and verify its contents
sqlConnCtx := clisqlclient.Context{}
conn := sqlConnCtx.MakeSQLConn(io.Discard, io.Discard, instance1.pgURL)
pgURL, err := c.ExternalPGUrl(ctx, t.L(), c.Node(1), tenantName, 0)
require.NoError(t, err)
conn := sqlConnCtx.MakeSQLConn(io.Discard, io.Discard, pgURL[0])
bundles, err := clisqlclient.StmtDiagListBundles(ctx, conn)
require.NoError(t, err)

Expand Down
Loading

0 comments on commit cbcdca3

Please sign in to comment.