From 89dd056d5f42e4b794e99d21d2239f8e475331b9 Mon Sep 17 00:00:00 2001 From: Alex Sarkesian Date: Wed, 25 May 2022 19:36:26 -0400 Subject: [PATCH] roachtest: benchmark repeated decommissions over time This adds new long-running configurations to the `decommissionBench` roachtest, so that repeated decommissions over time in a cluster can be benchmarked individually. With these long-running tests, we've seen the following averages: ``` decommissionBench/nodes=4/cpu=16/warehouses=1000/duration=1h0m0s: avg decommission: 16m41s, avg upreplication: 33m17s decommissionBench/nodes=4/cpu=4/warehouses=100: avg decommission: 2m44s, avg upreplication: 4m3s ``` Release note: None --- pkg/cmd/roachtest/tests/allocator.go | 8 +- pkg/cmd/roachtest/tests/decommission.go | 61 ++- pkg/cmd/roachtest/tests/decommissionbench.go | 384 ++++++++++++++++--- 3 files changed, 369 insertions(+), 84 deletions(-) diff --git a/pkg/cmd/roachtest/tests/allocator.go b/pkg/cmd/roachtest/tests/allocator.go index bc5573ae2401..5dc0fca8954d 100644 --- a/pkg/cmd/roachtest/tests/allocator.go +++ b/pkg/cmd/roachtest/tests/allocator.go @@ -117,7 +117,7 @@ func registerAllocator(r registry.Registry) { m = c.NewMonitor(ctx, clusNodes) m.Go(func(ctx context.Context) error { t.Status("waiting for reblance") - err := waitForRebalance(ctx, t.L(), db, maxStdDev) + err := waitForRebalance(ctx, t.L(), db, maxStdDev, allocatorStableSeconds) if err != nil { return err } @@ -281,14 +281,14 @@ func allocatorStats(db *gosql.DB) (s replicationStats, err error) { } // waitForRebalance waits until there's been no recent range adds, removes, and -// splits. We wait until the cluster is balanced or until `StableInterval` +// splits. We wait until the cluster is balanced or until `stableSeconds` // elapses, whichever comes first. Then, it prints stats about the rebalancing // process. If the replica count appears unbalanced, an error is returned. // // This method is crude but necessary. If we were to wait until range counts // were just about even, we'd miss potential post-rebalance thrashing. func waitForRebalance( - ctx context.Context, l *logger.Logger, db *gosql.DB, maxStdDev float64, + ctx context.Context, l *logger.Logger, db *gosql.DB, maxStdDev float64, stableSeconds int64, ) error { const statsInterval = 2 * time.Second @@ -307,7 +307,7 @@ func waitForRebalance( } l.Printf("%v\n", stats) - if allocatorStableSeconds <= stats.SecondsSinceLastEvent { + if stableSeconds <= stats.SecondsSinceLastEvent { l.Printf("replica count stddev = %f, max allowed stddev = %f\n", stats.ReplicaCountStdDev, maxStdDev) if stats.ReplicaCountStdDev > maxStdDev { _ = printRebalanceStats(l, db) diff --git a/pkg/cmd/roachtest/tests/decommission.go b/pkg/cmd/roachtest/tests/decommission.go index f7a2de59e896..3d12f16375ee 100644 --- a/pkg/cmd/roachtest/tests/decommission.go +++ b/pkg/cmd/roachtest/tests/decommission.go @@ -280,17 +280,6 @@ func runDecommission( } m.Go(func() error { - getNodeID := func(node int) (int, error) { - dbNode := c.Conn(ctx, t.L(), node) - defer dbNode.Close() - - var nodeID int - if err := dbNode.QueryRow(`SELECT node_id FROM crdb_internal.node_runtime_info LIMIT 1`).Scan(&nodeID); err != nil { - return 0, err - } - return nodeID, nil - } - tBegin, whileDown := timeutil.Now(), true node := nodes for timeutil.Since(tBegin) <= duration { @@ -304,7 +293,7 @@ func runDecommission( } t.Status(fmt.Sprintf("decommissioning %d (down=%t)", node, whileDown)) - nodeID, err := getNodeID(node) + nodeID, err := h.getLogicalNodeID(ctx, node) if err != nil { return err } @@ -1169,6 +1158,20 @@ func (h *decommTestHelper) stop(ctx context.Context, node int) { h.c.Stop(ctx, h.t.L(), opts, h.c.Node(node)) } +// getLogicalNodeID connects to the nodeIdx-th node in the roachprod cluster to +// obtain the logical CockroachDB nodeID of this node. This is because nodes can +// change ID as they are continuously decommissioned, wiped, and started anew. +func (h *decommTestHelper) getLogicalNodeID(ctx context.Context, nodeIdx int) (int, error) { + dbNode := h.c.Conn(ctx, h.t.L(), nodeIdx) + defer dbNode.Close() + + var nodeID int + if err := dbNode.QueryRow(`SELECT node_id FROM crdb_internal.node_runtime_info LIMIT 1`).Scan(&nodeID); err != nil { + return 0, err + } + return nodeID, nil +} + // decommission decommissions the given targetNodes, running the process // through the specified runNode. func (h *decommTestHelper) decommission( @@ -1207,17 +1210,21 @@ func (h *decommTestHelper) recommission( // checkDecommissioned validates that a node has successfully decommissioned // and has updated its state in gossip. -func (h *decommTestHelper) checkDecommissioned(ctx context.Context, downNodeID, runNode int) error { +func (h *decommTestHelper) checkDecommissioned( + ctx context.Context, targetLogicalNodeID, runNode int, +) error { db := h.c.Conn(ctx, h.t.L(), runNode) defer db.Close() var membership string - if err := db.QueryRow("SELECT membership FROM crdb_internal.kv_node_liveness WHERE node_id = $1", - downNodeID).Scan(&membership); err != nil { + if err := db.QueryRow( + "SELECT membership FROM crdb_internal.kv_node_liveness WHERE node_id = $1", + targetLogicalNodeID, + ).Scan(&membership); err != nil { return err } if membership != "decommissioned" { - return errors.Newf("node %d not decommissioned", downNodeID) + return errors.Newf("node %d not decommissioned", targetLogicalNodeID) } return nil @@ -1226,7 +1233,7 @@ func (h *decommTestHelper) checkDecommissioned(ctx context.Context, downNodeID, // waitReplicatedAwayFrom checks each second until there are no ranges present // on a node and all ranges are fully replicated. func (h *decommTestHelper) waitReplicatedAwayFrom( - ctx context.Context, downNodeID, runNode int, + ctx context.Context, targetLogicalNodeID, runNode int, ) error { db := h.c.Conn(ctx, h.t.L(), runNode) defer func() { @@ -1234,11 +1241,17 @@ func (h *decommTestHelper) waitReplicatedAwayFrom( }() for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + var count int if err := db.QueryRow( // Check if the down node has any replicas. "SELECT count(*) FROM crdb_internal.ranges WHERE array_position(replicas, $1) IS NOT NULL", - downNodeID, + targetLogicalNodeID, ).Scan(&count); err != nil { return err } @@ -1264,7 +1277,7 @@ func (h *decommTestHelper) waitReplicatedAwayFrom( // the roachprod node and targetNodeID representing the logical nodeID within // the cluster. func (h *decommTestHelper) waitUpReplicated( - ctx context.Context, targetNodeID, targetNode int, database string, + ctx context.Context, targetLogicalNodeID, targetNode int, database string, ) error { db := h.c.Conn(ctx, h.t.L(), targetNode) defer func() { @@ -1273,17 +1286,23 @@ func (h *decommTestHelper) waitUpReplicated( var count int for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + // Check to see that there are no ranges where the target node is // not part of the replica set. stmtReplicaCount := fmt.Sprintf( "SELECT count(*) FROM crdb_internal.ranges "+ "WHERE array_position(replicas, %d) IS NULL and database_name = '%s';", - targetNodeID, database, + targetLogicalNodeID, database, ) if err := db.QueryRow(stmtReplicaCount).Scan(&count); err != nil { return err } - h.t.L().Printf("node%d (n%d) awaiting %d replica(s)", targetNode, targetNodeID, count) + h.t.L().Printf("node%d (n%d) awaiting %d replica(s)", targetNode, targetLogicalNodeID, count) if count == 0 { break } diff --git a/pkg/cmd/roachtest/tests/decommissionbench.go b/pkg/cmd/roachtest/tests/decommissionbench.go index 37447771c488..09dd0428b88e 100644 --- a/pkg/cmd/roachtest/tests/decommissionbench.go +++ b/pkg/cmd/roachtest/tests/decommissionbench.go @@ -11,7 +11,9 @@ package tests import ( + "bytes" "context" + "encoding/json" "fmt" "path/filepath" "strings" @@ -25,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachprod/install" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/workload/histogram" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -40,6 +43,7 @@ type decommissionBenchSpec struct { load bool admissionControl bool snapshotRate int + duration time.Duration // When true, the test will attempt to stop the node prior to decommission. whileDown bool @@ -61,12 +65,28 @@ func registerDecommissionBench(r registry.Registry) { load: true, admissionControl: true, }, + { + nodes: 4, + cpus: 4, + warehouses: 100, + load: true, + admissionControl: true, + duration: 30 * time.Minute, + }, + { + nodes: 4, + cpus: 16, + warehouses: 1000, + load: true, + admissionControl: true, + }, { nodes: 4, cpus: 16, warehouses: 1000, load: true, admissionControl: true, + duration: 1 * time.Hour, }, { nodes: 4, @@ -98,7 +118,8 @@ func registerDecommissionBench(r registry.Registry) { } } -// registerDecommissionBenchSpec adds a test using the specified configuration to the registry. +// registerDecommissionBenchSpec adds a test using the specified configuration +// to the registry. func registerDecommissionBenchSpec(r registry.Registry, benchSpec decommissionBenchSpec) { timeout := defaultTimeout if benchSpec.timeout != time.Duration(0) { @@ -107,7 +128,8 @@ func registerDecommissionBenchSpec(r registry.Registry, benchSpec decommissionBe extraNameParts := []string{""} if benchSpec.snapshotRate != 0 { - extraNameParts = append(extraNameParts, fmt.Sprintf("snapshotRate=%dmb", benchSpec.snapshotRate)) + extraNameParts = append(extraNameParts, + fmt.Sprintf("snapshotRate=%dmb", benchSpec.snapshotRate)) } if benchSpec.whileDown { @@ -122,6 +144,11 @@ func registerDecommissionBenchSpec(r registry.Registry, benchSpec decommissionBe extraNameParts = append(extraNameParts, "no-admission") } + if benchSpec.duration > 0 { + timeout = benchSpec.duration * 3 + extraNameParts = append(extraNameParts, fmt.Sprintf("duration=%s", benchSpec.duration)) + } + extraName := strings.Join(extraNameParts, "/") // TODO(sarkesian): add a configuration that tests decommission of a node @@ -138,7 +165,11 @@ func registerDecommissionBenchSpec(r registry.Registry, benchSpec decommissionBe NonReleaseBlocker: true, Skip: benchSpec.skip, Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { - runDecommissionBench(ctx, t, c, benchSpec, timeout) + if benchSpec.duration > 0 { + runDecommissionBenchLong(ctx, t, c, benchSpec, timeout) + } else { + runDecommissionBench(ctx, t, c, benchSpec, timeout) + } }, }) } @@ -150,21 +181,16 @@ type decommBenchTicker struct { post func() } -// runDecommissionBench initializes a cluster with TPCC and attempts to -// benchmark the decommissioning of a single node picked at random. The cluster -// may or may not be running under load. -func runDecommissionBench( +// setupDecommissionBench performs the initial cluster setup needed prior to +// running a workload and benchmarking decommissioning. +func setupDecommissionBench( ctx context.Context, t test.Test, c cluster.Cluster, benchSpec decommissionBenchSpec, - testTimeout time.Duration, + workloadNode, pinnedNode int, + importCmd string, ) { - // node1 is kept pinned (i.e. not decommissioned/restarted), and is the node - // through which we run decommissions. The last node is used for the workload. - pinnedNode := 1 - workloadNode := benchSpec.nodes + 1 - crdbNodes := c.Range(pinnedNode, benchSpec.nodes) c.Put(ctx, t.Cockroach(), "./cockroach", c.All()) c.Put(ctx, t.DeprecatedWorkload(), "./workload", c.Node(workloadNode)) for i := 1; i <= benchSpec.nodes; i++ { @@ -174,14 +200,6 @@ func runDecommissionBench( c.Start(ctx, t.L(), startOpts, install.MakeClusterSettings(), c.Node(i)) } - maxRate := tpccMaxRate(benchSpec.warehouses) - rampDuration := 3 * time.Minute - rampStarted := make(chan struct{}, 1) - importCmd := fmt.Sprintf(`./cockroach workload fixtures import tpcc --warehouses=%d`, - benchSpec.warehouses) - workloadCmd := fmt.Sprintf("./workload run tpcc --warehouses=%d --max-rate=%d --duration=%s "+ - "--histograms=%s/stats.json --ramp=%s --tolerate-errors {pgurl:1-%d}", maxRate, benchSpec.warehouses, - testTimeout, t.PerfArtifactsDir(), rampDuration, benchSpec.nodes) t.Status(fmt.Sprintf("initializing cluster with %d warehouses", benchSpec.warehouses)) c.Run(ctx, c.Node(pinnedNode), importCmd) @@ -210,6 +228,90 @@ func runDecommissionBench( err := WaitFor3XReplication(ctx, t, db) require.NoError(t, err) } +} + +// uploadPerfArtifacts puts the contents of perfBuf onto the pinned node so +// that the results will be picked up by Roachperf. If there is a workload +// running, it will also get the perf artifacts from the workload node and +// concatenate them with the decommission perf artifacts so that the effects +// of the decommission on foreground traffic can also be visualized. +func uploadPerfArtifacts( + ctx context.Context, + t test.Test, + c cluster.Cluster, + benchSpec decommissionBenchSpec, + pinnedNode, workloadNode int, + perfBuf *bytes.Buffer, +) { + // Store the perf artifacts on the pinned node so that the test + // runner copies it into an appropriate directory path. + dest := filepath.Join(t.PerfArtifactsDir(), "stats.json") + if err := c.RunE(ctx, c.Node(pinnedNode), "mkdir -p "+filepath.Dir(dest)); err != nil { + t.L().Errorf("failed to create perf dir: %+v", err) + } + if err := c.PutString(ctx, perfBuf.String(), dest, 0755, c.Node(pinnedNode)); err != nil { + t.L().Errorf("failed to upload perf artifacts to node: %s", err.Error()) + } + + // Get the workload perf artifacts and move them to the pinned node, so that + // they can be used to display the workload operation rates during decommission. + if benchSpec.load { + workloadStatsSrc := filepath.Join(t.PerfArtifactsDir(), "stats.json") + localWorkloadStatsPath := filepath.Join(t.ArtifactsDir(), "workload_stats.json") + workloadStatsDest := filepath.Join(t.PerfArtifactsDir(), "workload_stats.json") + if err := c.Get( + ctx, t.L(), workloadStatsSrc, localWorkloadStatsPath, c.Node(workloadNode), + ); err != nil { + t.L().Errorf( + "failed to download workload perf artifacts from workload node: %s", err.Error(), + ) + } + + if err := c.PutE(ctx, t.L(), localWorkloadStatsPath, workloadStatsDest, + c.Node(pinnedNode)); err != nil { + t.L().Errorf("failed to upload workload perf artifacts to node: %s", err.Error()) + } + + if err := c.RunE(ctx, c.Node(pinnedNode), + fmt.Sprintf("cat %s >> %s", workloadStatsDest, dest)); err != nil { + t.L().Errorf("failed to concatenate workload perf artifacts with "+ + "decommission perf artifacts: %s", err.Error()) + } + + if err := c.RunE( + ctx, c.Node(pinnedNode), fmt.Sprintf("rm %s", workloadStatsDest), + ); err != nil { + t.L().Errorf("failed to cleanup workload perf artifacts: %s", err.Error()) + } + } +} + +// runDecommissionBench initializes a cluster with TPCC and attempts to +// benchmark the decommissioning of a single node picked at random. The cluster +// may or may not be running under load. +func runDecommissionBench( + ctx context.Context, + t test.Test, + c cluster.Cluster, + benchSpec decommissionBenchSpec, + testTimeout time.Duration, +) { + // node1 is kept pinned (i.e. not decommissioned/restarted), and is the node + // through which we run decommissions. The last node is used for the workload. + pinnedNode := 1 + workloadNode := benchSpec.nodes + 1 + crdbNodes := c.Range(pinnedNode, benchSpec.nodes) + + maxRate := tpccMaxRate(benchSpec.warehouses) + rampDuration := 3 * time.Minute + rampStarted := make(chan struct{}, 1) + importCmd := fmt.Sprintf( + `./cockroach workload fixtures import tpcc --warehouses=%d`, benchSpec.warehouses, + ) + workloadCmd := fmt.Sprintf("./workload run tpcc --warehouses=%d --max-rate=%d --duration=%s "+ + "--histograms=%s/stats.json --ramp=%s --tolerate-errors {pgurl:1-%d}", maxRate, benchSpec.warehouses, + testTimeout, t.PerfArtifactsDir(), rampDuration, benchSpec.nodes) + setupDecommissionBench(ctx, t, c, benchSpec, workloadNode, pinnedNode, importCmd) workloadCtx, workloadCancel := context.WithCancel(ctx) m := c.NewMonitor(workloadCtx, crdbNodes) @@ -263,50 +365,146 @@ func runDecommissionBench( m.ExpectDeath() defer m.ResetDeaths() - return runSingleDecommission(ctx, h, pinnedNode, benchSpec.whileDown, false, /* reuse */ - recorder, nil /* upreplicateTicker */) + return runSingleDecommission(ctx, h, pinnedNode, benchSpec.whileDown, + false /* reuse */, recorder, nil /* upreplicateTicker */) }) if err := m.WaitE(); err != nil { t.Fatal(err) } - // Store the perf artifacts on the pinned node so that the test - // runner copies it into an appropriate directory path. - dest := filepath.Join(t.PerfArtifactsDir(), "stats.json") - if err := c.RunE(ctx, c.Node(pinnedNode), "mkdir -p "+filepath.Dir(dest)); err != nil { - t.L().Errorf("failed to create perf dir: %+v", err) - } - if err := c.PutString(ctx, perfBuf.String(), dest, 0755, c.Node(pinnedNode)); err != nil { - t.L().Errorf("failed to upload perf artifacts to node: %s", err.Error()) - } + uploadPerfArtifacts(ctx, t, c, benchSpec, pinnedNode, workloadNode, perfBuf) +} + +// runDecommissionBenchLong initializes a cluster with TPCC and attempts to +// benchmark the decommissioning of nodes picked at random before subsequently +// wiping them and re-adding them to the cluster to continually execute the +// decommissioning process over the runtime of the test. The cluster may or may +// not be running under load. +func runDecommissionBenchLong( + ctx context.Context, + t test.Test, + c cluster.Cluster, + benchSpec decommissionBenchSpec, + testTimeout time.Duration, +) { + // node1 is kept pinned (i.e. not decommissioned/restarted), and is the node + // through which we run decommissions. The last node is used for the workload. + pinnedNode := 1 + workloadNode := benchSpec.nodes + 1 + crdbNodes := c.Range(pinnedNode, benchSpec.nodes) + + maxRate := tpccMaxRate(benchSpec.warehouses) + rampDuration := 3 * time.Minute + rampStarted := make(chan struct{}, 1) + importCmd := fmt.Sprintf( + `./cockroach workload fixtures import tpcc --warehouses=%d`, benchSpec.warehouses, + ) + workloadCmd := fmt.Sprintf("./workload run tpcc --warehouses=%d --max-rate=%d --duration=%s "+ + "--histograms=%s/stats.json --ramp=%s --tolerate-errors {pgurl:1-%d}", maxRate, benchSpec.warehouses, + testTimeout, t.PerfArtifactsDir(), rampDuration, benchSpec.nodes) + + setupDecommissionBench(ctx, t, c, benchSpec, workloadNode, pinnedNode, importCmd) + + workloadCtx, workloadCancel := context.WithCancel(ctx) + m := c.NewMonitor(workloadCtx, crdbNodes) - // Get the workload perf artifacts and move them to the pinned node, so that - // they can be used to display the workload operation rates during decommission. if benchSpec.load { - workloadStatsSrc := filepath.Join(t.PerfArtifactsDir(), "stats.json") - localWorkloadStatsPath := filepath.Join(t.ArtifactsDir(), "workload_stats.json") - workloadStatsDest := filepath.Join(t.PerfArtifactsDir(), "workload_stats.json") - if err := c.Get(ctx, t.L(), workloadStatsSrc, localWorkloadStatsPath, c.Node(workloadNode)); err != nil { - t.L().Errorf("failed to download workload perf artifacts from workload node: %s", err.Error()) - } + m.Go( + func(ctx context.Context) error { + rampStarted <- struct{}{} - if err := c.PutE(ctx, t.L(), localWorkloadStatsPath, workloadStatsDest, - c.Node(pinnedNode)); err != nil { - t.L().Errorf("failed to upload workload perf artifacts to node: %s", err.Error()) + // Run workload indefinitely, to be later killed by context + // cancellation once decommission has completed. + err := c.RunE(ctx, c.Node(workloadNode), workloadCmd) + if errors.Is(ctx.Err(), context.Canceled) { + // Workload intentionally cancelled via context, so don't return error. + return nil + } + if err != nil { + t.L().Printf("workload error: %s", err) + } + return err + }, + ) + } + + // Create a histogram registry for recording multiple decommission metrics, + // following the "bulk job" form of measuring performance. + // See runDecommissionBench for more explanation. + reg := histogram.NewRegistry( + defaultTimeout, + histogram.MockWorkloadName, + ) + + perfBuf := bytes.NewBuffer([]byte{}) + jsonEnc := json.NewEncoder(perfBuf) + + // Initialize operation-specific metric ticks. + opNames := []string{"decommission", "decommission:upreplicate"} + opTickByName := make(map[string]func()) + + makeOpTick := func(reg *histogram.Registry, jsonEnc *json.Encoder, opName string) func() { + return func() { + reg.Tick(func(tick histogram.Tick) { + if tick.Name == opName { + _ = jsonEnc.Encode(tick.Snapshot()) + } + }) } + } - if err := c.RunE(ctx, c.Node(pinnedNode), - fmt.Sprintf("cat %s >> %s", workloadStatsDest, dest)); err != nil { - t.L().Errorf("failed to concatenate workload perf artifacts with "+ - "decommission perf artifacts: %s", err.Error()) + for _, opName := range opNames { + reg.GetHandle().Get(opName) + opTickByName[opName] = makeOpTick(reg, jsonEnc, opName) + } + + decommRecorder := &decommBenchTicker{ + pre: opTickByName["decommission"], + post: opTickByName["decommission"], + } + upreplicateRecorder := &decommBenchTicker{ + pre: opTickByName["decommission:upreplicate"], + post: opTickByName["decommission:upreplicate"], + } + + m.Go(func(ctx context.Context) error { + defer workloadCancel() + + h := newDecommTestHelper(t, c) + h.blockFromRandNode(workloadNode) + + // If we are running a workload, wait until it has started and completed its + // ramp time before initiating a decommission. + if benchSpec.load { + <-rampStarted + t.Status("Waiting for workload to ramp up...") + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(rampDuration): + // Workload ramp-up complete. + } } - if err := c.RunE(ctx, c.Node(pinnedNode), - fmt.Sprintf("rm %s", workloadStatsDest)); err != nil { - t.L().Errorf("failed to cleanup workload perf artifacts: %s", err.Error()) + for tBegin := timeutil.Now(); timeutil.Since(tBegin) <= benchSpec.duration; { + m.ExpectDeath() + err := runSingleDecommission(ctx, h, pinnedNode, benchSpec.whileDown, + true /* reuse */, decommRecorder, upreplicateRecorder) + m.ResetDeaths() + if err != nil { + return err + } } + + return nil + }) + + if err := m.WaitE(); err != nil { + t.Fatal(err) } + + uploadPerfArtifacts(ctx, t, c, benchSpec, pinnedNode, workloadNode, perfBuf) } // runSingleDecommission picks a random node and attempts to decommission that @@ -321,7 +519,11 @@ func runSingleDecommission( decommTicker, upreplicateTicker *decommBenchTicker, ) error { target := h.getRandNodeOtherThan(pinnedNode) - h.t.Status(fmt.Sprintf("targeting node%d (n%d) for decommission", target, target)) + targetLogicalNodeID, err := h.getLogicalNodeID(ctx, target) + if err != nil { + return err + } + h.t.Status(fmt.Sprintf("targeting node%d (n%d) for decommission", target, targetLogicalNodeID)) // TODO(sarkesian): Consider adding a future test for decommissions that get // stuck with replicas in purgatory, by pinning them to a node. @@ -339,13 +541,16 @@ func runSingleDecommission( } h.t.Status("waiting for cluster balance") - if err := waitForRebalance(ctx, h.t.L(), dbNode, float64(totalRanges)/3.0); err != nil { + if err := waitForRebalance( + ctx, h.t.L(), dbNode, float64(totalRanges)/3.0, 60, /* stableSeconds */ + ); err != nil { return err } if err := dbNode.QueryRow( `SELECT sum(range_count), sum(used) FROM crdb_internal.kv_store_status where node_id = $1`, - target).Scan(&rangeCount, &bytesUsed); err != nil { + targetLogicalNodeID, + ).Scan(&rangeCount, &bytesUsed); err != nil { return err } } @@ -355,23 +560,26 @@ func runSingleDecommission( h.stop(ctx, target) } - h.t.Status(fmt.Sprintf("decommissioning node%d (n%d)", target, target)) + h.t.Status(fmt.Sprintf("decommissioning node%d (n%d)", target, targetLogicalNodeID)) + targetLogicalNodeIDList := option.NodeListOption{targetLogicalNodeID} tBegin := timeutil.Now() decommTicker.pre() - if _, err := h.decommission(ctx, h.c.Node(target), pinnedNode, "--wait=all"); err != nil { + if _, err := h.decommission( + ctx, targetLogicalNodeIDList, pinnedNode, "--wait=all", + ); err != nil { return err } - if err := h.waitReplicatedAwayFrom(ctx, target, pinnedNode); err != nil { + if err := h.waitReplicatedAwayFrom(ctx, targetLogicalNodeID, pinnedNode); err != nil { return err } - if err := h.checkDecommissioned(ctx, target, pinnedNode); err != nil { + if err := h.checkDecommissioned(ctx, targetLogicalNodeID, pinnedNode); err != nil { return err } decommTicker.post() elapsed := timeutil.Since(tBegin) h.t.Status(fmt.Sprintf("decommissioned node%d (n%d) in %s (ranges: %d, size: %s)", - target, target, elapsed, rangeCount, humanizeutil.IBytes(bytesUsed))) + target, targetLogicalNodeID, elapsed, rangeCount, humanizeutil.IBytes(bytesUsed))) if reuse { if !stopFirst { @@ -379,7 +587,65 @@ func runSingleDecommission( h.stop(ctx, target) } - // TODO(sarkesian): Wipe the node and re-add to cluster. + // Wipe the node and re-add to cluster with a new node ID. + if err := h.c.RunE(ctx, h.c.Node(target), "rm -rf {store-dir}"); err != nil { + return err + } + + startOpts := option.DefaultStartOpts() + startOpts.RoachprodOpts.ExtraArgs = append( + startOpts.RoachprodOpts.ExtraArgs, fmt.Sprintf("--attrs=node%d", target), + ) + if err := h.c.StartE( + ctx, h.t.L(), startOpts, install.MakeClusterSettings(), h.c.Node(target), + ); err != nil { + return err + } + + newLogicalNodeID, err := h.getLogicalNodeID(ctx, target) + if err != nil { + return err + } + + upreplicateTicker.pre() + h.t.Status("waiting for replica counts to balance across nodes") + { + dbNode := h.c.Conn(ctx, h.t.L(), pinnedNode) + defer dbNode.Close() + + for { + var membership string + select { + case <-ctx.Done(): + return ctx.Err() + default: + if err := dbNode.QueryRow( + "SELECT membership FROM crdb_internal.gossip_liveness WHERE node_id = $1", + newLogicalNodeID, + ).Scan(&membership); err != nil { + return err + } + } + + if membership == "active" { + break + } + } + + var totalRanges int64 + if err := dbNode.QueryRow( + `SELECT count(*) FROM crdb_internal.ranges_no_leases`, + ).Scan(&totalRanges); err != nil { + return err + } + + if err := waitForRebalance( + ctx, h.t.L(), dbNode, float64(totalRanges)/3.0, 60, /* stableSeconds */ + ); err != nil { + return err + } + } + upreplicateTicker.post() } return nil