From c676964a1c06d03f8920f65c4cded93723479f92 Mon Sep 17 00:00:00 2001 From: Renato Costa Date: Fri, 27 Jan 2023 11:54:38 -0500 Subject: [PATCH] roachtest: support running steps in the background in `mixedversion` This adds a `BackgroundFunc` API to the `mixedversion` package in roachtest, allowing test writers to run tasks in the background during an upgrade test. The most common use-case for this functionality is running a workload while the cluster upgrades (other similar use-cases exist in a variety of tests); for this reason, a `Workload` convenience function is added that allows tests to add a workload to a mixed-version test with one function call. Currently, each test needs to devise their own mechanism to: spawn the background task; monitor its execution; and terminate the test on error. The current API aims to reduce copying and pasting of such logic, making for a more declarative test. In the future, the test planner itself could decide to run some steps in the background and it should be able to leverage the mechanisms introduced in this commit. Epic: CRDB-19321 Release note: None --- .../roachtestutil/mixedversion/BUILD.bazel | 3 + .../mixedversion/mixedversion.go | 175 +++++++-- .../roachtestutil/mixedversion/planner.go | 14 +- .../mixedversion/planner_test.go | 152 ++++++-- .../roachtestutil/mixedversion/runner.go | 358 ++++++++++++++---- 5 files changed, 581 insertions(+), 121 deletions(-) diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel b/pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel index 75f083b93aee..37f780e5a2ca 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel @@ -13,6 +13,7 @@ go_library( deps = [ "//pkg/cmd/roachtest/cluster", "//pkg/cmd/roachtest/option", + "//pkg/cmd/roachtest/roachtestutil", "//pkg/cmd/roachtest/roachtestutil/clusterupgrade", "//pkg/cmd/roachtest/test", "//pkg/roachpb", @@ -30,7 +31,9 @@ go_test( args = ["-test.timeout=295s"], embed = [":mixedversion"], deps = [ + "//pkg/cmd/roachtest/cluster", "//pkg/cmd/roachtest/option", + "//pkg/cmd/roachtest/roachtestutil", "//pkg/cmd/roachtest/roachtestutil/clusterupgrade", "//pkg/roachprod/logger", "//pkg/util/version", diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go b/pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go index 9bb0700e7440..cbfd554107ec 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go @@ -35,14 +35,18 @@ // // Typical usage: // -// mvt, err := NewMixedVersionTest(...) -// mvt.InMixedVersion("test my feature", func(l *logger.Logger, db *gosql.DB) error { +// mvt, err := mixedversion.NewTest(...) +// mvt.InMixedVersion("test my feature", func(l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error { // l.Printf("testing feature X") +// node, db := h.RandomDB(rng, c.All()) +// l.Printf("running query on node %d", node) // _, err := db.ExecContext(ctx, "SELECT * FROM test") // return err // }) -// mvt.InMixedVersion("test another feature", func(l *logger.Logger, db *gosql.DB) error { +// mvt.InMixedVersion("test another feature", func(l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error { // l.Printf("testing feature Y") +// node, db := h.RandomDB(rng, c.All()) +// l.Printf("running query on node %d", node) // _, err := db.ExecContext(ctx, "SELECT * FROM test2") // return err // }) @@ -72,6 +76,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/clusterupgrade" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" "github.com/cockroachdb/cockroach/pkg/roachprod/logger" @@ -82,6 +87,7 @@ import ( const ( logPrefix = "mixed-version-test" startupLabel = "run startup hooks" + backgroundLabel = "start background hooks" mixedVersionLabel = "run mixed-version hooks" afterTestLabel = "run after test hooks" @@ -92,6 +98,13 @@ const ( // of migration steps before the new cluster version can be // finalized. runWhileMigratingProbability = 0.5 + + // CurrentCockroachPath is the path to the binary where the current + // version of cockroach being tested is located. This file is + // uploaded before any user functions are run. The primary use case + // are tests that need long runnig background functions on startup + // (such as running a workload). + CurrentCockroachPath = "./cockroach-current" ) var ( @@ -157,6 +170,11 @@ type ( // for human-consumption. Displayed when pretty-printing the test // plan. Description() string + // Background indicates whether the step should be run in the + // background. When a step is *not* run in the background, the + // test will wait for it to finish before moving on. When a + // background step fails, the entire test fails. + Background() bool // Run implements the actual functionality of the step. Run(context.Context, *logger.Logger, cluster.Cluster, *Helper) error } @@ -167,6 +185,7 @@ type ( // its different stages: startup, mixed-version, and after-test. testHooks struct { startup hooks + background hooks mixedVersion hooks afterUpgradeFinalized hooks @@ -177,6 +196,7 @@ type ( // Test is the main struct callers of this package interact with. Test struct { ctx context.Context + cancel context.CancelFunc cluster cluster.Cluster logger *logger.Logger crdbNodes option.NodeListOption @@ -209,8 +229,10 @@ func NewTest( prng, seed := randutil.NewPseudoRand() testLogger.Printf("mixed-version random seed: %d", seed) + testCtx, cancel := context.WithCancel(ctx) return &Test{ - ctx: ctx, + ctx: testCtx, + cancel: cancel, cluster: c, logger: testLogger, crdbNodes: crdbNodes, @@ -248,7 +270,7 @@ func (t *Test) InMixedVersion(desc string, fn userFunc) { return len(testContext.ToVersionNodes) == numUpgradedNodes } - t.hooks.AddMixedVersion(versionUpgradeHook{desc, predicate, fn}) + t.hooks.AddMixedVersion(versionUpgradeHook{name: desc, predicate: predicate, fn: fn}) } // OnStartup registers a callback that is run once the cluster is @@ -260,7 +282,7 @@ func (t *Test) OnStartup(desc string, fn userFunc) { // Since the callbacks here are only referenced in the setup steps // of the planner, there is no need to have a predicate function // gating them. - t.hooks.AddStartup(versionUpgradeHook{desc, nil, fn}) + t.hooks.AddStartup(versionUpgradeHook{name: desc, fn: fn}) } // AfterUpgradeFinalized registers a callback that is run once the @@ -268,7 +290,52 @@ func (t *Test) OnStartup(desc string, fn userFunc) { // and allowed the upgrade to finalize successfully. If multiple such // hooks are passed, they will be executed concurrently. func (t *Test) AfterUpgradeFinalized(desc string, fn userFunc) { - t.hooks.AddAfterUpgradeFinalized(versionUpgradeHook{desc, nil, fn}) + t.hooks.AddAfterUpgradeFinalized(versionUpgradeHook{name: desc, fn: fn}) +} + +// BackgroundFunc runs the function passed as argument in the +// background during the test. Background functions are kicked off +// once the cluster has been initialized (i.e., after all startup +// steps have finished). +func (t *Test) BackgroundFunc(desc string, fn userFunc) { + t.hooks.AddBackground(versionUpgradeHook{name: desc, fn: fn}) +} + +// BackgroundCommand is a convenience wrapper around `BackgroundFunc` +// that runs the command passed once the cluster is initialized. The +// node where the command runs is picked randomly. +// +// TODO: unfortunately, `cluster.Run()` does not allow the caller to +// pass a logger instance. It would be convenient if the output of the +// command itself lived within the `mixed-version/*.log` files. +func (t *Test) BackgroundCommand( + desc string, nodes option.NodeListOption, cmd *roachtestutil.Command, +) { + t.BackgroundFunc(desc, t.runCommandFunc(nodes, cmd.String())) +} + +// Workload is a convenience wrapper that allows callers to run +// workloads in the background during a mixed-version test. `initCmd`, +// if passed, is the command run to initialize the workload; it is run +// synchronously as a regular startup function. `runCmd` is the +// command to actually run the command; it is run in the background. +func (t *Test) Workload( + name string, node option.NodeListOption, initCmd, runCmd *roachtestutil.Command, +) { + seed := uint64(t.prng.Int63()) + addSeed := func(cmd *roachtestutil.Command) { + if !cmd.HasFlag("seed") { + cmd.Flag("seed", seed) + } + } + + if initCmd != nil { + addSeed(initCmd) + t.OnStartup(fmt.Sprintf("initialize %s workload", name), t.runCommandFunc(node, initCmd.String())) + } + + addSeed(runCmd) + t.BackgroundCommand(fmt.Sprintf("%s workload", name), node, runCmd) } // Run runs the mixed-version test. It should be called once all @@ -290,7 +357,7 @@ func (t *Test) Run() { func (t *Test) run(plan *TestPlan) error { return newTestRunner( - t.ctx, plan, t.logger, t.cluster, t.crdbNodes, t.seed, + t.ctx, t.cancel, plan, t.logger, t.cluster, t.crdbNodes, t.seed, ).run() } @@ -319,6 +386,13 @@ func (t *Test) buildVersion() version.Version { return *t.rt.BuildVersion() } +func (t *Test) runCommandFunc(nodes option.NodeListOption, cmd string) userFunc { + return func(l *logger.Logger, rng *rand.Rand, h *Helper) error { + l.Printf("running command `%s` on nodes %v", cmd, nodes) + return t.cluster.RunE(t.ctx, nodes, cmd) + } +} + // startFromCheckpointStep is the step that starts the cluster from a // specific `version`, using checked-in fixtures. type startFromCheckpointStep struct { @@ -328,7 +402,8 @@ type startFromCheckpointStep struct { crdbNodes option.NodeListOption } -func (s startFromCheckpointStep) ID() int { return s.id } +func (s startFromCheckpointStep) ID() int { return s.id } +func (s startFromCheckpointStep) Background() bool { return false } func (s startFromCheckpointStep) Description() string { return fmt.Sprintf("starting cluster from fixtures for version %q", s.version) @@ -349,12 +424,41 @@ func (s startFromCheckpointStep) Run( return err } - startOpts := option.DefaultStartOpts() + startOpts := option.DefaultStartOptsNoBackups() startOpts.RoachprodOpts.Sequential = false clusterupgrade.StartWithBinary(ctx, l, c, s.crdbNodes, binaryPath, startOpts) return nil } +// uploadCurrentVersionStep uploads the current cockroach binary to +// all DB nodes in the test. This is so that startup steps can use +// them (if, for instance, they need to run a workload). The binary +// will be located in `dest`. +type uploadCurrentVersionStep struct { + id int + rt test.Test + crdbNodes option.NodeListOption + dest string +} + +func (s uploadCurrentVersionStep) ID() int { return s.id } +func (s uploadCurrentVersionStep) Background() bool { return false } + +func (s uploadCurrentVersionStep) Description() string { + return fmt.Sprintf("upload current binary to all cockroach nodes (%v)", s.crdbNodes) +} + +func (s uploadCurrentVersionStep) Run( + ctx context.Context, l *logger.Logger, c cluster.Cluster, helper *Helper, +) error { + _, err := clusterupgrade.UploadVersion(ctx, s.rt, l, c, s.crdbNodes, clusterupgrade.MainVersion) + if err != nil { + return err + } + + return c.RunE(ctx, s.crdbNodes, fmt.Sprintf("mv ./cockroach %s", s.dest)) +} + // waitForStableClusterVersionStep implements the process of waiting // for the `version` cluster setting being the same on all nodes of // the cluster and equal to the binary version of the first node in @@ -364,7 +468,8 @@ type waitForStableClusterVersionStep struct { nodes option.NodeListOption } -func (s waitForStableClusterVersionStep) ID() int { return s.id } +func (s waitForStableClusterVersionStep) ID() int { return s.id } +func (s waitForStableClusterVersionStep) Background() bool { return false } func (s waitForStableClusterVersionStep) Description() string { return fmt.Sprintf( @@ -387,7 +492,8 @@ type preserveDowngradeOptionStep struct { prng *rand.Rand } -func (s preserveDowngradeOptionStep) ID() int { return s.id } +func (s preserveDowngradeOptionStep) ID() int { return s.id } +func (s preserveDowngradeOptionStep) Background() bool { return false } func (s preserveDowngradeOptionStep) Description() string { return "preventing auto-upgrades by setting `preserve_downgrade_option`" @@ -421,7 +527,8 @@ type restartWithNewBinaryStep struct { node int } -func (s restartWithNewBinaryStep) ID() int { return s.id } +func (s restartWithNewBinaryStep) ID() int { return s.id } +func (s restartWithNewBinaryStep) Background() bool { return false } func (s restartWithNewBinaryStep) Description() string { return fmt.Sprintf("restart node %d with binary version %s", s.node, versionMsg(s.version)) @@ -436,7 +543,12 @@ func (s restartWithNewBinaryStep) Run( l, c, c.Node(s.node), - option.DefaultStartOpts(), + // Disable regular backups in mixed-version tests, as some tests + // check for running jobs and the scheduled backup may make + // things non-deterministic. In the future, we should change the + // default and add an API for tests to opt-out of the default + // scheduled backup if necessary. + option.DefaultStartOptsNoBackups(), s.version, ) } @@ -450,7 +562,8 @@ type finalizeUpgradeStep struct { prng *rand.Rand } -func (s finalizeUpgradeStep) ID() int { return s.id } +func (s finalizeUpgradeStep) ID() int { return s.id } +func (s finalizeUpgradeStep) Background() bool { return false } func (s finalizeUpgradeStep) Description() string { return "finalize upgrade by resetting `preserve_downgrade_option`" @@ -472,9 +585,11 @@ type runHookStep struct { testContext Context prng *rand.Rand hook versionUpgradeHook + background bool } -func (s runHookStep) ID() int { return s.id } +func (s runHookStep) ID() int { return s.id } +func (s runHookStep) Background() bool { return s.background } func (s runHookStep) Description() string { return fmt.Sprintf("run %q", s.hook.name) @@ -559,13 +674,19 @@ func (h hooks) Filter(testContext Context) hooks { // returned. Otherwise, a `concurrentRunStep` is returned, where every // hook is run concurrently. func (h hooks) AsSteps( - label string, idGen func() int, prng *rand.Rand, nodes option.NodeListOption, testContext Context, + label string, + idGen func() int, + prng *rand.Rand, + nodes option.NodeListOption, + testContext Context, + background bool, ) []testStep { steps := make([]testStep, 0, len(h)) for _, hook := range h { hookPrng := rngFromRNG(prng) - rhs := runHookStep{id: idGen(), prng: hookPrng, hook: hook, testContext: testContext} - steps = append(steps, rhs) + steps = append(steps, runHookStep{ + id: idGen(), prng: hookPrng, hook: hook, background: background, testContext: testContext, + }) } if len(steps) <= 1 { @@ -579,6 +700,10 @@ func (th *testHooks) AddStartup(hook versionUpgradeHook) { th.startup = append(th.startup, hook) } +func (th *testHooks) AddBackground(hook versionUpgradeHook) { + th.background = append(th.background, hook) +} + func (th *testHooks) AddMixedVersion(hook versionUpgradeHook) { th.mixedVersion = append(th.mixedVersion, hook) } @@ -588,15 +713,21 @@ func (th *testHooks) AddAfterUpgradeFinalized(hook versionUpgradeHook) { } func (th *testHooks) StartupSteps(idGen func() int, testContext Context) []testStep { - return th.startup.AsSteps(startupLabel, idGen, th.prng, th.crdbNodes, testContext) + return th.startup.AsSteps(startupLabel, idGen, th.prng, th.crdbNodes, testContext, false) +} + +func (th *testHooks) BackgroundSteps(idGen func() int, testContext Context) []testStep { + return th.background.AsSteps(backgroundLabel, idGen, th.prng, th.crdbNodes, testContext, true) } func (th *testHooks) MixedVersionSteps(testContext Context, idGen func() int) []testStep { - return th.mixedVersion.Filter(testContext).AsSteps(mixedVersionLabel, idGen, th.prng, th.crdbNodes, testContext) + return th.mixedVersion. + Filter(testContext). + AsSteps(mixedVersionLabel, idGen, th.prng, th.crdbNodes, testContext, false) } func (th *testHooks) AfterUpgradeFinalizedSteps(idGen func() int, testContext Context) []testStep { - return th.afterUpgradeFinalized.AsSteps(afterTestLabel, idGen, th.prng, th.crdbNodes, testContext) + return th.afterUpgradeFinalized.AsSteps(afterTestLabel, idGen, th.prng, th.crdbNodes, testContext, false) } func randomDelay(rng *rand.Rand) time.Duration { diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/planner.go b/pkg/cmd/roachtest/roachtestutil/mixedversion/planner.go index 0fbabe2c39f3..50a233f08d9b 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/planner.go +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/planner.go @@ -69,14 +69,16 @@ const ( // this is happening. // - run after-test hooks. // -// TODO(renato): further opportunities for random exploration: going -// back multiple releases instead of just one; picking a patch release -// randomly instead of just the latest release. +// TODO(renato): further opportunities for random exploration: +// - going back multiple releases instead of just one +// - picking a patch release randomly instead of just the latest release +// - inserting arbitrary delays (`sleep` calls) during the test. func (p *testPlanner) Plan() *TestPlan { var steps []testStep addSteps := func(ss []testStep) { steps = append(steps, ss...) } addSteps(p.initSteps()) + addSteps(p.hooks.BackgroundSteps(p.nextID, p.initialContext())) // previous -> current addSteps(p.upgradeSteps(p.initialVersion, clusterupgrade.MainVersion)) @@ -119,6 +121,7 @@ func (p *testPlanner) finalContext(finalizing bool) Context { func (p *testPlanner) initSteps() []testStep { return append([]testStep{ startFromCheckpointStep{id: p.nextID(), version: p.initialVersion, rt: p.rt, crdbNodes: p.crdbNodes}, + uploadCurrentVersionStep{id: p.nextID(), rt: p.rt, crdbNodes: p.crdbNodes, dest: CurrentCockroachPath}, waitForStableClusterVersionStep{id: p.nextID(), nodes: p.crdbNodes}, preserveDowngradeOptionStep{id: p.nextID(), prng: p.newRNG(), crdbNodes: p.crdbNodes}, }, p.hooks.StartupSteps(p.nextID, p.initialContext())...) @@ -244,7 +247,10 @@ func (plan *TestPlan) prettyPrintStep(out *strings.Builder, step testStep, prefi case concurrentRunStep: writeNested(s.Description(), s.delayedSteps) case delayedStep: - delayStr := fmt.Sprintf("after %s delay", s.delay) + var delayStr string + if s.delay.Milliseconds() > 0 { + delayStr = fmt.Sprintf("after %s delay", s.delay) + } writeSingle(s.step.(singleStep), delayStr) default: writeSingle(s.(singleStep)) diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/planner_test.go b/pkg/cmd/roachtest/roachtestutil/mixedversion/planner_test.go index c078aaf62f74..5b920c64f8b2 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/planner_test.go +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/planner_test.go @@ -17,7 +17,9 @@ import ( "math/rand" "testing" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/clusterupgrade" "github.com/cockroachdb/cockroach/pkg/roachprod/logger" "github.com/cockroachdb/cockroach/pkg/util/version" @@ -68,10 +70,17 @@ func TestTestPlanner(t *testing.T) { mvt := newTest(t) mvt.InMixedVersion("mixed-version 1", dummyHook) mvt.InMixedVersion("mixed-version 2", dummyHook) + initBank := roachtestutil.NewCommand("./cockroach workload bank init") + runBank := roachtestutil.NewCommand("./cockroach workload run bank").Flag("max-ops", 100) + mvt.Workload("bank", nodes, initBank, runBank) + runRand := roachtestutil.NewCommand("./cockroach run rand").Flag("seed", 321) + mvt.Workload("rand", nodes, nil /* initCmd */, runRand) + csvServer := roachtestutil.NewCommand("./cockroach workload csv-server").Flag("port", 9999) + mvt.BackgroundCommand("csv server", nodes, csvServer) plan, err := mvt.plan() require.NoError(t, err) - require.Len(t, plan.steps, 9) + require.Len(t, plan.steps, 12) // Assert on the pretty-printed version of the test plan as that // asserts the ordering of the steps we want to take, and as a bonus @@ -79,33 +88,42 @@ func TestTestPlanner(t *testing.T) { expectedPrettyPlan := fmt.Sprintf(` mixed-version test plan for upgrading from %[1]s to : ├── starting cluster from fixtures for version "%[1]s" (1) -├── wait for nodes :1-4 to all have the same cluster version (same as binary version of node 1) (2) -├── preventing auto-upgrades by setting `+"`preserve_downgrade_option`"+` (3) +├── upload current binary to all cockroach nodes (:1-4) (2) +├── wait for nodes :1-4 to all have the same cluster version (same as binary version of node 1) (3) +├── preventing auto-upgrades by setting `+"`preserve_downgrade_option`"+` (4) +├── run "initialize bank workload" (5) +├── start background hooks concurrently +│ ├── run "bank workload", after 50ms delay (6) +│ ├── run "rand workload", after 50ms delay (7) +│ └── run "csv server", after 200ms delay (8) ├── upgrade nodes :1-4 from "%[1]s" to "" -│ ├── restart node 2 with binary version (4) -│ ├── restart node 1 with binary version (5) -│ ├── run "mixed-version 1" (6) -│ ├── restart node 4 with binary version (7) -│ ├── restart node 3 with binary version (8) -│ └── run "mixed-version 2" (9) -├── downgrade nodes :1-4 from "" to "%[1]s" -│ ├── restart node 3 with binary version %[1]s (10) -│ ├── restart node 4 with binary version %[1]s (11) +│ ├── restart node 4 with binary version (9) │ ├── run mixed-version hooks concurrently -│ │ ├── run "mixed-version 1", after 200ms delay (12) -│ │ └── run "mixed-version 2", after 200ms delay (13) -│ ├── restart node 2 with binary version %[1]s (14) -│ └── restart node 1 with binary version %[1]s (15) +│ │ ├── run "mixed-version 1", after 100ms delay (10) +│ │ └── run "mixed-version 2", after 100ms delay (11) +│ ├── restart node 3 with binary version (12) +│ ├── restart node 2 with binary version (13) +│ └── restart node 1 with binary version (14) +├── downgrade nodes :1-4 from "" to "%[1]s" +│ ├── restart node 2 with binary version %[1]s (15) +│ ├── run "mixed-version 1" (16) +│ ├── restart node 1 with binary version %[1]s (17) +│ ├── run "mixed-version 2" (18) +│ ├── restart node 3 with binary version %[1]s (19) +│ └── restart node 4 with binary version %[1]s (20) ├── upgrade nodes :1-4 from "%[1]s" to "" -│ ├── restart node 3 with binary version (16) -│ ├── run "mixed-version 1" (17) -│ ├── restart node 4 with binary version (18) -│ ├── restart node 1 with binary version (19) -│ ├── restart node 2 with binary version (20) -│ └── run "mixed-version 2" (21) -├── finalize upgrade by resetting `+"`preserve_downgrade_option`"+` (22) -├── run "mixed-version 2" (23) -└── wait for nodes :1-4 to all have the same cluster version (same as binary version of node 1) (24) +│ ├── restart node 4 with binary version (21) +│ ├── restart node 3 with binary version (22) +│ ├── restart node 1 with binary version (23) +│ ├── run mixed-version hooks concurrently +│ │ ├── run "mixed-version 1" (24) +│ │ └── run "mixed-version 2" (25) +│ └── restart node 2 with binary version (26) +├── finalize upgrade by resetting `+"`preserve_downgrade_option`"+` (27) +├── run mixed-version hooks concurrently +│ ├── run "mixed-version 1", after 100ms delay (28) +│ └── run "mixed-version 2" (29) +└── wait for nodes :1-4 to all have the same cluster version (same as binary version of node 1) (30) `, previousVersion, ) @@ -120,7 +138,7 @@ mixed-version test plan for upgrading from %[1]s to : mvt.OnStartup("startup 2", dummyHook) plan, err = mvt.plan() require.NoError(t, err) - requireConcurrentHooks(t, plan.steps[3], "startup 1", "startup 2") + requireConcurrentHooks(t, plan.steps[4], "startup 1", "startup 2") // Assert that AfterUpgradeFinalized hooks are scheduled to run in // the last step of the test. @@ -130,8 +148,8 @@ mixed-version test plan for upgrading from %[1]s to : mvt.AfterUpgradeFinalized("finalizer 3", dummyHook) plan, err = mvt.plan() require.NoError(t, err) - require.Len(t, plan.steps, 9) - requireConcurrentHooks(t, plan.steps[8], "finalizer 1", "finalizer 2", "finalizer 3") + require.Len(t, plan.steps, 10) + requireConcurrentHooks(t, plan.steps[9], "finalizer 1", "finalizer 2", "finalizer 3") } // TestDeterministicTestPlan tests that generating a test plan with @@ -154,6 +172,84 @@ func TestDeterministicTestPlan(t *testing.T) { } } +var unused float64 + +// TestDeterministicHookSeeds ensures that user functions passed to +// `InMixedVersion` always see the same sequence of values even if the +// PRNG passed to the `Test` struct is perturbed during runs. In other +// words, this ensures that user functions have at their disposal a +// random number generator that is unique to them and concurrency with +// other functions should not change the sequence of values they see +// as long as the RNG is used deterministically in the user function +// itself. +func TestDeterministicHookSeeds(t *testing.T) { + generateData := func(generateMoreRandomNumbers bool) [][]int { + var generatedData [][]int + mvt := newTest(t) + mvt.InMixedVersion("do something", func(_ *logger.Logger, rng *rand.Rand, _ *Helper) error { + var data []int + for j := 0; j < 5; j++ { + data = append(data, rng.Intn(100)) + } + + generatedData = append(generatedData, data) + + // Ensure that changing the top-level random number generator + // has no impact on the rng passed to the user function. + if generateMoreRandomNumbers { + for j := 0; j < 10; j++ { + unused = mvt.prng.Float64() + } + } + return nil + }) + + var ( + // these variables are not used by the hook so they can be nil + ctx = context.Background() + nilCluster cluster.Cluster + emptyHelper = &Helper{} + ) + + plan, err := mvt.plan() + require.NoError(t, err) + + // We can hardcode these paths since we are using a fixed seed in + // these tests. + firstRun := plan.steps[4].(sequentialRunStep).steps[2].(runHookStep) + require.Equal(t, "do something", firstRun.hook.name) + require.NoError(t, firstRun.Run(ctx, nilLogger, nilCluster, emptyHelper)) + + secondRun := plan.steps[5].(sequentialRunStep).steps[3].(runHookStep) + require.Equal(t, "do something", secondRun.hook.name) + require.NoError(t, secondRun.Run(ctx, nilLogger, nilCluster, emptyHelper)) + + thirdRun := plan.steps[6].(sequentialRunStep).steps[1].(runHookStep) + require.Equal(t, "do something", thirdRun.hook.name) + require.NoError(t, thirdRun.Run(ctx, nilLogger, nilCluster, emptyHelper)) + + fourthRun := plan.steps[8].(runHookStep) + require.Equal(t, "do something", fourthRun.hook.name) + require.NoError(t, fourthRun.Run(ctx, nilLogger, nilCluster, emptyHelper)) + + require.Len(t, generatedData, 4) + return generatedData + } + + expectedData := [][]int{ + {82, 1, 17, 3, 87}, + {73, 17, 6, 37, 43}, + {82, 35, 57, 54, 8}, + {7, 95, 26, 31, 65}, + } + const numRums = 50 + for j := 0; j < numRums; j++ { + for _, b := range []bool{true, false} { + require.Equal(t, expectedData, generateData(b), "j = %d | b = %t", j, b) + } + } +} + func newTest(t *testing.T) *Test { prng := rand.New(rand.NewSource(seed)) return &Test{ diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go b/pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go index 7f9826f6fe09..8e6aec140b96 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go @@ -16,10 +16,12 @@ import ( "fmt" "math/rand" "os" + "path" "path/filepath" "regexp" "strconv" "strings" + "sync/atomic" "time" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" @@ -35,24 +37,48 @@ type ( // Helper is the struct passed to user-functions providing helper // functions that mixed-version tests can use. Helper struct { - ctx context.Context - testContext *Context - conns []*gosql.DB - crdbNodes option.NodeListOption + ctx context.Context + testContext *Context + backgroundCount int64 + runner *testRunner + stepLogger *logger.Logger + } + + // backgroundEvent is the struct sent by background steps when they + // finish (successfully or not). + backgroundEvent struct { + Name string + Err error + } - stepLogger *logger.Logger + backgroundRunner struct { + group *errgroup.Group + ctx context.Context + events chan backgroundEvent + } + + testFailure struct { + displayed bool + description string + seed int64 + binaryVersions []roachpb.Version + clusterVersionsBefore []roachpb.Version + clusterVersionsAfter []roachpb.Version } testRunner struct { ctx context.Context + cancel context.CancelFunc plan *TestPlan cluster cluster.Cluster crdbNodes option.NodeListOption seed int64 logger *logger.Logger - binaryVersions []roachpb.Version - clusterVersions []roachpb.Version + binaryVersions atomic.Value + clusterVersions atomic.Value + + background *backgroundRunner connCache []*gosql.DB } @@ -65,6 +91,7 @@ var ( func newTestRunner( ctx context.Context, + cancel context.CancelFunc, plan *TestPlan, l *logger.Logger, c cluster.Cluster, @@ -72,12 +99,14 @@ func newTestRunner( randomSeed int64, ) *testRunner { return &testRunner{ - ctx: ctx, - plan: plan, - logger: l, - cluster: c, - crdbNodes: crdbNodes, - seed: randomSeed, + ctx: ctx, + cancel: cancel, + plan: plan, + logger: l, + cluster: c, + crdbNodes: crdbNodes, + background: newBackgroundRunner(ctx), + seed: randomSeed, } } @@ -85,14 +114,39 @@ func newTestRunner( // each step in sequence. func (tr *testRunner) run() error { defer tr.closeConnections() + defer func() { + tr.logger.Printf("canceling mixed-version test context") + tr.cancel() + // Wait for some time so that any background tasks are properly + // canceled and their cancelation messages are displayed in the + // logs accordingly. + time.Sleep(5 * time.Second) + }() + + stepsErr := make(chan error) + go func() { + defer close(stepsErr) + for _, step := range tr.plan.steps { + if err := tr.runStep(step); err != nil { + stepsErr <- err + return + } + } + }() - for _, step := range tr.plan.steps { - if err := tr.runStep(step); err != nil { + for { + select { + case err := <-stepsErr: return err + case event := <-tr.background.Events(): + if event.Err == nil { + tr.logger.Printf("background step finished: %s", event.Name) + continue + } + + return fmt.Errorf("background step `%s` returned error: %w", event.Name, event.Err) } } - - return nil } // runStep contains the logic of running a single test step, called @@ -153,53 +207,87 @@ func (tr *testRunner) runStep(step testStep) error { return err } - tr.logStep("STARTING", ss, stepLogger) - tr.logVersions(stepLogger) - start := timeutil.Now() - defer func() { - prefix := fmt.Sprintf("FINISHED [%s]", timeutil.Since(start)) - tr.logStep(prefix, ss, stepLogger) - }() - if err := ss.Run(tr.ctx, stepLogger, tr.cluster, tr.newHelper(stepLogger)); err != nil { - return tr.reportError(err, ss, stepLogger) + if ss.Background() { + tr.startBackgroundStep(ss, stepLogger) + return nil } - return nil + return tr.runSingleStep(tr.ctx, ss, stepLogger) + } +} + +// runSingleStep takes care of the logic of running a `singleStep`, +// including logging start and finish times, wrapping the error (if +// any) with useful information, and renaming the log file to indicate +// failure. This logic is the same whether running a step in the +// background or not. +func (tr *testRunner) runSingleStep(ctx context.Context, ss singleStep, l *logger.Logger) error { + tr.logStep("STARTING", ss, l) + tr.logVersions(l) + start := timeutil.Now() + defer func() { + prefix := fmt.Sprintf("FINISHED [%s]", timeutil.Since(start)) + tr.logStep(prefix, ss, l) + }() + if err := ss.Run(ctx, l, tr.cluster, tr.newHelper(l)); err != nil { + if isContextCanceled(err) { + l.Printf("step terminated (context canceled)") + return nil + } + return tr.stepError(err, ss, l) } + + return nil +} + +func (tr *testRunner) startBackgroundStep(ss singleStep, l *logger.Logger) { + tr.background.Start(ss.Description(), func(ctx context.Context) error { + return tr.runSingleStep(ctx, ss, l) + }) } -// reportError augments the error passed with extra -// information. Specifically, the error message will include the ID of -// the step that failed, the random seed used, the binary version on -// each node when the error occurred, and the cluster version before -// and after the step (in case the failure happened *while* the -// cluster version was updating). -func (tr *testRunner) reportError(err error, step singleStep, l *logger.Logger) error { - errMsg := fmt.Sprintf("mixed-version test failure while running step %d (%s): %s", +// stepError augments generates a `testFailure` error by augmenting +// the error passed with extra information. Specifically, the error +// message will include the ID of the step that failed, the random +// seed used, the binary version on each node when the error occurred, +// and the cluster version before and after the step (in case the +// failure happened *while* the cluster version was updating). +func (tr *testRunner) stepError(err error, step singleStep, l *logger.Logger) error { + desc := fmt.Sprintf("mixed-version test failure while running step %d (%s): %s", step.ID(), step.Description(), err, ) - debugInfo := func(label, value string) string { - return fmt.Sprintf("%-40s%s", label+":", value) - } - seedInfo := debugInfo("test random seed", strconv.FormatInt(tr.seed, 10)) - binaryVersions := debugInfo("binary versions", formatVersions(tr.binaryVersions)) - clusterVersionsBefore := debugInfo("cluster versions before failure", formatVersions(tr.clusterVersions)) - var clusterVersionsAfter string - if err := tr.refreshClusterVersions(); err == nil { - clusterVersionsBefore += "\n" - clusterVersionsAfter = debugInfo("cluster versions after failure", formatVersions(tr.clusterVersions)) - } else { + + return tr.testFailure(desc, l) +} + +// testFailure generates a `testFailure` with the given +// description. It logs the error to the logger passed, and renames +// the underlying file to include the "FAILED" prefix to help in +// debugging. +func (tr *testRunner) testFailure(desc string, l *logger.Logger) error { + clusterVersionsBefore := tr.clusterVersions + if err := tr.refreshClusterVersions(); err != nil { tr.logger.Printf("failed to fetch cluster versions after failure: %s", err) } + clusterVersionsAfter := tr.clusterVersions + + tf := &testFailure{ + description: desc, + seed: tr.seed, + binaryVersions: loadAtomicVersions(tr.binaryVersions), + clusterVersionsBefore: loadAtomicVersions(clusterVersionsBefore), + clusterVersionsAfter: loadAtomicVersions(clusterVersionsAfter), + } + + // print the test failure on the step's logger for convenience, and + // to reduce cross referencing of logs. + l.Printf("%v", tf) if err := renameFailedLogger(l); err != nil { tr.logger.Printf("could not rename failed step logger: %v", err) } - return fmt.Errorf( - "%s\n%s\n%s\n%s%s", - errMsg, seedInfo, binaryVersions, clusterVersionsBefore, clusterVersionsAfter, - ) + return tf } func (tr *testRunner) logStep(prefix string, step singleStep, l *logger.Logger) { @@ -211,12 +299,15 @@ func (tr *testRunner) logStep(prefix string, step singleStep, l *logger.Logger) // cluster versions on each node. The cached versions should exist for // all steps but the first one (when we start the cluster itself). func (tr *testRunner) logVersions(l *logger.Logger) { - if tr.binaryVersions == nil || tr.clusterVersions == nil { + binaryVersions := loadAtomicVersions(tr.binaryVersions) + clusterVersions := loadAtomicVersions(tr.clusterVersions) + + if binaryVersions == nil || clusterVersions == nil { return } - l.Printf("binary versions: %s", formatVersions(tr.binaryVersions)) - l.Printf("cluster versions: %s", formatVersions(tr.clusterVersions)) + l.Printf("binary versions: %s", formatVersions(binaryVersions)) + l.Printf("cluster versions: %s", formatVersions(clusterVersions)) } // loggerFor creates a logger instance to be used by a test step. Logs @@ -227,22 +318,25 @@ func (tr *testRunner) loggerFor(step singleStep) (*logger.Logger, error) { name := invalidChars.ReplaceAllString(strings.ToLower(step.Description()), "") name = fmt.Sprintf("%d_%s", step.ID(), name) - prefix := fmt.Sprintf("%s/%s", logPrefix, name) + prefix := path.Join(logPrefix, name) return prefixedLogger(tr.logger, prefix) } // refreshBinaryVersions updates the internal `binaryVersions` field -// with the binary version running on each node of the cluster. +// with the binary version running on each node of the cluster. We use +// the `atomic` package here as this function may be called by two +// steps that are running concurrently. func (tr *testRunner) refreshBinaryVersions() error { - tr.binaryVersions = make([]roachpb.Version, 0, len(tr.crdbNodes)) + newBinaryVersions := make([]roachpb.Version, 0, len(tr.crdbNodes)) for _, node := range tr.crdbNodes { bv, err := clusterupgrade.BinaryVersion(tr.conn(node)) if err != nil { return fmt.Errorf("failed to get binary version for node %d: %w", node, err) } - tr.binaryVersions = append(tr.binaryVersions, bv) + newBinaryVersions = append(newBinaryVersions, bv) } + tr.binaryVersions.Store(newBinaryVersions) return nil } @@ -250,15 +344,16 @@ func (tr *testRunner) refreshBinaryVersions() error { // with the current view of the cluster version in each of the nodes // of the cluster. func (tr *testRunner) refreshClusterVersions() error { - tr.clusterVersions = make([]roachpb.Version, 0, len(tr.crdbNodes)) + newClusterVersions := make([]roachpb.Version, 0, len(tr.crdbNodes)) for _, node := range tr.crdbNodes { cv, err := clusterupgrade.ClusterVersion(tr.ctx, tr.conn(node)) if err != nil { return fmt.Errorf("failed to get cluster version for node %d: %w", node, err) } - tr.clusterVersions = append(tr.clusterVersions, cv) + newClusterVersions = append(newClusterVersions, cv) } + tr.clusterVersions.Store(newClusterVersions) return nil } @@ -294,8 +389,7 @@ func (tr *testRunner) maybeInitConnections() error { func (tr *testRunner) newHelper(l *logger.Logger) *Helper { return &Helper{ ctx: tr.ctx, - conns: tr.connCache, - crdbNodes: tr.crdbNodes, + runner: tr, stepLogger: l, } } @@ -314,10 +408,41 @@ func (tr *testRunner) closeConnections() { } } +func newBackgroundRunner(ctx context.Context) *backgroundRunner { + g, _ := errgroup.WithContext(ctx) + return &backgroundRunner{ + group: g, + ctx: ctx, + events: make(chan backgroundEvent), + } +} + +// Start will run the function `fn` in a goroutine. Any errors +// returned by that function are observable by reading from the +// channel returned by the `Events()` function. +func (br *backgroundRunner) Start(name string, fn func(context.Context) error) { + br.group.Go(func() error { + err := fn(br.ctx) + br.events <- backgroundEvent{ + Name: name, + Err: err, + } + return err + }) +} + +func (br *backgroundRunner) Events() <-chan backgroundEvent { + return br.events +} + +func (h *Helper) RandomNode(prng *rand.Rand, nodes option.NodeListOption) int { + return nodes[prng.Intn(len(nodes))] +} + // RandomDB returns a (nodeID, connection) tuple for a randomly picked // cockroach node according to the parameters passed. func (h *Helper) RandomDB(prng *rand.Rand, nodes option.NodeListOption) (int, *gosql.DB) { - node := nodes[prng.Intn(len(nodes))] + node := h.RandomNode(prng, nodes) return node, h.Connect(node) } @@ -325,7 +450,7 @@ func (h *Helper) RandomDB(prng *rand.Rand, nodes option.NodeListOption) (int, *g // database node. The query and the node picked are logged in the logs // of the step that calls this function. func (h *Helper) QueryRow(rng *rand.Rand, query string, args ...interface{}) *gosql.Row { - node, db := h.RandomDB(rng, h.crdbNodes) + node, db := h.RandomDB(rng, h.runner.crdbNodes) h.stepLogger.Printf("running SQL statement:\n%s\nArgs: %v\nNode: %d", query, args, node) return db.QueryRowContext(h.ctx, query, args...) } @@ -334,14 +459,14 @@ func (h *Helper) QueryRow(rng *rand.Rand, query string, args ...interface{}) *go // The query and the node picked are logged in the logs of the step // that calls this function. func (h *Helper) Exec(rng *rand.Rand, query string, args ...interface{}) error { - node, db := h.RandomDB(rng, h.crdbNodes) + node, db := h.RandomDB(rng, h.runner.crdbNodes) h.stepLogger.Printf("running SQL statement:\n%s\nArgs: %v\nNode: %d", query, args, node) _, err := db.ExecContext(h.ctx, query, args...) return err } func (h *Helper) Connect(node int) *gosql.DB { - return h.conns[node-1] + return h.runner.conn(node) } // SetContext should be called by steps that need access to the test @@ -356,13 +481,99 @@ func (h *Helper) Context() *Context { return h.testContext } +// Background allows test writers to create functions that run in the +// background in mixed-version hooks. +func (h *Helper) Background(name string, fn func(context.Context, *logger.Logger) error) { + h.runner.background.Start(name, func(ctx context.Context) error { + bgLogger, err := h.loggerFor(name) + if err != nil { + return fmt.Errorf("failed to create logger for background function %q: %w", name, err) + } + + err = fn(ctx, bgLogger) + if err != nil { + if isContextCanceled(err) { + bgLogger.Printf("background function terminated (context canceled)") + return nil + } + + desc := fmt.Sprintf("error in background function: %s", err) + return h.runner.testFailure(desc, bgLogger) + } + + return nil + }) +} + +// BackgroundCommand has the same semantics of `Background()`; the +// command passed will run and the test will fail if the command is +// not successful. +func (h *Helper) BackgroundCommand(cmd string, nodes option.NodeListOption) { + desc := fmt.Sprintf("run command: %q", cmd) + h.Background(desc, func(ctx context.Context, l *logger.Logger) error { + l.Printf("running command `%s` on nodes %v in the background", cmd, nodes) + return h.runner.cluster.RunE(ctx, nodes, cmd) + }) +} + +// loggerFor creates a logger instance to be used by background +// functions (created by calling `Background` on the helper +// instance). It is similar to the logger instances created for +// mixed-version steps, but with the `background_` prefix. +func (h *Helper) loggerFor(name string) (*logger.Logger, error) { + atomic.AddInt64(&h.backgroundCount, 1) + + fileName := invalidChars.ReplaceAllString(strings.ToLower(name), "") + fileName = fmt.Sprintf("background_%s_%d", fileName, h.backgroundCount) + fileName = path.Join(logPrefix, fileName) + + return prefixedLogger(h.runner.logger, fileName) +} + +func (tf *testFailure) Error() string { + if tf.displayed { + return tf.description + } + + tf.displayed = true + debugInfo := func(label, value string) string { + return fmt.Sprintf("%-40s%s", label+":", value) + } + seedInfo := debugInfo("test random seed", strconv.FormatInt(tf.seed, 10)) + binaryVersions := debugInfo("binary versions", formatVersions(tf.binaryVersions)) + clusterVersionsBefore := debugInfo( + "cluster versions before failure", + formatVersions(tf.clusterVersionsBefore), + ) + var clusterVersionsAfter string + if cv := tf.clusterVersionsAfter; cv != nil { + clusterVersionsBefore += "\n" + clusterVersionsAfter = debugInfo("cluster versions after failure", formatVersions(cv)) + } + + return fmt.Sprintf( + "%s\n%s\n%s\n%s%s", + tf.description, seedInfo, binaryVersions, clusterVersionsBefore, clusterVersionsAfter, + ) +} + func renameFailedLogger(l *logger.Logger) error { currentFileName := l.File.Name() - newLogName := strings.TrimSuffix(currentFileName, filepath.Ext(currentFileName)) - newLogName += "_FAILED.log" + newLogName := path.Join( + filepath.Dir(currentFileName), + "FAILED_"+filepath.Base(currentFileName), + ) return os.Rename(currentFileName, newLogName) } +func loadAtomicVersions(v atomic.Value) []roachpb.Version { + if v.Load() == nil { + return nil + } + + return v.Load().([]roachpb.Version) +} + func formatVersions(versions []roachpb.Version) string { var pairs []string for idx, version := range versions { @@ -371,3 +582,16 @@ func formatVersions(versions []roachpb.Version) string { return fmt.Sprintf("[%s]", strings.Join(pairs, ", ")) } + +// isContextCanceled returns a boolean indicating whether the error +// given happened because some context was canceled. +func isContextCanceled(err error) bool { + // TODO(renato): unfortunately, we have to resort to string + // comparison here. The most common use case for this function is + // detecting cluster commands that fail when the test context is + // canceled (after test success or failure), and roachtest does not + // return an error that wraps the context cancelation (in other + // words, `errors.Is` doesn't work). Once we fix this behavior, we + // should use structured errors here. + return strings.Contains(err.Error(), context.Canceled.Error()) +}