Skip to content

Commit

Permalink
roachtest: port admission-control/multitenant-fairness to new API
Browse files Browse the repository at this point in the history
This ports the `admission-control/multitenant-fariness/*` set of tests
to the 'official' virtual clusters roachprod API.

Fixes: cockroachdb#117670.

Release note: None
  • Loading branch information
renatolabs committed Mar 6, 2024
1 parent db26405 commit cab3489
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 139 deletions.
251 changes: 119 additions & 132 deletions pkg/cmd/roachtest/tests/admission_control_multitenant_fairness.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,21 @@ package tests

import (
"context"
gosql "database/sql"
"fmt"
"math"
"sort"
"time"

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/grafana"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/roachprod/prometheus"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/stretchr/testify/require"
"golang.org/x/exp/maps"
)

// This test sets up a single-node CRDB cluster on a 4vCPU machine, and 4
Expand Down Expand Up @@ -91,14 +92,13 @@ func registerMultiTenantFairness(r registry.Registry) {
for _, s := range specs {
s := s
r.Add(registry.TestSpec{
Name: fmt.Sprintf("admission-control/multitenant-fairness/%s", s.name),
Cluster: r.MakeClusterSpec(5),
Owner: registry.OwnerAdmissionControl,
Benchmark: true,
Leases: registry.MetamorphicLeases,
CompatibleClouds: registry.AllExceptAWS,
Suites: registry.Suites(registry.Weekly),
NonReleaseBlocker: false,
Name: fmt.Sprintf("admission-control/multitenant-fairness/%s", s.name),
Cluster: r.MakeClusterSpec(5),
Owner: registry.OwnerAdmissionControl,
Benchmark: true,
Leases: registry.MetamorphicLeases,
CompatibleClouds: registry.AllExceptAWS,
Suites: registry.Suites(registry.Weekly),
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runMultiTenantFairness(ctx, t, c, s)
},
Expand All @@ -121,13 +121,7 @@ type multiTenantFairnessSpec struct {
func runMultiTenantFairness(
ctx context.Context, t test.Test, c cluster.Cluster, s multiTenantFairnessSpec,
) {
if c.Spec().NodeCount < 5 {
t.Fatalf("expected at least 5 nodes, found %d", c.Spec().NodeCount)
}

numTenants := 4
crdbNodeID := 1
crdbNode := c.Node(crdbNodeID)
crdbNode := c.Node(1)
if c.IsLocal() {
s.duration = 30 * time.Second
s.concurrency = func(i int) int { return 4 }
Expand All @@ -142,7 +136,7 @@ func runMultiTenantFairness(
}
}

t.L().Printf("starting cockroach securely (<%s)", time.Minute)
t.L().Printf("starting cockroach (<%s)", time.Minute)
c.Start(ctx, t.L(),
option.DefaultStartOptsNoBackups(),
install.MakeClusterSettings(),
Expand All @@ -156,105 +150,78 @@ func runMultiTenantFairness(
promCfg.WithCluster(crdbNode.InstallNodes())
promCfg.WithGrafanaDashboardJSON(grafana.MultiTenantFairnessGrafanaJSON)

setRateLimit := func(ctx context.Context, val int) {
db := c.Conn(ctx, t.L(), crdbNodeID)
defer db.Close()

if _, err := db.ExecContext(
ctx, fmt.Sprintf("SET CLUSTER SETTING kv.tenant_rate_limiter.rate_limit = '%d'", val)); err != nil {
t.Fatalf("failed to set tenant rate limiter limit: %v", err)
}
}
systemConn := c.Conn(ctx, t.L(), crdbNode[0])
defer systemConn.Close()

setRateLimit(ctx, 1_000_000)
const rateLimit = 1_000_000

const (
tenantBaseID = 11
tenantBaseHTTPPort = 8081
tenantBaseSQLPort = 26259
)
tenantHTTPPort := func(offset int) int {
if c.IsLocal() {
return tenantBaseHTTPPort + offset
}
return tenantBaseHTTPPort
}
tenantSQLPort := func(offset int) int {
if c.IsLocal() {
return tenantBaseSQLPort + offset
}
return tenantBaseSQLPort
}
tenantID := func(offset int) int {
return tenantBaseID + offset
}
setTenantResourceLimits := func(tenantID int) {
db := c.Conn(ctx, t.L(), crdbNodeID)
defer db.Close()
if _, err := db.ExecContext(
ctx, fmt.Sprintf(
"SELECT crdb_internal.update_tenant_resource_limits(%[1]d, 1000000000, 10000, 1000000, now(), 0)", tenantID)); err != nil {
t.Fatalf("failed to update tenant resource limits: %v", err)
}
}
tenantNodeID := func(idx int) int {
return idx + 2
if _, err := systemConn.ExecContext(
ctx, fmt.Sprintf("SET CLUSTER SETTING kv.tenant_rate_limiter.rate_limit = '%d'", rateLimit),
); err != nil {
t.Fatalf("failed to set tenant rate limiter limit: %v", err)
}

t.L().Printf("enabling child metrics (<%s)", 30*time.Second)
_, err := c.Conn(ctx, t.L(), crdbNodeID).Exec(`SET CLUSTER SETTING server.child_metrics.enabled = true`)
_, err := systemConn.ExecContext(ctx, `SET CLUSTER SETTING server.child_metrics.enabled = true`)
require.NoError(t, err)

// Create the tenants.
t.L().Printf("initializing %d tenants (<%s)", numTenants, 5*time.Minute)
tenants := make([]*tenantNode, numTenants)
for i := 0; i < numTenants; i++ {
if !t.SkipInit() {
_, err := c.Conn(ctx, t.L(), 1).Exec(`SELECT crdb_internal.create_tenant($1::INT)`, tenantID(i))
require.NoError(t, err)
}

tenant := createTenantNode(ctx, t, c,
crdbNode, tenantID(i), tenantNodeID(i), tenantHTTPPort(i), tenantSQLPort(i))
defer tenant.stop(ctx, t, c)
const sqlInstance = 0
virtualClusters := map[string]option.NodeListOption{
"app-fairness-n2": c.Node(2),
"app-fairness-n3": c.Node(3),
"app-fairness-n4": c.Node(4),
"app-fairness-n5": c.Node(5),
}

tenants[i] = tenant
tenant.start(ctx, t, c, "./cockroach")
setTenantResourceLimits(tenantID(i))
virtualClusterNames := maps.Keys(virtualClusters)
sort.Strings(virtualClusterNames)

tenantNode := c.Node(tenantNodeID(i))
t.L().Printf("initializing %d virtual clusters (<%s)", len(virtualClusters), 5*time.Minute)
for j, name := range virtualClusterNames {
node := virtualClusters[name]
c.StartServiceForVirtualCluster(
ctx, t.L(), node,
option.DefaultStartVirtualClusterOpts(name, sqlInstance),
install.MakeClusterSettings(),
)

// Init kv on each tenant.
cmd := fmt.Sprintf("./cockroach workload init kv '%s'", tenant.secureURL())
require.NoError(t, c.RunE(ctx, option.WithNodes(tenantNode), cmd))
t.L().Printf("virtual cluster %q started on n%d", name, node[0])
_, err := systemConn.ExecContext(
ctx, fmt.Sprintf("SELECT crdb_internal.update_tenant_resource_limits('%s', 1000000000, 10000, 1000000, now(), 0)", name),
)
require.NoError(t, err)

promCfg.WithTenantPod(tenantNode.InstallNodes()[0], tenantID(i))
promCfg.WithTenantPod(node.InstallNodes()[0], j+1)
promCfg.WithScrapeConfigs(
prometheus.MakeWorkloadScrapeConfig(fmt.Sprintf("workload-tenant-%d", i),
prometheus.MakeWorkloadScrapeConfig(fmt.Sprintf("workload-tenant-%d", j+1),
"/", makeWorkloadScrapeNodes(
tenantNode.InstallNodes()[0],
node.InstallNodes()[0],
[]workloadInstance{
{
nodes: c.Node(tenantNodeID(i)),
nodes: node,
prometheusPort: 2112,
},
})),
)

initKV := fmt.Sprintf(
"%s workload init kv {pgurl:%d:%s:%d}",
test.DefaultCockroachPath, node[0], name, sqlInstance,
)

c.Run(ctx, option.WithNodes(node), initKV)
}

t.Status(fmt.Sprintf("setting up prometheus/grafana (<%s)", 2*time.Minute))
t.L().Printf("setting up prometheus/grafana (<%s)", 2*time.Minute)
_, cleanupFunc := setupPrometheusForRoachtest(ctx, t, c, promCfg, nil)
defer cleanupFunc()

t.L().Printf("loading per-tenant data (<%s)", 10*time.Minute)
m1 := c.NewMonitor(ctx, crdbNode)
for i := 0; i < numTenants; i++ {
if t.SkipInit() {
continue
}

i := i
pgurl := tenants[i].secureURL()
m1 := c.NewMonitor(ctx, c.All())
for name, node := range virtualClusters {
pgurl := fmt.Sprintf("{pgurl:%d:%s:%d}", node[0], name, sqlInstance)
name := name
node := node
m1.Go(func(ctx context.Context) error {
// TODO(irfansharif): Occasionally we see SQL liveness errors of the
// following form. See #78691, #97448.
Expand All @@ -268,37 +235,56 @@ func runMultiTenantFairness(
// session gets renewed shortly (within some jitter). We don't want
// to --tolerate-errors here and below because we'd see total
// throughput collapse.
cmd := fmt.Sprintf(
"./cockroach workload run kv '%s' --secure --min-block-bytes %d --max-block-bytes %d "+
"--batch %d --max-ops %d --concurrency=25",
pgurl, s.blockSize, s.blockSize, s.batch, s.maxOps)
err := c.RunE(ctx, option.WithNodes(c.Node(tenantNodeID(i))), cmd)
t.L().Printf("loaded data for tenant %d", tenantID(i))
return err
cmd := roachtestutil.NewCommand("%s workload run kv", test.DefaultCockroachPath).
Option("secure").
Flag("min-block-bytes", s.blockSize).
Flag("max-block-bytes", s.blockSize).
Flag("batch", s.batch).
Flag("max-ops", s.maxOps).
Flag("concurrency", 25).
Arg(pgurl)

if err := c.RunE(ctx, option.WithNodes(node), cmd.String()); err != nil {
return err
}

t.L().Printf("loaded data for virtual cluster %q", name)
return nil
})
}
m1.Wait()

if !t.SkipInit() {
t.L().Printf("loaded data for all tenants, sleeping (<%s)", 2*time.Minute)
time.Sleep(2 * time.Minute)
}
waitDur := 2 * time.Minute
t.L().Printf("loaded data for all tenants, sleeping (<%s)", waitDur)
time.Sleep(waitDur)

t.L().Printf("running per-tenant workloads (<%s)", s.duration+time.Minute)
t.L().Printf("running virtual cluster workloads (<%s)", s.duration+time.Minute)
m2 := c.NewMonitor(ctx, crdbNode)
for i := 0; i < numTenants; i++ {
i := i
pgurl := tenants[i].secureURL()
var n int
for name, node := range virtualClusters {
pgurl := fmt.Sprintf("{pgurl:%d:%s:%d}", node[0], name, sqlInstance)
n++

name := name
node := node
m2.Go(func(ctx context.Context) error {
cmd := fmt.Sprintf(
"./cockroach workload run kv '%s' --write-seq=%s --secure --min-block-bytes %d "+
"--max-block-bytes %d --batch %d --duration=%s --read-percent=%d --concurrency=%d",
pgurl, fmt.Sprintf("R%d", s.maxOps*s.batch), s.blockSize, s.blockSize, s.batch,
s.duration, s.readPercent, s.concurrency(tenantNodeID(i)-1))

err := c.RunE(ctx, option.WithNodes(c.Node(tenantNodeID(i))), cmd)
t.L().Printf("ran workload for tenant %d", tenantID(i))
return err
cmd := roachtestutil.NewCommand("%s workload run kv", test.DefaultCockroachPath).
Option("secure").
Flag("write-seq", fmt.Sprintf("R%d", s.maxOps*s.batch)).
Flag("min-block-bytes", s.blockSize).
Flag("max-block-bytes", s.blockSize).
Flag("batch", s.batch).
Flag("duration", s.duration).
Flag("read-percent", s.readPercent).
Flag("concurrency", s.concurrency(n)).
Arg(pgurl)

if err := c.RunE(ctx, option.WithNodes(node), cmd.String()); err != nil {
return err
}

t.L().Printf("ran workload for virtual cluster %q", name)
return nil
})
}
m2.Wait()
Expand All @@ -313,40 +299,41 @@ func runMultiTenantFairness(
// TODO(irfansharif): Aren't these stats getting polluted by the data-load
// step?
t.L().Printf("computing workload statistics (%s)", 30*time.Second)
counts := make([]float64, numTenants)
meanLatencies := make([]float64, numTenants)
for i := 0; i < numTenants; i++ {
i := i
db, err := gosql.Open("postgres", tenants[i].pgURL)
if err != nil {
t.Fatal(err)
}
defer func() { _ = db.Close() }()
counts := make([]float64, len(virtualClusters))
meanLatencies := make([]float64, len(virtualClusters))
for j, name := range virtualClusterNames {
node := virtualClusters[name]

vcdb := c.Conn(ctx, t.L(), node[0], option.TenantName(name), option.SQLInstance(sqlInstance))
defer vcdb.Close()

tdb := sqlutils.MakeSQLRunner(db)
tdb.Exec(t, "USE kv")
_, err := vcdb.ExecContext(ctx, "USE kv")
require.NoError(t, err)

rows := tdb.Query(t, `
rows, err := vcdb.QueryContext(ctx, `
SELECT
sum((statistics -> 'statistics' -> 'cnt')::INT),
avg((statistics -> 'statistics' -> 'runLat' -> 'mean')::FLOAT)
FROM crdb_internal.statement_statistics
WHERE metadata @> '{"db":"kv","failed":false}' AND metadata @> $1`,
fmt.Sprintf(`{"querySummary": "%s"}`, s.query))
require.NoError(t, err)

if rows.Next() {
var cnt, lat float64
err := rows.Scan(&cnt, &lat)
require.NoError(t, err)
counts[i] = cnt
meanLatencies[i] = lat
counts[j] = cnt
meanLatencies[j] = lat
} else {
t.Fatal("no query results")
}

require.NoError(t, rows.Err())
}

failThreshold := .3
throughput := make([]float64, numTenants)
throughput := make([]float64, len(virtualClusters))
ok, maxThroughputDelta := floatsWithinPercentage(counts, failThreshold)
for i, count := range counts {
throughput[i] = count / s.duration.Seconds()
Expand All @@ -360,7 +347,7 @@ func runMultiTenantFairness(
}

ok, maxLatencyDelta := floatsWithinPercentage(meanLatencies, failThreshold)
t.L().Printf("max-latency-delta=d%% mean-latency-per-tenant=%v\n", int(maxLatencyDelta*100), meanLatencies)
t.L().Printf("max-latency-delta=%d% mean-latency-per-tenant=%v\n", int(maxLatencyDelta*100), meanLatencies)
if !ok {
// TODO(irfansharif): Same as above -- this is a weak assertion.
t.L().Printf("latency not within expectations: %f > %f %v", maxLatencyDelta, failThreshold, meanLatencies)
Expand Down
7 changes: 0 additions & 7 deletions pkg/cmd/roachtest/tests/multitenant_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,6 @@ func (tn *tenantNode) storeDir() string {
return fmt.Sprintf("cockroach-data-mt-%d-%d", tn.tenantID, tn.instanceID)
}

// In secure mode the url we get from roachprod contains ssl parameters with
// local file paths. secureURL returns a url with those changed to
// roachprod/workload friendly local paths, ie "certs".
func (tn *tenantNode) secureURL() string {
return tn.relativeSecureURL
}

func (tn *tenantNode) start(ctx context.Context, t test.Test, c cluster.Cluster, binary string) {
require.True(t, c.IsSecure())

Expand Down

0 comments on commit cab3489

Please sign in to comment.