From af396e7fefa573cbd73fa3b6a1eaeec42516334c Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Mon, 17 Oct 2022 14:49:54 -0400 Subject: [PATCH] roachtest: cleanup multitenant-fairness tests Also integrate prometheus, --skip-init. Release note: None --- pkg/cmd/roachtest/tests/BUILD.bazel | 3 +- pkg/cmd/roachtest/tests/admission_control.go | 5 +- .../admission_control_multitenant_fairness.go | 410 ++++++++++++++++++ .../roachtest/tests/multitenant_fairness.go | 403 ----------------- pkg/cmd/roachtest/tests/multitenant_utils.go | 4 +- pkg/cmd/roachtest/tests/registry.go | 1 - pkg/roachprod/prometheus/prometheus.go | 41 +- .../prometheus/testdata/multiple_scrape_nodes | 4 + .../prometheus/testdata/using_make_commands | 8 + pkg/roachprod/roachprod.go | 3 +- 10 files changed, 467 insertions(+), 415 deletions(-) create mode 100644 pkg/cmd/roachtest/tests/admission_control_multitenant_fairness.go delete mode 100644 pkg/cmd/roachtest/tests/multitenant_fairness.go diff --git a/pkg/cmd/roachtest/tests/BUILD.bazel b/pkg/cmd/roachtest/tests/BUILD.bazel index 73f5dd821520..6b7d5f9dfdfd 100644 --- a/pkg/cmd/roachtest/tests/BUILD.bazel +++ b/pkg/cmd/roachtest/tests/BUILD.bazel @@ -13,6 +13,7 @@ go_library( "admission_control_elastic_cdc.go", "admission_control_index_overload.go", "admission_control_multi_store_overload.go", + "admission_control_multitenant_fairness.go", "admission_control_snapshot_overload.go", "admission_control_tpcc_overload.go", "allocator.go", @@ -92,7 +93,6 @@ go_library( "mixed_version_schemachange.go", "multitenant.go", "multitenant_distsql.go", - "multitenant_fairness.go", "multitenant_tpch.go", "multitenant_upgrade.go", "multitenant_utils.go", @@ -210,7 +210,6 @@ go_library( "//pkg/ts/tspb", "//pkg/util", "//pkg/util/binfetcher", - "//pkg/util/buildutil", "//pkg/util/cancelchecker", "//pkg/util/contextutil", "//pkg/util/ctxgroup", diff --git a/pkg/cmd/roachtest/tests/admission_control.go b/pkg/cmd/roachtest/tests/admission_control.go index 95b736681c12..637258f73be1 100644 --- a/pkg/cmd/roachtest/tests/admission_control.go +++ b/pkg/cmd/roachtest/tests/admission_control.go @@ -31,12 +31,9 @@ func registerAdmission(r registry.Registry) { registerElasticControlForBackups(r) registerElasticControlForCDC(r) registerMultiStoreOverload(r) + registerMultiTenantFairness(r) registerSnapshotOverload(r) registerTPCCOverload(r) registerTPCCSevereOverload(r) registerIndexOverload(r) - - // TODO(irfansharif): Once registerMultiTenantFairness is unskipped and - // observed to be non-flaky for 3-ish months, transfer ownership to the AC - // group + re-home it here. } diff --git a/pkg/cmd/roachtest/tests/admission_control_multitenant_fairness.go b/pkg/cmd/roachtest/tests/admission_control_multitenant_fairness.go new file mode 100644 index 000000000000..5973b6b6c90e --- /dev/null +++ b/pkg/cmd/roachtest/tests/admission_control_multitenant_fairness.go @@ -0,0 +1,410 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tests + +import ( + "context" + gosql "database/sql" + "fmt" + "math" + "time" + + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" + "github.com/cockroachdb/cockroach/pkg/roachprod/install" + "github.com/cockroachdb/cockroach/pkg/roachprod/prometheus" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/stretchr/testify/require" +) + +// This test sets up a single-node CRDB cluster on a 4vCPU machine, and 4 +// separate tenant pods also running on their own 4vCPU machines. Each tenant is +// also running a workload on the same node the SQL pod is running on[1]. It +// tests that KV fairly proportions CPU and IO use across the four tenants. Each +// tenant drives the workload generator, k95 and kv5 for {read,CPU}-heavy and +// {write,IO}-heavy variants. Another thing that's varied is the relative +// concurrency of each tenant workload -- they're either even across all tenants +// or skewed multiples of one another. +// +// [1]: Co-locating the SQL pod and the workload generator is a bit funky, but +// +// it works fine enough as written and saves us from using another 4 nodes +// per test. +func registerMultiTenantFairness(r registry.Registry) { + specs := []multiTenantFairnessSpec{ + { + name: "read-heavy/even", + concurrency: func(int) int { return 250 }, + blockSize: 5, + readPercent: 95, + duration: 20 * time.Minute, + batch: 100, + maxOps: 100_000, + query: "SELECT k, v FROM kv", + }, + { + name: "read-heavy/skewed", + concurrency: func(i int) int { return i * 250 }, + blockSize: 5, + readPercent: 95, + duration: 20 * time.Minute, + batch: 100, + maxOps: 100_000, + query: "SELECT k, v FROM kv", + }, + { + name: "write-heavy/even", + concurrency: func(i int) int { return 50 }, + blockSize: 50_000, + readPercent: 5, + duration: 20 * time.Minute, + batch: 1, + maxOps: 1000, + query: "UPSERT INTO kv(k, v)", + }, + { + name: "write-heavy/skewed", + concurrency: func(i int) int { return i * 50 }, + blockSize: 50_000, + readPercent: 5, + duration: 20 * time.Minute, + batch: 1, + maxOps: 1000, + query: "UPSERT INTO kv(k, v)", + }, + } + + for _, s := range specs { + s := s + r.Add(registry.TestSpec{ + Name: fmt.Sprintf("admission-control/multitenant-fairness/%s", s.name), + Cluster: r.MakeClusterSpec(5), + Owner: registry.OwnerAdmissionControl, + NonReleaseBlocker: false, + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { + runMultiTenantFairness(ctx, t, c, s) + }, + }) + } +} + +type multiTenantFairnessSpec struct { + name string + query string // query for which we'll check statistics for + + readPercent int // --read-percent + blockSize int // --min-block-bytes, --max-block-bytes + duration time.Duration // --duration + concurrency func(int) int // --concurrency + batch int // --batch + maxOps int // --max-ops +} + +func runMultiTenantFairness( + ctx context.Context, t test.Test, c cluster.Cluster, s multiTenantFairnessSpec, +) { + if c.Spec().NodeCount < 5 { + t.Fatalf("expected at least 5 nodes, found %d", c.Spec().NodeCount) + } + + numTenants := 4 + crdbNodeID := 1 + crdbNode := c.Node(crdbNodeID) + if c.IsLocal() { + s.duration = 30 * time.Second + s.concurrency = func(i int) int { return 4 } + if s.maxOps > 10000 { + s.maxOps = 10000 + } + if s.batch > 10 { + s.batch = 10 + } + if s.blockSize > 2 { + s.batch = 2 + } + } + + t.L().Printf("starting cockroach securely (<%s)", time.Minute) + c.Put(ctx, t.Cockroach(), "./cockroach") + c.Start(ctx, t.L(), + option.DefaultStartOpts(), + install.MakeClusterSettings(install.SecureOption(true)), + crdbNode, + ) + + promNode := c.Node(c.Spec().NodeCount) + promCfg := &prometheus.Config{} + promCfg.WithPrometheusNode(promNode.InstallNodes()[0]) + promCfg.WithNodeExporter(crdbNode.InstallNodes()) + promCfg.WithCluster(crdbNode.InstallNodes()) + promCfg.WithGrafanaDashboard("https://go.crdb.dev/p/multi-tenant-fairness-grafana") + + setRateLimit := func(ctx context.Context, val int) { + db := c.Conn(ctx, t.L(), crdbNodeID) + defer db.Close() + + if _, err := db.ExecContext( + ctx, fmt.Sprintf("SET CLUSTER SETTING kv.tenant_rate_limiter.rate_limit = '%d'", val)); err != nil { + t.Fatalf("failed to set tenant rate limiter limit: %v", err) + } + } + + setRateLimit(ctx, 1_000_000) + + const ( + tenantBaseID = 11 + tenantBaseHTTPPort = 8081 + tenantBaseSQLPort = 26259 + ) + tenantHTTPPort := func(offset int) int { + if c.IsLocal() { + return tenantBaseHTTPPort + offset + } + return tenantBaseHTTPPort + } + tenantSQLPort := func(offset int) int { + if c.IsLocal() { + return tenantBaseSQLPort + offset + } + return tenantBaseSQLPort + } + tenantID := func(offset int) int { + return tenantBaseID + offset + } + setTenantResourceLimits := func(tenantID int) { + db := c.Conn(ctx, t.L(), crdbNodeID) + defer db.Close() + if _, err := db.ExecContext( + ctx, fmt.Sprintf( + "SELECT crdb_internal.update_tenant_resource_limits(%[1]d, 1000000000, 10000, 1000000, now(), 0)", tenantID)); err != nil { + t.Fatalf("failed to update tenant resource limits: %v", err) + } + } + tenantNodeID := func(idx int) int { + return idx + 2 + } + + t.L().Printf("enabling child metrics (<%s)", 30*time.Second) + _, err := c.Conn(ctx, t.L(), crdbNodeID).Exec(`SET CLUSTER SETTING server.child_metrics.enabled = true`) + require.NoError(t, err) + + // Create the tenants. + t.L().Printf("initializing %d tenants (<%s)", numTenants, 5*time.Minute) + tenantIDs := make([]int, 0, numTenants) + for i := 0; i < numTenants; i++ { + tenantIDs = append(tenantIDs, tenantID(i)) + } + + tenants := make([]*tenantNode, numTenants) + for i := 0; i < numTenants; i++ { + if !t.SkipInit() { + _, err := c.Conn(ctx, t.L(), 1).Exec(`SELECT crdb_internal.create_tenant($1::INT)`, tenantID(i)) + require.NoError(t, err) + } + + tenant := createTenantNode(ctx, t, c, + crdbNode, tenantID(i), tenantNodeID(i), tenantHTTPPort(i), tenantSQLPort(i), + createTenantOtherTenantIDs(tenantIDs)) + defer tenant.stop(ctx, t, c) + + tenants[i] = tenant + tenant.start(ctx, t, c, "./cockroach") + setTenantResourceLimits(tenantID(i)) + + tenantNode := c.Node(tenantNodeID(i)) + + // Init kv on each tenant. + cmd := fmt.Sprintf("./cockroach workload init kv '%s'", tenant.secureURL()) + require.NoError(t, c.RunE(ctx, tenantNode, cmd)) + + promCfg.WithTenantPod(tenantNode.InstallNodes()[0], tenantID(i)) + promCfg.WithScrapeConfigs( + prometheus.MakeWorkloadScrapeConfig(fmt.Sprintf("workload-tenant-%d", i), + "/", makeWorkloadScrapeNodes( + tenantNode.InstallNodes()[0], + []workloadInstance{ + { + nodes: c.Node(tenantNodeID(i)), + prometheusPort: 2112, + }, + })), + ) + } + + t.Status(fmt.Sprintf("setting up prometheus/grafana (<%s)", 2*time.Minute)) + _, cleanupFunc := setupPrometheusForRoachtest(ctx, t, c, promCfg, nil) + defer cleanupFunc() + + t.L().Printf("loading per-tenant data (<%s)", 10*time.Minute) + m1 := c.NewMonitor(ctx, crdbNode) + for i := 0; i < numTenants; i++ { + if t.SkipInit() { + continue + } + + i := i + pgurl := tenants[i].secureURL() + m1.Go(func(ctx context.Context) error { + // TODO(irfansharif): Occasionally we see SQL Liveness errors of the + // form: + // + // ERROR: liveness session expired 571.043163ms before transaction + // + // Why do these errors occur? Do we want to give sql liveness + // session goroutines higher priority? Is this test using too high a + // concurrency? Why do we even need this data load step -- why not + // just run the workload generator right away? + cmd := fmt.Sprintf( + "./cockroach workload run kv '%s' --secure --min-block-bytes %d --max-block-bytes %d "+ + "--batch %d --max-ops %d --concurrency=25", + pgurl, s.blockSize, s.blockSize, s.batch, s.maxOps) + err := c.RunE(ctx, c.Node(tenantNodeID(i)), cmd) + t.L().Printf("loaded data for tenant %d", tenantID(i)) + return err + }) + } + m1.Wait() + + if !t.SkipInit() { + t.L().Printf("loaded data for all tenants, sleeping (<%s)", 2*time.Minute) + time.Sleep(2 * time.Minute) + } + + t.L().Printf("running per-tenant workloads (<%s)", s.duration+time.Minute) + m2 := c.NewMonitor(ctx, crdbNode) + for i := 0; i < numTenants; i++ { + i := i + pgurl := tenants[i].secureURL() + m2.Go(func(ctx context.Context) error { + cmd := fmt.Sprintf( + "./cockroach workload run kv '%s' --write-seq=%s --secure --min-block-bytes %d "+ + "--max-block-bytes %d --batch %d --duration=%s --read-percent=%d --concurrency=%d", + pgurl, fmt.Sprintf("R%d", s.maxOps*s.batch), s.blockSize, s.blockSize, s.batch, + s.duration, s.readPercent, s.concurrency(tenantNodeID(i)-1)) + + err := c.RunE(ctx, c.Node(tenantNodeID(i)), cmd) + t.L().Printf("ran workload for tenant %d", tenantID(i)) + return err + }) + } + m2.Wait() + + // Pull workload performance from crdb_internal.statement_statistics. We + // could alternatively get these from the workload itself but this was + // easier. + // + // TODO(irfansharif): Worth using clusterstats for this directly against a + // prometheus instance pointed to each tenant's workload generator. + // TODO(irfansharif): Make sure that count of failed queries is small/zero. + // TODO(irfansharif): Aren't these stats getting polluted by the data-load + // step? + t.L().Printf("computing workload statistics (%s)", 30*time.Second) + counts := make([]float64, numTenants) + meanLatencies := make([]float64, numTenants) + for i := 0; i < numTenants; i++ { + i := i + db, err := gosql.Open("postgres", tenants[i].pgURL) + if err != nil { + t.Fatal(err) + } + defer func() { _ = db.Close() }() + + tdb := sqlutils.MakeSQLRunner(db) + tdb.Exec(t, "USE kv") + + rows := tdb.Query(t, ` + SELECT + sum((statistics -> 'statistics' -> 'cnt')::INT), + avg((statistics -> 'statistics' -> 'runLat' -> 'mean')::FLOAT) + FROM crdb_internal.statement_statistics + WHERE metadata @> '{"db":"kv","failed":false}' AND metadata @> $1`, + fmt.Sprintf(`{"querySummary": "%s"}`, s.query)) + + if rows.Next() { + var cnt, lat float64 + err := rows.Scan(&cnt, &lat) + require.NoError(t, err) + counts[i] = cnt + meanLatencies[i] = lat + } else { + t.Fatal("no query results") + } + } + + failThreshold := .3 + throughput := make([]float64, numTenants) + ok, maxThroughputDelta := floatsWithinPercentage(counts, failThreshold) + for i, count := range counts { + throughput[i] = count / s.duration.Seconds() + } + t.L().Printf("max-throughput-delta=%d%% average-throughput=%f total-ops-per-tenant=%v\n", int(maxThroughputDelta*100), averageFloat(throughput), counts) + if !ok { + // TODO(irfansharif): This is a weak assertion. Variation occurs when + // there are workload differences during periods where AC is not + // inducing any queuing. Remove? + t.L().Printf("throughput not within expectations: %f > %f %v", maxThroughputDelta, failThreshold, throughput) + } + + ok, maxLatencyDelta := floatsWithinPercentage(meanLatencies, failThreshold) + t.L().Printf("max-latency-delta=d%% mean-latency-per-tenant=%v\n", int(maxLatencyDelta*100), meanLatencies) + if !ok { + // TODO(irfansharif): Same as above -- this is a weak assertion. + t.L().Printf("latency not within expectations: %f > %f %v", maxLatencyDelta, failThreshold, meanLatencies) + } + + c.Run(ctx, crdbNode, "mkdir", "-p", t.PerfArtifactsDir()) + results := fmt.Sprintf(`{ "max_tput_delta": %f, "max_tput": %f, "min_tput": %f, "max_latency": %f, "min_latency": %f}`, + maxThroughputDelta, maxFloat(throughput), minFloat(throughput), maxFloat(meanLatencies), minFloat(meanLatencies)) + c.Run(ctx, crdbNode, fmt.Sprintf(`echo '%s' > %s/stats.json`, results, t.PerfArtifactsDir())) +} + +func averageFloat(values []float64) float64 { + sum := 0.0 + for _, v := range values { + sum += v + } + return sum / float64(len(values)) +} + +func minFloat(values []float64) float64 { + min := values[0] + for _, v := range values { + min = math.Min(v, min) + } + return min +} + +func maxFloat(values []float64) float64 { + max := values[0] + for _, v := range values { + max = math.Max(v, max) + } + return max +} + +func floatsWithinPercentage(values []float64, percent float64) (bool, float64) { + avg := averageFloat(values) + limit := avg * percent + maxDelta := 0.0 + for _, v := range values { + delta := math.Abs(avg - v) + if delta > limit { + return false, 1.0 - (avg-delta)/avg + } + if delta > maxDelta { + maxDelta = delta + } + } + maxDelta = 1.0 - (avg-maxDelta)/avg // make a percentage + return true, maxDelta +} diff --git a/pkg/cmd/roachtest/tests/multitenant_fairness.go b/pkg/cmd/roachtest/tests/multitenant_fairness.go deleted file mode 100644 index 8cd90a62fe21..000000000000 --- a/pkg/cmd/roachtest/tests/multitenant_fairness.go +++ /dev/null @@ -1,403 +0,0 @@ -// Copyright 2022 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package tests - -import ( - "context" - gosql "database/sql" - "fmt" - "math" - "time" - - "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" - "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" - "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" - "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" - "github.com/cockroachdb/cockroach/pkg/roachprod/install" - "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" - "github.com/cockroachdb/cockroach/pkg/util/buildutil" - "github.com/stretchr/testify/require" -) - -type mtFairnessSpec struct { - name string - acEnabled bool - readPercent int - blockSize int - duration time.Duration - concurrency func(int) int - batchSize int - maxLoadOps int -} - -func registerMultiTenantFairness(r registry.Registry) { - // With AC off tests are too flakey. - acEnabled := true - acStr := map[bool]string{ - true: "admission", - false: "no-admission", - } - kvSpecs := []mtFairnessSpec{ - { - name: "same", - concurrency: func(int) int { return 250 }, - }, - { - name: "concurrency-skew", - concurrency: func(i int) int { return i * 250 }, - }, - } - for i := range kvSpecs { - s := kvSpecs[i] - s.blockSize = 5 - s.readPercent = 95 - s.acEnabled = acEnabled - s.duration = 5 * time.Minute - s.batchSize = 100 - s.maxLoadOps = 100_000 - - r.Add(registry.TestSpec{ - Name: fmt.Sprintf("multitenant/fairness/kv/%s/%s", s.name, acStr[s.acEnabled]), - Cluster: r.MakeClusterSpec(5), - Owner: registry.OwnerSQLQueries, - NonReleaseBlocker: false, - Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { - runMultiTenantFairness(ctx, t, c, s, "SELECT k, v FROM kv") - }, - }) - } - storeSpecs := []mtFairnessSpec{ - { - name: "same", - concurrency: func(i int) int { return 50 }, - }, - { - name: "concurrency-skew", - concurrency: func(i int) int { return i * 50 }, - }, - } - for i := range storeSpecs { - s := storeSpecs[i] - s.blockSize = 50_000 - s.readPercent = 5 - s.acEnabled = acEnabled - s.duration = 10 * time.Minute - s.batchSize = 1 - s.maxLoadOps = 1000 - - r.Add(registry.TestSpec{ - Name: fmt.Sprintf("multitenant/fairness/store/%s/%s", s.name, acStr[s.acEnabled]), - Cluster: r.MakeClusterSpec(5), - Owner: registry.OwnerSQLQueries, - NonReleaseBlocker: false, - Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { - runMultiTenantFairness(ctx, t, c, s, "UPSERT INTO kv(k, v)") - }, - }) - } - if buildutil.CrdbTestBuild { - quick := mtFairnessSpec{ - duration: 1, - acEnabled: true, - readPercent: 95, - name: "quick", - concurrency: func(i int) int { return 1 }, - blockSize: 2, - batchSize: 10, - maxLoadOps: 10000, - } - r.Add(registry.TestSpec{ - Name: "multitenant/fairness/quick", - Cluster: r.MakeClusterSpec(2), - Owner: registry.OwnerSQLQueries, - NonReleaseBlocker: false, - Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { - runMultiTenantFairness(ctx, t, c, quick, "SELECT k, v FROM kv") - }, - }) - } -} - -// Test that the kvserver fairly distributes CPU token on a highly concurrent 4 sql pod workload. -func runMultiTenantFairness( - ctx context.Context, t test.Test, c cluster.Cluster, s mtFairnessSpec, query string, -) { - numTenants := 4 - duration := s.duration - - // For quick local testing. - quick := c.IsLocal() || s.name == "quick" - var kvstores option.NodeListOption - if quick { - numTenants = 1 - duration = 30 * time.Second - s.concurrency = func(i int) int { return 4 } - kvstores = c.Node(1) - } else { - kvstores = c.Nodes(1) - } - - c.Put(ctx, t.Cockroach(), "./cockroach") - c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings(install.SecureOption(true)), c.Node(1)) - // I think a more conformant test should have 3 kvserver nodes but this - // leads to crashes and in-stability so stay with 1 for now. - // if !quick { - // c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings(install.SecureOption(true)), c.Node(2)) - // c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings(install.SecureOption(true)), c.Node(3)) - // } - setAdmissionControl(ctx, t, c, s.acEnabled) - - // This isn't working... - // promCfg := &prometheus.Config{} - // promCfg.WithPrometheusNode(c.Node(1).InstallNodes()[0]) - // promCfg.WithNodeExporter(c.Range(1, c.Spec().NodeCount-1).InstallNodes()) - // promCfg.WithCluster(c.Range(1, c.Spec().NodeCount-1).InstallNodes()) - - setRateLimit := func(ctx context.Context, val int, node int) { - db := c.Conn(ctx, t.L(), node) - defer db.Close() - if _, err := db.ExecContext( - ctx, fmt.Sprintf("SET CLUSTER SETTING kv.tenant_rate_limiter.rate_limit = '%d'", val)); err != nil { - t.Fatalf("failed to set kv.tenant_rate_limiter.rate_limit: %v", err) - } - } - - setRateLimit(ctx, 1_000_000, 1) - - const ( - tenantBaseID = 11 - tenantBaseHTTPPort = 8081 - tenantBaseSQLPort = 26259 - ) - - tenantHTTPPort := func(offset int) int { - if c.IsLocal() { - return tenantBaseHTTPPort + offset - } - return tenantBaseHTTPPort - } - tenantSQLPort := func(offset int) int { - if c.IsLocal() { - return tenantBaseSQLPort + offset - } - return tenantBaseSQLPort - } - - setTenantResourceLimits := func(tid int) { - db := c.Conn(ctx, t.L(), 1) - defer db.Close() - if _, err := db.ExecContext( - ctx, fmt.Sprintf( - "SELECT crdb_internal.update_tenant_resource_limits(%[1]d, 1000000000, 10000, 1000000, now(), 0)", tid)); err != nil { - t.Fatalf("failed to update_tenant_resource_limits: %v", err) - } - } - - tenantNodeID := func(idx int) int { - if quick { - return idx + 1 - } - return idx + 2 - } - - // Create the tenants. - t.L().Printf("initializing %d tenants", numTenants) - tenantIDs := make([]int, 0, numTenants) - for i := 0; i < numTenants; i++ { - tenantIDs = append(tenantIDs, tenantBaseID+i) - } - - tenants := make([]*tenantNode, numTenants) - for i := 0; i < numTenants; i++ { - node := tenantNodeID(i) - _, err := c.Conn(ctx, t.L(), 1).Exec(`SELECT crdb_internal.create_tenant($1::INT)`, tenantBaseID+i) - require.NoError(t, err) - tenant := createTenantNode(ctx, t, c, c.Node(1), tenantBaseID+i, node, tenantHTTPPort(i), tenantSQLPort(i), createTenantOtherTenantIDs(tenantIDs)) - defer tenant.stop(ctx, t, c) - tenant.start(ctx, t, c, "./cockroach") - tenants[i] = tenant - setTenantResourceLimits(tenantBaseID + i) - - // Init kv on each tenant. - cmd := fmt.Sprintf("./cockroach workload init kv '%s' --secure", tenant.secureURL()) - err = c.RunE(ctx, c.Node(node), cmd) - require.NoError(t, err) - - // promCfg.ScrapeConfigs = append(promCfg.ScrapeConfigs, prometheus.MakeWorkloadScrapeConfig(fmt.Sprintf("workload-%d", i), - // "/", makeWorkloadScrapeNodes(c.Node(node).InstallNodes()[0], []workloadInstance{ - // {nodes: c.Node(node)}, - // }))) - } - - // promCfg.WithGrafanaDashboard("http://go.crdb.dev/p/snapshot-admission-control-grafana") - // _, cleanupFunc := setupPrometheusForRoachtest(ctx, t, c, promCfg, nil) - // defer cleanupFunc() - - m := c.NewMonitor(ctx, kvstores) - - // NB: we're using --tolerate-errors because of sql liveness errors like this: - // ERROR: liveness session expired 571.043163ms before transaction - // dialed back batch to 20 so we don't need. - - t.L().Printf("running dataload ") - for i := 0; i < numTenants; i++ { - tid := tenantBaseID + i - node := tenantNodeID(i) - pgurl := tenants[i].secureURL() - m.Go(func(ctx context.Context) error { - cmd := fmt.Sprintf( - "./cockroach workload run kv '%s' --secure --min-block-bytes %d --max-block-bytes %d "+ - "--batch %d --max-ops %d --concurrency=100", - pgurl, s.blockSize, s.blockSize, s.batchSize, s.maxLoadOps) - err := c.RunE(ctx, c.Node(node), cmd) - t.L().Printf("dataload for tenant %d done", tid) - return err - }) - } - - m.Wait() - t.L().Printf("running main workloads") - m = c.NewMonitor(ctx, c.Node(1)) - - for i := 0; i < numTenants; i++ { - tid := tenantBaseID + i - node := tenantNodeID(i) - pgurl := tenants[i].secureURL() - m.Go(func(ctx context.Context) error { - cmd := fmt.Sprintf( - "./cockroach workload run kv '%s' --write-seq=%s --secure --min-block-bytes %d "+ - "--max-block-bytes %d --batch %d --duration=%s --read-percent=%d --concurrency=%d", - pgurl, fmt.Sprintf("R%d", s.maxLoadOps*s.batchSize), s.blockSize, s.blockSize, s.batchSize, - duration, s.readPercent, s.concurrency(node-1)) - err := c.RunE(ctx, c.Node(node), cmd) - t.L().Printf("workload for tenant %d done", tid) - return err - }) - } - - m.Wait() - t.L().Printf("workloads done") - - // Pull workload performance from crdb_internal.statement_statistics. Alternatively we could pull these from - // workload but this seemed most straightforward. - counts := make([]float64, numTenants) - meanLatencies := make([]float64, numTenants) - for i := 0; i < numTenants; i++ { - db, err := gosql.Open("postgres", tenants[i].pgURL) - if err != nil { - t.Fatal(err) - } - defer func() { _ = db.Close() }() - tdb := sqlutils.MakeSQLRunner(db) - tdb.Exec(t, "USE kv") - querySelector := fmt.Sprintf(`{"querySummary": "%s"}`, query) - // TODO: should we check that count of failed queries is smallish? - rows := tdb.Query(t, ` - SELECT - sum((statistics -> 'statistics' -> 'cnt')::INT), - avg((statistics -> 'statistics' -> 'runLat' -> 'mean')::FLOAT) - FROM crdb_internal.statement_statistics - WHERE metadata @> '{"db":"kv","failed":false}' AND metadata @> $1`, querySelector) - - if rows.Next() { - var cnt, lat float64 - err := rows.Scan(&cnt, &lat) - require.NoError(t, err) - counts[i] = cnt - meanLatencies[i] = lat - } else { - t.Fatal("no query results") - } - } - - failThreshold := .3 - - throughput := make([]float64, numTenants) - ok, maxThroughputDelta := floatsWithinPercentage(counts, failThreshold) - for i, count := range counts { - throughput[i] = count / duration.Seconds() - } - - if s.acEnabled && !ok { - t.L().Printf("Throughput not within expectations: %f > %f %v", maxThroughputDelta, failThreshold, throughput) - } - - t.L().Printf("Max throughput delta: %d%% %d %v\n", int(maxThroughputDelta*100), average(throughput), counts) - - ok, maxLatencyDelta := floatsWithinPercentage(meanLatencies, failThreshold) - t.L().Printf("Max latency delta: %d%% %v\n", int(maxLatencyDelta*100), meanLatencies) - - if s.acEnabled && !ok { - t.L().Printf("Latency not within expectations: %f > %f %v", maxLatencyDelta, failThreshold, meanLatencies) - } - - c.Run(ctx, c.Node(1), "mkdir", "-p", t.PerfArtifactsDir()) - results := fmt.Sprintf(`{ "max_tput_delta": %f, "max_tput": %f, "min_tput": %f, "max_latency": %f, "min_latency": %f}`, - maxThroughputDelta, maxFloat(throughput), minFloat(throughput), maxFloat(meanLatencies), minFloat(meanLatencies)) - t.L().Printf("reporting perf results: %s", results) - cmd := fmt.Sprintf(`echo '%s' > %s/stats.json`, results, t.PerfArtifactsDir()) - c.Run(ctx, c.Node(1), cmd) - - // get cluster timeseries data into artifacts - err := c.FetchTimeseriesData(ctx, t.L()) - require.NoError(t, err) -} - -func average(values []float64) int { - average := 0 - for _, v := range values { - average += int(v) - } - average /= len(values) - return average -} - -func minFloat(values []float64) float64 { - min := values[0] - for _, v := range values { - if v < min { - min = v - } - } - return min -} - -func maxFloat(values []float64) float64 { - max := values[0] - for _, v := range values { - if v > max { - max = v - } - } - return max -} - -func floatsWithinPercentage(values []float64, percent float64) (bool, float64) { - average := 0.0 - for _, v := range values { - average += v - } - average = average / float64(len(values)) - limit := average * percent - maxDelta := 0.0 - for _, v := range values { - delta := math.Abs(average - v) - if delta > limit { - return false, 1.0 - (average-delta)/average - } - if delta > maxDelta { - maxDelta = delta - } - } - // make a percentage - maxDelta = 1.0 - (average-maxDelta)/average - return true, maxDelta -} diff --git a/pkg/cmd/roachtest/tests/multitenant_utils.go b/pkg/cmd/roachtest/tests/multitenant_utils.go index 61e6d9b1562d..dbb39f4fe0b3 100644 --- a/pkg/cmd/roachtest/tests/multitenant_utils.go +++ b/pkg/cmd/roachtest/tests/multitenant_utils.go @@ -139,7 +139,7 @@ func (tn *tenantNode) createTenantCert( names = append(names, "localhost", "127.0.0.1") cmd := fmt.Sprintf( - "./cockroach cert create-tenant-client --certs-dir=certs --ca-key=certs/ca.key %d %s", + "./cockroach cert create-tenant-client --certs-dir=certs --ca-key=certs/ca.key %d %s --overwrite", tn.tenantID, strings.Join(names, " ")) c.Run(ctx, c.Node(tn.node), cmd) } @@ -248,7 +248,7 @@ func (tn *tenantNode) start(ctx context.Context, t test.Test, c cluster.Cluster, return err }) - t.L().Printf("sql server for tenant %d (%d) running at %s", tn.tenantID, tn.instanceID, tn.pgURL) + t.L().Printf("sql server for tenant %d (instance %d) now running", tn.tenantID, tn.instanceID) } func startTenantServer( diff --git a/pkg/cmd/roachtest/tests/registry.go b/pkg/cmd/roachtest/tests/registry.go index a9b3ec06604f..a192e10a6a78 100644 --- a/pkg/cmd/roachtest/tests/registry.go +++ b/pkg/cmd/roachtest/tests/registry.go @@ -82,7 +82,6 @@ func RegisterTests(r registry.Registry) { registerLoadSplits(r) registerMVCCGC(r) registerMultiTenantDistSQL(r) - registerMultiTenantFairness(r) registerMultiTenantTPCH(r) registerMultiTenantUpgrade(r) registerNetwork(r) diff --git a/pkg/roachprod/prometheus/prometheus.go b/pkg/roachprod/prometheus/prometheus.go index 07418d15cbc9..87e023db10c3 100644 --- a/pkg/roachprod/prometheus/prometheus.go +++ b/pkg/roachprod/prometheus/prometheus.go @@ -130,6 +130,13 @@ func (cfg *Config) WithCluster(nodes install.Nodes) *Config { return cfg } +// WithTenantPod adds scraping for a tenant SQL pod running on the given nodes. +// Chains for convenience. +func (cfg *Config) WithTenantPod(node install.Node, tenantID int) *Config { + cfg.ScrapeConfigs = append(cfg.ScrapeConfigs, MakeInsecureTenantPodScrapeConfig(node, tenantID)...) + return cfg +} + // WithGrafanaDashboard adds links to dashboards to provision into Grafana. See // cfg.Grafana.DashboardURLs for helpful tips. // Enables Grafana if not already enabled. @@ -177,7 +184,10 @@ func MakeInsecureCockroachScrapeConfig(nodes install.Nodes) []ScrapeConfig { sl = append(sl, ScrapeConfig{ JobName: "cockroach-n" + s, MetricsPath: "/_status/vars", - Labels: map[string]string{"node": s}, + Labels: map[string]string{ + "node": s, + "tenant": "system", // all CRDB nodes emit SQL metrics for the system tenant since it's embedded + }, ScrapeNodes: []ScrapeNode{ { Node: node, @@ -189,6 +199,27 @@ func MakeInsecureCockroachScrapeConfig(nodes install.Nodes) []ScrapeConfig { return sl } +// MakeInsecureTenantPodScrapeConfig creates a scrape config for a given tenant +// SQL pod. All nodes are assumed to be insecure and running on port 8081. +func MakeInsecureTenantPodScrapeConfig(node install.Node, tenantID int) []ScrapeConfig { + var sl []ScrapeConfig + sl = append(sl, ScrapeConfig{ + JobName: fmt.Sprintf("cockroach-tenant-t%d-n%d", tenantID, int(node)), + MetricsPath: "/_status/vars", + Labels: map[string]string{ + "node": strconv.Itoa(int(node)), + "tenant": strconv.Itoa(tenantID), + }, + ScrapeNodes: []ScrapeNode{ + { + Node: node, + Port: 8081, + }, + }, + }) + return sl +} + // Prometheus contains metadata of a running instance of prometheus. type Prometheus struct { Config @@ -494,6 +525,10 @@ func makeNodeIPMap(c *install.SyncedCluster) (map[install.Node]string, error) { // makeYAMLConfig creates a prometheus YAML config for the server to use. func makeYAMLConfig(scrapeConfigs []ScrapeConfig, nodeIPs map[install.Node]string) (string, error) { + type tlsConfig struct { + InsecureSkipVerify bool `yaml:"insecure_skip_verify"` + } + type yamlStaticConfig struct { Labels map[string]string `yaml:",omitempty"` Targets []string @@ -503,6 +538,7 @@ func makeYAMLConfig(scrapeConfigs []ScrapeConfig, nodeIPs map[install.Node]strin JobName string `yaml:"job_name"` StaticConfigs []yamlStaticConfig `yaml:"static_configs"` MetricsPath string `yaml:"metrics_path"` + TLSConfig tlsConfig `yaml:"tls_config"` } type yamlConfig struct { @@ -534,6 +570,9 @@ func makeYAMLConfig(scrapeConfigs []ScrapeConfig, nodeIPs map[install.Node]strin Targets: targets, }, }, + TLSConfig: tlsConfig{ + InsecureSkipVerify: true, + }, }, ) } diff --git a/pkg/roachprod/prometheus/testdata/multiple_scrape_nodes b/pkg/roachprod/prometheus/testdata/multiple_scrape_nodes index 85fa3430a5cf..90f6fc379ebe 100644 --- a/pkg/roachprod/prometheus/testdata/multiple_scrape_nodes +++ b/pkg/roachprod/prometheus/testdata/multiple_scrape_nodes @@ -12,8 +12,12 @@ scrape_configs: - 127.0.0.4:2003 - 127.0.0.5:2003 metrics_path: /b + tls_config: + insecure_skip_verify: true - job_name: workload1 static_configs: - targets: - 127.0.0.6:2009 metrics_path: /c + tls_config: + insecure_skip_verify: true diff --git a/pkg/roachprod/prometheus/testdata/using_make_commands b/pkg/roachprod/prometheus/testdata/using_make_commands index 9e34cea68699..899de02c492c 100644 --- a/pkg/roachprod/prometheus/testdata/using_make_commands +++ b/pkg/roachprod/prometheus/testdata/using_make_commands @@ -12,17 +12,25 @@ scrape_configs: - 127.0.0.5:2005 - 127.0.0.6:2009 metrics_path: / + tls_config: + insecure_skip_verify: true - job_name: cockroach-n8 static_configs: - labels: node: "8" + tenant: system targets: - 127.0.0.8:26258 metrics_path: /_status/vars + tls_config: + insecure_skip_verify: true - job_name: cockroach-n9 static_configs: - labels: node: "9" + tenant: system targets: - 127.0.0.9:26258 metrics_path: /_status/vars + tls_config: + insecure_skip_verify: true diff --git a/pkg/roachprod/roachprod.go b/pkg/roachprod/roachprod.go index 49f2d76207a2..7c19c619913f 100644 --- a/pkg/roachprod/roachprod.go +++ b/pkg/roachprod/roachprod.go @@ -925,12 +925,11 @@ func urlGenerator( config.path = "/" + config.path } url := fmt.Sprintf("%s://%s:%d%s", scheme, host, config.port, config.path) + urls = append(urls, url) if config.openInBrowser { if err := exec.Command("python", "-m", "webbrowser", url).Run(); err != nil { return nil, err } - } else { - urls = append(urls, url) } } return urls, nil