Skip to content

Commit

Permalink
Merge pull request #116525 from herkolategan/backport23.2-115599
Browse files Browse the repository at this point in the history
release-23.2: roachtest: update multitenant/distsql to use new roachprod service APIs
  • Loading branch information
herkolategan authored Jan 9, 2024
2 parents b58e407 + 1145d70 commit a86d937
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 130 deletions.
114 changes: 87 additions & 27 deletions pkg/cmd/roachtest/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,8 +639,11 @@ type clusterImpl struct {
expiration time.Time
encAtRest bool // use encryption at rest

// clusterSettings are additional cluster settings set on cluster startup.
// clusterSettings are additional cluster settings set on the storage cluster startup.
clusterSettings map[string]string
// virtualClusterSettings are additional cluster settings to set on the
// virtual cluster startup.
virtualClusterSettings map[string]string
// goCoverDir is the directory for Go coverage data (if coverage is enabled).
// BAZEL_COVER_DIR will be set to this value when starting a node.
goCoverDir string
Expand Down Expand Up @@ -1910,6 +1913,38 @@ func (c *clusterImpl) clearStatusForClusterOpt(worker bool) {
}
}

func (c *clusterImpl) configureClusterSettingOptions(
defaultClusterSettings install.ClusterSettingsOption, settings install.ClusterSettings,
) []install.ClusterSettingOption {
setUnlessExists := func(name string, value interface{}) {
if !envExists(settings.Env, name) {
settings.Env = append(settings.Env, fmt.Sprintf("%s=%s", name, fmt.Sprint(value)))
}
}
// Set the same seed on every node, to be used by builds with
// runtime assertions enabled.
setUnlessExists("COCKROACH_RANDOM_SEED", c.cockroachRandomSeed())

// Panic on span use-after-Finish, so we catch such bugs.
setUnlessExists("COCKROACH_CRASH_ON_SPAN_USE_AFTER_FINISH", true)

if c.goCoverDir != "" {
settings.Env = append(settings.Env, fmt.Sprintf("BAZEL_COVER_DIR=%s", c.goCoverDir))
}

return []install.ClusterSettingOption{
install.TagOption(settings.Tag),
install.PGUrlCertsDirOption(settings.PGUrlCertsDir),
install.SecureOption(settings.Secure),
install.UseTreeDistOption(settings.UseTreeDist),
install.EnvOption(settings.Env),
install.NumRacksOption(settings.NumRacks),
install.BinaryOption(settings.Binary),
defaultClusterSettings,
install.ClusterSettingsOption(settings.ClusterSettings),
}
}

