Skip to content

Commit

Permalink
cmd/roachtest: add disagg-rebalance roachtest
Browse files Browse the repository at this point in the history
This test adds a roachtest that spins up a cluster with
3 nodes using S3 as the --experimental-shared-storage, and then
adds a fourth node after loading a tpcc fixture and with a foreground
workload running on it. It confirms the fourth node gets hydrated
without transferring all live bytes over the wire.

Epic: none
Fixes: #103030

Release note: None
  • Loading branch information
itsbilal committed Jul 25, 2023
1 parent e0235d0 commit bb24031
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 0 deletions.
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ go_library(
"decommission.go",
"decommission_self.go",
"decommissionbench.go",
"disagg_rebalance.go",
"disk_full.go",
"disk_stall.go",
"django.go",
Expand Down
134 changes: 134 additions & 0 deletions pkg/cmd/roachtest/tests/disagg_rebalance.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package tests

import (
"context"
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/dustin/go-humanize"
)

func registerDisaggRebalance(r registry.Registry) {
disaggRebalanceSpec := r.MakeClusterSpec(4)
r.Add(registry.TestSpec{
Name: fmt.Sprintf("disagg-rebalance/aws/%s", disaggRebalanceSpec),
Tags: registry.Tags("aws"),
Owner: registry.OwnerStorage,
Cluster: disaggRebalanceSpec,
EncryptionSupport: registry.EncryptionAlwaysDisabled,
Timeout: 1 * time.Hour,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
if c.Spec().Cloud != spec.AWS {
t.Skip("disagg-rebalance is only configured to run on AWS")
}
c.Put(ctx, t.Cockroach(), "./cockroach")
c.Put(ctx, t.DeprecatedWorkload(), "./workload")
s3dir := fmt.Sprintf("s3://%s/disagg-rebalance/%s?AUTH=implicit", testutils.BackupTestingBucket(), c.Name())
startOpts := option.DefaultStartOptsNoBackups()
startOpts.RoachprodOpts.ExtraArgs = append(startOpts.RoachprodOpts.ExtraArgs, fmt.Sprintf("--experimental-shared-storage=%s", s3dir))
c.Start(ctx, t.L(), startOpts, install.MakeClusterSettings(), c.Range(1, 3))

initialWaitDuration := 2 * time.Minute
warehouses := 20

t.Status("workload initialization")
cmd := []string{fmt.Sprintf(
"./workload fixtures import tpcc --warehouses=%d {pgurl:1}",
warehouses,
)}
m := c.NewMonitor(ctx, c.Range(1, 3))
m.Go(func(ctx context.Context) error {
return c.RunE(ctx, c.Node(1), cmd...)
})
m.Wait()

m.Go(func(ctx context.Context) error {
t.Status("run tpcc")

cmd := fmt.Sprintf(
"./workload run tpcc --warehouses=%d --duration=10m {pgurl:1-3}",
warehouses,
)

return c.RunE(ctx, c.Node(1), cmd)
})

select {
case <-time.After(initialWaitDuration):
case <-ctx.Done():
return
}

t.Status("starting fourth node")
// Start the fourth node.
c.Start(ctx, t.L(), startOpts, install.MakeClusterSettings(), c.Node(4))

t.Status("verify rebalance")

db := c.Conn(ctx, t.L(), 4)
defer func() {
_ = db.Close()
}()

if err := waitForRebalance(ctx, t.L(), db, 10 /* maxStdDev */, 20 /* stableSeconds */); err != nil {
t.Fatal(err)
}

var count int
if err := db.QueryRow(
// Check if the down node has any replicas.
"SELECT count(*) FROM crdb_internal.ranges WHERE array_position(replicas, $1) IS NOT NULL",
4,
).Scan(&count); err != nil {
t.Fatal(err)
}
if count < 10 {
t.Fatalf("did not replicate to n4 quickly enough, only found %d replicas", count)
}

var bytesInRanges int64
if err := db.QueryRow(
"SELECT sum(used) "+
"FROM crdb_internal.kv_store_status WHERE node_id = $1 GROUP BY node_id LIMIT 1",
4,
).Scan(&bytesInRanges); err != nil {
t.Fatal(err)
}
var bytesSnapshotted int64
if err := db.QueryRow(
"SELECT metrics['range.snapshots.rcvd-bytes'] FROM crdb_internal.kv_store_status WHERE node_id = $1 LIMIT 1",
4,
).Scan(&bytesSnapshotted); err != nil {
t.Fatal(err)
}

t.L().PrintfCtx(ctx, "got snapshot received bytes = %s, logical bytes in ranges = %s", humanize.IBytes(uint64(bytesSnapshotted)), humanize.IBytes(uint64(bytesInRanges)))
if bytesSnapshotted > bytesInRanges {
t.Fatalf("unexpected snapshot received bytes %d > bytes in all replicas on n4 %d, did not do a disaggregated rebalance?", bytesSnapshotted, bytesInRanges)
}

t.Status("continue tpcc")

if err := m.WaitE(); err != nil {
t.Fatal(err)
}
},
})
}
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func RegisterTests(r registry.Registry) {
registerCostFuzz(r)
registerDecommission(r)
registerDecommissionBench(r)
registerDisaggRebalance(r)
registerDiskFull(r)
registerDiskStalledDetection(r)
registerDjango(r)
Expand Down

0 comments on commit bb24031

Please sign in to comment.