Skip to content

Commit

Permalink
cmd/roachtest: add disagg-rebalance roachtest
Browse files Browse the repository at this point in the history
This test adds a roachtest that spins up a cluster with
3 nodes using S3 as the --experimental-shared-storage, and then
adds a fourth node after loading a tpcc fixture and with a foreground
workload running on it. It confirms the fourth node gets hydrated
without transferring all live bytes over the wire.

Epic: none
Fixes: cockroachdb#103030

Release note: None
  • Loading branch information
itsbilal committed Jul 21, 2023
1 parent e0235d0 commit b2e861b
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 0 deletions.
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ go_library(
"decommission.go",
"decommission_self.go",
"decommissionbench.go",
"disagg-rebalance.go",
"disk_full.go",
"disk_stall.go",
"django.go",
Expand Down
138 changes: 138 additions & 0 deletions pkg/cmd/roachtest/tests/disagg-rebalance.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package tests

import (
"context"
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/dustin/go-humanize"
)

func registerDisaggRebalance(r registry.Registry) {
disaggRebalanceSpec := r.MakeClusterSpec(4)
r.Add(registry.TestSpec{
Name: fmt.Sprintf("disagg-rebalance/aws/%s", disaggRebalanceSpec),
Owner: registry.OwnerStorage,
Cluster: disaggRebalanceSpec,
EncryptionSupport: registry.EncryptionAlwaysDisabled,
Timeout: 1 * time.Hour,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
if c.Spec().Cloud != spec.AWS {
t.Skip("disagg-rebalance is only configured to run on AWS")
}
c.Put(ctx, t.Cockroach(), "./cockroach")
c.Put(ctx, t.DeprecatedWorkload(), "./workload")
s3dir := "s3://cockroachdb-backup-testing/disagg-rebalance/" + c.Name() + "?AUTH=implicit"
startOpts := option.DefaultStartOptsNoBackups()
startOpts.RoachprodOpts.ExtraArgs = append(startOpts.RoachprodOpts.ExtraArgs, fmt.Sprintf("--experimental-shared-storage=%s", s3dir))
c.Start(ctx, t.L(), startOpts, install.MakeClusterSettings(), c.Range(1, 3))

initialWaitDuration := 2 * time.Minute
warehouses := 20

t.Status(`workload initialization`)
cmd := []string{fmt.Sprintf(
"./workload fixtures import tpcc --warehouses=%d {pgurl:1}",
warehouses,
)}
c.Run(ctx, c.Node(1), cmd...)

t.Status(`run tpcc`)
ctx, cancel := context.WithCancel(ctx)
defer cancel()

cmdDone := make(chan error)
go func() {
cmd := fmt.Sprintf(
"./workload run tpcc --warehouses=%d --duration=10m {pgurl:1-3}",
warehouses,
)

cmdDone <- c.RunE(ctx, c.Node(1), cmd)
}()

select {
case <-time.After(initialWaitDuration):
case <-ctx.Done():
return
}

t.Status(`starting fourth node`)
// Start the fourth node.
c.Start(ctx, t.L(), startOpts, install.MakeClusterSettings(), c.Node(4))

t.Status(`verify rebalance`)

db := c.Conn(ctx, t.L(), 4)
defer func() {
_ = db.Close()
}()

if err := waitForRebalance(ctx, t.L(), db, 10 /* maxStdDev */, 20 /* stableSeconds */); err != nil {
t.Fatal(err)
return
}

var count int
if err := db.QueryRow(
// Check if the down node has any replicas.
"SELECT count(*) FROM crdb_internal.ranges WHERE array_position(replicas, $1) IS NOT NULL",
4,
).Scan(&count); err != nil {
t.Fatal(err)
return
}
if count < 10 {
t.Fatalf("did not replicate to n4 quickly enough, only found %d replicas", count)
}

var bytesInRanges int64
if err := db.QueryRow(
"SELECT sum(used) "+
"FROM crdb_internal.kv_store_status WHERE node_id = $1 GROUP BY node_id LIMIT 1",
4,
).Scan(&bytesInRanges); err != nil {
t.Fatal(err)
}
var bytesSnapshotted int64
if err := db.QueryRow(
"SELECT metrics['range.snapshots.rcvd-bytes'] FROM crdb_internal.kv_store_status WHERE node_id = $1 LIMIT 1",
4,
).Scan(&bytesSnapshotted); err != nil {
t.Fatal(err)
}

t.L().PrintfCtx(ctx, "got snapshot received bytes = %s, logical bytes in ranges = %s", humanize.IBytes(uint64(bytesSnapshotted)), humanize.IBytes(uint64(bytesInRanges)))
if bytesSnapshotted > bytesInRanges {
t.Fatalf("unexpected snapshot received bytes %d > bytes in all replicas on n4 %d, did not do a disaggregated rebalance?", bytesSnapshotted, bytesInRanges)
}

t.Status(`continue tpcc`)
select {
case err := <-cmdDone:
if err != nil {
t.Fatal(err)
}
case <-ctx.Done():
return
}

},
})
}
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func RegisterTests(r registry.Registry) {
registerCostFuzz(r)
registerDecommission(r)
registerDecommissionBench(r)
registerDisaggRebalance(r)
registerDiskFull(r)
registerDiskStalledDetection(r)
registerDjango(r)
Expand Down

0 comments on commit b2e861b

Please sign in to comment.