// StartE starts cockroach nodes on a subset of the cluster. The nodes parameter
// can either be a specific node, empty (to indicate all nodes), or a pair of
// nodes indicating a range.
Expand All @@ -1928,17 +1963,6 @@ func (c *clusterImpl) StartE(

startOpts.RoachprodOpts.EncryptedStores = c.encAtRest

setUnlessExists := func(name string, value interface{}) {
if !envExists(settings.Env, name) {
settings.Env = append(settings.Env, fmt.Sprintf("%s=%s", name, fmt.Sprint(value)))
}
}
// Panic on span use-after-Finish, so we catch such bugs.
setUnlessExists("COCKROACH_CRASH_ON_SPAN_USE_AFTER_FINISH", true)
// Set the same seed on every node, to be used by builds with
// runtime assertions enabled.
setUnlessExists("COCKROACH_RANDOM_SEED", c.cockroachRandomSeed())

// Needed for backward-compat on crdb_internal.ranges{_no_leases}.
// Remove in v23.2.
if !envExists(settings.Env, "COCKROACH_FORCE_DEPRECATED_SHOW_RANGE_BEHAVIOR") {
Expand All @@ -1947,21 +1971,7 @@ func (c *clusterImpl) StartE(
settings.Env = append(settings.Env, "COCKROACH_FORCE_DEPRECATED_SHOW_RANGE_BEHAVIOR=false")
}

if c.goCoverDir != "" {
settings.Env = append(settings.Env, fmt.Sprintf("BAZEL_COVER_DIR=%s", c.goCoverDir))
}

clusterSettingsOpts := []install.ClusterSettingOption{
install.TagOption(settings.Tag),
install.PGUrlCertsDirOption(settings.PGUrlCertsDir),
install.SecureOption(settings.Secure),
install.UseTreeDistOption(settings.UseTreeDist),
install.EnvOption(settings.Env),
install.NumRacksOption(settings.NumRacks),
install.BinaryOption(settings.Binary),
install.ClusterSettingsOption(c.clusterSettings),
install.ClusterSettingsOption(settings.ClusterSettings),
}
clusterSettingsOpts := c.configureClusterSettingOptions(c.clusterSettings, settings)

if err := roachprod.Start(ctx, l, c.MakeNodes(opts...), startOpts.RoachprodOpts, clusterSettingsOpts...); err != nil {
return err
Expand All @@ -1977,6 +1987,56 @@ func (c *clusterImpl) StartE(
return nil
}

// StartServiceForVirtualClusterE can start either external or shared process
// virtual clusters. This can be specified in startOpts.RoachprodOpts. Set the
// `Target` to the required virtual cluster type. Refer to the virtual cluster
// section in the struct for more information on what fields are available for
// virtual clusters.
//
// With external process virtual clusters an external process will be started on
// each node specified in the externalNodes parameter.
//
// With shared process virtual clusters the required queries will be run on a
// storage node of the cluster specified in the opts parameter.
func (c *clusterImpl) StartServiceForVirtualClusterE(
ctx context.Context,
l *logger.Logger,
externalNodes option.NodeListOption,
startOpts option.StartOpts,
settings install.ClusterSettings,
opts ...option.Option,
) error {

c.setStatusForClusterOpt("starting virtual cluster", startOpts.RoachtestOpts.Worker, opts...)
defer c.clearStatusForClusterOpt(startOpts.RoachtestOpts.Worker)

clusterSettingsOpts := c.configureClusterSettingOptions(c.virtualClusterSettings, settings)

if err := roachprod.StartServiceForVirtualCluster(ctx, l, c.MakeNodes(externalNodes), c.MakeNodes(opts...), startOpts.RoachprodOpts, clusterSettingsOpts...); err != nil {
return err
}

if settings.Secure {
if err := c.RefetchCertsFromNode(ctx, 1); err != nil {
return err
}
}
return nil
}

func (c *clusterImpl) StartServiceForVirtualCluster(
ctx context.Context,
l *logger.Logger,
externalNodes option.NodeListOption,
startOpts option.StartOpts,
settings install.ClusterSettings,
opts ...option.Option,
) {
if err := c.StartServiceForVirtualClusterE(ctx, l, externalNodes, startOpts, settings, opts...); err != nil {
c.t.Fatal(err)
}
}

func (c *clusterImpl) RefetchCertsFromNode(ctx context.Context, node int) error {
var err error
c.localCertsDir, err = os.MkdirTemp("", "roachtest-certs")
Expand Down
5 changes: 5 additions & 0 deletions pkg/cmd/roachtest/cluster/cluster_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ type Cluster interface {
StopCockroachGracefullyOnNode(ctx context.Context, l *logger.Logger, node int) error
NewMonitor(context.Context, ...option.Option) Monitor

// Starting virtual clusters.

StartServiceForVirtualClusterE(ctx context.Context, l *logger.Logger, externalNodes option.NodeListOption, startOpts option.StartOpts, settings install.ClusterSettings, opts ...option.Option) error
StartServiceForVirtualCluster(ctx context.Context, l *logger.Logger, externalNodes option.NodeListOption, startOpts option.StartOpts, settings install.ClusterSettings, opts ...option.Option)

// Hostnames and IP addresses of the nodes.

InternalAddr(ctx context.Context, l *logger.Logger, node option.NodeListOption) ([]string, error)
Expand Down
6 changes: 6 additions & 0 deletions pkg/cmd/roachtest/option/connection_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ func TenantName(tenantName string) func(*ConnOption) {
}
}

func SQLInstance(sqlInstance int) func(*ConnOption) {
return func(option *ConnOption) {
option.SQLInstance = sqlInstance
}
}

func ConnectionOption(key, value string) func(*ConnOption) {
return func(option *ConnOption) {
if len(option.Options) == 0 {
Expand Down
10 changes: 10 additions & 0 deletions pkg/cmd/roachtest/option/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ func DefaultStartSingleNodeOpts() StartOpts {
return startOpts
}

// DefaultStartVirtualClusterOpts returns StartOpts for starting an external
// process virtual cluster with the given tenant name and SQL instance.
func DefaultStartVirtualClusterOpts(tenantName string, sqlInstance int) StartOpts {
startOpts := StartOpts{RoachprodOpts: roachprod.DefaultStartOpts()}
startOpts.RoachprodOpts.Target = install.StartServiceForVirtualCluster
startOpts.RoachprodOpts.VirtualClusterName = tenantName
startOpts.RoachprodOpts.SQLInstance = sqlInstance
return startOpts
}

// StopOpts is a type that combines the stop options needed by roachprod and roachtest.
type StopOpts struct {
// TODO(radu): we should use a higher-level abstraction instead of
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/test_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,7 @@ func (r *testRunner) runWorker(

// Set initial cluster settings for this test.
c.clusterSettings = map[string]string{}
c.virtualClusterSettings = map[string]string{}

switch testSpec.Leases {
case registry.DefaultLeases:
Expand Down
77 changes: 22 additions & 55 deletions pkg/cmd/roachtest/tests/multitenant_distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package tests
import (
"archive/zip"
"context"
gosql "database/sql"
"fmt"
"io"
"strconv"
Expand Down Expand Up @@ -64,82 +63,49 @@ func runMultiTenantDistSQL(
// 1 byte to bypass the guardrails.
settings := install.MakeClusterSettings(install.SecureOption(true))
settings.Env = append(settings.Env, "COCKROACH_MIN_RANGE_MAX_BYTES=1")
tenantEnvOpt := createTenantEnvVar(settings.Env[len(settings.Env)-1])
c.Start(ctx, t.L(), option.DefaultStartOpts(), settings, c.Node(1))
c.Start(ctx, t.L(), option.DefaultStartOpts(), settings, c.Node(2))
c.Start(ctx, t.L(), option.DefaultStartOpts(), settings, c.Node(3))
storageNodes := c.Range(1, 3)

const (
tenantID = 11
tenantBaseHTTPPort = 8081
tenantBaseSQLPort = 26259
// localPortOffset is used to avoid port conflicts with nodes on a local
// cluster.
localPortOffset = 1000
)

tenantHTTPPort := func(offset int) int {
if c.IsLocal() || numInstances > c.Spec().NodeCount {
return tenantBaseHTTPPort + localPortOffset + offset
}
return tenantBaseHTTPPort
}
tenantSQLPort := func(offset int) int {
if c.IsLocal() || numInstances > c.Spec().NodeCount {
return tenantBaseSQLPort + localPortOffset + offset
}
return tenantBaseSQLPort
tenantName := "test-tenant"
var nodes intsets.Fast
for i := 0; i < numInstances; i++ {
node := (i % c.Spec().NodeCount) + 1
sqlInstance := i / c.Spec().NodeCount
instStartOps := option.DefaultStartVirtualClusterOpts(tenantName, sqlInstance)
t.L().Printf("Starting instance %d on node %d", i, node)
c.StartServiceForVirtualCluster(ctx, t.L(), c.Node(node), instStartOps, settings, storageNodes)
nodes.Add(i + 1)
}

storConn := c.Conn(ctx, t.L(), 1)
_, err := storConn.Exec(`SELECT crdb_internal.create_tenant($1::INT)`, tenantID)
require.NoError(t, err)

instances := make([]*tenantNode, 0, numInstances)
instance1 := createTenantNode(ctx, t, c, c.Node(1), tenantID, 2 /* node */, tenantHTTPPort(0), tenantSQLPort(0),
createTenantCertNodes(c.All()), tenantEnvOpt)
instances = append(instances, instance1)
defer instance1.stop(ctx, t, c)
instance1.start(ctx, t, c, "./cockroach")

// Open things up so we can configure range sizes below.
_, err = storConn.Exec(`ALTER TENANT [$1] SET CLUSTER SETTING sql.zone_configs.allow_for_secondary_tenant.enabled = true`, tenantID)
// Open things up, so we can configure range sizes below.
_, err := storConn.Exec(`ALTER TENANT $1 SET CLUSTER SETTING sql.zone_configs.allow_for_secondary_tenant.enabled = true`, tenantName)
require.NoError(t, err)

// Create numInstances sql pods and spread them evenly across the machines.
var nodes intsets.Fast
nodes.Add(1)
for i := 1; i < numInstances; i++ {
node := ((i + 1) % c.Spec().NodeCount) + 1
inst, err := newTenantInstance(ctx, instance1, t, c, node, tenantHTTPPort(i), tenantSQLPort(i))
instances = append(instances, inst)
require.NoError(t, err)
defer inst.stop(ctx, t, c)
inst.start(ctx, t, c, "./cockroach")
nodes.Add(i + 1)
}

m := c.NewMonitor(ctx, c.Nodes(1, 2, 3))

inst1Conn, err := gosql.Open("postgres", instance1.pgURL)
inst1Conn, err := c.ConnE(ctx, t.L(), 1, option.TenantName(tenantName))
require.NoError(t, err)
_, err = inst1Conn.Exec("CREATE TABLE t(n INT, i INT,s STRING, PRIMARY KEY(n,i))")
require.NoError(t, err)

// DistSQL needs at least a range per node to distribute query everywhere
// and test takes too long and too much resources with default range sizes
// and test takes too long and too many resources with default range sizes
// so make them much smaller.
_, err = inst1Conn.Exec(`ALTER TABLE t CONFIGURE ZONE USING range_min_bytes = 1000,range_max_bytes = 100000`)
require.NoError(t, err)

insertCtx, cancel := context.WithCancel(ctx)
defer cancel()

for i, inst := range instances {
url := inst.pgURL
for i := 0; i < numInstances; i++ {
li := i
m.Go(func(ctx context.Context) error {
dbi, err := gosql.Open("postgres", url)
node := (li % c.Spec().NodeCount) + 1
sqlInstance := li / c.Spec().NodeCount
dbi, err := c.ConnE(ctx, t.L(), node, option.TenantName(tenantName), option.SQLInstance(sqlInstance))
require.NoError(t, err)
iter := 0
for {
Expand All @@ -149,7 +115,7 @@ func runMultiTenantDistSQL(
t.L().Printf("worker %d done:%v", li, insertCtx.Err())
return nil
default:
// procede to report error
// proceed to report error
}
require.NoError(t, err, "instance idx = %d, iter = %d", li, iter)
iter++
Expand Down Expand Up @@ -189,7 +155,6 @@ func runMultiTenantDistSQL(
} else {
t.L().Printf("Only %d nodes present: %v", nodesInPlan.Len(), nodesInPlan)
}

}
m.Wait()

Expand Down Expand Up @@ -233,7 +198,9 @@ func runMultiTenantDistSQL(
if bundle {
// Open bundle and verify its contents
sqlConnCtx := clisqlclient.Context{}
conn := sqlConnCtx.MakeSQLConn(io.Discard, io.Discard, instance1.pgURL)
pgURL, err := c.ExternalPGUrl(ctx, t.L(), c.Node(1), tenantName, 0)
require.NoError(t, err)
conn := sqlConnCtx.MakeSQLConn(io.Discard, io.Discard, pgURL[0])
bundles, err := clisqlclient.StmtDiagListBundles(ctx, conn)
require.NoError(t, err)

Expand Down
Loading

0 comments on commit a86d937

Please sign in to comment.