Skip to content

Commit

Permalink
roachtest: introduce admission-control/elastic-cdc
Browse files Browse the repository at this point in the history
Part of #89208. This test sets up a 3-node CRDB cluster on 8vCPU
machines running 1000-warehouse TPC-C, and kicks off a few changefeed
backfills concurrently. We've observed latency spikes during backfills
because of its CPU/scan-heavy nature -- it can elevate CPU scheduling
latencies which in turn translates to an increase in foreground latency.

Also in this commit: routing std{err,out} from prometheus/grafana setup
that roachtests do to the logger in scope.

Release note: None
  • Loading branch information
irfansharif committed Oct 17, 2022
1 parent fa47f7b commit 42d08d8
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 20 deletions.
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"activerecord_blocklist.go",
"admission_control.go",
"admission_control_elastic_backup.go",
"admission_control_elastic_cdc.go",
"admission_control_multi_store_overload.go",
"admission_control_snapshot_overload.go",
"admission_control_tpcc_overload.go",
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/admission_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func registerAdmission(r registry.Registry) {
// over some latency threshold. Will be Useful to track over time.

registerElasticControlForBackups(r)
registerElasticControlForCDC(r)
registerMultiStoreOverload(r)
registerSnapshotOverload(r)
registerTPCCOverload(r)
Expand Down
147 changes: 147 additions & 0 deletions pkg/cmd/roachtest/tests/admission_control_elastic_cdc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package tests

import (
"context"
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/prometheus"
)

// This test sets up a 3-node CRDB cluster on 8vCPU machines running
// 1000-warehouse TPC-C, and kicks off a few changefeed backfills concurrently.
// We've observed latency spikes during backfills because of its CPU/scan-heavy
// nature -- it can elevate CPU scheduling latencies which in turn translates to
// an increase in foreground latency.
func registerElasticControlForCDC(r registry.Registry) {
r.Add(registry.TestSpec{
Name: "admission-control/elastic-cdc",
Owner: registry.OwnerAdmissionControl,
// TODO(irfansharif): After two weeks of nightly baking time, reduce
// this to a weekly cadence. This is a long-running test and serves only
// as a coarse-grained benchmark.
// Tags: []string{`weekly`},
Cluster: r.MakeClusterSpec(4, spec.CPU(8)),
RequiresLicense: true,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
if c.Spec().NodeCount < 4 {
t.Fatalf("expected at least 4 nodes, found %d", c.Spec().NodeCount)
}

crdbNodes := c.Spec().NodeCount - 1
workloadNode := crdbNodes + 1
numWarehouses, workloadDuration, estimatedSetupTime := 1000, 60*time.Minute, 10*time.Minute
if c.IsLocal() {
numWarehouses, workloadDuration, estimatedSetupTime = 1, time.Minute, 2*time.Minute
}

promCfg := &prometheus.Config{}
promCfg.WithPrometheusNode(c.Node(workloadNode).InstallNodes()[0]).
WithNodeExporter(c.Range(1, c.Spec().NodeCount-1).InstallNodes()).
WithCluster(c.Range(1, c.Spec().NodeCount-1).InstallNodes()).
WithGrafanaDashboard("http://go.crdb.dev/p/changefeed-admission-control-grafana").
WithScrapeConfigs(
prometheus.MakeWorkloadScrapeConfig("workload", "/",
makeWorkloadScrapeNodes(
c.Node(workloadNode).InstallNodes()[0],
[]workloadInstance{{nodes: c.Node(workloadNode)}},
),
),
)

if t.SkipInit() {
t.Status(fmt.Sprintf("running tpcc for %s (<%s)", workloadDuration, time.Minute))
} else {
t.Status(fmt.Sprintf("initializing + running tpcc for %s (<%s)", workloadDuration, 10*time.Minute))
}

padDuration, err := time.ParseDuration(ifLocal(c, "5s", "5m"))
if err != nil {
t.Fatal(err)
}
stopFeedsDuration, err := time.ParseDuration(ifLocal(c, "5s", "1m"))
if err != nil {
t.Fatal(err)
}

runTPCC(ctx, t, c, tpccOptions{
Warehouses: numWarehouses,
Duration: workloadDuration,
SetupType: usingImport,
EstimatedSetupTime: estimatedSetupTime,
SkipPostRunCheck: true,
ExtraSetupArgs: "--checks=false",
PrometheusConfig: promCfg,
During: func(ctx context.Context) error {
db := c.Conn(ctx, t.L(), crdbNodes)
defer db.Close()

t.Status(fmt.Sprintf("configuring cluster (<%s)", 30*time.Second))
{
setAdmissionControl(ctx, t, c, true)

// Changefeeds depend on rangefeeds being enabled.
if _, err := db.Exec("SET CLUSTER SETTING kv.rangefeed.enabled = true"); err != nil {
return err
}
}

stopFeeds(db) // stop stray feeds (from repeated runs against the same cluster for ex.)
defer stopFeeds(db)

m := c.NewMonitor(ctx, c.Range(1, crdbNodes))
m.Go(func(ctx context.Context) error {
const iters, changefeeds = 5, 10
for i := 0; i < iters; i++ {
if i == 0 {
t.Status(fmt.Sprintf("setting performance baseline (<%s)", padDuration))
}
time.Sleep(padDuration) // each iteration lasts long enough to observe effects in metrics

t.Status(fmt.Sprintf("during: round %d: stopping extant changefeeds (<%s)", i, stopFeedsDuration))
stopFeeds(db)
time.Sleep(stopFeedsDuration) // buffer for cancellations to take effect/show up in metrics

t.Status(fmt.Sprintf("during: round %d: creating %d changefeeds (<%s)", i, changefeeds, time.Minute))
for j := 0; j < changefeeds; j++ {
stmtWithCursor := fmt.Sprintf(`
CREATE CHANGEFEED FOR tpcc.order_line, tpcc.stock, tpcc.customer
INTO 'null://' WITH cursor = '-%ds'
`, int64(float64(i+1)*padDuration.Seconds())) // scanning as far back as possible (~ when the workload started)
if _, err := db.ExecContext(ctx, stmtWithCursor); err != nil {
return err
}
}

// TODO(irfansharif): Add a version of this test
// with initial_scan = 'only' to demonstrate the
// need+efficacy of using elastic CPU control in
// changefeed workers. That too has a severe effect
// on scheduling latencies.
}
return nil
})

t.Status(fmt.Sprintf("waiting for workload to finish (<%s)", workloadDuration))
m.Wait()

return nil
},
})
},
})
}
3 changes: 3 additions & 0 deletions pkg/cmd/roachtest/tests/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ func runTPCC(ctx context.Context, t test.Test, c cluster.Cluster, opts tpccOptio
var ep *tpccChaosEventProcessor
var promCfg *prometheus.Config
if !opts.DisablePrometheus {
// TODO(irfansharif): Move this after the import step. The statistics
// during import itself is uninteresting and pollutes actual workload
// data.
var cleanupFunc func()
promCfg, cleanupFunc = setupPrometheusForRoachtest(ctx, t, c, opts.PrometheusConfig, workloadInstances)
defer cleanupFunc()
Expand Down
40 changes: 20 additions & 20 deletions pkg/roachprod/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func Init(
// NB: when upgrading here, make sure to target a version that picks up this PR:
// https://github.com/prometheus/node_exporter/pull/2311
// At time of writing, there hasn't been a release in over half a year.
if err := c.RepeatRun(ctx, l, os.Stdout, os.Stderr, cfg.NodeExporter,
if err := c.RepeatRun(ctx, l, l.Stdout, l.Stderr, cfg.NodeExporter,
"download node exporter",
`
(sudo systemctl stop node_exporter || true) &&
Expand All @@ -214,7 +214,7 @@ rm -rf node_exporter && mkdir -p node_exporter && curl -fsSL \
}

// Start node_exporter.
if err := c.Run(ctx, l, os.Stdout, os.Stderr, cfg.NodeExporter, "init node exporter",
if err := c.Run(ctx, l, l.Stdout, l.Stderr, cfg.NodeExporter, "init node exporter",
`cd node_exporter &&
sudo systemd-run --unit node_exporter --same-dir ./node_exporter`,
); err != nil {
Expand All @@ -226,8 +226,8 @@ sudo systemd-run --unit node_exporter --same-dir ./node_exporter`,
if err := c.RepeatRun(
ctx,
l,
os.Stdout,
os.Stderr,
l.Stdout,
l.Stderr,
cfg.PrometheusNode,
"reset prometheus",
"sudo systemctl stop prometheus || echo 'no prometheus is running'",
Expand All @@ -238,8 +238,8 @@ sudo systemd-run --unit node_exporter --same-dir ./node_exporter`,
if err := c.RepeatRun(
ctx,
l,
os.Stdout,
os.Stderr,
l.Stdout,
l.Stderr,
cfg.PrometheusNode,
"download prometheus",
`sudo rm -rf /tmp/prometheus && mkdir /tmp/prometheus && cd /tmp/prometheus &&
Expand Down Expand Up @@ -272,8 +272,8 @@ sudo systemd-run --unit node_exporter --same-dir ./node_exporter`,
if err := c.Run(
ctx,
l,
os.Stdout,
os.Stderr,
l.Stdout,
l.Stderr,
cfg.PrometheusNode,
"start-prometheus",
`cd /tmp/prometheus &&
Expand All @@ -286,8 +286,8 @@ sudo systemd-run --unit prometheus --same-dir \
if cfg.Grafana.Enabled {
// Install Grafana.
if err := c.RepeatRun(ctx, l,
os.Stdout,
os.Stderr, cfg.PrometheusNode, "install grafana",
l.Stdout,
l.Stderr, cfg.PrometheusNode, "install grafana",
`sudo apt-get install -qqy apt-transport-https &&
sudo apt-get install -qqy software-properties-common wget &&
wget -q -O - https://packages.grafana.com/gpg.key | sudo apt-key add - &&
Expand All @@ -299,8 +299,8 @@ sudo apt-get update -qqy && sudo apt-get install -qqy grafana-enterprise && sudo

// Provision local prometheus instance as data source.
if err := c.RepeatRun(ctx, l,
os.Stdout,
os.Stderr, cfg.PrometheusNode, "permissions",
l.Stdout,
l.Stderr, cfg.PrometheusNode, "permissions",
`sudo chmod 777 /etc/grafana/provisioning/datasources /etc/grafana/provisioning/dashboards /var/lib/grafana/dashboards /etc/grafana/grafana.ini`,
); err != nil {
return nil, err
Expand Down Expand Up @@ -342,14 +342,14 @@ org_role = Admin

for idx, u := range cfg.Grafana.DashboardURLs {
cmd := fmt.Sprintf("curl -fsSL %s -o /var/lib/grafana/dashboards/%d.json", u, idx)
if err := c.Run(ctx, l, os.Stdout, os.Stderr, cfg.PrometheusNode, "download dashboard",
if err := c.Run(ctx, l, l.Stdout, l.Stderr, cfg.PrometheusNode, "download dashboard",
cmd); err != nil {
l.PrintfCtx(ctx, "failed to download dashboard from %s: %s", u, err)
}
}

// Start Grafana. Default port is 3000.
if err := c.Run(ctx, l, os.Stdout, os.Stderr, cfg.PrometheusNode, "start grafana",
if err := c.Run(ctx, l, l.Stdout, l.Stderr, cfg.PrometheusNode, "start grafana",
`sudo systemctl restart grafana-server`); err != nil {
return nil, err
}
Expand All @@ -371,8 +371,8 @@ func Snapshot(
if err := c.Run(
ctx,
l,
os.Stdout,
os.Stderr,
l.Stdout,
l.Stderr,
promNode,
"prometheus snapshot",
`sudo rm -rf /tmp/prometheus/data/snapshots/* && curl -XPOST http://localhost:9090/api/v1/admin/tsdb/snapshot &&
Expand Down Expand Up @@ -442,13 +442,13 @@ func Shutdown(
shutdownErr = errors.CombineErrors(shutdownErr, err)
}
}
if err := c.Run(ctx, l, os.Stdout, os.Stderr, nodes, "stop node exporter",
if err := c.Run(ctx, l, l.Stdout, l.Stderr, nodes, "stop node exporter",
`sudo systemctl stop node_exporter || echo 'Stopped node exporter'`); err != nil {
l.Printf("Failed to stop node exporter: %v", err)
shutdownErr = errors.CombineErrors(shutdownErr, err)
}

if err := c.Run(ctx, l, os.Stdout, os.Stderr, promNode, "stop grafana",
if err := c.Run(ctx, l, l.Stdout, l.Stderr, promNode, "stop grafana",
`sudo systemctl stop grafana-server || echo 'Stopped grafana'`); err != nil {
l.Printf("Failed to stop grafana server: %v", err)
shutdownErr = errors.CombineErrors(shutdownErr, err)
Expand All @@ -457,8 +457,8 @@ func Shutdown(
if err := c.RepeatRun(
ctx,
l,
os.Stdout,
os.Stderr,
l.Stdout,
l.Stderr,
promNode,
"stop prometheus",
"sudo systemctl stop prometheus || echo 'Stopped prometheus'",
Expand Down

0 comments on commit 42d08d8

Please sign in to comment.