diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index d484e0cb4407..288f3f9cf3b1 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -139,6 +139,7 @@ ALL_TESTS = [ "//pkg/cmd/roachprod-microbench:roachprod-microbench_test", "//pkg/cmd/roachtest/clusterstats:clusterstats_test", "//pkg/cmd/roachtest/roachtestutil/mixedversion:mixedversion_test", + "//pkg/cmd/roachtest/roachtestutil:roachtestutil_test", "//pkg/cmd/roachtest/tests:tests_test", "//pkg/cmd/roachtest:roachtest_test", "//pkg/cmd/teamcity-trigger:teamcity-trigger_test", @@ -1043,6 +1044,7 @@ GO_TARGETS = [ "//pkg/cmd/roachtest/roachtestutil/mixedversion:mixedversion", "//pkg/cmd/roachtest/roachtestutil/mixedversion:mixedversion_test", "//pkg/cmd/roachtest/roachtestutil:roachtestutil", + "//pkg/cmd/roachtest/roachtestutil:roachtestutil_test", "//pkg/cmd/roachtest/spec:spec", "//pkg/cmd/roachtest/test:test", "//pkg/cmd/roachtest/tests:tests", diff --git a/pkg/cmd/roachtest/roachtestutil/BUILD.bazel b/pkg/cmd/roachtest/roachtestutil/BUILD.bazel index 2472bf06ef2e..e41c839e01f1 100644 --- a/pkg/cmd/roachtest/roachtestutil/BUILD.bazel +++ b/pkg/cmd/roachtest/roachtestutil/BUILD.bazel @@ -1,9 +1,10 @@ load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "roachtestutil", srcs = [ + "commandbuilder.go", "jaeger.go", "validation_check.go", ], @@ -17,4 +18,12 @@ go_library( ], ) +go_test( + name = "roachtestutil_test", + srcs = ["commandbuilder_test.go"], + args = ["-test.timeout=295s"], + embed = [":roachtestutil"], + deps = ["@com_github_stretchr_testify//require"], +) + get_x_data(name = "get_x_data") diff --git a/pkg/cmd/roachtest/roachtestutil/commandbuilder.go b/pkg/cmd/roachtest/roachtestutil/commandbuilder.go new file mode 100644 index 000000000000..878dfcfb31a2 --- /dev/null +++ b/pkg/cmd/roachtest/roachtestutil/commandbuilder.go @@ -0,0 +1,131 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package roachtestutil + +import ( + "fmt" + "sort" + "strings" +) + +// Command wraps a command to be run in a cluster. It allows users to +// manipulate a command without having to perform string-based +// operations. +type Command struct { + Binary string + Arguments []string + Flags map[string]*string +} + +// NewCommand builds a command. The format parameter can take +// `fmt.Print` verbs. +// +// Examples: +// +// NewCommand("./cockroach version") +// NewCommand("%s version", binaryPath) +func NewCommand(format string, args ...interface{}) *Command { + cmd := fmt.Sprintf(format, args...) + parts := strings.Fields(cmd) + return &Command{ + Binary: parts[0], + Arguments: parts[1:], + Flags: make(map[string]*string), + } +} + +func (c *Command) Arg(format string, args ...interface{}) *Command { + c.Arguments = append(c.Arguments, fmt.Sprintf(format, args...)) + return c +} + +func (c *Command) HasFlag(name string) bool { + _, ok := c.Flags[name] + return ok +} + +func (c *Command) Flag(name string, val interface{}) *Command { + c.Flags[name] = stringP(fmt.Sprint(val)) + return c +} + +// MaybeFlag is a thin wrapper around Flag for the caller's +// convenience. The flag is added only if the `condition` parameter is +// true. +func (c *Command) MaybeFlag(condition bool, name string, val interface{}) *Command { + if condition { + return c.Flag(name, val) + } + + return c +} + +// Option adds a flag that doesn't have an associated value +func (c *Command) Option(name string) *Command { + c.Flags[name] = nil + return c +} + +func (c *Command) MaybeOption(condition bool, name string) *Command { + if condition { + return c.Option(name) + } + + return c +} + +// ITEFlag (if-then-else flag) adds a flag where the value depends on +// the `condition` parameter. `trueVal` is used if `condition` is +// true; `falseVal` is used otherwise. +func (c *Command) ITEFlag(condition bool, name string, trueVal, falseVal interface{}) *Command { + if condition { + return c.Flag(name, trueVal) + } + + return c.Flag(name, falseVal) +} + +// String returns a canonical string representation of the command +// which can be passed to `cluster.Run`. +func (c *Command) String() string { + flags := make([]string, 0, len(c.Flags)) + names := make([]string, 0, len(c.Flags)) + for name := range c.Flags { + names = append(names, name) + } + sort.Strings(names) + + for _, name := range names { + val := c.Flags[name] + prefix := "-" + if len(name) > 1 { + prefix = "--" + } + + prefixedName := prefix + name + parts := []string{prefixedName} + if val != nil { + parts = append(parts, *val) + } + flags = append(flags, strings.Join(parts, " ")) + } + + cmd := append( + []string{c.Binary}, + append(c.Arguments, flags...)..., + ) + + return strings.Join(cmd, " ") +} + +func stringP(s string) *string { + return &s +} diff --git a/pkg/cmd/roachtest/roachtestutil/commandbuilder_test.go b/pkg/cmd/roachtest/roachtestutil/commandbuilder_test.go new file mode 100644 index 000000000000..28ee2de2b9df --- /dev/null +++ b/pkg/cmd/roachtest/roachtestutil/commandbuilder_test.go @@ -0,0 +1,77 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package roachtestutil + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestCommand(t *testing.T) { + c := NewCommand("./cockroach") + require.Equal(t, "./cockroach", c.String()) + + c = NewCommand("./cockroach workload init") + require.Equal(t, "./cockroach workload init", c.String()) + + c = NewCommand("./cockroach").Arg("workload").Arg("init") + require.Equal(t, "./cockroach workload init", c.String()) + + baseCommand := NewCommand("./cockroach workload run bank").Arg("{pgurl:%d}", 1) + + c = clone(baseCommand) + c.Flag("max-ops", 10).Flag("path", "/some/path") + require.Equal(t, "./cockroach workload run bank {pgurl:1} --max-ops 10 --path /some/path", c.String()) + + c = clone(baseCommand) + c.MaybeFlag(true, "max-ops", 10) // included + c.MaybeFlag(false, "concurrency", 8) // not included + require.True(t, c.HasFlag("max-ops")) + require.False(t, c.HasFlag("concurrency")) + require.Equal(t, "./cockroach workload run bank {pgurl:1} --max-ops 10", c.String()) + + c = clone(baseCommand) + c.ITEFlag(true, "max-ops", 10, 20) + c.ITEFlag(false, "duration", 2*time.Hour, 10*time.Minute) + require.Equal(t, "./cockroach workload run bank {pgurl:1} --duration 10m0s --max-ops 10", c.String()) + + c = clone(baseCommand) + c.Option("local") + c.MaybeOption(true, "background") // included + c.MaybeOption(false, "dry-run") // not included + require.True(t, c.HasFlag("local")) + require.True(t, c.HasFlag("background")) + require.False(t, c.HasFlag("dry-run")) + require.Equal(t, "./cockroach workload run bank {pgurl:1} --background --local", c.String()) + + c = clone(baseCommand) + c.Flag("c", 10) + c.MaybeFlag(true, "n", "8") // included + c.MaybeFlag(false, "j", "yes") // not included + c.Option("x") + require.True(t, c.HasFlag("c")) + require.Equal(t, "./cockroach workload run bank {pgurl:1} -c 10 -n 8 -x", c.String()) +} + +func clone(cmd *Command) *Command { + flags := make(map[string]*string) + for k, v := range cmd.Flags { + flags[k] = v + } + + return &Command{ + Binary: cmd.Binary, + Arguments: append([]string{}, cmd.Arguments...), + Flags: flags, + } +} diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel b/pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel index 75f083b93aee..bd62b5924741 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel @@ -13,14 +13,15 @@ go_library( deps = [ "//pkg/cmd/roachtest/cluster", "//pkg/cmd/roachtest/option", + "//pkg/cmd/roachtest/roachtestutil", "//pkg/cmd/roachtest/roachtestutil/clusterupgrade", "//pkg/cmd/roachtest/test", "//pkg/roachpb", "//pkg/roachprod/logger", + "//pkg/util/ctxgroup", "//pkg/util/randutil", "//pkg/util/timeutil", "//pkg/util/version", - "@org_golang_x_sync//errgroup", ], ) @@ -30,7 +31,9 @@ go_test( args = ["-test.timeout=295s"], embed = [":mixedversion"], deps = [ + "//pkg/cmd/roachtest/cluster", "//pkg/cmd/roachtest/option", + "//pkg/cmd/roachtest/roachtestutil", "//pkg/cmd/roachtest/roachtestutil/clusterupgrade", "//pkg/roachprod/logger", "//pkg/util/version", diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go b/pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go index 9bb0700e7440..737fd7045274 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go @@ -35,14 +35,18 @@ // // Typical usage: // -// mvt, err := NewMixedVersionTest(...) -// mvt.InMixedVersion("test my feature", func(l *logger.Logger, db *gosql.DB) error { +// mvt, err := mixedversion.NewTest(...) +// mvt.InMixedVersion("test my feature", func(l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error { // l.Printf("testing feature X") +// node, db := h.RandomDB(rng, c.All()) +// l.Printf("running query on node %d", node) // _, err := db.ExecContext(ctx, "SELECT * FROM test") // return err // }) -// mvt.InMixedVersion("test another feature", func(l *logger.Logger, db *gosql.DB) error { +// mvt.InMixedVersion("test another feature", func(l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error { // l.Printf("testing feature Y") +// node, db := h.RandomDB(rng, c.All()) +// l.Printf("running query on node %d", node) // _, err := db.ExecContext(ctx, "SELECT * FROM test2") // return err // }) @@ -72,6 +76,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/clusterupgrade" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" "github.com/cockroachdb/cockroach/pkg/roachprod/logger" @@ -82,6 +87,7 @@ import ( const ( logPrefix = "mixed-version-test" startupLabel = "run startup hooks" + backgroundLabel = "start background hooks" mixedVersionLabel = "run mixed-version hooks" afterTestLabel = "run after test hooks" @@ -92,6 +98,13 @@ const ( // of migration steps before the new cluster version can be // finalized. runWhileMigratingProbability = 0.5 + + // CurrentCockroachPath is the path to the binary where the current + // version of cockroach being tested is located. This file is + // uploaded before any user functions are run. The primary use case + // are tests that need long runnig background functions on startup + // (such as running a workload). + CurrentCockroachPath = "./cockroach-current" ) var ( @@ -125,7 +138,16 @@ type ( Finalizing bool } - userFunc func(*logger.Logger, *rand.Rand, *Helper) error + // userFunc is the signature for user-provided functions that run at + // various points in the test (synchronously or in the background). + // These functions run on the test runner node itself; i.e., any + // commands they wish to execute on the cluster need to go through + // the `cluster` methods as usual (cluster.RunE, cluster.PutE, + // etc). In addition, these functions should prefer returning an + // error over calling `t.Fatal` directly. The error is handled by + // the mixedversion framework and better error messages are produced + // as a result. + userFunc func(context.Context, *logger.Logger, *rand.Rand, *Helper) error predicateFunc func(Context) bool // versionUpgradeHook is a hook that can be called at any time @@ -157,6 +179,11 @@ type ( // for human-consumption. Displayed when pretty-printing the test // plan. Description() string + // Background indicates whether the step should be run in the + // background. When a step is *not* run in the background, the + // test will wait for it to finish before moving on. When a + // background step fails, the entire test fails. + Background() bool // Run implements the actual functionality of the step. Run(context.Context, *logger.Logger, cluster.Cluster, *Helper) error } @@ -167,6 +194,7 @@ type ( // its different stages: startup, mixed-version, and after-test. testHooks struct { startup hooks + background hooks mixedVersion hooks afterUpgradeFinalized hooks @@ -177,6 +205,7 @@ type ( // Test is the main struct callers of this package interact with. Test struct { ctx context.Context + cancel context.CancelFunc cluster cluster.Cluster logger *logger.Logger crdbNodes option.NodeListOption @@ -209,8 +238,10 @@ func NewTest( prng, seed := randutil.NewPseudoRand() testLogger.Printf("mixed-version random seed: %d", seed) + testCtx, cancel := context.WithCancel(ctx) return &Test{ - ctx: ctx, + ctx: testCtx, + cancel: cancel, cluster: c, logger: testLogger, crdbNodes: crdbNodes, @@ -248,7 +279,7 @@ func (t *Test) InMixedVersion(desc string, fn userFunc) { return len(testContext.ToVersionNodes) == numUpgradedNodes } - t.hooks.AddMixedVersion(versionUpgradeHook{desc, predicate, fn}) + t.hooks.AddMixedVersion(versionUpgradeHook{name: desc, predicate: predicate, fn: fn}) } // OnStartup registers a callback that is run once the cluster is @@ -260,7 +291,7 @@ func (t *Test) OnStartup(desc string, fn userFunc) { // Since the callbacks here are only referenced in the setup steps // of the planner, there is no need to have a predicate function // gating them. - t.hooks.AddStartup(versionUpgradeHook{desc, nil, fn}) + t.hooks.AddStartup(versionUpgradeHook{name: desc, fn: fn}) } // AfterUpgradeFinalized registers a callback that is run once the @@ -268,7 +299,55 @@ func (t *Test) OnStartup(desc string, fn userFunc) { // and allowed the upgrade to finalize successfully. If multiple such // hooks are passed, they will be executed concurrently. func (t *Test) AfterUpgradeFinalized(desc string, fn userFunc) { - t.hooks.AddAfterUpgradeFinalized(versionUpgradeHook{desc, nil, fn}) + t.hooks.AddAfterUpgradeFinalized(versionUpgradeHook{name: desc, fn: fn}) +} + +// BackgroundFunc runs the function passed as argument in the +// background during the test. Background functions are kicked off +// once the cluster has been initialized (i.e., after all startup +// steps have finished). If the `userFunc` returns an error, it will +// cause the test to fail. These functions can run indefinitely but +// should respect the context passed to them, which will be canceled +// when the test terminates (successfully or not). +func (t *Test) BackgroundFunc(desc string, fn userFunc) { + t.hooks.AddBackground(versionUpgradeHook{name: desc, fn: fn}) +} + +// BackgroundCommand is a convenience wrapper around `BackgroundFunc` +// that runs the command passed once the cluster is initialized. The +// node where the command runs is picked randomly. +// +// TODO: unfortunately, `cluster.Run()` does not allow the caller to +// pass a logger instance. It would be convenient if the output of the +// command itself lived within the `mixed-version/*.log` files. +func (t *Test) BackgroundCommand( + desc string, nodes option.NodeListOption, cmd *roachtestutil.Command, +) { + t.BackgroundFunc(desc, t.runCommandFunc(nodes, cmd.String())) +} + +// Workload is a convenience wrapper that allows callers to run +// workloads in the background during a mixed-version test. `initCmd`, +// if passed, is the command run to initialize the workload; it is run +// synchronously as a regular startup function. `runCmd` is the +// command to actually run the command; it is run in the background. +func (t *Test) Workload( + name string, node option.NodeListOption, initCmd, runCmd *roachtestutil.Command, +) { + seed := uint64(t.prng.Int63()) + addSeed := func(cmd *roachtestutil.Command) { + if !cmd.HasFlag("seed") { + cmd.Flag("seed", seed) + } + } + + if initCmd != nil { + addSeed(initCmd) + t.OnStartup(fmt.Sprintf("initialize %s workload", name), t.runCommandFunc(node, initCmd.String())) + } + + addSeed(runCmd) + t.BackgroundCommand(fmt.Sprintf("%s workload", name), node, runCmd) } // Run runs the mixed-version test. It should be called once all @@ -290,7 +369,7 @@ func (t *Test) Run() { func (t *Test) run(plan *TestPlan) error { return newTestRunner( - t.ctx, plan, t.logger, t.cluster, t.crdbNodes, t.seed, + t.ctx, t.cancel, plan, t.logger, t.cluster, t.crdbNodes, t.seed, ).run() } @@ -319,6 +398,13 @@ func (t *Test) buildVersion() version.Version { return *t.rt.BuildVersion() } +func (t *Test) runCommandFunc(nodes option.NodeListOption, cmd string) userFunc { + return func(ctx context.Context, l *logger.Logger, rng *rand.Rand, h *Helper) error { + l.Printf("running command `%s` on nodes %v", cmd, nodes) + return t.cluster.RunE(ctx, nodes, cmd) + } +} + // startFromCheckpointStep is the step that starts the cluster from a // specific `version`, using checked-in fixtures. type startFromCheckpointStep struct { @@ -328,7 +414,8 @@ type startFromCheckpointStep struct { crdbNodes option.NodeListOption } -func (s startFromCheckpointStep) ID() int { return s.id } +func (s startFromCheckpointStep) ID() int { return s.id } +func (s startFromCheckpointStep) Background() bool { return false } func (s startFromCheckpointStep) Description() string { return fmt.Sprintf("starting cluster from fixtures for version %q", s.version) @@ -349,12 +436,41 @@ func (s startFromCheckpointStep) Run( return err } - startOpts := option.DefaultStartOpts() + startOpts := option.DefaultStartOptsNoBackups() startOpts.RoachprodOpts.Sequential = false clusterupgrade.StartWithBinary(ctx, l, c, s.crdbNodes, binaryPath, startOpts) return nil } +// uploadCurrentVersionStep uploads the current cockroach binary to +// all DB nodes in the test. This is so that startup steps can use +// them (if, for instance, they need to run a workload). The binary +// will be located in `dest`. +type uploadCurrentVersionStep struct { + id int + rt test.Test + crdbNodes option.NodeListOption + dest string +} + +func (s uploadCurrentVersionStep) ID() int { return s.id } +func (s uploadCurrentVersionStep) Background() bool { return false } + +func (s uploadCurrentVersionStep) Description() string { + return fmt.Sprintf("upload current binary to all cockroach nodes (%v)", s.crdbNodes) +} + +func (s uploadCurrentVersionStep) Run( + ctx context.Context, l *logger.Logger, c cluster.Cluster, helper *Helper, +) error { + _, err := clusterupgrade.UploadVersion(ctx, s.rt, l, c, s.crdbNodes, clusterupgrade.MainVersion) + if err != nil { + return err + } + + return c.RunE(ctx, s.crdbNodes, fmt.Sprintf("mv ./cockroach %s", s.dest)) +} + // waitForStableClusterVersionStep implements the process of waiting // for the `version` cluster setting being the same on all nodes of // the cluster and equal to the binary version of the first node in @@ -364,7 +480,8 @@ type waitForStableClusterVersionStep struct { nodes option.NodeListOption } -func (s waitForStableClusterVersionStep) ID() int { return s.id } +func (s waitForStableClusterVersionStep) ID() int { return s.id } +func (s waitForStableClusterVersionStep) Background() bool { return false } func (s waitForStableClusterVersionStep) Description() string { return fmt.Sprintf( @@ -387,7 +504,8 @@ type preserveDowngradeOptionStep struct { prng *rand.Rand } -func (s preserveDowngradeOptionStep) ID() int { return s.id } +func (s preserveDowngradeOptionStep) ID() int { return s.id } +func (s preserveDowngradeOptionStep) Background() bool { return false } func (s preserveDowngradeOptionStep) Description() string { return "preventing auto-upgrades by setting `preserve_downgrade_option`" @@ -421,7 +539,8 @@ type restartWithNewBinaryStep struct { node int } -func (s restartWithNewBinaryStep) ID() int { return s.id } +func (s restartWithNewBinaryStep) ID() int { return s.id } +func (s restartWithNewBinaryStep) Background() bool { return false } func (s restartWithNewBinaryStep) Description() string { return fmt.Sprintf("restart node %d with binary version %s", s.node, versionMsg(s.version)) @@ -436,7 +555,12 @@ func (s restartWithNewBinaryStep) Run( l, c, c.Node(s.node), - option.DefaultStartOpts(), + // Disable regular backups in mixed-version tests, as some tests + // check for running jobs and the scheduled backup may make + // things non-deterministic. In the future, we should change the + // default and add an API for tests to opt-out of the default + // scheduled backup if necessary. + option.DefaultStartOptsNoBackups(), s.version, ) } @@ -450,7 +574,8 @@ type finalizeUpgradeStep struct { prng *rand.Rand } -func (s finalizeUpgradeStep) ID() int { return s.id } +func (s finalizeUpgradeStep) ID() int { return s.id } +func (s finalizeUpgradeStep) Background() bool { return false } func (s finalizeUpgradeStep) Description() string { return "finalize upgrade by resetting `preserve_downgrade_option`" @@ -472,9 +597,11 @@ type runHookStep struct { testContext Context prng *rand.Rand hook versionUpgradeHook + background bool } -func (s runHookStep) ID() int { return s.id } +func (s runHookStep) ID() int { return s.id } +func (s runHookStep) Background() bool { return s.background } func (s runHookStep) Description() string { return fmt.Sprintf("run %q", s.hook.name) @@ -484,7 +611,7 @@ func (s runHookStep) Run( ctx context.Context, l *logger.Logger, c cluster.Cluster, helper *Helper, ) error { helper.SetContext(&s.testContext) - return s.hook.fn(l, s.prng, helper) + return s.hook.fn(ctx, l, s.prng, helper) } // sequentialRunStep is a "meta-step" that indicates that a sequence @@ -559,13 +686,14 @@ func (h hooks) Filter(testContext Context) hooks { // returned. Otherwise, a `concurrentRunStep` is returned, where every // hook is run concurrently. func (h hooks) AsSteps( - label string, idGen func() int, prng *rand.Rand, nodes option.NodeListOption, testContext Context, + label string, idGen func() int, prng *rand.Rand, testContext Context, background bool, ) []testStep { steps := make([]testStep, 0, len(h)) for _, hook := range h { hookPrng := rngFromRNG(prng) - rhs := runHookStep{id: idGen(), prng: hookPrng, hook: hook, testContext: testContext} - steps = append(steps, rhs) + steps = append(steps, runHookStep{ + id: idGen(), prng: hookPrng, hook: hook, background: background, testContext: testContext, + }) } if len(steps) <= 1 { @@ -579,6 +707,10 @@ func (th *testHooks) AddStartup(hook versionUpgradeHook) { th.startup = append(th.startup, hook) } +func (th *testHooks) AddBackground(hook versionUpgradeHook) { + th.background = append(th.background, hook) +} + func (th *testHooks) AddMixedVersion(hook versionUpgradeHook) { th.mixedVersion = append(th.mixedVersion, hook) } @@ -588,15 +720,21 @@ func (th *testHooks) AddAfterUpgradeFinalized(hook versionUpgradeHook) { } func (th *testHooks) StartupSteps(idGen func() int, testContext Context) []testStep { - return th.startup.AsSteps(startupLabel, idGen, th.prng, th.crdbNodes, testContext) + return th.startup.AsSteps(startupLabel, idGen, th.prng, testContext, false) +} + +func (th *testHooks) BackgroundSteps(idGen func() int, testContext Context) []testStep { + return th.background.AsSteps(backgroundLabel, idGen, th.prng, testContext, true) } func (th *testHooks) MixedVersionSteps(testContext Context, idGen func() int) []testStep { - return th.mixedVersion.Filter(testContext).AsSteps(mixedVersionLabel, idGen, th.prng, th.crdbNodes, testContext) + return th.mixedVersion. + Filter(testContext). + AsSteps(mixedVersionLabel, idGen, th.prng, testContext, false) } func (th *testHooks) AfterUpgradeFinalizedSteps(idGen func() int, testContext Context) []testStep { - return th.afterUpgradeFinalized.AsSteps(afterTestLabel, idGen, th.prng, th.crdbNodes, testContext) + return th.afterUpgradeFinalized.AsSteps(afterTestLabel, idGen, th.prng, testContext, false) } func randomDelay(rng *rand.Rand) time.Duration { diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/planner.go b/pkg/cmd/roachtest/roachtestutil/mixedversion/planner.go index 0fbabe2c39f3..720a7df06e63 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/planner.go +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/planner.go @@ -69,14 +69,16 @@ const ( // this is happening. // - run after-test hooks. // -// TODO(renato): further opportunities for random exploration: going -// back multiple releases instead of just one; picking a patch release -// randomly instead of just the latest release. +// TODO(renato): further opportunities for random exploration: +// - going back multiple releases instead of just one +// - picking a patch release randomly instead of just the latest release +// - inserting arbitrary delays (`sleep` calls) during the test. func (p *testPlanner) Plan() *TestPlan { var steps []testStep addSteps := func(ss []testStep) { steps = append(steps, ss...) } addSteps(p.initSteps()) + addSteps(p.hooks.BackgroundSteps(p.nextID, p.initialContext())) // previous -> current addSteps(p.upgradeSteps(p.initialVersion, clusterupgrade.MainVersion)) @@ -119,6 +121,7 @@ func (p *testPlanner) finalContext(finalizing bool) Context { func (p *testPlanner) initSteps() []testStep { return append([]testStep{ startFromCheckpointStep{id: p.nextID(), version: p.initialVersion, rt: p.rt, crdbNodes: p.crdbNodes}, + uploadCurrentVersionStep{id: p.nextID(), rt: p.rt, crdbNodes: p.crdbNodes, dest: CurrentCockroachPath}, waitForStableClusterVersionStep{id: p.nextID(), nodes: p.crdbNodes}, preserveDowngradeOptionStep{id: p.nextID(), prng: p.newRNG(), crdbNodes: p.crdbNodes}, }, p.hooks.StartupSteps(p.nextID, p.initialContext())...) diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/planner_test.go b/pkg/cmd/roachtest/roachtestutil/mixedversion/planner_test.go index c078aaf62f74..9f4c83735fd5 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/planner_test.go +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/planner_test.go @@ -17,7 +17,9 @@ import ( "math/rand" "testing" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/clusterupgrade" "github.com/cockroachdb/cockroach/pkg/roachprod/logger" "github.com/cockroachdb/cockroach/pkg/util/version" @@ -68,10 +70,17 @@ func TestTestPlanner(t *testing.T) { mvt := newTest(t) mvt.InMixedVersion("mixed-version 1", dummyHook) mvt.InMixedVersion("mixed-version 2", dummyHook) + initBank := roachtestutil.NewCommand("./cockroach workload bank init") + runBank := roachtestutil.NewCommand("./cockroach workload run bank").Flag("max-ops", 100) + mvt.Workload("bank", nodes, initBank, runBank) + runRand := roachtestutil.NewCommand("./cockroach run rand").Flag("seed", 321) + mvt.Workload("rand", nodes, nil /* initCmd */, runRand) + csvServer := roachtestutil.NewCommand("./cockroach workload csv-server").Flag("port", 9999) + mvt.BackgroundCommand("csv server", nodes, csvServer) plan, err := mvt.plan() require.NoError(t, err) - require.Len(t, plan.steps, 9) + require.Len(t, plan.steps, 12) // Assert on the pretty-printed version of the test plan as that // asserts the ordering of the steps we want to take, and as a bonus @@ -79,33 +88,42 @@ func TestTestPlanner(t *testing.T) { expectedPrettyPlan := fmt.Sprintf(` mixed-version test plan for upgrading from %[1]s to : ├── starting cluster from fixtures for version "%[1]s" (1) -├── wait for nodes :1-4 to all have the same cluster version (same as binary version of node 1) (2) -├── preventing auto-upgrades by setting `+"`preserve_downgrade_option`"+` (3) +├── upload current binary to all cockroach nodes (:1-4) (2) +├── wait for nodes :1-4 to all have the same cluster version (same as binary version of node 1) (3) +├── preventing auto-upgrades by setting `+"`preserve_downgrade_option`"+` (4) +├── run "initialize bank workload" (5) +├── start background hooks concurrently +│ ├── run "bank workload", after 50ms delay (6) +│ ├── run "rand workload", after 50ms delay (7) +│ └── run "csv server", after 200ms delay (8) ├── upgrade nodes :1-4 from "%[1]s" to "" -│ ├── restart node 2 with binary version (4) -│ ├── restart node 1 with binary version (5) -│ ├── run "mixed-version 1" (6) -│ ├── restart node 4 with binary version (7) -│ ├── restart node 3 with binary version (8) -│ └── run "mixed-version 2" (9) -├── downgrade nodes :1-4 from "" to "%[1]s" -│ ├── restart node 3 with binary version %[1]s (10) -│ ├── restart node 4 with binary version %[1]s (11) +│ ├── restart node 4 with binary version (9) │ ├── run mixed-version hooks concurrently -│ │ ├── run "mixed-version 1", after 200ms delay (12) -│ │ └── run "mixed-version 2", after 200ms delay (13) -│ ├── restart node 2 with binary version %[1]s (14) -│ └── restart node 1 with binary version %[1]s (15) +│ │ ├── run "mixed-version 1", after 100ms delay (10) +│ │ └── run "mixed-version 2", after 100ms delay (11) +│ ├── restart node 3 with binary version (12) +│ ├── restart node 2 with binary version (13) +│ └── restart node 1 with binary version (14) +├── downgrade nodes :1-4 from "" to "%[1]s" +│ ├── restart node 2 with binary version %[1]s (15) +│ ├── run "mixed-version 1" (16) +│ ├── restart node 1 with binary version %[1]s (17) +│ ├── run "mixed-version 2" (18) +│ ├── restart node 3 with binary version %[1]s (19) +│ └── restart node 4 with binary version %[1]s (20) ├── upgrade nodes :1-4 from "%[1]s" to "" -│ ├── restart node 3 with binary version (16) -│ ├── run "mixed-version 1" (17) -│ ├── restart node 4 with binary version (18) -│ ├── restart node 1 with binary version (19) -│ ├── restart node 2 with binary version (20) -│ └── run "mixed-version 2" (21) -├── finalize upgrade by resetting `+"`preserve_downgrade_option`"+` (22) -├── run "mixed-version 2" (23) -└── wait for nodes :1-4 to all have the same cluster version (same as binary version of node 1) (24) +│ ├── restart node 4 with binary version (21) +│ ├── restart node 3 with binary version (22) +│ ├── restart node 1 with binary version (23) +│ ├── run mixed-version hooks concurrently +│ │ ├── run "mixed-version 1", after 0s delay (24) +│ │ └── run "mixed-version 2", after 0s delay (25) +│ └── restart node 2 with binary version (26) +├── finalize upgrade by resetting `+"`preserve_downgrade_option`"+` (27) +├── run mixed-version hooks concurrently +│ ├── run "mixed-version 1", after 100ms delay (28) +│ └── run "mixed-version 2", after 0s delay (29) +└── wait for nodes :1-4 to all have the same cluster version (same as binary version of node 1) (30) `, previousVersion, ) @@ -120,7 +138,7 @@ mixed-version test plan for upgrading from %[1]s to : mvt.OnStartup("startup 2", dummyHook) plan, err = mvt.plan() require.NoError(t, err) - requireConcurrentHooks(t, plan.steps[3], "startup 1", "startup 2") + requireConcurrentHooks(t, plan.steps[4], "startup 1", "startup 2") // Assert that AfterUpgradeFinalized hooks are scheduled to run in // the last step of the test. @@ -130,8 +148,8 @@ mixed-version test plan for upgrading from %[1]s to : mvt.AfterUpgradeFinalized("finalizer 3", dummyHook) plan, err = mvt.plan() require.NoError(t, err) - require.Len(t, plan.steps, 9) - requireConcurrentHooks(t, plan.steps[8], "finalizer 1", "finalizer 2", "finalizer 3") + require.Len(t, plan.steps, 10) + requireConcurrentHooks(t, plan.steps[9], "finalizer 1", "finalizer 2", "finalizer 3") } // TestDeterministicTestPlan tests that generating a test plan with @@ -154,6 +172,84 @@ func TestDeterministicTestPlan(t *testing.T) { } } +var unused float64 + +// TestDeterministicHookSeeds ensures that user functions passed to +// `InMixedVersion` always see the same sequence of values even if the +// PRNG passed to the `Test` struct is perturbed during runs. In other +// words, this ensures that user functions have at their disposal a +// random number generator that is unique to them and concurrency with +// other functions should not change the sequence of values they see +// as long as the RNG is used deterministically in the user function +// itself. +func TestDeterministicHookSeeds(t *testing.T) { + generateData := func(generateMoreRandomNumbers bool) [][]int { + var generatedData [][]int + mvt := newTest(t) + mvt.InMixedVersion("do something", func(_ context.Context, _ *logger.Logger, rng *rand.Rand, _ *Helper) error { + var data []int + for j := 0; j < 5; j++ { + data = append(data, rng.Intn(100)) + } + + generatedData = append(generatedData, data) + + // Ensure that changing the top-level random number generator + // has no impact on the rng passed to the user function. + if generateMoreRandomNumbers { + for j := 0; j < 10; j++ { + unused = mvt.prng.Float64() + } + } + return nil + }) + + var ( + // these variables are not used by the hook so they can be nil + ctx = context.Background() + nilCluster cluster.Cluster + emptyHelper = &Helper{} + ) + + plan, err := mvt.plan() + require.NoError(t, err) + + // We can hardcode these paths since we are using a fixed seed in + // these tests. + firstRun := plan.steps[4].(sequentialRunStep).steps[2].(runHookStep) + require.Equal(t, "do something", firstRun.hook.name) + require.NoError(t, firstRun.Run(ctx, nilLogger, nilCluster, emptyHelper)) + + secondRun := plan.steps[5].(sequentialRunStep).steps[3].(runHookStep) + require.Equal(t, "do something", secondRun.hook.name) + require.NoError(t, secondRun.Run(ctx, nilLogger, nilCluster, emptyHelper)) + + thirdRun := plan.steps[6].(sequentialRunStep).steps[1].(runHookStep) + require.Equal(t, "do something", thirdRun.hook.name) + require.NoError(t, thirdRun.Run(ctx, nilLogger, nilCluster, emptyHelper)) + + fourthRun := plan.steps[8].(runHookStep) + require.Equal(t, "do something", fourthRun.hook.name) + require.NoError(t, fourthRun.Run(ctx, nilLogger, nilCluster, emptyHelper)) + + require.Len(t, generatedData, 4) + return generatedData + } + + expectedData := [][]int{ + {82, 1, 17, 3, 87}, + {73, 17, 6, 37, 43}, + {82, 35, 57, 54, 8}, + {7, 95, 26, 31, 65}, + } + const numRums = 50 + for j := 0; j < numRums; j++ { + for _, b := range []bool{true, false} { + require.Equal(t, expectedData, generateData(b), "j = %d | b = %t", j, b) + } + } +} + func newTest(t *testing.T) *Test { prng := rand.New(rand.NewSource(seed)) return &Test{ @@ -183,6 +279,6 @@ func requireConcurrentHooks(t *testing.T, step testStep, names ...string) { } } -func dummyHook(*logger.Logger, *rand.Rand, *Helper) error { +func dummyHook(context.Context, *logger.Logger, *rand.Rand, *Helper) error { return nil } diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go b/pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go index 7f9826f6fe09..96978b8c41bb 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go @@ -16,10 +16,12 @@ import ( "fmt" "math/rand" "os" + "path" "path/filepath" "regexp" "strconv" "strings" + "sync/atomic" "time" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" @@ -27,8 +29,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/clusterupgrade" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/roachprod/logger" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/timeutil" - "golang.org/x/sync/errgroup" ) type ( @@ -37,22 +39,57 @@ type ( Helper struct { ctx context.Context testContext *Context - conns []*gosql.DB - crdbNodes option.NodeListOption - + // bgCount keeps track of the number of background tasks started + // with `helper.Background()`. The counter is used to generate + // unique log file names. + bgCount int64 + runner *testRunner stepLogger *logger.Logger } + // backgroundEvent is the struct sent by background steps when they + // finish (successfully or not). + backgroundEvent struct { + Name string + Err error + } + + backgroundRunner struct { + group ctxgroup.Group + ctx context.Context + events chan backgroundEvent + } + + testFailure struct { + summarized bool + description string + seed int64 + binaryVersions []roachpb.Version + // Cluster versions before and after the failure occurred. Before + // each step is executed, the test runner will cache each node's + // view of the cluster version; after a failure occurs, we'll try + // to read the cluster version from every node again. This context + // is added to the failure message displayed to the user with the + // intention of highlighting whether the cluster version changed + // during the failure, which is useful for test failures that + // happen while the upgrade is finalizing. + clusterVersionsBefore []roachpb.Version + clusterVersionsAfter []roachpb.Version + } + testRunner struct { ctx context.Context + cancel context.CancelFunc plan *TestPlan cluster cluster.Cluster crdbNodes option.NodeListOption seed int64 logger *logger.Logger - binaryVersions []roachpb.Version - clusterVersions []roachpb.Version + binaryVersions atomic.Value + clusterVersions atomic.Value + + background *backgroundRunner connCache []*gosql.DB } @@ -65,6 +102,7 @@ var ( func newTestRunner( ctx context.Context, + cancel context.CancelFunc, plan *TestPlan, l *logger.Logger, c cluster.Cluster, @@ -72,12 +110,14 @@ func newTestRunner( randomSeed int64, ) *testRunner { return &testRunner{ - ctx: ctx, - plan: plan, - logger: l, - cluster: c, - crdbNodes: crdbNodes, - seed: randomSeed, + ctx: ctx, + cancel: cancel, + plan: plan, + logger: l, + cluster: c, + crdbNodes: crdbNodes, + background: newBackgroundRunner(ctx), + seed: randomSeed, } } @@ -85,19 +125,44 @@ func newTestRunner( // each step in sequence. func (tr *testRunner) run() error { defer tr.closeConnections() + defer func() { + tr.logger.Printf("canceling mixed-version test context") + tr.cancel() + // Wait for some time so that any background tasks are properly + // canceled and their cancelation messages are displayed in the + // logs accordingly. + time.Sleep(5 * time.Second) + }() + + stepsErr := make(chan error) + go func() { + defer close(stepsErr) + for _, step := range tr.plan.steps { + if err := tr.runStep(tr.ctx, step); err != nil { + stepsErr <- err + return + } + } + }() - for _, step := range tr.plan.steps { - if err := tr.runStep(step); err != nil { + for { + select { + case err := <-stepsErr: return err + case event := <-tr.background.CompletedEvents(): + if event.Err == nil { + tr.logger.Printf("background step finished: %s", event.Name) + continue + } + + return fmt.Errorf("background step `%s` returned error: %w", event.Name, event.Err) } } - - return nil } // runStep contains the logic of running a single test step, called // recursively in the case of sequentialRunStep and concurrentRunStep. -func (tr *testRunner) runStep(step testStep) error { +func (tr *testRunner) runStep(ctx context.Context, step testStep) error { if ss, ok := step.(singleStep); ok { if ss.ID() == 1 { // if this is the first singleStep of the plan, ensure it is an @@ -126,25 +191,25 @@ func (tr *testRunner) runStep(step testStep) error { switch s := step.(type) { case sequentialRunStep: for _, ss := range s.steps { - if err := tr.runStep(ss); err != nil { + if err := tr.runStep(ctx, ss); err != nil { return err } } return nil case concurrentRunStep: - group, _ := errgroup.WithContext(tr.ctx) + group := ctxgroup.WithContext(tr.ctx) for _, cs := range s.delayedSteps { cs := cs - group.Go(func() error { - return tr.runStep(cs) + group.GoCtx(func(concurrentCtx context.Context) error { + return tr.runStep(concurrentCtx, cs) }) } return group.Wait() case delayedStep: time.Sleep(s.delay) - return tr.runStep(s.step) + return tr.runStep(ctx, s.step) default: ss := s.(singleStep) @@ -153,53 +218,87 @@ func (tr *testRunner) runStep(step testStep) error { return err } - tr.logStep("STARTING", ss, stepLogger) - tr.logVersions(stepLogger) - start := timeutil.Now() - defer func() { - prefix := fmt.Sprintf("FINISHED [%s]", timeutil.Since(start)) - tr.logStep(prefix, ss, stepLogger) - }() - if err := ss.Run(tr.ctx, stepLogger, tr.cluster, tr.newHelper(stepLogger)); err != nil { - return tr.reportError(err, ss, stepLogger) + if ss.Background() { + tr.startBackgroundStep(ss, stepLogger) + return nil } - return nil + return tr.runSingleStep(ctx, ss, stepLogger) } } -// reportError augments the error passed with extra -// information. Specifically, the error message will include the ID of -// the step that failed, the random seed used, the binary version on -// each node when the error occurred, and the cluster version before -// and after the step (in case the failure happened *while* the -// cluster version was updating). -func (tr *testRunner) reportError(err error, step singleStep, l *logger.Logger) error { - errMsg := fmt.Sprintf("mixed-version test failure while running step %d (%s): %s", +// runSingleStep takes care of the logic of running a `singleStep`, +// including logging start and finish times, wrapping the error (if +// any) with useful information, and renaming the log file to indicate +// failure. This logic is the same whether running a step in the +// background or not. +func (tr *testRunner) runSingleStep(ctx context.Context, ss singleStep, l *logger.Logger) error { + tr.logStep("STARTING", ss, l) + tr.logVersions(l) + start := timeutil.Now() + defer func() { + prefix := fmt.Sprintf("FINISHED [%s]", timeutil.Since(start)) + tr.logStep(prefix, ss, l) + }() + if err := ss.Run(ctx, l, tr.cluster, tr.newHelper(ctx, l)); err != nil { + if isContextCanceled(err) { + l.Printf("step terminated (context canceled)") + return nil + } + return tr.stepError(err, ss, l) + } + + return nil +} + +func (tr *testRunner) startBackgroundStep(ss singleStep, l *logger.Logger) { + tr.background.Start(ss.Description(), func(ctx context.Context) error { + return tr.runSingleStep(ctx, ss, l) + }) +} + +// stepError generates a `testFailure` error by augmenting the error +// passed with extra information. Specifically, the error message will +// include the ID of the step that failed, the random seed used, the +// binary version on each node when the error occurred, and the +// cluster version before and after the step (in case the failure +// happened *while* the cluster version was updating). +func (tr *testRunner) stepError(err error, step singleStep, l *logger.Logger) error { + desc := fmt.Sprintf("mixed-version test failure while running step %d (%s): %s", step.ID(), step.Description(), err, ) - debugInfo := func(label, value string) string { - return fmt.Sprintf("%-40s%s", label+":", value) - } - seedInfo := debugInfo("test random seed", strconv.FormatInt(tr.seed, 10)) - binaryVersions := debugInfo("binary versions", formatVersions(tr.binaryVersions)) - clusterVersionsBefore := debugInfo("cluster versions before failure", formatVersions(tr.clusterVersions)) - var clusterVersionsAfter string - if err := tr.refreshClusterVersions(); err == nil { - clusterVersionsBefore += "\n" - clusterVersionsAfter = debugInfo("cluster versions after failure", formatVersions(tr.clusterVersions)) - } else { + + return tr.testFailure(desc, l) +} + +// testFailure generates a `testFailure` with the given +// description. It logs the error to the logger passed, and renames +// the underlying file to include the "FAILED" prefix to help in +// debugging. +func (tr *testRunner) testFailure(desc string, l *logger.Logger) error { + clusterVersionsBefore := tr.clusterVersions + if err := tr.refreshClusterVersions(); err != nil { tr.logger.Printf("failed to fetch cluster versions after failure: %s", err) } + clusterVersionsAfter := tr.clusterVersions + + tf := &testFailure{ + description: desc, + seed: tr.seed, + binaryVersions: loadAtomicVersions(tr.binaryVersions), + clusterVersionsBefore: loadAtomicVersions(clusterVersionsBefore), + clusterVersionsAfter: loadAtomicVersions(clusterVersionsAfter), + } + + // Print the test failure on the step's logger for convenience, and + // to reduce cross referencing of logs. + l.Printf("%v", tf) if err := renameFailedLogger(l); err != nil { tr.logger.Printf("could not rename failed step logger: %v", err) } - return fmt.Errorf( - "%s\n%s\n%s\n%s%s", - errMsg, seedInfo, binaryVersions, clusterVersionsBefore, clusterVersionsAfter, - ) + return tf } func (tr *testRunner) logStep(prefix string, step singleStep, l *logger.Logger) { @@ -211,12 +310,15 @@ func (tr *testRunner) logStep(prefix string, step singleStep, l *logger.Logger) // cluster versions on each node. The cached versions should exist for // all steps but the first one (when we start the cluster itself). func (tr *testRunner) logVersions(l *logger.Logger) { - if tr.binaryVersions == nil || tr.clusterVersions == nil { + binaryVersions := loadAtomicVersions(tr.binaryVersions) + clusterVersions := loadAtomicVersions(tr.clusterVersions) + + if binaryVersions == nil || clusterVersions == nil { return } - l.Printf("binary versions: %s", formatVersions(tr.binaryVersions)) - l.Printf("cluster versions: %s", formatVersions(tr.clusterVersions)) + l.Printf("binary versions: %s", formatVersions(binaryVersions)) + l.Printf("cluster versions: %s", formatVersions(clusterVersions)) } // loggerFor creates a logger instance to be used by a test step. Logs @@ -227,22 +329,25 @@ func (tr *testRunner) loggerFor(step singleStep) (*logger.Logger, error) { name := invalidChars.ReplaceAllString(strings.ToLower(step.Description()), "") name = fmt.Sprintf("%d_%s", step.ID(), name) - prefix := fmt.Sprintf("%s/%s", logPrefix, name) + prefix := path.Join(logPrefix, name) return prefixedLogger(tr.logger, prefix) } // refreshBinaryVersions updates the internal `binaryVersions` field -// with the binary version running on each node of the cluster. +// with the binary version running on each node of the cluster. We use +// the `atomic` package here as this function may be called by two +// steps that are running concurrently. func (tr *testRunner) refreshBinaryVersions() error { - tr.binaryVersions = make([]roachpb.Version, 0, len(tr.crdbNodes)) + newBinaryVersions := make([]roachpb.Version, 0, len(tr.crdbNodes)) for _, node := range tr.crdbNodes { bv, err := clusterupgrade.BinaryVersion(tr.conn(node)) if err != nil { return fmt.Errorf("failed to get binary version for node %d: %w", node, err) } - tr.binaryVersions = append(tr.binaryVersions, bv) + newBinaryVersions = append(newBinaryVersions, bv) } + tr.binaryVersions.Store(newBinaryVersions) return nil } @@ -250,15 +355,16 @@ func (tr *testRunner) refreshBinaryVersions() error { // with the current view of the cluster version in each of the nodes // of the cluster. func (tr *testRunner) refreshClusterVersions() error { - tr.clusterVersions = make([]roachpb.Version, 0, len(tr.crdbNodes)) + newClusterVersions := make([]roachpb.Version, 0, len(tr.crdbNodes)) for _, node := range tr.crdbNodes { cv, err := clusterupgrade.ClusterVersion(tr.ctx, tr.conn(node)) if err != nil { return fmt.Errorf("failed to get cluster version for node %d: %w", node, err) } - tr.clusterVersions = append(tr.clusterVersions, cv) + newClusterVersions = append(newClusterVersions, cv) } + tr.clusterVersions.Store(newClusterVersions) return nil } @@ -291,11 +397,10 @@ func (tr *testRunner) maybeInitConnections() error { return nil } -func (tr *testRunner) newHelper(l *logger.Logger) *Helper { +func (tr *testRunner) newHelper(ctx context.Context, l *logger.Logger) *Helper { return &Helper{ - ctx: tr.ctx, - conns: tr.connCache, - crdbNodes: tr.crdbNodes, + ctx: ctx, + runner: tr, stepLogger: l, } } @@ -314,10 +419,41 @@ func (tr *testRunner) closeConnections() { } } +func newBackgroundRunner(ctx context.Context) *backgroundRunner { + g := ctxgroup.WithContext(ctx) + return &backgroundRunner{ + group: g, + ctx: ctx, + events: make(chan backgroundEvent), + } +} + +// Start will run the function `fn` in a goroutine. Any errors +// returned by that function are observable by reading from the +// channel returned by the `Events()` function. +func (br *backgroundRunner) Start(name string, fn func(context.Context) error) { + br.group.Go(func() error { + err := fn(br.ctx) + br.events <- backgroundEvent{ + Name: name, + Err: err, + } + return err + }) +} + +func (br *backgroundRunner) CompletedEvents() <-chan backgroundEvent { + return br.events +} + +func (h *Helper) RandomNode(prng *rand.Rand, nodes option.NodeListOption) int { + return nodes[prng.Intn(len(nodes))] +} + // RandomDB returns a (nodeID, connection) tuple for a randomly picked // cockroach node according to the parameters passed. func (h *Helper) RandomDB(prng *rand.Rand, nodes option.NodeListOption) (int, *gosql.DB) { - node := nodes[prng.Intn(len(nodes))] + node := h.RandomNode(prng, nodes) return node, h.Connect(node) } @@ -325,7 +461,7 @@ func (h *Helper) RandomDB(prng *rand.Rand, nodes option.NodeListOption) (int, *g // database node. The query and the node picked are logged in the logs // of the step that calls this function. func (h *Helper) QueryRow(rng *rand.Rand, query string, args ...interface{}) *gosql.Row { - node, db := h.RandomDB(rng, h.crdbNodes) + node, db := h.RandomDB(rng, h.runner.crdbNodes) h.stepLogger.Printf("running SQL statement:\n%s\nArgs: %v\nNode: %d", query, args, node) return db.QueryRowContext(h.ctx, query, args...) } @@ -334,14 +470,14 @@ func (h *Helper) QueryRow(rng *rand.Rand, query string, args ...interface{}) *go // The query and the node picked are logged in the logs of the step // that calls this function. func (h *Helper) Exec(rng *rand.Rand, query string, args ...interface{}) error { - node, db := h.RandomDB(rng, h.crdbNodes) + node, db := h.RandomDB(rng, h.runner.crdbNodes) h.stepLogger.Printf("running SQL statement:\n%s\nArgs: %v\nNode: %d", query, args, node) _, err := db.ExecContext(h.ctx, query, args...) return err } func (h *Helper) Connect(node int) *gosql.DB { - return h.conns[node-1] + return h.runner.conn(node) } // SetContext should be called by steps that need access to the test @@ -356,13 +492,99 @@ func (h *Helper) Context() *Context { return h.testContext } +// Background allows test authors to create functions that run in the +// background in mixed-version hooks. +func (h *Helper) Background(name string, fn func(context.Context, *logger.Logger) error) { + h.runner.background.Start(name, func(ctx context.Context) error { + bgLogger, err := h.loggerFor(name) + if err != nil { + return fmt.Errorf("failed to create logger for background function %q: %w", name, err) + } + + err = fn(ctx, bgLogger) + if err != nil { + if isContextCanceled(err) { + bgLogger.Printf("background function terminated (context canceled)") + return nil + } + + desc := fmt.Sprintf("error in background function %s: %s", name, err) + return h.runner.testFailure(desc, bgLogger) + } + + return nil + }) +} + +// BackgroundCommand has the same semantics of `Background()`; the +// command passed will run and the test will fail if the command is +// not successful. +func (h *Helper) BackgroundCommand(cmd string, nodes option.NodeListOption) { + desc := fmt.Sprintf("run command: %q", cmd) + h.Background(desc, func(ctx context.Context, l *logger.Logger) error { + l.Printf("running command `%s` on nodes %v in the background", cmd, nodes) + return h.runner.cluster.RunE(ctx, nodes, cmd) + }) +} + +// loggerFor creates a logger instance to be used by background +// functions (created by calling `Background` on the helper +// instance). It is similar to the logger instances created for +// mixed-version steps, but with the `background_` prefix. +func (h *Helper) loggerFor(name string) (*logger.Logger, error) { + atomic.AddInt64(&h.bgCount, 1) + + fileName := invalidChars.ReplaceAllString(strings.ToLower(name), "") + fileName = fmt.Sprintf("background_%s_%d", fileName, h.bgCount) + fileName = path.Join(logPrefix, fileName) + + return prefixedLogger(h.runner.logger, fileName) +} + +func (tf *testFailure) Error() string { + if tf.summarized { + return tf.description + } + + tf.summarized = true + debugInfo := func(label, value string) string { + return fmt.Sprintf("%-40s%s", label+":", value) + } + seedInfo := debugInfo("test random seed", strconv.FormatInt(tf.seed, 10)) + binaryVersions := debugInfo("binary versions", formatVersions(tf.binaryVersions)) + clusterVersionsBefore := debugInfo( + "cluster versions before failure", + formatVersions(tf.clusterVersionsBefore), + ) + var clusterVersionsAfter string + if cv := tf.clusterVersionsAfter; cv != nil { + clusterVersionsBefore += "\n" + clusterVersionsAfter = debugInfo("cluster versions after failure", formatVersions(cv)) + } + + return fmt.Sprintf( + "%s\n%s\n%s\n%s%s", + tf.description, seedInfo, binaryVersions, clusterVersionsBefore, clusterVersionsAfter, + ) +} + func renameFailedLogger(l *logger.Logger) error { currentFileName := l.File.Name() - newLogName := strings.TrimSuffix(currentFileName, filepath.Ext(currentFileName)) - newLogName += "_FAILED.log" + newLogName := path.Join( + filepath.Dir(currentFileName), + "FAILED_"+filepath.Base(currentFileName), + ) return os.Rename(currentFileName, newLogName) } +func loadAtomicVersions(v atomic.Value) []roachpb.Version { + if v.Load() == nil { + return nil + } + + return v.Load().([]roachpb.Version) +} + func formatVersions(versions []roachpb.Version) string { var pairs []string for idx, version := range versions { @@ -371,3 +593,16 @@ func formatVersions(versions []roachpb.Version) string { return fmt.Sprintf("[%s]", strings.Join(pairs, ", ")) } + +// isContextCanceled returns a boolean indicating whether the error +// given happened because some context was canceled. +func isContextCanceled(err error) bool { + // TODO(renato): unfortunately, we have to resort to string + // comparison here. The most common use case for this function is + // detecting cluster commands that fail when the test context is + // canceled (after test success or failure), and roachtest does not + // return an error that wraps the context cancelation (in other + // words, `errors.Is` doesn't work). Once we fix this behavior, we + // should use structured errors here. + return strings.Contains(err.Error(), context.Canceled.Error()) +} diff --git a/pkg/cmd/roachtest/tests/versionupgrade.go b/pkg/cmd/roachtest/tests/versionupgrade.go index cfc4faf8265f..68d4fda1d446 100644 --- a/pkg/cmd/roachtest/tests/versionupgrade.go +++ b/pkg/cmd/roachtest/tests/versionupgrade.go @@ -104,48 +104,54 @@ func runVersionUpgrade(ctx context.Context, t test.Test, c cluster.Cluster) { } mvt := mixedversion.NewTest(ctx, t, t.L(), c, c.All()) - mvt.InMixedVersion("run backup", func(l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error { + mvt.InMixedVersion("run backup", func(ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error { // Verify that backups can be created in various configurations. This is // important to test because changes in system tables might cause backups to // fail in mixed-version clusters. dest := fmt.Sprintf("nodelocal://0/%d", timeutil.Now().UnixNano()) return h.Exec(rng, `BACKUP TO $1`, dest) }) - mvt.InMixedVersion("test features", func(l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error { - for _, featureTest := range versionUpgradeTestFeatures { - l.Printf("running feature test %q", featureTest.name) - if err := h.Exec(rng, featureTest.statement); err != nil { - l.Printf("%q: ERROR (%s)", featureTest.name, err) - return err + mvt.InMixedVersion( + "test features", + func(ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error { + for _, featureTest := range versionUpgradeTestFeatures { + l.Printf("running feature test %q", featureTest.name) + if err := h.Exec(rng, featureTest.statement); err != nil { + l.Printf("%q: ERROR (%s)", featureTest.name, err) + return err + } + l.Printf("%q: OK", featureTest.name) } - l.Printf("%q: OK", featureTest.name) - } - return nil - }) - mvt.AfterUpgradeFinalized("check if GC TTL is pinned", func(l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error { - // TODO(irfansharif): This can be removed when the predecessor version - // in this test is v23.1, where the default is 4h. This test was only to - // make sure that existing clusters that upgrade to 23.1 retained their - // existing GC TTL. - l.Printf("checking if GC TTL is pinned to 24h") - var ttlSeconds int - query := ` + return nil + }, + ) + mvt.AfterUpgradeFinalized( + "check if GC TTL is pinned", + func(ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error { + // TODO(irfansharif): This can be removed when the predecessor version + // in this test is v23.1, where the default is 4h. This test was only to + // make sure that existing clusters that upgrade to 23.1 retained their + // existing GC TTL. + l.Printf("checking if GC TTL is pinned to 24h") + var ttlSeconds int + query := ` SELECT (crdb_internal.pb_to_json('cockroach.config.zonepb.ZoneConfig', raw_config_protobuf)->'gc'->'ttlSeconds')::INT FROM crdb_internal.zones WHERE target = 'RANGE default' LIMIT 1 ` - if err := h.QueryRow(rng, query).Scan(&ttlSeconds); err != nil { - return fmt.Errorf("error querying GC TTL: %w", err) - } - expectedTTL := 24 * 60 * 60 // NB: 24h is what's used in the fixture - if ttlSeconds != expectedTTL { - return fmt.Errorf("unexpected GC TTL: actual (%d) != expected (%d)", ttlSeconds, expectedTTL) - } - return nil - }) + if err := h.QueryRow(rng, query).Scan(&ttlSeconds); err != nil { + return fmt.Errorf("error querying GC TTL: %w", err) + } + expectedTTL := 24 * 60 * 60 // NB: 24h is what's used in the fixture + if ttlSeconds != expectedTTL { + return fmt.Errorf("unexpected GC TTL: actual (%d) != expected (%d)", ttlSeconds, expectedTTL) + } + return nil + }, + ) mvt.Run() }