diff --git a/pkg/cmd/roachtest/tests/BUILD.bazel b/pkg/cmd/roachtest/tests/BUILD.bazel index 28e66b5fcf1e..1f1128d83909 100644 --- a/pkg/cmd/roachtest/tests/BUILD.bazel +++ b/pkg/cmd/roachtest/tests/BUILD.bazel @@ -10,6 +10,7 @@ go_library( "activerecord_blocklist.go", "admission_control.go", "admission_control_elastic_backup.go", + "admission_control_elastic_cdc.go", "admission_control_multi_store_overload.go", "admission_control_snapshot_overload.go", "admission_control_tpcc_overload.go", diff --git a/pkg/cmd/roachtest/tests/admission_control.go b/pkg/cmd/roachtest/tests/admission_control.go index 4d3835f14493..f039f236c697 100644 --- a/pkg/cmd/roachtest/tests/admission_control.go +++ b/pkg/cmd/roachtest/tests/admission_control.go @@ -29,6 +29,7 @@ func registerAdmission(r registry.Registry) { // over some latency threshold. Will be Useful to track over time. registerElasticControlForBackups(r) + registerElasticControlForCDC(r) registerMultiStoreOverload(r) registerSnapshotOverload(r) registerTPCCOverload(r) diff --git a/pkg/cmd/roachtest/tests/admission_control_elastic_cdc.go b/pkg/cmd/roachtest/tests/admission_control_elastic_cdc.go new file mode 100644 index 000000000000..361a90498753 --- /dev/null +++ b/pkg/cmd/roachtest/tests/admission_control_elastic_cdc.go @@ -0,0 +1,147 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tests + +import ( + "context" + "fmt" + "time" + + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" + "github.com/cockroachdb/cockroach/pkg/roachprod/prometheus" +) + +// This test sets up a 3-node CRDB cluster on 8vCPU machines running +// 1000-warehouse TPC-C, and kicks off a few changefeed backfills concurrently. +// We've observed latency spikes during backfills because of its CPU/scan-heavy +// nature -- it can elevate CPU scheduling latencies which in turn translates to +// an increase in foreground latency. +func registerElasticControlForCDC(r registry.Registry) { + r.Add(registry.TestSpec{ + Name: "admission-control/elastic-cdc", + Owner: registry.OwnerAdmissionControl, + // TODO(irfansharif): After two weeks of nightly baking time, reduce + // this to a weekly cadence. This is a long-running test and serves only + // as a coarse-grained benchmark. + // Tags: []string{`weekly`}, + Cluster: r.MakeClusterSpec(4, spec.CPU(8)), + RequiresLicense: true, + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { + if c.Spec().NodeCount < 4 { + t.Fatalf("expected at least 4 nodes, found %d", c.Spec().NodeCount) + } + + crdbNodes := c.Spec().NodeCount - 1 + workloadNode := crdbNodes + 1 + numWarehouses, workloadDuration, estimatedSetupTime := 1000, 60*time.Minute, 10*time.Minute + if c.IsLocal() { + numWarehouses, workloadDuration, estimatedSetupTime = 1, time.Minute, 2*time.Minute + } + + promCfg := &prometheus.Config{} + promCfg.WithPrometheusNode(c.Node(workloadNode).InstallNodes()[0]). + WithNodeExporter(c.Range(1, c.Spec().NodeCount-1).InstallNodes()). + WithCluster(c.Range(1, c.Spec().NodeCount-1).InstallNodes()). + WithGrafanaDashboard("http://go.crdb.dev/p/changefeed-admission-control-grafana"). + WithScrapeConfigs( + prometheus.MakeWorkloadScrapeConfig("workload", "/", + makeWorkloadScrapeNodes( + c.Node(workloadNode).InstallNodes()[0], + []workloadInstance{{nodes: c.Node(workloadNode)}}, + ), + ), + ) + + if t.SkipInit() { + t.Status(fmt.Sprintf("running tpcc for %s (<%s)", workloadDuration, time.Minute)) + } else { + t.Status(fmt.Sprintf("initializing + running tpcc for %s (<%s)", workloadDuration, 10*time.Minute)) + } + + padDuration, err := time.ParseDuration(ifLocal(c, "5s", "5m")) + if err != nil { + t.Fatal(err) + } + stopFeedsDuration, err := time.ParseDuration(ifLocal(c, "5s", "1m")) + if err != nil { + t.Fatal(err) + } + + runTPCC(ctx, t, c, tpccOptions{ + Warehouses: numWarehouses, + Duration: workloadDuration, + SetupType: usingImport, + EstimatedSetupTime: estimatedSetupTime, + SkipPostRunCheck: true, + ExtraSetupArgs: "--checks=false", + PrometheusConfig: promCfg, + During: func(ctx context.Context) error { + db := c.Conn(ctx, t.L(), crdbNodes) + defer db.Close() + + t.Status(fmt.Sprintf("configuring cluster (<%s)", 30*time.Second)) + { + setAdmissionControl(ctx, t, c, true) + + // Changefeeds depend on rangefeeds being enabled. + if _, err := db.Exec("SET CLUSTER SETTING kv.rangefeed.enabled = true"); err != nil { + return err + } + } + + stopFeeds(db) // stop stray feeds (from repeated runs against the same cluster for ex.) + defer stopFeeds(db) + + m := c.NewMonitor(ctx, c.Range(1, crdbNodes)) + m.Go(func(ctx context.Context) error { + const iters, changefeeds = 5, 10 + for i := 0; i < iters; i++ { + if i == 0 { + t.Status(fmt.Sprintf("setting performance baseline (<%s)", padDuration)) + } + time.Sleep(padDuration) // each iteration lasts long enough to observe effects in metrics + + t.Status(fmt.Sprintf("during: round %d: stopping extant changefeeds (<%s)", i, stopFeedsDuration)) + stopFeeds(db) + time.Sleep(stopFeedsDuration) // buffer for cancellations to take effect/show up in metrics + + t.Status(fmt.Sprintf("during: round %d: creating %d changefeeds (<%s)", i, changefeeds, time.Minute)) + for j := 0; j < changefeeds; j++ { + stmtWithCursor := fmt.Sprintf(` + CREATE CHANGEFEED FOR tpcc.order_line, tpcc.stock, tpcc.customer + INTO 'null://' WITH cursor = '-%ds' + `, int64(float64(i+1)*padDuration.Seconds())) // scanning as far back as possible (~ when the workload started) + if _, err := db.ExecContext(ctx, stmtWithCursor); err != nil { + return err + } + } + + // TODO(irfansharif): Add a version of this test + // with initial_scan = 'only' to demonstrate the + // need+efficacy of using elastic CPU control in + // changefeed workers. That too has a severe effect + // on scheduling latencies. + } + return nil + }) + + t.Status(fmt.Sprintf("waiting for workload to finish (<%s)", workloadDuration)) + m.Wait() + + return nil + }, + }) + }, + }) +} diff --git a/pkg/cmd/roachtest/tests/tpcc.go b/pkg/cmd/roachtest/tests/tpcc.go index fead65e29d9b..6400a610fcea 100644 --- a/pkg/cmd/roachtest/tests/tpcc.go +++ b/pkg/cmd/roachtest/tests/tpcc.go @@ -211,6 +211,9 @@ func runTPCC(ctx context.Context, t test.Test, c cluster.Cluster, opts tpccOptio var ep *tpccChaosEventProcessor var promCfg *prometheus.Config if !opts.DisablePrometheus { + // TODO(irfansharif): Move this after the import step. The statistics + // during import itself is uninteresting and pollutes actual workload + // data. var cleanupFunc func() promCfg, cleanupFunc = setupPrometheusForRoachtest(ctx, t, c, opts.PrometheusConfig, workloadInstances) defer cleanupFunc() diff --git a/pkg/roachprod/prometheus/prometheus.go b/pkg/roachprod/prometheus/prometheus.go index f35597157788..ac11cb0c0d8f 100644 --- a/pkg/roachprod/prometheus/prometheus.go +++ b/pkg/roachprod/prometheus/prometheus.go @@ -202,7 +202,7 @@ func Init( // NB: when upgrading here, make sure to target a version that picks up this PR: // https://github.com/prometheus/node_exporter/pull/2311 // At time of writing, there hasn't been a release in over half a year. - if err := c.RepeatRun(ctx, l, os.Stdout, os.Stderr, cfg.NodeExporter, + if err := c.RepeatRun(ctx, l, l.Stdout, l.Stderr, cfg.NodeExporter, "download node exporter", ` (sudo systemctl stop node_exporter || true) && @@ -214,7 +214,7 @@ rm -rf node_exporter && mkdir -p node_exporter && curl -fsSL \ } // Start node_exporter. - if err := c.Run(ctx, l, os.Stdout, os.Stderr, cfg.NodeExporter, "init node exporter", + if err := c.Run(ctx, l, l.Stdout, l.Stderr, cfg.NodeExporter, "init node exporter", `cd node_exporter && sudo systemd-run --unit node_exporter --same-dir ./node_exporter`, ); err != nil { @@ -226,8 +226,8 @@ sudo systemd-run --unit node_exporter --same-dir ./node_exporter`, if err := c.RepeatRun( ctx, l, - os.Stdout, - os.Stderr, + l.Stdout, + l.Stderr, cfg.PrometheusNode, "reset prometheus", "sudo systemctl stop prometheus || echo 'no prometheus is running'", @@ -238,8 +238,8 @@ sudo systemd-run --unit node_exporter --same-dir ./node_exporter`, if err := c.RepeatRun( ctx, l, - os.Stdout, - os.Stderr, + l.Stdout, + l.Stderr, cfg.PrometheusNode, "download prometheus", `sudo rm -rf /tmp/prometheus && mkdir /tmp/prometheus && cd /tmp/prometheus && @@ -272,8 +272,8 @@ sudo systemd-run --unit node_exporter --same-dir ./node_exporter`, if err := c.Run( ctx, l, - os.Stdout, - os.Stderr, + l.Stdout, + l.Stderr, cfg.PrometheusNode, "start-prometheus", `cd /tmp/prometheus && @@ -286,8 +286,8 @@ sudo systemd-run --unit prometheus --same-dir \ if cfg.Grafana.Enabled { // Install Grafana. if err := c.RepeatRun(ctx, l, - os.Stdout, - os.Stderr, cfg.PrometheusNode, "install grafana", + l.Stdout, + l.Stderr, cfg.PrometheusNode, "install grafana", `sudo apt-get install -qqy apt-transport-https && sudo apt-get install -qqy software-properties-common wget && wget -q -O - https://packages.grafana.com/gpg.key | sudo apt-key add - && @@ -299,8 +299,8 @@ sudo apt-get update -qqy && sudo apt-get install -qqy grafana-enterprise && sudo // Provision local prometheus instance as data source. if err := c.RepeatRun(ctx, l, - os.Stdout, - os.Stderr, cfg.PrometheusNode, "permissions", + l.Stdout, + l.Stderr, cfg.PrometheusNode, "permissions", `sudo chmod 777 /etc/grafana/provisioning/datasources /etc/grafana/provisioning/dashboards /var/lib/grafana/dashboards /etc/grafana/grafana.ini`, ); err != nil { return nil, err @@ -342,14 +342,14 @@ org_role = Admin for idx, u := range cfg.Grafana.DashboardURLs { cmd := fmt.Sprintf("curl -fsSL %s -o /var/lib/grafana/dashboards/%d.json", u, idx) - if err := c.Run(ctx, l, os.Stdout, os.Stderr, cfg.PrometheusNode, "download dashboard", + if err := c.Run(ctx, l, l.Stdout, l.Stderr, cfg.PrometheusNode, "download dashboard", cmd); err != nil { l.PrintfCtx(ctx, "failed to download dashboard from %s: %s", u, err) } } // Start Grafana. Default port is 3000. - if err := c.Run(ctx, l, os.Stdout, os.Stderr, cfg.PrometheusNode, "start grafana", + if err := c.Run(ctx, l, l.Stdout, l.Stderr, cfg.PrometheusNode, "start grafana", `sudo systemctl restart grafana-server`); err != nil { return nil, err } @@ -371,8 +371,8 @@ func Snapshot( if err := c.Run( ctx, l, - os.Stdout, - os.Stderr, + l.Stdout, + l.Stderr, promNode, "prometheus snapshot", `sudo rm -rf /tmp/prometheus/data/snapshots/* && curl -XPOST http://localhost:9090/api/v1/admin/tsdb/snapshot && @@ -442,13 +442,13 @@ func Shutdown( shutdownErr = errors.CombineErrors(shutdownErr, err) } } - if err := c.Run(ctx, l, os.Stdout, os.Stderr, nodes, "stop node exporter", + if err := c.Run(ctx, l, l.Stdout, l.Stderr, nodes, "stop node exporter", `sudo systemctl stop node_exporter || echo 'Stopped node exporter'`); err != nil { l.Printf("Failed to stop node exporter: %v", err) shutdownErr = errors.CombineErrors(shutdownErr, err) } - if err := c.Run(ctx, l, os.Stdout, os.Stderr, promNode, "stop grafana", + if err := c.Run(ctx, l, l.Stdout, l.Stderr, promNode, "stop grafana", `sudo systemctl stop grafana-server || echo 'Stopped grafana'`); err != nil { l.Printf("Failed to stop grafana server: %v", err) shutdownErr = errors.CombineErrors(shutdownErr, err) @@ -457,8 +457,8 @@ func Shutdown( if err := c.RepeatRun( ctx, l, - os.Stdout, - os.Stderr, + l.Stdout, + l.Stderr, promNode, "stop prometheus", "sudo systemctl stop prometheus || echo 'Stopped prometheus'",