From 56658228c7b04f1bf64f14dd78ea3133cf4ba7da Mon Sep 17 00:00:00 2001 From: Aayush Shah Date: Tue, 10 Aug 2021 18:47:11 -0400 Subject: [PATCH] roachtest: add a drain+decommission roachtest This commit adds a roachtest that's meant to be a regression test against the hazard addressed by the first commit in this PR. This roachtest is meant to ensure that nodes that are marked "draining" are considered "live" by the allocator for when it makes the determination of whether a range can achieve quorum. Release note: None --- pkg/cmd/roachtest/tests/decommission.go | 120 +++++++++++++++++++++++- 1 file changed, 119 insertions(+), 1 deletion(-) diff --git a/pkg/cmd/roachtest/tests/decommission.go b/pkg/cmd/roachtest/tests/decommission.go index 0bcd0e151c04..53f0b6afea78 100644 --- a/pkg/cmd/roachtest/tests/decommission.go +++ b/pkg/cmd/roachtest/tests/decommission.go @@ -39,7 +39,7 @@ func registerDecommission(r registry.Registry) { r.Add(registry.TestSpec{ Name: fmt.Sprintf("decommission/nodes=%d/duration=%s", numNodes, duration), Owner: registry.OwnerKV, - Cluster: r.MakeClusterSpec(4), + Cluster: r.MakeClusterSpec(numNodes), Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { if c.IsLocal() { duration = 5 * time.Minute @@ -49,6 +49,19 @@ func registerDecommission(r registry.Registry) { }, }) } + { + numNodes := 6 + duration := 30 * time.Minute + + r.Add(registry.TestSpec{ + Name: fmt.Sprintf("drain-and-decommission/nodes=%d", numNodes), + Owner: registry.OwnerKV, + Cluster: r.MakeClusterSpec(numNodes), + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { + runDrainAndDecommission(ctx, t, c, numNodes, duration) + }, + }) + } { numNodes := 6 r.Add(registry.TestSpec{ @@ -74,6 +87,111 @@ func registerDecommission(r registry.Registry) { } } +// runDrainAndDecommission marks 3 nodes in the test cluster as "draining" and +// then attempts to decommission a fourth node from the cluster. This test is +// meant to ensure that, in the allocator, we consider "draining" nodes as +// "live" for the purposes of determining whether a range can achieve quorum. +// Note that, if "draining" nodes were not considered live for this purpose, +// decommissioning would stall forever since the allocator would incorrectly +// think that at least a handful of ranges (that need to be moved off the +// decommissioning node) are unavailable. +func runDrainAndDecommission( + ctx context.Context, t test.Test, c cluster.Cluster, nodes int, duration time.Duration, +) { + const defaultReplicationFactor = 5 + if defaultReplicationFactor > nodes { + t.Fatal("improper configuration: replication factor greater than number of nodes in the test") + } + pinnedNode := 1 + c.Put(ctx, t.Cockroach(), "./cockroach", c.All()) + for i := 1; i <= nodes; i++ { + c.Start(ctx, c.Node(i)) + } + c.Run(ctx, c.Node(pinnedNode), `./cockroach workload init kv --drop --splits 1000`) + + run := func(stmt string) { + db := c.Conn(ctx, pinnedNode) + defer db.Close() + + t.Status(stmt) + _, err := db.ExecContext(ctx, stmt) + if err != nil { + t.Fatal(err) + } + t.L().Printf("run: %s\n", stmt) + } + + run(fmt.Sprintf(`ALTER RANGE default CONFIGURE ZONE USING num_replicas=%d`, defaultReplicationFactor)) + run(fmt.Sprintf(`ALTER DATABASE system CONFIGURE ZONE USING num_replicas=%d`, defaultReplicationFactor)) + + // Speed up the decommissioning. + run(fmt.Sprintf(`SET CLUSTER SETTING kv.snapshot_rebalance.max_rate='2GiB'`)) + run(fmt.Sprintf(`SET CLUSTER SETTING kv.snapshot_recovery.max_rate='2GiB'`)) + + t.Status(fmt.Sprintf("waiting for initial up-replication")) + db := c.Conn(ctx, pinnedNode) + defer func() { + _ = db.Close() + }() + for { + fullReplicated := false + if err := db.QueryRow( + // Check if all ranges are fully replicated. + "SELECT min(array_length(replicas, 1)) >= $1 FROM crdb_internal.ranges", + defaultReplicationFactor, + ).Scan(&fullReplicated); err != nil { + t.Fatal(err) + } + if fullReplicated { + break + } + time.Sleep(time.Second) + } + + var m *errgroup.Group // see comment in version.go + m, ctx = errgroup.WithContext(ctx) + m.Go( + func() error { + return c.RunE(ctx, c.Node(pinnedNode), + fmt.Sprintf("./cockroach workload run kv --max-rate 500 --tolerate-errors --duration=%s {pgurl:1-%d}", + duration.String(), 2, + ), + ) + }, + ) + + // Let the workload run for a small amount of time. + time.Sleep(1 * time.Minute) + + // Drain the last 3 nodes from the cluster. + for nodeID := nodes - 2; nodeID <= nodes; nodeID++ { + id := nodeID + m.Go(func() error { + drain := func(id int) error { + t.Status(fmt.Sprintf("draining node %d", id)) + return c.RunL(ctx, t.L(), c.Node(id), fmt.Sprintf("./cockroach node drain --insecure")) + } + return drain(id) + }) + } + // Sleep for long enough that all the other nodes in the cluster learn about + // the draining status of the two nodes we just drained. + time.Sleep(30 * time.Second) + + // Decommission the fourth-to-last node from the cluster. + id := nodes - 3 + m.Go(func() error { + decom := func(id int) error { + t.Status(fmt.Sprintf("decommissioning node %d", id)) + return c.RunL(ctx, t.L(), c.Node(id), fmt.Sprintf("./cockroach node decommission --self --insecure")) + } + return decom(id) + }) + if err := m.Wait(); err != nil { + t.Fatal(err) + } +} + // runDecommission decommissions and wipes nodes in a cluster repeatedly, // alternating between the node being shut down gracefully before and after the // decommissioning operation, while some light load is running against the