diff --git a/pkg/cmd/roachtest/tests/BUILD.bazel b/pkg/cmd/roachtest/tests/BUILD.bazel index b09eb6c0e559..b384859f8bd7 100644 --- a/pkg/cmd/roachtest/tests/BUILD.bazel +++ b/pkg/cmd/roachtest/tests/BUILD.bazel @@ -46,6 +46,7 @@ go_library( "decommission.go", "decommission_self.go", "decommissionbench.go", + "disagg-rebalance.go", "disk_full.go", "disk_stall.go", "django.go", diff --git a/pkg/cmd/roachtest/tests/disagg-rebalance.go b/pkg/cmd/roachtest/tests/disagg-rebalance.go new file mode 100644 index 000000000000..f0f9a23d0084 --- /dev/null +++ b/pkg/cmd/roachtest/tests/disagg-rebalance.go @@ -0,0 +1,138 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tests + +import ( + "context" + "fmt" + "time" + + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" + "github.com/cockroachdb/cockroach/pkg/roachprod/install" + "github.com/dustin/go-humanize" +) + +func registerDisaggRebalance(r registry.Registry) { + disaggRebalanceSpec := r.MakeClusterSpec(4) + r.Add(registry.TestSpec{ + Name: fmt.Sprintf("disagg-rebalance/aws/%s", disaggRebalanceSpec), + Owner: registry.OwnerStorage, + Cluster: disaggRebalanceSpec, + EncryptionSupport: registry.EncryptionAlwaysDisabled, + Timeout: 1 * time.Hour, + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { + if c.Spec().Cloud != spec.AWS { + t.Skip("disagg-rebalance is only configured to run on AWS") + } + c.Put(ctx, t.Cockroach(), "./cockroach") + c.Put(ctx, t.DeprecatedWorkload(), "./workload") + s3dir := "s3://cockroachdb-backup-testing/disagg-rebalance/" + c.Name() + "?AUTH=implicit" + startOpts := option.DefaultStartOptsNoBackups() + startOpts.RoachprodOpts.ExtraArgs = append(startOpts.RoachprodOpts.ExtraArgs, fmt.Sprintf("--experimental-shared-storage=%s", s3dir)) + c.Start(ctx, t.L(), startOpts, install.MakeClusterSettings(), c.Range(1, 3)) + + initialWaitDuration := 2 * time.Minute + warehouses := 20 + + t.Status(`workload initialization`) + cmd := []string{fmt.Sprintf( + "./workload fixtures import tpcc --warehouses=%d {pgurl:1}", + warehouses, + )} + c.Run(ctx, c.Node(1), cmd...) + + t.Status(`run tpcc`) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + cmdDone := make(chan error) + go func() { + cmd := fmt.Sprintf( + "./workload run tpcc --warehouses=%d --duration=10m {pgurl:1-3}", + warehouses, + ) + + cmdDone <- c.RunE(ctx, c.Node(1), cmd) + }() + + select { + case <-time.After(initialWaitDuration): + case <-ctx.Done(): + return + } + + t.Status(`starting fourth node`) + // Start the fourth node. + c.Start(ctx, t.L(), startOpts, install.MakeClusterSettings(), c.Node(4)) + + t.Status(`verify rebalance`) + + db := c.Conn(ctx, t.L(), 4) + defer func() { + _ = db.Close() + }() + + if err := waitForRebalance(ctx, t.L(), db, 10 /* maxStdDev */, 20 /* stableSeconds */); err != nil { + t.Fatal(err) + return + } + + var count int + if err := db.QueryRow( + // Check if the down node has any replicas. + "SELECT count(*) FROM crdb_internal.ranges WHERE array_position(replicas, $1) IS NOT NULL", + 4, + ).Scan(&count); err != nil { + t.Fatal(err) + return + } + if count < 10 { + t.Fatalf("did not replicate to n4 quickly enough, only found %d replicas", count) + } + + var bytesInRanges int64 + if err := db.QueryRow( + "SELECT sum(used) "+ + "FROM crdb_internal.kv_store_status WHERE node_id = $1 GROUP BY node_id LIMIT 1", + 4, + ).Scan(&bytesInRanges); err != nil { + t.Fatal(err) + } + var bytesSnapshotted int64 + if err := db.QueryRow( + "SELECT metrics['range.snapshots.rcvd-bytes'] FROM crdb_internal.kv_store_status WHERE node_id = $1 LIMIT 1", + 4, + ).Scan(&bytesSnapshotted); err != nil { + t.Fatal(err) + } + + t.L().PrintfCtx(ctx, "got snapshot received bytes = %s, logical bytes in ranges = %s", humanize.IBytes(uint64(bytesSnapshotted)), humanize.IBytes(uint64(bytesInRanges))) + if bytesSnapshotted > bytesInRanges { + t.Fatalf("unexpected snapshot received bytes %d > bytes in all replicas on n4 %d, did not do a disaggregated rebalance?", bytesSnapshotted, bytesInRanges) + } + + t.Status(`continue tpcc`) + select { + case err := <-cmdDone: + if err != nil { + t.Fatal(err) + } + case <-ctx.Done(): + return + } + + }, + }) +} diff --git a/pkg/cmd/roachtest/tests/registry.go b/pkg/cmd/roachtest/tests/registry.go index 192301529011..e8ff29dff3aa 100644 --- a/pkg/cmd/roachtest/tests/registry.go +++ b/pkg/cmd/roachtest/tests/registry.go @@ -44,6 +44,7 @@ func RegisterTests(r registry.Registry) { registerCostFuzz(r) registerDecommission(r) registerDecommissionBench(r) + registerDisaggRebalance(r) registerDiskFull(r) registerDiskStalledDetection(r) registerDjango(r)