Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
107529: roachtest: make upgrade timeouts configurable r=herkolategan a=renatolabs

This commit makes the timeout applied when waiting for a cluster
upgrade configurable. Previously, a fixed 10 minute timeout would
apply regardless of the test.

A new option is added to `mixedversion` tests to specify custom
upgrade timeouts; this is especially useful in tests that operate on
larger volumes of data and upgrades that may include expensive
migrations.

Informs: cockroachdb#107414

Release note: None

Co-authored-by: Renato Costa <[email protected]>
  • Loading branch information
craig[bot] and renatolabs committed Jul 27, 2023
2 parents c566dbc + de9539a commit 400105c
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 16 deletions.
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/roachtestutil/clusterupgrade/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ go_library(
"//pkg/roachprod/logger",
"//pkg/util/retry",
"//pkg/util/version",
"@com_github_cockroachdb_errors//:errors",
],
)
54 changes: 46 additions & 8 deletions pkg/cmd/roachtest/roachtestutil/clusterupgrade/clusterupgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/version"
"github.com/cockroachdb/errors"
)

const (
Expand Down Expand Up @@ -233,34 +234,71 @@ func RestartNodesWithNewBinary(
return nil
}

// DefaultUpgradeTimeout is the default timeout used when waiting for
// an upgrade to finish (i.e., for all migrations to run and for the
// cluster version to propagate). This timeout should be sufficient
// for simple tests where there isn't a lot of data; in other
// situations, a custom timeout can be passed to
// `WaitForClusterUpgrade`.
var DefaultUpgradeTimeout = 10 * time.Minute

// WaitForClusterUpgrade waits for the cluster version to reach the
// first node's binary version. This function should only be called if
// every node in the cluster has been restarted to run the same binary
// version. We rely on the cluster's internal self-upgrading
// mechanism to update the underlying cluster version.
func WaitForClusterUpgrade(
ctx context.Context, l *logger.Logger, nodes option.NodeListOption, dbFunc func(int) *gosql.DB,
ctx context.Context,
l *logger.Logger,
nodes option.NodeListOption,
dbFunc func(int) *gosql.DB,
timeout time.Duration,
) error {
newVersion, err := BinaryVersion(dbFunc(nodes[0]))
firstNode := nodes[0]
newVersion, err := BinaryVersion(dbFunc(firstNode))
if err != nil {
return err
}

l.Printf("waiting for cluster to auto-upgrade to %s", newVersion)
for _, node := range nodes {
err := retry.ForDuration(10*time.Minute, func() error {
// waitForUpgrade will wait for the given `node` to have the
// expected cluster version within the given timeout.
waitForUpgrade := func(node int, timeout time.Duration) error {
var latestVersion roachpb.Version
err := retry.ForDuration(timeout, func() error {
currentVersion, err := ClusterVersion(ctx, dbFunc(node))
if err != nil {
return err
}

latestVersion = currentVersion
if currentVersion != newVersion {
return fmt.Errorf("%d: expected cluster version %s, got %s", node, newVersion, currentVersion)
return fmt.Errorf("not upgraded yet")
}
l.Printf("%s: acked by n%d", currentVersion, node)
return nil
})
if err != nil {
return err
return errors.Wrapf(err,
"timed out after %s: expected n%d to be at cluster version %s, but is still at %s",
timeout, node, newVersion, latestVersion,
)
}
l.Printf("%s: acked by n%d", newVersion, node)
return nil
}

l.Printf("waiting for cluster to auto-upgrade to %s for %s", newVersion, timeout)
if err := waitForUpgrade(firstNode, timeout); err != nil {
return err
}

// Wait for `propagationTimeout` for all other nodes to also
// acknowledge the same cluster version as the first node. This
// should happen much faster, as migrations should already have
// finished at this point.
propagationTimeout := 3 * time.Minute
for _, node := range nodes[1:] {
if err := waitForUpgrade(node, propagationTimeout); err != nil {
return fmt.Errorf("n%d is already at %s: %w", firstNode, newVersion, err)
}
}

Expand Down
19 changes: 15 additions & 4 deletions pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ var (
// We use fixtures more often than not as they are more likely to
// detect bugs, especially in migrations.
useFixturesProbability: 0.7,
upgradeTimeout: clusterupgrade.DefaultUpgradeTimeout,
}
)

Expand Down Expand Up @@ -232,9 +233,10 @@ type (
}

// testOptions contains some options that can be changed by the user
// that expose some control over the generated test plan.
// that expose some control over the generated test plan and behaviour.
testOptions struct {
useFixturesProbability float64
upgradeTimeout time.Duration
}

customOption func(*testOptions)
Expand Down Expand Up @@ -298,6 +300,14 @@ func AlwaysUseFixtures(opts *testOptions) {
opts.useFixturesProbability = 1
}

// UpgradeTimeout allows test authors to provide a different timeout
// to apply when waiting for an upgrade to finish.
func UpgradeTimeout(timeout time.Duration) customOption {
return func(opts *testOptions) {
opts.upgradeTimeout = timeout
}
}

// NewTest creates a Test struct that users can use to create and run
// a mixed-version roachtest.
func NewTest(
Expand Down Expand Up @@ -605,8 +615,9 @@ func (s uploadCurrentVersionStep) Run(
// the cluster and equal to the binary version of the first node in
// the `nodes` field.
type waitForStableClusterVersionStep struct {
id int
nodes option.NodeListOption
id int
nodes option.NodeListOption
timeout time.Duration
}

func (s waitForStableClusterVersionStep) ID() int { return s.id }
Expand All @@ -622,7 +633,7 @@ func (s waitForStableClusterVersionStep) Description() string {
func (s waitForStableClusterVersionStep) Run(
ctx context.Context, l *logger.Logger, c cluster.Cluster, h *Helper,
) error {
return clusterupgrade.WaitForClusterUpgrade(ctx, l, s.nodes, h.Connect)
return clusterupgrade.WaitForClusterUpgrade(ctx, l, s.nodes, h.Connect, s.timeout)
}

// preserveDowngradeOptionStep sets the `preserve_downgrade_option`
Expand Down
4 changes: 2 additions & 2 deletions pkg/cmd/roachtest/roachtestutil/mixedversion/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (p *testPlanner) initSteps() []testStep {
return append(
append(steps,
uploadCurrentVersionStep{id: p.nextID(), rt: p.rt, crdbNodes: p.crdbNodes, dest: CurrentCockroachPath},
waitForStableClusterVersionStep{id: p.nextID(), nodes: p.crdbNodes},
waitForStableClusterVersionStep{id: p.nextID(), nodes: p.crdbNodes, timeout: p.options.upgradeTimeout},
preserveDowngradeOptionStep{id: p.nextID(), prng: p.newRNG(), crdbNodes: p.crdbNodes},
),
p.hooks.StartupSteps(p.nextID, p.initialContext())...,
Expand All @@ -146,7 +146,7 @@ func (p *testPlanner) initSteps() []testStep {
// user may have provided.
func (p *testPlanner) finalSteps() []testStep {
return append([]testStep{
waitForStableClusterVersionStep{id: p.nextID(), nodes: p.crdbNodes},
waitForStableClusterVersionStep{id: p.nextID(), nodes: p.crdbNodes, timeout: p.options.upgradeTimeout},
}, p.hooks.AfterUpgradeFinalizedSteps(p.nextID, p.finalContext(false /* finalizing */))...)
}

Expand Down
34 changes: 34 additions & 0 deletions pkg/cmd/roachtest/roachtestutil/mixedversion/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"io"
"math/rand"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
Expand Down Expand Up @@ -258,6 +259,39 @@ func Test_startClusterID(t *testing.T) {
require.Equal(t, 2, plan.startClusterID)
}

// Test_upgradeTimeout tests the behaviour of upgrade timeouts in
// mixedversion tests. If no custom value is passed, the default
// timeout in the clusterupgrade package is used; otherwise, the
// custom value is enforced.
func Test_upgradeTimeout(t *testing.T) {
findUpgradeWaitSteps := func(plan *TestPlan) []waitForStableClusterVersionStep {
var steps []waitForStableClusterVersionStep
for _, s := range plan.steps {
if step, isUpgrade := s.(waitForStableClusterVersionStep); isUpgrade {
steps = append(steps, step)
}
}
if len(steps) == 0 {
require.Fail(t, "could not find any waitForStableClusterVersionStep in the plan")
}
return steps
}

assertTimeout := func(expectedTimeout time.Duration, opts ...customOption) {
mvt := newTest(opts...)
plan, err := mvt.plan()
require.NoError(t, err)
waitUpgrades := findUpgradeWaitSteps(plan)

for _, s := range waitUpgrades {
require.Equal(t, expectedTimeout, s.timeout)
}
}

assertTimeout(10 * time.Minute) // using default settings, the default timeout applies
assertTimeout(30*time.Minute, UpgradeTimeout(30*time.Minute)) // custom timeout applies.
}

func newTest(options ...customOption) *Test {
testOptions := defaultTestOptions
for _, fn := range options {
Expand Down
9 changes: 8 additions & 1 deletion pkg/cmd/roachtest/tests/mixed_version_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2101,7 +2101,14 @@ func registerBackupMixedVersion(r registry.Registry) {

roachNodes := c.Range(1, c.Spec().NodeCount-1)
workloadNode := c.Node(c.Spec().NodeCount)
mvt := mixedversion.NewTest(ctx, t, t.L(), c, roachNodes)
mvt := mixedversion.NewTest(
ctx, t, t.L(), c, roachNodes,
// We use a longer upgrade timeout in this test to give the
// migrations enough time to finish considering all the data
// that might exist in the cluster by the time the upgrade is
// attempted.
mixedversion.UpgradeTimeout(30*time.Minute),
)
testRNG := mvt.RNG()

uploadVersion(ctx, t, c, workloadNode, clusterupgrade.MainVersion)
Expand Down
4 changes: 3 additions & 1 deletion pkg/cmd/roachtest/tests/versionupgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,9 @@ func allowAutoUpgradeStep(node int) versionStep {
func waitForUpgradeStep(nodes option.NodeListOption) versionStep {
return func(ctx context.Context, t test.Test, u *versionUpgradeTest) {
dbFunc := func(node int) *gosql.DB { return u.conn(ctx, t, node) }
if err := clusterupgrade.WaitForClusterUpgrade(ctx, t.L(), nodes, dbFunc); err != nil {
if err := clusterupgrade.WaitForClusterUpgrade(
ctx, t.L(), nodes, dbFunc, clusterupgrade.DefaultUpgradeTimeout,
); err != nil {
t.Fatal(err)
}
}
Expand Down

0 comments on commit 400105c

Please sign in to comment.