diff --git a/pkg/cmd/roachtest/decommission.go b/pkg/cmd/roachtest/decommission.go index 25d570659de7..691b8e529100 100644 --- a/pkg/cmd/roachtest/decommission.go +++ b/pkg/cmd/roachtest/decommission.go @@ -36,15 +36,16 @@ func registerDecommission(r *testRegistry) { duration := time.Hour r.Add(testSpec{ - Name: fmt.Sprintf("decommission/nodes=%d/duration=%s", numNodes, duration), - Owner: OwnerKV, - Cluster: makeClusterSpec(4), + Name: fmt.Sprintf("decommission/nodes=%d/duration=%s", numNodes, duration), + Owner: OwnerKV, + MinVersion: "v20.2.0", + Cluster: makeClusterSpec(4), Run: func(ctx context.Context, t *test, c *cluster) { if local { - duration = 3 * time.Minute + duration = 5 * time.Minute t.l.Printf("running with duration=%s in local mode\n", duration) } - runDecommission(t, c, numNodes, duration) + runDecommission(ctx, t, c, numNodes, duration) }, }) } @@ -63,13 +64,16 @@ func registerDecommission(r *testRegistry) { } } +// runDecommission decommissions and wipes nodes in a cluster repeatedly, +// alternating between the node being shut down gracefully before and after the +// decommissioning operation, while some light load is running against the +// cluster (to manually verify that the qps don't dip too much). +// // TODO(tschottdorf): verify that the logs don't contain the messages // that would spam the log before #23605. I wonder if we should really // start grepping the logs. An alternative is to introduce a metric // that would have signaled this and check that instead. -func runDecommission(t *test, c *cluster, nodes int, duration time.Duration) { - ctx := context.Background() - +func runDecommission(ctx context.Context, t *test, c *cluster, nodes int, duration time.Duration) { const defaultReplicationFactor = 3 // The number of nodes we're going to cycle through. Since we're sometimes // killing the nodes and then removing them, this means having to be careful @@ -79,18 +83,19 @@ func runDecommission(t *test, c *cluster, nodes int, duration time.Duration) { // at some point. numDecom := (defaultReplicationFactor - 1) / 2 - c.Put(ctx, workload, "./workload", c.Node(nodes)) + // node1 is kept pinned (i.e. not decommissioned/restarted), and is the node + // through which we run the workload and other queries. + pinnedNode := 1 c.Put(ctx, cockroach, "./cockroach", c.All()) + c.Put(ctx, workload, "./workload", c.Node(pinnedNode)) - for i := 1; i <= numDecom; i++ { + for i := 1; i <= nodes; i++ { c.Start(ctx, t, c.Node(i), startArgs(fmt.Sprintf("-a=--attrs=node%d", i))) } + c.Run(ctx, c.Node(pinnedNode), `./workload init kv --drop`) - c.Start(ctx, t, c.Range(numDecom+1, nodes)) - c.Run(ctx, c.Node(nodes), `./workload init kv --drop`) - - waitReplicatedAwayFrom := func(downNodeID string) error { - db := c.Conn(ctx, nodes) + waitReplicatedAwayFrom := func(downNodeID int) error { + db := c.Conn(ctx, pinnedNode) defer func() { _ = db.Close() }() @@ -121,52 +126,47 @@ func runDecommission(t *test, c *cluster, nodes int, duration time.Duration) { return nil } - waitUpReplicated := func(targetNodeID string) error { - db := c.Conn(ctx, nodes) + waitUpReplicated := func(targetNode, targetNodeID int) error { + db := c.Conn(ctx, pinnedNode) defer func() { _ = db.Close() }() - for ok := false; !ok; { + var count int + for { + // Check to see that there are no ranges where the target node is + // not part of the replica set. stmtReplicaCount := fmt.Sprintf( - `SELECT count(*) = 0 FROM crdb_internal.ranges WHERE array_position(replicas, %s) IS NULL and database_name = 'kv';`, targetNodeID) - t.Status(stmtReplicaCount) - if err := db.QueryRow(stmtReplicaCount).Scan(&ok); err != nil { + `SELECT count(*) FROM crdb_internal.ranges WHERE array_position(replicas, %d) IS NULL and database_name = 'kv';`, targetNodeID) + if err := db.QueryRow(stmtReplicaCount).Scan(&count); err != nil { return err } + t.Status(fmt.Sprintf("node%d missing %d replica(s)", targetNode, count)) + if count == 0 { + break + } time.Sleep(time.Second) } return nil } - if err := waitReplicatedAwayFrom("0" /* no down node */); err != nil { + if err := waitReplicatedAwayFrom(0 /* no down node */); err != nil { t.Fatal(err) } - loadDuration := " --duration=" + duration.String() - workloads := []string{ // TODO(tschottdorf): in remote mode, the ui shows that we consistently write // at 330 qps (despite asking for 500 below). Locally we get 500qps (and a lot // more without rate limiting). Check what's up with that. - "./workload run kv --max-rate 500 --tolerate-errors" + loadDuration + " {pgurl:1-%d}", + fmt.Sprintf("./workload run kv --max-rate 500 --tolerate-errors --duration=%s {pgurl:1-%d}", duration.String(), nodes), } - run := func(stmtStr string) { - db := c.Conn(ctx, nodes) + run := func(stmt string) { + db := c.Conn(ctx, pinnedNode) defer db.Close() - stmt := fmt.Sprintf(stmtStr, "", "=") - // We are removing the EXPERIMENTAL keyword in 2.1. For compatibility - // with 2.0 clusters we still need to try with it if the - // syntax without EXPERIMENTAL fails. - // TODO(knz): Remove this in 2.2. + t.Status(stmt) _, err := db.ExecContext(ctx, stmt) - if err != nil && strings.Contains(err.Error(), "syntax error") { - stmt = fmt.Sprintf(stmtStr, "EXPERIMENTAL", "") - t.Status(stmt) - _, err = db.ExecContext(ctx, stmt) - } if err != nil { t.Fatal(err) } @@ -177,20 +177,19 @@ func runDecommission(t *test, c *cluster, nodes int, duration time.Duration) { m, ctx = errgroup.WithContext(ctx) for _, cmd := range workloads { cmd := cmd // copy is important for goroutine - - cmd = fmt.Sprintf(cmd, nodes) m.Go(func() error { - return c.RunE(ctx, c.Node(nodes), cmd) + return c.RunE(ctx, c.Node(pinnedNode), cmd) }) } m.Go(func() error { - nodeID := func(node int) (string, error) { + getNodeID := func(node int) (int, error) { dbNode := c.Conn(ctx, node) defer dbNode.Close() - var nodeID string + + var nodeID int if err := dbNode.QueryRow(`SELECT node_id FROM crdb_internal.node_runtime_info LIMIT 1`).Scan(&nodeID); err != nil { - return "", err + return 0, err } return nodeID, nil } @@ -201,21 +200,32 @@ func runDecommission(t *test, c *cluster, nodes int, duration time.Duration) { return c.RunE(ctx, c.Node(node), "./cockroach quit --insecure --host=:"+port) } - decom := func(id string) error { - port := fmt.Sprintf("{pgport:%d}", nodes) // always use last node - t.Status("decommissioning node", id) - return c.RunE(ctx, c.Node(nodes), "./cockroach node decommission --insecure --wait=all --host=:"+port+" "+id) + decom := func(id int) error { + port := fmt.Sprintf("{pgport:%d}", pinnedNode) // always use the pinned node + t.Status(fmt.Sprintf("decommissioning node %d", id)) + return c.RunE(ctx, c.Node(pinnedNode), fmt.Sprintf("./cockroach node decommission --insecure --wait=all --host=:%s %d", port, id)) } - for tBegin, whileDown, node := timeutil.Now(), true, 1; timeutil.Since(tBegin) <= duration; whileDown, node = !whileDown, (node%numDecom)+1 { + tBegin, whileDown := timeutil.Now(), true + node := nodes + for timeutil.Since(tBegin) <= duration { + // Alternate between the node being shut down gracefully before and + // after the decommissioning operation. + whileDown = !whileDown + // Cycle through the last numDecom nodes. + node = nodes - (node % numDecom) + if node == pinnedNode { + t.Fatalf("programming error: not expecting to decommission/wipe node%d", pinnedNode) + } + t.Status(fmt.Sprintf("decommissioning %d (down=%t)", node, whileDown)) - id, err := nodeID(node) + nodeID, err := getNodeID(node) if err != nil { return err } - run(fmt.Sprintf(`ALTER RANGE default %%[1]s CONFIGURE ZONE %%[2]s 'constraints: {"+node%d"}'`, node)) - if err := waitUpReplicated(id); err != nil { + run(fmt.Sprintf(`ALTER RANGE default CONFIGURE ZONE = 'constraints: {"+node%d"}'`, node)) + if err := waitUpReplicated(node, nodeID); err != nil { return err } @@ -225,13 +235,13 @@ func runDecommission(t *test, c *cluster, nodes int, duration time.Duration) { } } - run(fmt.Sprintf(`ALTER RANGE default %%[1]s CONFIGURE ZONE %%[2]s 'constraints: {"-node%d"}'`, node)) + run(fmt.Sprintf(`ALTER RANGE default CONFIGURE ZONE = 'constraints: {"-node%d"}'`, node)) - if err := decom(id); err != nil { + if err := decom(nodeID); err != nil { return err } - if err := waitReplicatedAwayFrom(id); err != nil { + if err := waitReplicatedAwayFrom(nodeID); err != nil { return err } @@ -241,14 +251,15 @@ func runDecommission(t *test, c *cluster, nodes int, duration time.Duration) { } } + // Wipe the node and re-add to cluster with a new node ID. if err := c.RunE(ctx, c.Node(node), "rm -rf {store-dir}"); err != nil { return err } - db := c.Conn(ctx, 1) + db := c.Conn(ctx, pinnedNode) defer db.Close() - sArgs := startArgs(fmt.Sprintf("-a=--join %s --attrs=node%d", c.InternalAddr(ctx, c.Node(nodes))[0], node)) + sArgs := startArgs(fmt.Sprintf("-a=--join %s --attrs=node%d", c.InternalAddr(ctx, c.Node(pinnedNode))[0], node)) if err := c.StartE(ctx, c.Node(node), sArgs); err != nil { return err }