diff --git a/pkg/cmd/roachtest/tests/decommission.go b/pkg/cmd/roachtest/tests/decommission.go index 3d12f16375ee..86488fe534df 100644 --- a/pkg/cmd/roachtest/tests/decommission.go +++ b/pkg/cmd/roachtest/tests/decommission.go @@ -304,7 +304,9 @@ func runDecommission( } if whileDown { - h.stop(ctx, node) + if err := c.StopCockroachGracefullyOnNode(ctx, t.L(), node); err != nil { + return err + } } run(fmt.Sprintf(`ALTER RANGE default CONFIGURE ZONE = 'constraints: {"-node%d"}'`, node)) @@ -319,7 +321,9 @@ func runDecommission( } if !whileDown { - h.stop(ctx, node) + if err := c.StopCockroachGracefullyOnNode(ctx, t.L(), node); err != nil { + return err + } } // Wipe the node and re-add to cluster with a new node ID. @@ -1150,14 +1154,6 @@ func newDecommTestHelper(t test.Test, c cluster.Cluster) *decommTestHelper { } } -// stop gracefully stops a node with a signal. -func (h *decommTestHelper) stop(ctx context.Context, node int) { - opts := option.DefaultStopOpts() - opts.RoachprodOpts.Sig = 15 // SIGTERM - opts.RoachprodOpts.Wait = true - h.c.Stop(ctx, h.t.L(), opts, h.c.Node(node)) -} - // getLogicalNodeID connects to the nodeIdx-th node in the roachprod cluster to // obtain the logical CockroachDB nodeID of this node. This is because nodes can // change ID as they are continuously decommissioned, wiped, and started anew. diff --git a/pkg/cmd/roachtest/tests/decommissionbench.go b/pkg/cmd/roachtest/tests/decommissionbench.go index 3dfd367345f0..579d8a25a774 100644 --- a/pkg/cmd/roachtest/tests/decommissionbench.go +++ b/pkg/cmd/roachtest/tests/decommissionbench.go @@ -17,6 +17,7 @@ import ( "encoding/json" "fmt" "math" + "os" "path/filepath" "strings" "sync/atomic" @@ -29,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" "github.com/cockroachdb/cockroach/pkg/roachprod/install" "github.com/cockroachdb/cockroach/pkg/roachprod/logger" + "github.com/cockroachdb/cockroach/pkg/roachprod/prometheus" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/workload/histogram" @@ -37,7 +39,10 @@ import ( ) const ( - defaultTimeout = 1 * time.Hour + defaultTimeout = 2 * time.Hour + envDecommissionNoSkipFlag = "ROACHTEST_DECOMMISSION_NOSKIP" + envDecommissionGrafana = "ROACHTEST_DECOMMISSION_GRAFANA" + envDecommissionGrafanaURL = "ROACHTEST_DECOMMISSION_GRAFANA_URL" // Metrics for recording and sending to roachperf. decommissionMetric = "decommission" @@ -48,6 +53,10 @@ const ( // Used to calculate estimated decommission time. Should remain in sync with // setting `kv.snapshot_recovery.max_rate` in store_snapshot.go. defaultSnapshotRateMb = 32 + + // Skip message for tests not meant to be run nightly. + manualBenchmarkingOnly = "This config can be used to perform some manual " + + "benchmarking and is not meant to be run on a nightly basis" ) type decommissionBenchSpec struct { @@ -56,12 +65,24 @@ type decommissionBenchSpec struct { warehouses int load bool admissionControl bool + multistore bool snapshotRate int duration time.Duration // When true, the test will attempt to stop the node prior to decommission. whileDown bool + // When true, will drain the SQL connections and leases from a node first. + drainFirst bool + + // When true, the test will add a node to the cluster prior to decommission, + // so that the upreplication will overlap with the the decommission. + whileUpreplicating bool + + // When true, attempts to simulate decommissioning a node with high read + // amplification by slowing writes on the target in a write-heavy workload. + slowWrites bool + // An override for the default timeout, if needed. timeout time.Duration @@ -72,20 +93,21 @@ type decommissionBenchSpec struct { // and adds them to the roachtest registry. func registerDecommissionBench(r registry.Registry) { for _, benchSpec := range []decommissionBenchSpec{ + // Basic benchmark configurations, to be run nightly. { nodes: 4, - cpus: 4, - warehouses: 100, + cpus: 16, + warehouses: 1000, load: true, admissionControl: true, }, { nodes: 4, - cpus: 4, - warehouses: 100, + cpus: 16, + warehouses: 1000, load: true, admissionControl: true, - duration: 30 * time.Minute, + duration: 1 * time.Hour, }, { nodes: 4, @@ -93,14 +115,30 @@ func registerDecommissionBench(r registry.Registry) { warehouses: 1000, load: true, admissionControl: true, + whileDown: true, }, { - nodes: 4, + nodes: 8, cpus: 16, - warehouses: 1000, + warehouses: 3000, load: true, admissionControl: true, - duration: 1 * time.Hour, + // This test can take nearly an hour to import and achieve balance, so + // we extend the timeout to let it complete. + timeout: 4 * time.Hour, + }, + // Manually run benchmark configurations. + { + nodes: 8, + cpus: 16, + warehouses: 3000, + load: true, + admissionControl: true, + whileUpreplicating: true, + // This test can take nearly an hour to import and achieve balance, so + // we extend the timeout to let it complete. + timeout: 4 * time.Hour, + skip: manualBenchmarkingOnly, }, { nodes: 4, @@ -108,17 +146,43 @@ func registerDecommissionBench(r registry.Registry) { warehouses: 1000, load: true, admissionControl: true, - whileDown: true, + drainFirst: true, + skip: manualBenchmarkingOnly, }, { - nodes: 4, + nodes: 4, + cpus: 16, + warehouses: 1000, + load: true, + slowWrites: true, + skip: manualBenchmarkingOnly, + }, + { + nodes: 8, + cpus: 16, + warehouses: 3000, + load: true, + slowWrites: true, + // This test can take nearly an hour to import and achieve balance, so + // we extend the timeout to let it complete. + timeout: 4 * time.Hour, + skip: manualBenchmarkingOnly, + }, + { + nodes: 12, cpus: 16, - warehouses: 1000, + warehouses: 3000, load: true, - admissionControl: false, + admissionControl: true, + multistore: true, + // This test can take nearly an hour to import and achieve balance, so + // we extend the timeout to let it complete. + timeout: 3 * time.Hour, + skip: manualBenchmarkingOnly, }, { - nodes: 8, + // Test to compare 12 4-store nodes vs 48 single-store nodes + nodes: 48, cpus: 16, warehouses: 3000, load: true, @@ -126,6 +190,7 @@ func registerDecommissionBench(r registry.Registry) { // This test can take nearly an hour to import and achieve balance, so // we extend the timeout to let it complete. timeout: 3 * time.Hour, + skip: manualBenchmarkingOnly, }, } { registerDecommissionBenchSpec(r, benchSpec) @@ -140,6 +205,8 @@ func registerDecommissionBenchSpec(r registry.Registry, benchSpec decommissionBe timeout = benchSpec.timeout } extraNameParts := []string{""} + addlNodeCount := 0 + specOptions := []spec.Option{spec.CPU(benchSpec.cpus)} if benchSpec.snapshotRate != 0 { extraNameParts = append(extraNameParts, @@ -150,11 +217,30 @@ func registerDecommissionBenchSpec(r registry.Registry, benchSpec decommissionBe extraNameParts = append(extraNameParts, "while-down") } + if benchSpec.drainFirst { + extraNameParts = append(extraNameParts, "drain-first") + } + + if benchSpec.multistore { + extraNameParts = append(extraNameParts, "multi-store") + specOptions = append(specOptions, spec.SSD(4)) + } + + if benchSpec.whileUpreplicating { + // Only add additional nodes if we aren't performing repeated decommissions. + if benchSpec.duration == 0 { + addlNodeCount = 1 + } + extraNameParts = append(extraNameParts, "while-upreplicating") + } + if !benchSpec.load { extraNameParts = append(extraNameParts, "no-load") } - if !benchSpec.admissionControl { + if benchSpec.slowWrites { + extraNameParts = append(extraNameParts, "hi-read-amp") + } else if !benchSpec.admissionControl { extraNameParts = append(extraNameParts, "no-admission") } @@ -163,18 +249,22 @@ func registerDecommissionBenchSpec(r registry.Registry, benchSpec decommissionBe extraNameParts = append(extraNameParts, fmt.Sprintf("duration=%s", benchSpec.duration)) } - extraName := strings.Join(extraNameParts, "/") + // If run with ROACHTEST_DECOMMISSION_NOSKIP=1, roachtest will enable all specs. + noSkipFlag := os.Getenv(envDecommissionNoSkipFlag) + if noSkipFlag != "" { + benchSpec.skip = "" + } - // TODO(sarkesian): add a configuration that tests decommission of a node - // while upreplication to a recently added node is still ongoing. This - // will require the test to allocate additional Roachprod nodes up front, - // and start them just prior to initiating the decommission. + extraName := strings.Join(extraNameParts, "/") r.Add(registry.TestSpec{ Name: fmt.Sprintf("decommissionBench/nodes=%d/cpu=%d/warehouses=%d%s", benchSpec.nodes, benchSpec.cpus, benchSpec.warehouses, extraName), - Owner: registry.OwnerKV, - Cluster: r.MakeClusterSpec(benchSpec.nodes+1, spec.CPU(benchSpec.cpus)), + Owner: registry.OwnerKV, + Cluster: r.MakeClusterSpec( + benchSpec.nodes+addlNodeCount+1, + specOptions..., + ), Timeout: timeout, NonReleaseBlocker: true, Skip: benchSpec.skip, @@ -280,7 +370,21 @@ func setupDecommissionBench( if err != nil { t.Fatal(err) } - t.L().Printf("run: %s\n", stmt) + } + } + + // Disable load-based splitting/rebalancing when attempting to simulate high + // read amplification, so replicas on a store with slow writes will not move. + if benchSpec.slowWrites { + for _, stmt := range []string{ + `SET CLUSTER SETTING kv.range_split.by_load_enabled=false`, + `SET CLUSTER SETTING kv.allocator.load_based_rebalancing=0`, + } { + t.Status(stmt) + _, err := db.ExecContext(ctx, stmt) + if err != nil { + t.Fatal(err) + } } } @@ -383,6 +487,47 @@ func uploadPerfArtifacts( } } +// setupGrafana initializes Prometheus & Grafana only if environment variables +// ROACHTEST_DECOMMISSION_GRAFANA or ROACHTEST_DECOMMISSION_GRAFANA_URL are set. +// A URL set in ROACHTEST_DECOMMISSION_GRAFANA_URL will be used to initialize +// the Grafana dashboard. +func setupGrafana( + ctx context.Context, + t test.Test, + c cluster.Cluster, + crdbNodes option.NodeListOption, + prometheusNode int, +) (cleanupFunc func()) { + grafana := os.Getenv(envDecommissionGrafana) + grafanaURL := os.Getenv(envDecommissionGrafanaURL) + if grafana == "" && grafanaURL == "" { + return func() { + // noop + } + } + + cfg := &prometheus.Config{} + cfg.Grafana.Enabled = true + cfg = cfg. + WithCluster(crdbNodes.InstallNodes()). + WithNodeExporter(crdbNodes.InstallNodes()). + WithPrometheusNode(c.Node(prometheusNode).InstallNodes()[0]) + + if grafanaURL != "" { + cfg = cfg.WithGrafanaDashboard(grafanaURL) + } + + err := c.StartGrafana(ctx, t.L(), cfg) + require.NoError(t, err) + + cleanupFunc = func() { + if err := c.StopGrafana(ctx, t.L(), t.ArtifactsDir()); err != nil { + t.L().ErrorfCtx(ctx, "Error(s) shutting down prom/grafana %s", err) + } + } + return cleanupFunc +} + // runDecommissionBench initializes a cluster with TPCC and attempts to // benchmark the decommissioning of a single node picked at random. The cluster // may or may not be running under load. @@ -396,8 +541,19 @@ func runDecommissionBench( // node1 is kept pinned (i.e. not decommissioned/restarted), and is the node // through which we run decommissions. The last node is used for the workload. pinnedNode := 1 - workloadNode := benchSpec.nodes + 1 + workloadNode := c.Spec().NodeCount crdbNodes := c.Range(pinnedNode, benchSpec.nodes) + t.L().Printf("nodes %d - %d are crdb nodes", crdbNodes[0], crdbNodes[len(crdbNodes)-1]) + + // If we have additional nodes to start (so we are upreplicating during + // decommission), they will be part of addlNodes. + var addlNodes option.NodeListOption + if c.Spec().NodeCount > benchSpec.nodes+1 { + addlNodes = c.Range(benchSpec.nodes+1, c.Spec().NodeCount-1) + t.L().Printf("nodes %d - %d are addl nodes", addlNodes[0], addlNodes[len(addlNodes)-1]) + } + + t.L().Printf("node %d is the workload node", workloadNode) maxRate := tpccMaxRate(benchSpec.warehouses) rampDuration := 3 * time.Minute @@ -408,6 +564,17 @@ func runDecommissionBench( workloadCmd := fmt.Sprintf("./workload run tpcc --warehouses=%d --max-rate=%d --duration=%s "+ "--histograms=%s/stats.json --ramp=%s --tolerate-errors {pgurl:1-%d}", maxRate, benchSpec.warehouses, testTimeout, t.PerfArtifactsDir(), rampDuration, benchSpec.nodes) + + // In the case that we want to simulate high read amplification, we use kv0 + // to run a write-heavy workload known to be difficult for compactions to keep + // pace with. + if benchSpec.slowWrites { + workloadCmd = fmt.Sprintf("./workload run kv --init --concurrency=%d --splits=1000 "+ + "--read-percent=50 --min-block-bytes=8192 --max-block-bytes=8192 --duration=%s "+ + "--histograms=%s/stats.json --ramp=%s --tolerate-errors {pgurl:1-%d}", benchSpec.nodes*64, + testTimeout, t.PerfArtifactsDir(), rampDuration, benchSpec.nodes) + } + setupDecommissionBench(ctx, t, c, benchSpec, workloadNode, pinnedNode, importCmd) workloadCtx, workloadCancel := context.WithCancel(ctx) @@ -433,6 +600,11 @@ func runDecommissionBench( ) } + // Setup Prometheus/Grafana using workload node. + allCrdbNodes := crdbNodes.Merge(addlNodes) + cleanupFunc := setupGrafana(ctx, t, c, allCrdbNodes, workloadNode) + defer cleanupFunc() + // Create a histogram registry for recording multiple decommission metrics. // Note that "decommission.*" metrics are special in that they are // long-running metrics measured by the elapsed time between each tick, @@ -466,10 +638,23 @@ func runDecommissionBench( } } + if len(addlNodes) > 0 { + h.t.Status(fmt.Sprintf("starting %d additional node(s)", len(addlNodes))) + h.c.Start(ctx, h.t.L(), option.DefaultStartOpts(), install.MakeClusterSettings(), addlNodes) + for _, addlNode := range addlNodes { + h.blockFromRandNode(addlNode) + } + + // Include an additional minute of buffer time before starting decommission. + time.Sleep(1 * time.Minute) + } + m.ExpectDeath() defer m.ResetDeaths() err := runSingleDecommission(ctx, h, pinnedNode, &targetNodeAtomic, benchSpec.snapshotRate, - benchSpec.whileDown, false /* reuse */, true /* estimateDuration */, tickByName) + benchSpec.whileDown, benchSpec.drainFirst, false /* reuse */, benchSpec.whileUpreplicating, + true /* estimateDuration */, benchSpec.slowWrites, tickByName, + ) // Include an additional minute of buffer time post-decommission to gather // workload stats. @@ -511,6 +696,8 @@ func runDecommissionBenchLong( pinnedNode := 1 workloadNode := benchSpec.nodes + 1 crdbNodes := c.Range(pinnedNode, benchSpec.nodes) + t.L().Printf("nodes %d - %d are crdb nodes", crdbNodes[0], crdbNodes[len(crdbNodes)-1]) + t.L().Printf("node %d is the workload node", workloadNode) maxRate := tpccMaxRate(benchSpec.warehouses) rampDuration := 3 * time.Minute @@ -547,6 +734,10 @@ func runDecommissionBenchLong( ) } + // Setup Prometheus/Grafana using workload node. + cleanupFunc := setupGrafana(ctx, t, c, crdbNodes, workloadNode) + defer cleanupFunc() + // Create a histogram registry for recording multiple decommission metrics. // Note that "decommission.*" metrics are special in that they are // long-running metrics measured by the elapsed time between each tick, @@ -581,7 +772,9 @@ func runDecommissionBenchLong( for tBegin := timeutil.Now(); timeutil.Since(tBegin) <= benchSpec.duration; { m.ExpectDeath() err := runSingleDecommission(ctx, h, pinnedNode, &targetNodeAtomic, benchSpec.snapshotRate, - benchSpec.whileDown, true /* reuse */, false /* estimateDuration */, tickByName) + benchSpec.whileDown, benchSpec.drainFirst, true /* reuse */, benchSpec.whileUpreplicating, + true /* estimateDuration */, benchSpec.slowWrites, tickByName, + ) m.ResetDeaths() if err != nil { return err @@ -621,7 +814,7 @@ func runSingleDecommission( pinnedNode int, targetLogicalNodeAtomic *uint32, snapshotRateMb int, - stopFirst, reuse, estimateDuration bool, + stopFirst, drainFirst, reuse, noBalanceWait, estimateDuration, slowWrites bool, tickByName func(name string), ) error { target := h.getRandNodeOtherThan(pinnedNode) @@ -647,16 +840,18 @@ func runSingleDecommission( return err } - h.t.Status("waiting for cluster balance") - if err := waitForRebalance( - ctx, h.t.L(), dbNode, float64(totalRanges)/3.0, 60, /* stableSeconds */ - ); err != nil { - return err + if !noBalanceWait { + h.t.Status("waiting for cluster balance") + if err := waitForRebalance( + ctx, h.t.L(), dbNode, float64(totalRanges)/3.0, 60, /* stableSeconds */ + ); err != nil { + return err + } } if err := dbNode.QueryRow( - "SELECT range_count, used "+ - "FROM crdb_internal.kv_store_status where node_id = $1 LIMIT 1", + "SELECT sum(range_count), sum(used) "+ + "FROM crdb_internal.kv_store_status WHERE node_id = $1 GROUP BY node_id LIMIT 1", targetLogicalNodeID, ).Scan(&rangeCount, &bytesUsed); err != nil { return err @@ -678,12 +873,40 @@ func runSingleDecommission( } } + if drainFirst { + h.t.Status(fmt.Sprintf("draining node%d", target)) + if err := h.c.RunE(ctx, h.c.Node(target), "./cockroach node drain --self --insecure"); err != nil { + return err + } + } + if stopFirst { h.t.Status(fmt.Sprintf("gracefully stopping node%d", target)) - h.stop(ctx, target) + if err := h.c.StopCockroachGracefullyOnNode(ctx, h.t.L(), target); err != nil { + return err + } + // Wait after stopping the node to distinguish the impact of the node being + // down vs decommissioning. + time.Sleep(5 * time.Minute) + } + + if slowWrites { + h.t.Status(fmt.Sprintf("limiting write bandwith to 100MiBps and awaiting "+ + "increased read amplification on node%d", target)) + if err := h.c.RunE( + ctx, h.c.Node(target), + fmt.Sprintf("sudo bash -c 'echo \"259:0 %d\" > "+ + "/sys/fs/cgroup/blkio/system.slice/cockroach.service/blkio.throttle.write_bps_device'", + 100*(1<<20))); err != nil { + return err + } + // Wait for some time after limiting write bandwidth in order to affect read amplification. + time.Sleep(5 * time.Minute) + + // Print LSM health to logs. Intentionally swallows error to avoid flakes. + _ = logLSMHealth(ctx, h.t.L(), h.c, target) } - atomic.StoreUint32(targetLogicalNodeAtomic, uint32(targetLogicalNodeID)) if estimateDuration { estimateDecommissionDuration( ctx, h.t.L(), tickByName, snapshotRateMb, bytesUsed, candidateStores, @@ -692,7 +915,7 @@ func runSingleDecommission( } h.t.Status(fmt.Sprintf("decommissioning node%d (n%d)", target, targetLogicalNodeID)) - + atomic.StoreUint32(targetLogicalNodeAtomic, uint32(targetLogicalNodeID)) tBegin := timeutil.Now() tickByName(decommissionMetric) targetLogicalNodeIDList := option.NodeListOption{targetLogicalNodeID} @@ -717,7 +940,9 @@ func runSingleDecommission( if reuse { if !stopFirst { h.t.Status(fmt.Sprintf("gracefully stopping node%d", target)) - h.stop(ctx, target) + if err := h.c.StopCockroachGracefullyOnNode(ctx, h.t.L(), target); err != nil { + return err + } } // Wipe the node and re-add to cluster with a new node ID. @@ -735,6 +960,13 @@ func runSingleDecommission( return err } + // If we don't need to wait for the cluster to rebalance, wait a short + // buffer period before the next decommission and bail out early. + if noBalanceWait { + time.Sleep(1 * time.Minute) + return nil + } + newLogicalNodeID, err := h.getLogicalNodeID(ctx, target) if err != nil { return err @@ -786,6 +1018,24 @@ func runSingleDecommission( return nil } +// logLSMHealth is a convenience method that logs the output of /debug/lsm. +func logLSMHealth(ctx context.Context, l *logger.Logger, c cluster.Cluster, target int) error { + l.Printf("LSM Health of node%d", target) + adminAddrs, err := c.InternalAdminUIAddr(ctx, l, c.Node(target)) + if err != nil { + return err + } + result, err := c.RunWithDetailsSingleNode(ctx, l, c.Node(target), + "curl", "-s", fmt.Sprintf("http://%s/debug/lsm", + adminAddrs[0])) + if err == nil { + l.Printf(result.Stdout) + } else { + l.Printf(result.Stderr) + } + return err +} + // estimateDecommissionDuration attempts to come up with a rough estimate for // the theoretical minimum decommission duration as well as the estimated // actual duration of the decommission and writes them to the log and the