Skip to content

Commit

Permalink
roachtest: add a drain+decommission roachtest
Browse files Browse the repository at this point in the history
This commit adds a roachtest that's meant to be a regression test against the
hazard addressed by the first commit in this PR.

This roachtest is meant to ensure that nodes that are marked "draining" are
considered "live" by the allocator for when it makes the determination of
whether a range can achieve quorum.

Release note: None
  • Loading branch information
aayushshah15 committed Sep 2, 2021
1 parent 2690ec2 commit ff2ae38
Showing 1 changed file with 118 additions and 1 deletion.
119 changes: 118 additions & 1 deletion pkg/cmd/roachtest/tests/decommission.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func registerDecommission(r registry.Registry) {
r.Add(registry.TestSpec{
Name: fmt.Sprintf("decommission/nodes=%d/duration=%s", numNodes, duration),
Owner: registry.OwnerKV,
Cluster: r.MakeClusterSpec(4),
Cluster: r.MakeClusterSpec(numNodes),
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
if c.IsLocal() {
duration = 5 * time.Minute
Expand All @@ -49,6 +49,18 @@ func registerDecommission(r registry.Registry) {
},
})
}
{
numNodes := 9
duration := 30 * time.Minute
r.Add(registry.TestSpec{
Name: fmt.Sprintf("drain-and-decommission/nodes=%d", numNodes),
Owner: registry.OwnerKV,
Cluster: r.MakeClusterSpec(numNodes),
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runDrainAndDecommission(ctx, t, c, numNodes, duration)
},
})
}
{
numNodes := 6
r.Add(registry.TestSpec{
Expand All @@ -74,6 +86,111 @@ func registerDecommission(r registry.Registry) {
}
}

// runDrainAndDecommission marks 3 nodes in the test cluster as "draining" and
// then attempts to decommission a fourth node from the cluster. This test is
// meant to ensure that, in the allocator, we consider "draining" nodes as
// "live" for the purposes of determining whether a range can achieve quorum.
// Note that, if "draining" nodes were not considered live for this purpose,
// decommissioning would stall forever since the allocator would incorrectly
// think that at least a handful of ranges (that need to be moved off the
// decommissioning node) are unavailable.
func runDrainAndDecommission(
ctx context.Context, t test.Test, c cluster.Cluster, nodes int, duration time.Duration,
) {
const defaultReplicationFactor = 5
if defaultReplicationFactor > nodes {
t.Fatal("improper configuration: replication factor greater than number of nodes in the test")
}
pinnedNode := 1
c.Put(ctx, t.Cockroach(), "./cockroach", c.All())
for i := 1; i <= nodes; i++ {
c.Start(ctx, c.Node(i))
}
c.Run(ctx, c.Node(pinnedNode), `./cockroach workload init kv --drop --splits 1000`)

run := func(stmt string) {
db := c.Conn(ctx, pinnedNode)
defer db.Close()

t.Status(stmt)
_, err := db.ExecContext(ctx, stmt)
if err != nil {
t.Fatal(err)
}
t.L().Printf("run: %s\n", stmt)
}

run(fmt.Sprintf(`ALTER RANGE default CONFIGURE ZONE USING num_replicas=%d`, defaultReplicationFactor))
run(fmt.Sprintf(`ALTER DATABASE system CONFIGURE ZONE USING num_replicas=%d`, defaultReplicationFactor))

// Speed up the decommissioning.
run(`SET CLUSTER SETTING kv.snapshot_rebalance.max_rate='2GiB'`)
run(`SET CLUSTER SETTING kv.snapshot_recovery.max_rate='2GiB'`)

t.Status("waiting for initial up-replication")
db := c.Conn(ctx, pinnedNode)
defer func() {
_ = db.Close()
}()
for {
fullReplicated := false
if err := db.QueryRow(
// Check if all ranges are fully replicated.
"SELECT min(array_length(replicas, 1)) >= $1 FROM crdb_internal.ranges",
defaultReplicationFactor,
).Scan(&fullReplicated); err != nil {
t.Fatal(err)
}
if fullReplicated {
break
}
time.Sleep(time.Second)
}

var m *errgroup.Group
m, ctx = errgroup.WithContext(ctx)
m.Go(
func() error {
return c.RunE(ctx, c.Node(pinnedNode),
fmt.Sprintf("./cockroach workload run kv --max-rate 500 --tolerate-errors --duration=%s {pgurl:1-%d}",
duration.String(), nodes-4,
),
)
},
)

// Let the workload run for a small amount of time.
time.Sleep(1 * time.Minute)

// Drain the last 3 nodes from the cluster.
for nodeID := nodes - 2; nodeID <= nodes; nodeID++ {
id := nodeID
m.Go(func() error {
drain := func(id int) error {
t.Status(fmt.Sprintf("draining node %d", id))
return c.RunL(ctx, t.L(), c.Node(id), "./cockroach node drain --insecure")
}
return drain(id)
})
}
// Sleep for long enough that all the other nodes in the cluster learn about
// the draining status of the two nodes we just drained.
time.Sleep(30 * time.Second)

m.Go(func() error {
// Decommission the fourth-to-last node from the cluster.
id := nodes - 3
decom := func(id int) error {
t.Status(fmt.Sprintf("decommissioning node %d", id))
return c.RunL(ctx, t.L(), c.Node(id), "./cockroach node decommission --self --insecure")
}
return decom(id)
})
if err := m.Wait(); err != nil {
t.Fatal(err)
}
}

// runDecommission decommissions and wipes nodes in a cluster repeatedly,
// alternating between the node being shut down gracefully before and after the
// decommissioning operation, while some light load is running against the
Expand Down

0 comments on commit ff2ae38

Please sign in to comment.