From 7ccf2d0a2ba3c53b3b5e925183bbdfe42073158a Mon Sep 17 00:00:00 2001 From: Renato Costa Date: Wed, 25 Jan 2023 08:50:12 -0500 Subject: [PATCH 1/2] roachtestutil: introduce commandbuilder `commandbuilder.go` adds the `roachtestutil.Command` type, a thin wrapper that helps tests build complex commands to be run in a cluster. The way roachtests deal with this right now is by a combination of string concatenation and ad-hoc `fmt.Sprint` calls. This has at least two drawbacks: * it makes it difficult to piece together the command that is actually being run in a cluster, especially if conditionals and string formatters are involved; * if a function takes a command (string) as parameter, there's no easy way to either verify if a flag is present or to safely add a command line flag. The `roachtestutil.Command` type aims to solve both of the shortcomings. Over time, roachtests can move from raw strings to this type as needed. In addition, as `roachtest` starts to provide higher level APIs for test writers, having a more programmable interface to commands is desirable. Epic: CRDB-19321 Release note: None --- pkg/BUILD.bazel | 2 + pkg/cmd/roachtest/roachtestutil/BUILD.bazel | 11 +- .../roachtest/roachtestutil/commandbuilder.go | 131 ++++++++++++++++++ .../roachtestutil/commandbuilder_test.go | 77 ++++++++++ 4 files changed, 220 insertions(+), 1 deletion(-) create mode 100644 pkg/cmd/roachtest/roachtestutil/commandbuilder.go create mode 100644 pkg/cmd/roachtest/roachtestutil/commandbuilder_test.go diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 78f4a514ebec..d6c92a06e4ba 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -139,6 +139,7 @@ ALL_TESTS = [ "//pkg/cmd/roachprod-microbench:roachprod-microbench_test", "//pkg/cmd/roachtest/clusterstats:clusterstats_test", "//pkg/cmd/roachtest/roachtestutil/mixedversion:mixedversion_test", + "//pkg/cmd/roachtest/roachtestutil:roachtestutil_test", "//pkg/cmd/roachtest/tests:tests_test", "//pkg/cmd/roachtest:roachtest_test", "//pkg/cmd/teamcity-trigger:teamcity-trigger_test", @@ -1042,6 +1043,7 @@ GO_TARGETS = [ "//pkg/cmd/roachtest/roachtestutil/mixedversion:mixedversion", "//pkg/cmd/roachtest/roachtestutil/mixedversion:mixedversion_test", "//pkg/cmd/roachtest/roachtestutil:roachtestutil", + "//pkg/cmd/roachtest/roachtestutil:roachtestutil_test", "//pkg/cmd/roachtest/spec:spec", "//pkg/cmd/roachtest/test:test", "//pkg/cmd/roachtest/tests:tests", diff --git a/pkg/cmd/roachtest/roachtestutil/BUILD.bazel b/pkg/cmd/roachtest/roachtestutil/BUILD.bazel index 2472bf06ef2e..e41c839e01f1 100644 --- a/pkg/cmd/roachtest/roachtestutil/BUILD.bazel +++ b/pkg/cmd/roachtest/roachtestutil/BUILD.bazel @@ -1,9 +1,10 @@ load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "roachtestutil", srcs = [ + "commandbuilder.go", "jaeger.go", "validation_check.go", ], @@ -17,4 +18,12 @@ go_library( ], ) +go_test( + name = "roachtestutil_test", + srcs = ["commandbuilder_test.go"], + args = ["-test.timeout=295s"], + embed = [":roachtestutil"], + deps = ["@com_github_stretchr_testify//require"], +) + get_x_data(name = "get_x_data") diff --git a/pkg/cmd/roachtest/roachtestutil/commandbuilder.go b/pkg/cmd/roachtest/roachtestutil/commandbuilder.go new file mode 100644 index 000000000000..878dfcfb31a2 --- /dev/null +++ b/pkg/cmd/roachtest/roachtestutil/commandbuilder.go @@ -0,0 +1,131 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package roachtestutil + +import ( + "fmt" + "sort" + "strings" +) + +// Command wraps a command to be run in a cluster. It allows users to +// manipulate a command without having to perform string-based +// operations. +type Command struct { + Binary string + Arguments []string + Flags map[string]*string +} + +// NewCommand builds a command. The format parameter can take +// `fmt.Print` verbs. +// +// Examples: +// +// NewCommand("./cockroach version") +// NewCommand("%s version", binaryPath) +func NewCommand(format string, args ...interface{}) *Command { + cmd := fmt.Sprintf(format, args...) + parts := strings.Fields(cmd) + return &Command{ + Binary: parts[0], + Arguments: parts[1:], + Flags: make(map[string]*string), + } +} + +func (c *Command) Arg(format string, args ...interface{}) *Command { + c.Arguments = append(c.Arguments, fmt.Sprintf(format, args...)) + return c +} + +func (c *Command) HasFlag(name string) bool { + _, ok := c.Flags[name] + return ok +} + +func (c *Command) Flag(name string, val interface{}) *Command { + c.Flags[name] = stringP(fmt.Sprint(val)) + return c +} + +// MaybeFlag is a thin wrapper around Flag for the caller's +// convenience. The flag is added only if the `condition` parameter is +// true. +func (c *Command) MaybeFlag(condition bool, name string, val interface{}) *Command { + if condition { + return c.Flag(name, val) + } + + return c +} + +// Option adds a flag that doesn't have an associated value +func (c *Command) Option(name string) *Command { + c.Flags[name] = nil + return c +} + +func (c *Command) MaybeOption(condition bool, name string) *Command { + if condition { + return c.Option(name) + } + + return c +} + +// ITEFlag (if-then-else flag) adds a flag where the value depends on +// the `condition` parameter. `trueVal` is used if `condition` is +// true; `falseVal` is used otherwise. +func (c *Command) ITEFlag(condition bool, name string, trueVal, falseVal interface{}) *Command { + if condition { + return c.Flag(name, trueVal) + } + + return c.Flag(name, falseVal) +} + +// String returns a canonical string representation of the command +// which can be passed to `cluster.Run`. +func (c *Command) String() string { + flags := make([]string, 0, len(c.Flags)) + names := make([]string, 0, len(c.Flags)) + for name := range c.Flags { + names = append(names, name) + } + sort.Strings(names) + + for _, name := range names { + val := c.Flags[name] + prefix := "-" + if len(name) > 1 { + prefix = "--" + } + + prefixedName := prefix + name + parts := []string{prefixedName} + if val != nil { + parts = append(parts, *val) + } + flags = append(flags, strings.Join(parts, " ")) + } + + cmd := append( + []string{c.Binary}, + append(c.Arguments, flags...)..., + ) + + return strings.Join(cmd, " ") +} + +func stringP(s string) *string { + return &s +} diff --git a/pkg/cmd/roachtest/roachtestutil/commandbuilder_test.go b/pkg/cmd/roachtest/roachtestutil/commandbuilder_test.go new file mode 100644 index 000000000000..28ee2de2b9df --- /dev/null +++ b/pkg/cmd/roachtest/roachtestutil/commandbuilder_test.go @@ -0,0 +1,77 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package roachtestutil + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestCommand(t *testing.T) { + c := NewCommand("./cockroach") + require.Equal(t, "./cockroach", c.String()) + + c = NewCommand("./cockroach workload init") + require.Equal(t, "./cockroach workload init", c.String()) + + c = NewCommand("./cockroach").Arg("workload").Arg("init") + require.Equal(t, "./cockroach workload init", c.String()) + + baseCommand := NewCommand("./cockroach workload run bank").Arg("{pgurl:%d}", 1) + + c = clone(baseCommand) + c.Flag("max-ops", 10).Flag("path", "/some/path") + require.Equal(t, "./cockroach workload run bank {pgurl:1} --max-ops 10 --path /some/path", c.String()) + + c = clone(baseCommand) + c.MaybeFlag(true, "max-ops", 10) // included + c.MaybeFlag(false, "concurrency", 8) // not included + require.True(t, c.HasFlag("max-ops")) + require.False(t, c.HasFlag("concurrency")) + require.Equal(t, "./cockroach workload run bank {pgurl:1} --max-ops 10", c.String()) + + c = clone(baseCommand) + c.ITEFlag(true, "max-ops", 10, 20) + c.ITEFlag(false, "duration", 2*time.Hour, 10*time.Minute) + require.Equal(t, "./cockroach workload run bank {pgurl:1} --duration 10m0s --max-ops 10", c.String()) + + c = clone(baseCommand) + c.Option("local") + c.MaybeOption(true, "background") // included + c.MaybeOption(false, "dry-run") // not included + require.True(t, c.HasFlag("local")) + require.True(t, c.HasFlag("background")) + require.False(t, c.HasFlag("dry-run")) + require.Equal(t, "./cockroach workload run bank {pgurl:1} --background --local", c.String()) + + c = clone(baseCommand) + c.Flag("c", 10) + c.MaybeFlag(true, "n", "8") // included + c.MaybeFlag(false, "j", "yes") // not included + c.Option("x") + require.True(t, c.HasFlag("c")) + require.Equal(t, "./cockroach workload run bank {pgurl:1} -c 10 -n 8 -x", c.String()) +} + +func clone(cmd *Command) *Command { + flags := make(map[string]*string) + for k, v := range cmd.Flags { + flags[k] = v + } + + return &Command{ + Binary: cmd.Binary, + Arguments: append([]string{}, cmd.Arguments...), + Flags: flags, + } +} From b16ffe16686fbae81b58845950e492a15f2df328 Mon Sep 17 00:00:00 2001 From: Renato Costa Date: Fri, 27 Jan 2023 11:54:38 -0500 Subject: [PATCH 2/2] roachtest: support running steps in the background in `mixedversion` This adds a `BackgroundFunc` API to the `mixedversion` package in roachtest, allowing test writers to run tasks in the background during an upgrade test. The most common use-case for this functionality is running a workload while the cluster upgrades (other similar use-cases exist in a variety of tests); for this reason, a `Workload` convenience function is added that allows tests to add a workload to a mixed-version test with one function call. Currently, each test needs to devise their own mechanism to: spawn the background task; monitor its execution; and terminate the test on error. The current API aims to reduce copying and pasting of such logic, making for a more declarative test. In the future, the test planner itself could decide to run some steps in the background and it should be able to leverage the mechanisms introduced in this commit. Epic: CRDB-19321 Release note: None --- .../roachtestutil/mixedversion/BUILD.bazel | 5 +- .../mixedversion/mixedversion.go | 186 +++++++-- .../roachtestutil/mixedversion/planner.go | 9 +- .../mixedversion/planner_test.go | 154 +++++-- .../roachtestutil/mixedversion/runner.go | 383 ++++++++++++++---- pkg/cmd/roachtest/tests/versionupgrade.go | 62 +-- 6 files changed, 640 insertions(+), 159 deletions(-) diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel b/pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel index 75f083b93aee..bd62b5924741 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel @@ -13,14 +13,15 @@ go_library( deps = [ "//pkg/cmd/roachtest/cluster", "//pkg/cmd/roachtest/option", + "//pkg/cmd/roachtest/roachtestutil", "//pkg/cmd/roachtest/roachtestutil/clusterupgrade", "//pkg/cmd/roachtest/test", "//pkg/roachpb", "//pkg/roachprod/logger", + "//pkg/util/ctxgroup", "//pkg/util/randutil", "//pkg/util/timeutil", "//pkg/util/version", - "@org_golang_x_sync//errgroup", ], ) @@ -30,7 +31,9 @@ go_test( args = ["-test.timeout=295s"], embed = [":mixedversion"], deps = [ + "//pkg/cmd/roachtest/cluster", "//pkg/cmd/roachtest/option", + "//pkg/cmd/roachtest/roachtestutil", "//pkg/cmd/roachtest/roachtestutil/clusterupgrade", "//pkg/roachprod/logger", "//pkg/util/version", diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go b/pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go index 9bb0700e7440..737fd7045274 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go @@ -35,14 +35,18 @@ // // Typical usage: // -// mvt, err := NewMixedVersionTest(...) -// mvt.InMixedVersion("test my feature", func(l *logger.Logger, db *gosql.DB) error { +// mvt, err := mixedversion.NewTest(...) +// mvt.InMixedVersion("test my feature", func(l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error { // l.Printf("testing feature X") +// node, db := h.RandomDB(rng, c.All()) +// l.Printf("running query on node %d", node) // _, err := db.ExecContext(ctx, "SELECT * FROM test") // return err // }) -// mvt.InMixedVersion("test another feature", func(l *logger.Logger, db *gosql.DB) error { +// mvt.InMixedVersion("test another feature", func(l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error { // l.Printf("testing feature Y") +// node, db := h.RandomDB(rng, c.All()) +// l.Printf("running query on node %d", node) // _, err := db.ExecContext(ctx, "SELECT * FROM test2") // return err // }) @@ -72,6 +76,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/clusterupgrade" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" "github.com/cockroachdb/cockroach/pkg/roachprod/logger" @@ -82,6 +87,7 @@ import ( const ( logPrefix = "mixed-version-test" startupLabel = "run startup hooks" + backgroundLabel = "start background hooks" mixedVersionLabel = "run mixed-version hooks" afterTestLabel = "run after test hooks" @@ -92,6 +98,13 @@ const ( // of migration steps before the new cluster version can be // finalized. runWhileMigratingProbability = 0.5 + + // CurrentCockroachPath is the path to the binary where the current + // version of cockroach being tested is located. This file is + // uploaded before any user functions are run. The primary use case + // are tests that need long runnig background functions on startup + // (such as running a workload). + CurrentCockroachPath = "./cockroach-current" ) var ( @@ -125,7 +138,16 @@ type ( Finalizing bool } - userFunc func(*logger.Logger, *rand.Rand, *Helper) error + // userFunc is the signature for user-provided functions that run at + // various points in the test (synchronously or in the background). + // These functions run on the test runner node itself; i.e., any + // commands they wish to execute on the cluster need to go through + // the `cluster` methods as usual (cluster.RunE, cluster.PutE, + // etc). In addition, these functions should prefer returning an + // error over calling `t.Fatal` directly. The error is handled by + // the mixedversion framework and better error messages are produced + // as a result. + userFunc func(context.Context, *logger.Logger, *rand.Rand, *Helper) error predicateFunc func(Context) bool // versionUpgradeHook is a hook that can be called at any time @@ -157,6 +179,11 @@ type ( // for human-consumption. Displayed when pretty-printing the test // plan. Description() string + // Background indicates whether the step should be run in the + // background. When a step is *not* run in the background, the + // test will wait for it to finish before moving on. When a + // background step fails, the entire test fails. + Background() bool // Run implements the actual functionality of the step. Run(context.Context, *logger.Logger, cluster.Cluster, *Helper) error } @@ -167,6 +194,7 @@ type ( // its different stages: startup, mixed-version, and after-test. testHooks struct { startup hooks + background hooks mixedVersion hooks afterUpgradeFinalized hooks @@ -177,6 +205,7 @@ type ( // Test is the main struct callers of this package interact with. Test struct { ctx context.Context + cancel context.CancelFunc cluster cluster.Cluster logger *logger.Logger crdbNodes option.NodeListOption @@ -209,8 +238,10 @@ func NewTest( prng, seed := randutil.NewPseudoRand() testLogger.Printf("mixed-version random seed: %d", seed) + testCtx, cancel := context.WithCancel(ctx) return &Test{ - ctx: ctx, + ctx: testCtx, + cancel: cancel, cluster: c, logger: testLogger, crdbNodes: crdbNodes, @@ -248,7 +279,7 @@ func (t *Test) InMixedVersion(desc string, fn userFunc) { return len(testContext.ToVersionNodes) == numUpgradedNodes } - t.hooks.AddMixedVersion(versionUpgradeHook{desc, predicate, fn}) + t.hooks.AddMixedVersion(versionUpgradeHook{name: desc, predicate: predicate, fn: fn}) } // OnStartup registers a callback that is run once the cluster is @@ -260,7 +291,7 @@ func (t *Test) OnStartup(desc string, fn userFunc) { // Since the callbacks here are only referenced in the setup steps // of the planner, there is no need to have a predicate function // gating them. - t.hooks.AddStartup(versionUpgradeHook{desc, nil, fn}) + t.hooks.AddStartup(versionUpgradeHook{name: desc, fn: fn}) } // AfterUpgradeFinalized registers a callback that is run once the @@ -268,7 +299,55 @@ func (t *Test) OnStartup(desc string, fn userFunc) { // and allowed the upgrade to finalize successfully. If multiple such // hooks are passed, they will be executed concurrently. func (t *Test) AfterUpgradeFinalized(desc string, fn userFunc) { - t.hooks.AddAfterUpgradeFinalized(versionUpgradeHook{desc, nil, fn}) + t.hooks.AddAfterUpgradeFinalized(versionUpgradeHook{name: desc, fn: fn}) +} + +// BackgroundFunc runs the function passed as argument in the +// background during the test. Background functions are kicked off +// once the cluster has been initialized (i.e., after all startup +// steps have finished). If the `userFunc` returns an error, it will +// cause the test to fail. These functions can run indefinitely but +// should respect the context passed to them, which will be canceled +// when the test terminates (successfully or not). +func (t *Test) BackgroundFunc(desc string, fn userFunc) { + t.hooks.AddBackground(versionUpgradeHook{name: desc, fn: fn}) +} + +// BackgroundCommand is a convenience wrapper around `BackgroundFunc` +// that runs the command passed once the cluster is initialized. The +// node where the command runs is picked randomly. +// +// TODO: unfortunately, `cluster.Run()` does not allow the caller to +// pass a logger instance. It would be convenient if the output of the +// command itself lived within the `mixed-version/*.log` files. +func (t *Test) BackgroundCommand( + desc string, nodes option.NodeListOption, cmd *roachtestutil.Command, +) { + t.BackgroundFunc(desc, t.runCommandFunc(nodes, cmd.String())) +} + +// Workload is a convenience wrapper that allows callers to run +// workloads in the background during a mixed-version test. `initCmd`, +// if passed, is the command run to initialize the workload; it is run +// synchronously as a regular startup function. `runCmd` is the +// command to actually run the command; it is run in the background. +func (t *Test) Workload( + name string, node option.NodeListOption, initCmd, runCmd *roachtestutil.Command, +) { + seed := uint64(t.prng.Int63()) + addSeed := func(cmd *roachtestutil.Command) { + if !cmd.HasFlag("seed") { + cmd.Flag("seed", seed) + } + } + + if initCmd != nil { + addSeed(initCmd) + t.OnStartup(fmt.Sprintf("initialize %s workload", name), t.runCommandFunc(node, initCmd.String())) + } + + addSeed(runCmd) + t.BackgroundCommand(fmt.Sprintf("%s workload", name), node, runCmd) } // Run runs the mixed-version test. It should be called once all @@ -290,7 +369,7 @@ func (t *Test) Run() { func (t *Test) run(plan *TestPlan) error { return newTestRunner( - t.ctx, plan, t.logger, t.cluster, t.crdbNodes, t.seed, + t.ctx, t.cancel, plan, t.logger, t.cluster, t.crdbNodes, t.seed, ).run() } @@ -319,6 +398,13 @@ func (t *Test) buildVersion() version.Version { return *t.rt.BuildVersion() } +func (t *Test) runCommandFunc(nodes option.NodeListOption, cmd string) userFunc { + return func(ctx context.Context, l *logger.Logger, rng *rand.Rand, h *Helper) error { + l.Printf("running command `%s` on nodes %v", cmd, nodes) + return t.cluster.RunE(ctx, nodes, cmd) + } +} + // startFromCheckpointStep is the step that starts the cluster from a // specific `version`, using checked-in fixtures. type startFromCheckpointStep struct { @@ -328,7 +414,8 @@ type startFromCheckpointStep struct { crdbNodes option.NodeListOption } -func (s startFromCheckpointStep) ID() int { return s.id } +func (s startFromCheckpointStep) ID() int { return s.id } +func (s startFromCheckpointStep) Background() bool { return false } func (s startFromCheckpointStep) Description() string { return fmt.Sprintf("starting cluster from fixtures for version %q", s.version) @@ -349,12 +436,41 @@ func (s startFromCheckpointStep) Run( return err } - startOpts := option.DefaultStartOpts() + startOpts := option.DefaultStartOptsNoBackups() startOpts.RoachprodOpts.Sequential = false clusterupgrade.StartWithBinary(ctx, l, c, s.crdbNodes, binaryPath, startOpts) return nil } +// uploadCurrentVersionStep uploads the current cockroach binary to +// all DB nodes in the test. This is so that startup steps can use +// them (if, for instance, they need to run a workload). The binary +// will be located in `dest`. +type uploadCurrentVersionStep struct { + id int + rt test.Test + crdbNodes option.NodeListOption + dest string +} + +func (s uploadCurrentVersionStep) ID() int { return s.id } +func (s uploadCurrentVersionStep) Background() bool { return false } + +func (s uploadCurrentVersionStep) Description() string { + return fmt.Sprintf("upload current binary to all cockroach nodes (%v)", s.crdbNodes) +} + +func (s uploadCurrentVersionStep) Run( + ctx context.Context, l *logger.Logger, c cluster.Cluster, helper *Helper, +) error { + _, err := clusterupgrade.UploadVersion(ctx, s.rt, l, c, s.crdbNodes, clusterupgrade.MainVersion) + if err != nil { + return err + } + + return c.RunE(ctx, s.crdbNodes, fmt.Sprintf("mv ./cockroach %s", s.dest)) +} + // waitForStableClusterVersionStep implements the process of waiting // for the `version` cluster setting being the same on all nodes of // the cluster and equal to the binary version of the first node in @@ -364,7 +480,8 @@ type waitForStableClusterVersionStep struct { nodes option.NodeListOption } -func (s waitForStableClusterVersionStep) ID() int { return s.id } +func (s waitForStableClusterVersionStep) ID() int { return s.id } +func (s waitForStableClusterVersionStep) Background() bool { return false } func (s waitForStableClusterVersionStep) Description() string { return fmt.Sprintf( @@ -387,7 +504,8 @@ type preserveDowngradeOptionStep struct { prng *rand.Rand } -func (s preserveDowngradeOptionStep) ID() int { return s.id } +func (s preserveDowngradeOptionStep) ID() int { return s.id } +func (s preserveDowngradeOptionStep) Background() bool { return false } func (s preserveDowngradeOptionStep) Description() string { return "preventing auto-upgrades by setting `preserve_downgrade_option`" @@ -421,7 +539,8 @@ type restartWithNewBinaryStep struct { node int } -func (s restartWithNewBinaryStep) ID() int { return s.id } +func (s restartWithNewBinaryStep) ID() int { return s.id } +func (s restartWithNewBinaryStep) Background() bool { return false } func (s restartWithNewBinaryStep) Description() string { return fmt.Sprintf("restart node %d with binary version %s", s.node, versionMsg(s.version)) @@ -436,7 +555,12 @@ func (s restartWithNewBinaryStep) Run( l, c, c.Node(s.node), - option.DefaultStartOpts(), + // Disable regular backups in mixed-version tests, as some tests + // check for running jobs and the scheduled backup may make + // things non-deterministic. In the future, we should change the + // default and add an API for tests to opt-out of the default + // scheduled backup if necessary. + option.DefaultStartOptsNoBackups(), s.version, ) } @@ -450,7 +574,8 @@ type finalizeUpgradeStep struct { prng *rand.Rand } -func (s finalizeUpgradeStep) ID() int { return s.id } +func (s finalizeUpgradeStep) ID() int { return s.id } +func (s finalizeUpgradeStep) Background() bool { return false } func (s finalizeUpgradeStep) Description() string { return "finalize upgrade by resetting `preserve_downgrade_option`" @@ -472,9 +597,11 @@ type runHookStep struct { testContext Context prng *rand.Rand hook versionUpgradeHook + background bool } -func (s runHookStep) ID() int { return s.id } +func (s runHookStep) ID() int { return s.id } +func (s runHookStep) Background() bool { return s.background } func (s runHookStep) Description() string { return fmt.Sprintf("run %q", s.hook.name) @@ -484,7 +611,7 @@ func (s runHookStep) Run( ctx context.Context, l *logger.Logger, c cluster.Cluster, helper *Helper, ) error { helper.SetContext(&s.testContext) - return s.hook.fn(l, s.prng, helper) + return s.hook.fn(ctx, l, s.prng, helper) } // sequentialRunStep is a "meta-step" that indicates that a sequence @@ -559,13 +686,14 @@ func (h hooks) Filter(testContext Context) hooks { // returned. Otherwise, a `concurrentRunStep` is returned, where every // hook is run concurrently. func (h hooks) AsSteps( - label string, idGen func() int, prng *rand.Rand, nodes option.NodeListOption, testContext Context, + label string, idGen func() int, prng *rand.Rand, testContext Context, background bool, ) []testStep { steps := make([]testStep, 0, len(h)) for _, hook := range h { hookPrng := rngFromRNG(prng) - rhs := runHookStep{id: idGen(), prng: hookPrng, hook: hook, testContext: testContext} - steps = append(steps, rhs) + steps = append(steps, runHookStep{ + id: idGen(), prng: hookPrng, hook: hook, background: background, testContext: testContext, + }) } if len(steps) <= 1 { @@ -579,6 +707,10 @@ func (th *testHooks) AddStartup(hook versionUpgradeHook) { th.startup = append(th.startup, hook) } +func (th *testHooks) AddBackground(hook versionUpgradeHook) { + th.background = append(th.background, hook) +} + func (th *testHooks) AddMixedVersion(hook versionUpgradeHook) { th.mixedVersion = append(th.mixedVersion, hook) } @@ -588,15 +720,21 @@ func (th *testHooks) AddAfterUpgradeFinalized(hook versionUpgradeHook) { } func (th *testHooks) StartupSteps(idGen func() int, testContext Context) []testStep { - return th.startup.AsSteps(startupLabel, idGen, th.prng, th.crdbNodes, testContext) + return th.startup.AsSteps(startupLabel, idGen, th.prng, testContext, false) +} + +func (th *testHooks) BackgroundSteps(idGen func() int, testContext Context) []testStep { + return th.background.AsSteps(backgroundLabel, idGen, th.prng, testContext, true) } func (th *testHooks) MixedVersionSteps(testContext Context, idGen func() int) []testStep { - return th.mixedVersion.Filter(testContext).AsSteps(mixedVersionLabel, idGen, th.prng, th.crdbNodes, testContext) + return th.mixedVersion. + Filter(testContext). + AsSteps(mixedVersionLabel, idGen, th.prng, testContext, false) } func (th *testHooks) AfterUpgradeFinalizedSteps(idGen func() int, testContext Context) []testStep { - return th.afterUpgradeFinalized.AsSteps(afterTestLabel, idGen, th.prng, th.crdbNodes, testContext) + return th.afterUpgradeFinalized.AsSteps(afterTestLabel, idGen, th.prng, testContext, false) } func randomDelay(rng *rand.Rand) time.Duration { diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/planner.go b/pkg/cmd/roachtest/roachtestutil/mixedversion/planner.go index 0fbabe2c39f3..720a7df06e63 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/planner.go +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/planner.go @@ -69,14 +69,16 @@ const ( // this is happening. // - run after-test hooks. // -// TODO(renato): further opportunities for random exploration: going -// back multiple releases instead of just one; picking a patch release -// randomly instead of just the latest release. +// TODO(renato): further opportunities for random exploration: +// - going back multiple releases instead of just one +// - picking a patch release randomly instead of just the latest release +// - inserting arbitrary delays (`sleep` calls) during the test. func (p *testPlanner) Plan() *TestPlan { var steps []testStep addSteps := func(ss []testStep) { steps = append(steps, ss...) } addSteps(p.initSteps()) + addSteps(p.hooks.BackgroundSteps(p.nextID, p.initialContext())) // previous -> current addSteps(p.upgradeSteps(p.initialVersion, clusterupgrade.MainVersion)) @@ -119,6 +121,7 @@ func (p *testPlanner) finalContext(finalizing bool) Context { func (p *testPlanner) initSteps() []testStep { return append([]testStep{ startFromCheckpointStep{id: p.nextID(), version: p.initialVersion, rt: p.rt, crdbNodes: p.crdbNodes}, + uploadCurrentVersionStep{id: p.nextID(), rt: p.rt, crdbNodes: p.crdbNodes, dest: CurrentCockroachPath}, waitForStableClusterVersionStep{id: p.nextID(), nodes: p.crdbNodes}, preserveDowngradeOptionStep{id: p.nextID(), prng: p.newRNG(), crdbNodes: p.crdbNodes}, }, p.hooks.StartupSteps(p.nextID, p.initialContext())...) diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/planner_test.go b/pkg/cmd/roachtest/roachtestutil/mixedversion/planner_test.go index c078aaf62f74..9f4c83735fd5 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/planner_test.go +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/planner_test.go @@ -17,7 +17,9 @@ import ( "math/rand" "testing" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/clusterupgrade" "github.com/cockroachdb/cockroach/pkg/roachprod/logger" "github.com/cockroachdb/cockroach/pkg/util/version" @@ -68,10 +70,17 @@ func TestTestPlanner(t *testing.T) { mvt := newTest(t) mvt.InMixedVersion("mixed-version 1", dummyHook) mvt.InMixedVersion("mixed-version 2", dummyHook) + initBank := roachtestutil.NewCommand("./cockroach workload bank init") + runBank := roachtestutil.NewCommand("./cockroach workload run bank").Flag("max-ops", 100) + mvt.Workload("bank", nodes, initBank, runBank) + runRand := roachtestutil.NewCommand("./cockroach run rand").Flag("seed", 321) + mvt.Workload("rand", nodes, nil /* initCmd */, runRand) + csvServer := roachtestutil.NewCommand("./cockroach workload csv-server").Flag("port", 9999) + mvt.BackgroundCommand("csv server", nodes, csvServer) plan, err := mvt.plan() require.NoError(t, err) - require.Len(t, plan.steps, 9) + require.Len(t, plan.steps, 12) // Assert on the pretty-printed version of the test plan as that // asserts the ordering of the steps we want to take, and as a bonus @@ -79,33 +88,42 @@ func TestTestPlanner(t *testing.T) { expectedPrettyPlan := fmt.Sprintf(` mixed-version test plan for upgrading from %[1]s to : ├── starting cluster from fixtures for version "%[1]s" (1) -├── wait for nodes :1-4 to all have the same cluster version (same as binary version of node 1) (2) -├── preventing auto-upgrades by setting `+"`preserve_downgrade_option`"+` (3) +├── upload current binary to all cockroach nodes (:1-4) (2) +├── wait for nodes :1-4 to all have the same cluster version (same as binary version of node 1) (3) +├── preventing auto-upgrades by setting `+"`preserve_downgrade_option`"+` (4) +├── run "initialize bank workload" (5) +├── start background hooks concurrently +│ ├── run "bank workload", after 50ms delay (6) +│ ├── run "rand workload", after 50ms delay (7) +│ └── run "csv server", after 200ms delay (8) ├── upgrade nodes :1-4 from "%[1]s" to "" -│ ├── restart node 2 with binary version (4) -│ ├── restart node 1 with binary version (5) -│ ├── run "mixed-version 1" (6) -│ ├── restart node 4 with binary version (7) -│ ├── restart node 3 with binary version (8) -│ └── run "mixed-version 2" (9) -├── downgrade nodes :1-4 from "" to "%[1]s" -│ ├── restart node 3 with binary version %[1]s (10) -│ ├── restart node 4 with binary version %[1]s (11) +│ ├── restart node 4 with binary version (9) │ ├── run mixed-version hooks concurrently -│ │ ├── run "mixed-version 1", after 200ms delay (12) -│ │ └── run "mixed-version 2", after 200ms delay (13) -│ ├── restart node 2 with binary version %[1]s (14) -│ └── restart node 1 with binary version %[1]s (15) +│ │ ├── run "mixed-version 1", after 100ms delay (10) +│ │ └── run "mixed-version 2", after 100ms delay (11) +│ ├── restart node 3 with binary version (12) +│ ├── restart node 2 with binary version (13) +│ └── restart node 1 with binary version (14) +├── downgrade nodes :1-4 from "" to "%[1]s" +│ ├── restart node 2 with binary version %[1]s (15) +│ ├── run "mixed-version 1" (16) +│ ├── restart node 1 with binary version %[1]s (17) +│ ├── run "mixed-version 2" (18) +│ ├── restart node 3 with binary version %[1]s (19) +│ └── restart node 4 with binary version %[1]s (20) ├── upgrade nodes :1-4 from "%[1]s" to "" -│ ├── restart node 3 with binary version (16) -│ ├── run "mixed-version 1" (17) -│ ├── restart node 4 with binary version (18) -│ ├── restart node 1 with binary version (19) -│ ├── restart node 2 with binary version (20) -│ └── run "mixed-version 2" (21) -├── finalize upgrade by resetting `+"`preserve_downgrade_option`"+` (22) -├── run "mixed-version 2" (23) -└── wait for nodes :1-4 to all have the same cluster version (same as binary version of node 1) (24) +│ ├── restart node 4 with binary version (21) +│ ├── restart node 3 with binary version (22) +│ ├── restart node 1 with binary version (23) +│ ├── run mixed-version hooks concurrently +│ │ ├── run "mixed-version 1", after 0s delay (24) +│ │ └── run "mixed-version 2", after 0s delay (25) +│ └── restart node 2 with binary version (26) +├── finalize upgrade by resetting `+"`preserve_downgrade_option`"+` (27) +├── run mixed-version hooks concurrently +│ ├── run "mixed-version 1", after 100ms delay (28) +│ └── run "mixed-version 2", after 0s delay (29) +└── wait for nodes :1-4 to all have the same cluster version (same as binary version of node 1) (30) `, previousVersion, ) @@ -120,7 +138,7 @@ mixed-version test plan for upgrading from %[1]s to : mvt.OnStartup("startup 2", dummyHook) plan, err = mvt.plan() require.NoError(t, err) - requireConcurrentHooks(t, plan.steps[3], "startup 1", "startup 2") + requireConcurrentHooks(t, plan.steps[4], "startup 1", "startup 2") // Assert that AfterUpgradeFinalized hooks are scheduled to run in // the last step of the test. @@ -130,8 +148,8 @@ mixed-version test plan for upgrading from %[1]s to : mvt.AfterUpgradeFinalized("finalizer 3", dummyHook) plan, err = mvt.plan() require.NoError(t, err) - require.Len(t, plan.steps, 9) - requireConcurrentHooks(t, plan.steps[8], "finalizer 1", "finalizer 2", "finalizer 3") + require.Len(t, plan.steps, 10) + requireConcurrentHooks(t, plan.steps[9], "finalizer 1", "finalizer 2", "finalizer 3") } // TestDeterministicTestPlan tests that generating a test plan with @@ -154,6 +172,84 @@ func TestDeterministicTestPlan(t *testing.T) { } } +var unused float64 + +// TestDeterministicHookSeeds ensures that user functions passed to +// `InMixedVersion` always see the same sequence of values even if the +// PRNG passed to the `Test` struct is perturbed during runs. In other +// words, this ensures that user functions have at their disposal a +// random number generator that is unique to them and concurrency with +// other functions should not change the sequence of values they see +// as long as the RNG is used deterministically in the user function +// itself. +func TestDeterministicHookSeeds(t *testing.T) { + generateData := func(generateMoreRandomNumbers bool) [][]int { + var generatedData [][]int + mvt := newTest(t) + mvt.InMixedVersion("do something", func(_ context.Context, _ *logger.Logger, rng *rand.Rand, _ *Helper) error { + var data []int + for j := 0; j < 5; j++ { + data = append(data, rng.Intn(100)) + } + + generatedData = append(generatedData, data) + + // Ensure that changing the top-level random number generator + // has no impact on the rng passed to the user function. + if generateMoreRandomNumbers { + for j := 0; j < 10; j++ { + unused = mvt.prng.Float64() + } + } + return nil + }) + + var ( + // these variables are not used by the hook so they can be nil + ctx = context.Background() + nilCluster cluster.Cluster + emptyHelper = &Helper{} + ) + + plan, err := mvt.plan() + require.NoError(t, err) + + // We can hardcode these paths since we are using a fixed seed in + // these tests. + firstRun := plan.steps[4].(sequentialRunStep).steps[2].(runHookStep) + require.Equal(t, "do something", firstRun.hook.name) + require.NoError(t, firstRun.Run(ctx, nilLogger, nilCluster, emptyHelper)) + + secondRun := plan.steps[5].(sequentialRunStep).steps[3].(runHookStep) + require.Equal(t, "do something", secondRun.hook.name) + require.NoError(t, secondRun.Run(ctx, nilLogger, nilCluster, emptyHelper)) + + thirdRun := plan.steps[6].(sequentialRunStep).steps[1].(runHookStep) + require.Equal(t, "do something", thirdRun.hook.name) + require.NoError(t, thirdRun.Run(ctx, nilLogger, nilCluster, emptyHelper)) + + fourthRun := plan.steps[8].(runHookStep) + require.Equal(t, "do something", fourthRun.hook.name) + require.NoError(t, fourthRun.Run(ctx, nilLogger, nilCluster, emptyHelper)) + + require.Len(t, generatedData, 4) + return generatedData + } + + expectedData := [][]int{ + {82, 1, 17, 3, 87}, + {73, 17, 6, 37, 43}, + {82, 35, 57, 54, 8}, + {7, 95, 26, 31, 65}, + } + const numRums = 50 + for j := 0; j < numRums; j++ { + for _, b := range []bool{true, false} { + require.Equal(t, expectedData, generateData(b), "j = %d | b = %t", j, b) + } + } +} + func newTest(t *testing.T) *Test { prng := rand.New(rand.NewSource(seed)) return &Test{ @@ -183,6 +279,6 @@ func requireConcurrentHooks(t *testing.T, step testStep, names ...string) { } } -func dummyHook(*logger.Logger, *rand.Rand, *Helper) error { +func dummyHook(context.Context, *logger.Logger, *rand.Rand, *Helper) error { return nil } diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go b/pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go index 7f9826f6fe09..96978b8c41bb 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go @@ -16,10 +16,12 @@ import ( "fmt" "math/rand" "os" + "path" "path/filepath" "regexp" "strconv" "strings" + "sync/atomic" "time" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" @@ -27,8 +29,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/clusterupgrade" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/roachprod/logger" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/timeutil" - "golang.org/x/sync/errgroup" ) type ( @@ -37,22 +39,57 @@ type ( Helper struct { ctx context.Context testContext *Context - conns []*gosql.DB - crdbNodes option.NodeListOption - + // bgCount keeps track of the number of background tasks started + // with `helper.Background()`. The counter is used to generate + // unique log file names. + bgCount int64 + runner *testRunner stepLogger *logger.Logger } + // backgroundEvent is the struct sent by background steps when they + // finish (successfully or not). + backgroundEvent struct { + Name string + Err error + } + + backgroundRunner struct { + group ctxgroup.Group + ctx context.Context + events chan backgroundEvent + } + + testFailure struct { + summarized bool + description string + seed int64 + binaryVersions []roachpb.Version + // Cluster versions before and after the failure occurred. Before + // each step is executed, the test runner will cache each node's + // view of the cluster version; after a failure occurs, we'll try + // to read the cluster version from every node again. This context + // is added to the failure message displayed to the user with the + // intention of highlighting whether the cluster version changed + // during the failure, which is useful for test failures that + // happen while the upgrade is finalizing. + clusterVersionsBefore []roachpb.Version + clusterVersionsAfter []roachpb.Version + } + testRunner struct { ctx context.Context + cancel context.CancelFunc plan *TestPlan cluster cluster.Cluster crdbNodes option.NodeListOption seed int64 logger *logger.Logger - binaryVersions []roachpb.Version - clusterVersions []roachpb.Version + binaryVersions atomic.Value + clusterVersions atomic.Value + + background *backgroundRunner connCache []*gosql.DB } @@ -65,6 +102,7 @@ var ( func newTestRunner( ctx context.Context, + cancel context.CancelFunc, plan *TestPlan, l *logger.Logger, c cluster.Cluster, @@ -72,12 +110,14 @@ func newTestRunner( randomSeed int64, ) *testRunner { return &testRunner{ - ctx: ctx, - plan: plan, - logger: l, - cluster: c, - crdbNodes: crdbNodes, - seed: randomSeed, + ctx: ctx, + cancel: cancel, + plan: plan, + logger: l, + cluster: c, + crdbNodes: crdbNodes, + background: newBackgroundRunner(ctx), + seed: randomSeed, } } @@ -85,19 +125,44 @@ func newTestRunner( // each step in sequence. func (tr *testRunner) run() error { defer tr.closeConnections() + defer func() { + tr.logger.Printf("canceling mixed-version test context") + tr.cancel() + // Wait for some time so that any background tasks are properly + // canceled and their cancelation messages are displayed in the + // logs accordingly. + time.Sleep(5 * time.Second) + }() + + stepsErr := make(chan error) + go func() { + defer close(stepsErr) + for _, step := range tr.plan.steps { + if err := tr.runStep(tr.ctx, step); err != nil { + stepsErr <- err + return + } + } + }() - for _, step := range tr.plan.steps { - if err := tr.runStep(step); err != nil { + for { + select { + case err := <-stepsErr: return err + case event := <-tr.background.CompletedEvents(): + if event.Err == nil { + tr.logger.Printf("background step finished: %s", event.Name) + continue + } + + return fmt.Errorf("background step `%s` returned error: %w", event.Name, event.Err) } } - - return nil } // runStep contains the logic of running a single test step, called // recursively in the case of sequentialRunStep and concurrentRunStep. -func (tr *testRunner) runStep(step testStep) error { +func (tr *testRunner) runStep(ctx context.Context, step testStep) error { if ss, ok := step.(singleStep); ok { if ss.ID() == 1 { // if this is the first singleStep of the plan, ensure it is an @@ -126,25 +191,25 @@ func (tr *testRunner) runStep(step testStep) error { switch s := step.(type) { case sequentialRunStep: for _, ss := range s.steps { - if err := tr.runStep(ss); err != nil { + if err := tr.runStep(ctx, ss); err != nil { return err } } return nil case concurrentRunStep: - group, _ := errgroup.WithContext(tr.ctx) + group := ctxgroup.WithContext(tr.ctx) for _, cs := range s.delayedSteps { cs := cs - group.Go(func() error { - return tr.runStep(cs) + group.GoCtx(func(concurrentCtx context.Context) error { + return tr.runStep(concurrentCtx, cs) }) } return group.Wait() case delayedStep: time.Sleep(s.delay) - return tr.runStep(s.step) + return tr.runStep(ctx, s.step) default: ss := s.(singleStep) @@ -153,53 +218,87 @@ func (tr *testRunner) runStep(step testStep) error { return err } - tr.logStep("STARTING", ss, stepLogger) - tr.logVersions(stepLogger) - start := timeutil.Now() - defer func() { - prefix := fmt.Sprintf("FINISHED [%s]", timeutil.Since(start)) - tr.logStep(prefix, ss, stepLogger) - }() - if err := ss.Run(tr.ctx, stepLogger, tr.cluster, tr.newHelper(stepLogger)); err != nil { - return tr.reportError(err, ss, stepLogger) + if ss.Background() { + tr.startBackgroundStep(ss, stepLogger) + return nil } - return nil + return tr.runSingleStep(ctx, ss, stepLogger) } } -// reportError augments the error passed with extra -// information. Specifically, the error message will include the ID of -// the step that failed, the random seed used, the binary version on -// each node when the error occurred, and the cluster version before -// and after the step (in case the failure happened *while* the -// cluster version was updating). -func (tr *testRunner) reportError(err error, step singleStep, l *logger.Logger) error { - errMsg := fmt.Sprintf("mixed-version test failure while running step %d (%s): %s", +// runSingleStep takes care of the logic of running a `singleStep`, +// including logging start and finish times, wrapping the error (if +// any) with useful information, and renaming the log file to indicate +// failure. This logic is the same whether running a step in the +// background or not. +func (tr *testRunner) runSingleStep(ctx context.Context, ss singleStep, l *logger.Logger) error { + tr.logStep("STARTING", ss, l) + tr.logVersions(l) + start := timeutil.Now() + defer func() { + prefix := fmt.Sprintf("FINISHED [%s]", timeutil.Since(start)) + tr.logStep(prefix, ss, l) + }() + if err := ss.Run(ctx, l, tr.cluster, tr.newHelper(ctx, l)); err != nil { + if isContextCanceled(err) { + l.Printf("step terminated (context canceled)") + return nil + } + return tr.stepError(err, ss, l) + } + + return nil +} + +func (tr *testRunner) startBackgroundStep(ss singleStep, l *logger.Logger) { + tr.background.Start(ss.Description(), func(ctx context.Context) error { + return tr.runSingleStep(ctx, ss, l) + }) +} + +// stepError generates a `testFailure` error by augmenting the error +// passed with extra information. Specifically, the error message will +// include the ID of the step that failed, the random seed used, the +// binary version on each node when the error occurred, and the +// cluster version before and after the step (in case the failure +// happened *while* the cluster version was updating). +func (tr *testRunner) stepError(err error, step singleStep, l *logger.Logger) error { + desc := fmt.Sprintf("mixed-version test failure while running step %d (%s): %s", step.ID(), step.Description(), err, ) - debugInfo := func(label, value string) string { - return fmt.Sprintf("%-40s%s", label+":", value) - } - seedInfo := debugInfo("test random seed", strconv.FormatInt(tr.seed, 10)) - binaryVersions := debugInfo("binary versions", formatVersions(tr.binaryVersions)) - clusterVersionsBefore := debugInfo("cluster versions before failure", formatVersions(tr.clusterVersions)) - var clusterVersionsAfter string - if err := tr.refreshClusterVersions(); err == nil { - clusterVersionsBefore += "\n" - clusterVersionsAfter = debugInfo("cluster versions after failure", formatVersions(tr.clusterVersions)) - } else { + + return tr.testFailure(desc, l) +} + +// testFailure generates a `testFailure` with the given +// description. It logs the error to the logger passed, and renames +// the underlying file to include the "FAILED" prefix to help in +// debugging. +func (tr *testRunner) testFailure(desc string, l *logger.Logger) error { + clusterVersionsBefore := tr.clusterVersions + if err := tr.refreshClusterVersions(); err != nil { tr.logger.Printf("failed to fetch cluster versions after failure: %s", err) } + clusterVersionsAfter := tr.clusterVersions + + tf := &testFailure{ + description: desc, + seed: tr.seed, + binaryVersions: loadAtomicVersions(tr.binaryVersions), + clusterVersionsBefore: loadAtomicVersions(clusterVersionsBefore), + clusterVersionsAfter: loadAtomicVersions(clusterVersionsAfter), + } + + // Print the test failure on the step's logger for convenience, and + // to reduce cross referencing of logs. + l.Printf("%v", tf) if err := renameFailedLogger(l); err != nil { tr.logger.Printf("could not rename failed step logger: %v", err) } - return fmt.Errorf( - "%s\n%s\n%s\n%s%s", - errMsg, seedInfo, binaryVersions, clusterVersionsBefore, clusterVersionsAfter, - ) + return tf } func (tr *testRunner) logStep(prefix string, step singleStep, l *logger.Logger) { @@ -211,12 +310,15 @@ func (tr *testRunner) logStep(prefix string, step singleStep, l *logger.Logger) // cluster versions on each node. The cached versions should exist for // all steps but the first one (when we start the cluster itself). func (tr *testRunner) logVersions(l *logger.Logger) { - if tr.binaryVersions == nil || tr.clusterVersions == nil { + binaryVersions := loadAtomicVersions(tr.binaryVersions) + clusterVersions := loadAtomicVersions(tr.clusterVersions) + + if binaryVersions == nil || clusterVersions == nil { return } - l.Printf("binary versions: %s", formatVersions(tr.binaryVersions)) - l.Printf("cluster versions: %s", formatVersions(tr.clusterVersions)) + l.Printf("binary versions: %s", formatVersions(binaryVersions)) + l.Printf("cluster versions: %s", formatVersions(clusterVersions)) } // loggerFor creates a logger instance to be used by a test step. Logs @@ -227,22 +329,25 @@ func (tr *testRunner) loggerFor(step singleStep) (*logger.Logger, error) { name := invalidChars.ReplaceAllString(strings.ToLower(step.Description()), "") name = fmt.Sprintf("%d_%s", step.ID(), name) - prefix := fmt.Sprintf("%s/%s", logPrefix, name) + prefix := path.Join(logPrefix, name) return prefixedLogger(tr.logger, prefix) } // refreshBinaryVersions updates the internal `binaryVersions` field -// with the binary version running on each node of the cluster. +// with the binary version running on each node of the cluster. We use +// the `atomic` package here as this function may be called by two +// steps that are running concurrently. func (tr *testRunner) refreshBinaryVersions() error { - tr.binaryVersions = make([]roachpb.Version, 0, len(tr.crdbNodes)) + newBinaryVersions := make([]roachpb.Version, 0, len(tr.crdbNodes)) for _, node := range tr.crdbNodes { bv, err := clusterupgrade.BinaryVersion(tr.conn(node)) if err != nil { return fmt.Errorf("failed to get binary version for node %d: %w", node, err) } - tr.binaryVersions = append(tr.binaryVersions, bv) + newBinaryVersions = append(newBinaryVersions, bv) } + tr.binaryVersions.Store(newBinaryVersions) return nil } @@ -250,15 +355,16 @@ func (tr *testRunner) refreshBinaryVersions() error { // with the current view of the cluster version in each of the nodes // of the cluster. func (tr *testRunner) refreshClusterVersions() error { - tr.clusterVersions = make([]roachpb.Version, 0, len(tr.crdbNodes)) + newClusterVersions := make([]roachpb.Version, 0, len(tr.crdbNodes)) for _, node := range tr.crdbNodes { cv, err := clusterupgrade.ClusterVersion(tr.ctx, tr.conn(node)) if err != nil { return fmt.Errorf("failed to get cluster version for node %d: %w", node, err) } - tr.clusterVersions = append(tr.clusterVersions, cv) + newClusterVersions = append(newClusterVersions, cv) } + tr.clusterVersions.Store(newClusterVersions) return nil } @@ -291,11 +397,10 @@ func (tr *testRunner) maybeInitConnections() error { return nil } -func (tr *testRunner) newHelper(l *logger.Logger) *Helper { +func (tr *testRunner) newHelper(ctx context.Context, l *logger.Logger) *Helper { return &Helper{ - ctx: tr.ctx, - conns: tr.connCache, - crdbNodes: tr.crdbNodes, + ctx: ctx, + runner: tr, stepLogger: l, } } @@ -314,10 +419,41 @@ func (tr *testRunner) closeConnections() { } } +func newBackgroundRunner(ctx context.Context) *backgroundRunner { + g := ctxgroup.WithContext(ctx) + return &backgroundRunner{ + group: g, + ctx: ctx, + events: make(chan backgroundEvent), + } +} + +// Start will run the function `fn` in a goroutine. Any errors +// returned by that function are observable by reading from the +// channel returned by the `Events()` function. +func (br *backgroundRunner) Start(name string, fn func(context.Context) error) { + br.group.Go(func() error { + err := fn(br.ctx) + br.events <- backgroundEvent{ + Name: name, + Err: err, + } + return err + }) +} + +func (br *backgroundRunner) CompletedEvents() <-chan backgroundEvent { + return br.events +} + +func (h *Helper) RandomNode(prng *rand.Rand, nodes option.NodeListOption) int { + return nodes[prng.Intn(len(nodes))] +} + // RandomDB returns a (nodeID, connection) tuple for a randomly picked // cockroach node according to the parameters passed. func (h *Helper) RandomDB(prng *rand.Rand, nodes option.NodeListOption) (int, *gosql.DB) { - node := nodes[prng.Intn(len(nodes))] + node := h.RandomNode(prng, nodes) return node, h.Connect(node) } @@ -325,7 +461,7 @@ func (h *Helper) RandomDB(prng *rand.Rand, nodes option.NodeListOption) (int, *g // database node. The query and the node picked are logged in the logs // of the step that calls this function. func (h *Helper) QueryRow(rng *rand.Rand, query string, args ...interface{}) *gosql.Row { - node, db := h.RandomDB(rng, h.crdbNodes) + node, db := h.RandomDB(rng, h.runner.crdbNodes) h.stepLogger.Printf("running SQL statement:\n%s\nArgs: %v\nNode: %d", query, args, node) return db.QueryRowContext(h.ctx, query, args...) } @@ -334,14 +470,14 @@ func (h *Helper) QueryRow(rng *rand.Rand, query string, args ...interface{}) *go // The query and the node picked are logged in the logs of the step // that calls this function. func (h *Helper) Exec(rng *rand.Rand, query string, args ...interface{}) error { - node, db := h.RandomDB(rng, h.crdbNodes) + node, db := h.RandomDB(rng, h.runner.crdbNodes) h.stepLogger.Printf("running SQL statement:\n%s\nArgs: %v\nNode: %d", query, args, node) _, err := db.ExecContext(h.ctx, query, args...) return err } func (h *Helper) Connect(node int) *gosql.DB { - return h.conns[node-1] + return h.runner.conn(node) } // SetContext should be called by steps that need access to the test @@ -356,13 +492,99 @@ func (h *Helper) Context() *Context { return h.testContext } +// Background allows test authors to create functions that run in the +// background in mixed-version hooks. +func (h *Helper) Background(name string, fn func(context.Context, *logger.Logger) error) { + h.runner.background.Start(name, func(ctx context.Context) error { + bgLogger, err := h.loggerFor(name) + if err != nil { + return fmt.Errorf("failed to create logger for background function %q: %w", name, err) + } + + err = fn(ctx, bgLogger) + if err != nil { + if isContextCanceled(err) { + bgLogger.Printf("background function terminated (context canceled)") + return nil + } + + desc := fmt.Sprintf("error in background function %s: %s", name, err) + return h.runner.testFailure(desc, bgLogger) + } + + return nil + }) +} + +// BackgroundCommand has the same semantics of `Background()`; the +// command passed will run and the test will fail if the command is +// not successful. +func (h *Helper) BackgroundCommand(cmd string, nodes option.NodeListOption) { + desc := fmt.Sprintf("run command: %q", cmd) + h.Background(desc, func(ctx context.Context, l *logger.Logger) error { + l.Printf("running command `%s` on nodes %v in the background", cmd, nodes) + return h.runner.cluster.RunE(ctx, nodes, cmd) + }) +} + +// loggerFor creates a logger instance to be used by background +// functions (created by calling `Background` on the helper +// instance). It is similar to the logger instances created for +// mixed-version steps, but with the `background_` prefix. +func (h *Helper) loggerFor(name string) (*logger.Logger, error) { + atomic.AddInt64(&h.bgCount, 1) + + fileName := invalidChars.ReplaceAllString(strings.ToLower(name), "") + fileName = fmt.Sprintf("background_%s_%d", fileName, h.bgCount) + fileName = path.Join(logPrefix, fileName) + + return prefixedLogger(h.runner.logger, fileName) +} + +func (tf *testFailure) Error() string { + if tf.summarized { + return tf.description + } + + tf.summarized = true + debugInfo := func(label, value string) string { + return fmt.Sprintf("%-40s%s", label+":", value) + } + seedInfo := debugInfo("test random seed", strconv.FormatInt(tf.seed, 10)) + binaryVersions := debugInfo("binary versions", formatVersions(tf.binaryVersions)) + clusterVersionsBefore := debugInfo( + "cluster versions before failure", + formatVersions(tf.clusterVersionsBefore), + ) + var clusterVersionsAfter string + if cv := tf.clusterVersionsAfter; cv != nil { + clusterVersionsBefore += "\n" + clusterVersionsAfter = debugInfo("cluster versions after failure", formatVersions(cv)) + } + + return fmt.Sprintf( + "%s\n%s\n%s\n%s%s", + tf.description, seedInfo, binaryVersions, clusterVersionsBefore, clusterVersionsAfter, + ) +} + func renameFailedLogger(l *logger.Logger) error { currentFileName := l.File.Name() - newLogName := strings.TrimSuffix(currentFileName, filepath.Ext(currentFileName)) - newLogName += "_FAILED.log" + newLogName := path.Join( + filepath.Dir(currentFileName), + "FAILED_"+filepath.Base(currentFileName), + ) return os.Rename(currentFileName, newLogName) } +func loadAtomicVersions(v atomic.Value) []roachpb.Version { + if v.Load() == nil { + return nil + } + + return v.Load().([]roachpb.Version) +} + func formatVersions(versions []roachpb.Version) string { var pairs []string for idx, version := range versions { @@ -371,3 +593,16 @@ func formatVersions(versions []roachpb.Version) string { return fmt.Sprintf("[%s]", strings.Join(pairs, ", ")) } + +// isContextCanceled returns a boolean indicating whether the error +// given happened because some context was canceled. +func isContextCanceled(err error) bool { + // TODO(renato): unfortunately, we have to resort to string + // comparison here. The most common use case for this function is + // detecting cluster commands that fail when the test context is + // canceled (after test success or failure), and roachtest does not + // return an error that wraps the context cancelation (in other + // words, `errors.Is` doesn't work). Once we fix this behavior, we + // should use structured errors here. + return strings.Contains(err.Error(), context.Canceled.Error()) +} diff --git a/pkg/cmd/roachtest/tests/versionupgrade.go b/pkg/cmd/roachtest/tests/versionupgrade.go index cfc4faf8265f..68d4fda1d446 100644 --- a/pkg/cmd/roachtest/tests/versionupgrade.go +++ b/pkg/cmd/roachtest/tests/versionupgrade.go @@ -104,48 +104,54 @@ func runVersionUpgrade(ctx context.Context, t test.Test, c cluster.Cluster) { } mvt := mixedversion.NewTest(ctx, t, t.L(), c, c.All()) - mvt.InMixedVersion("run backup", func(l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error { + mvt.InMixedVersion("run backup", func(ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error { // Verify that backups can be created in various configurations. This is // important to test because changes in system tables might cause backups to // fail in mixed-version clusters. dest := fmt.Sprintf("nodelocal://0/%d", timeutil.Now().UnixNano()) return h.Exec(rng, `BACKUP TO $1`, dest) }) - mvt.InMixedVersion("test features", func(l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error { - for _, featureTest := range versionUpgradeTestFeatures { - l.Printf("running feature test %q", featureTest.name) - if err := h.Exec(rng, featureTest.statement); err != nil { - l.Printf("%q: ERROR (%s)", featureTest.name, err) - return err + mvt.InMixedVersion( + "test features", + func(ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error { + for _, featureTest := range versionUpgradeTestFeatures { + l.Printf("running feature test %q", featureTest.name) + if err := h.Exec(rng, featureTest.statement); err != nil { + l.Printf("%q: ERROR (%s)", featureTest.name, err) + return err + } + l.Printf("%q: OK", featureTest.name) } - l.Printf("%q: OK", featureTest.name) - } - return nil - }) - mvt.AfterUpgradeFinalized("check if GC TTL is pinned", func(l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error { - // TODO(irfansharif): This can be removed when the predecessor version - // in this test is v23.1, where the default is 4h. This test was only to - // make sure that existing clusters that upgrade to 23.1 retained their - // existing GC TTL. - l.Printf("checking if GC TTL is pinned to 24h") - var ttlSeconds int - query := ` + return nil + }, + ) + mvt.AfterUpgradeFinalized( + "check if GC TTL is pinned", + func(ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error { + // TODO(irfansharif): This can be removed when the predecessor version + // in this test is v23.1, where the default is 4h. This test was only to + // make sure that existing clusters that upgrade to 23.1 retained their + // existing GC TTL. + l.Printf("checking if GC TTL is pinned to 24h") + var ttlSeconds int + query := ` SELECT (crdb_internal.pb_to_json('cockroach.config.zonepb.ZoneConfig', raw_config_protobuf)->'gc'->'ttlSeconds')::INT FROM crdb_internal.zones WHERE target = 'RANGE default' LIMIT 1 ` - if err := h.QueryRow(rng, query).Scan(&ttlSeconds); err != nil { - return fmt.Errorf("error querying GC TTL: %w", err) - } - expectedTTL := 24 * 60 * 60 // NB: 24h is what's used in the fixture - if ttlSeconds != expectedTTL { - return fmt.Errorf("unexpected GC TTL: actual (%d) != expected (%d)", ttlSeconds, expectedTTL) - } - return nil - }) + if err := h.QueryRow(rng, query).Scan(&ttlSeconds); err != nil { + return fmt.Errorf("error querying GC TTL: %w", err) + } + expectedTTL := 24 * 60 * 60 // NB: 24h is what's used in the fixture + if ttlSeconds != expectedTTL { + return fmt.Errorf("unexpected GC TTL: actual (%d) != expected (%d)", ttlSeconds, expectedTTL) + } + return nil + }, + ) mvt.Run() }