diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel b/pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel index 75f083b93aee..37f780e5a2ca 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel @@ -13,6 +13,7 @@ go_library( deps = [ "//pkg/cmd/roachtest/cluster", "//pkg/cmd/roachtest/option", + "//pkg/cmd/roachtest/roachtestutil", "//pkg/cmd/roachtest/roachtestutil/clusterupgrade", "//pkg/cmd/roachtest/test", "//pkg/roachpb", @@ -30,7 +31,9 @@ go_test( args = ["-test.timeout=295s"], embed = [":mixedversion"], deps = [ + "//pkg/cmd/roachtest/cluster", "//pkg/cmd/roachtest/option", + "//pkg/cmd/roachtest/roachtestutil", "//pkg/cmd/roachtest/roachtestutil/clusterupgrade", "//pkg/roachprod/logger", "//pkg/util/version", diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go b/pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go index 9bb0700e7440..cbfd554107ec 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go @@ -35,14 +35,18 @@ // // Typical usage: // -// mvt, err := NewMixedVersionTest(...) -// mvt.InMixedVersion("test my feature", func(l *logger.Logger, db *gosql.DB) error { +// mvt, err := mixedversion.NewTest(...) +// mvt.InMixedVersion("test my feature", func(l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error { // l.Printf("testing feature X") +// node, db := h.RandomDB(rng, c.All()) +// l.Printf("running query on node %d", node) // _, err := db.ExecContext(ctx, "SELECT * FROM test") // return err // }) -// mvt.InMixedVersion("test another feature", func(l *logger.Logger, db *gosql.DB) error { +// mvt.InMixedVersion("test another feature", func(l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error { // l.Printf("testing feature Y") +// node, db := h.RandomDB(rng, c.All()) +// l.Printf("running query on node %d", node) // _, err := db.ExecContext(ctx, "SELECT * FROM test2") // return err // }) @@ -72,6 +76,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/clusterupgrade" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" "github.com/cockroachdb/cockroach/pkg/roachprod/logger" @@ -82,6 +87,7 @@ import ( const ( logPrefix = "mixed-version-test" startupLabel = "run startup hooks" + backgroundLabel = "start background hooks" mixedVersionLabel = "run mixed-version hooks" afterTestLabel = "run after test hooks" @@ -92,6 +98,13 @@ const ( // of migration steps before the new cluster version can be // finalized. runWhileMigratingProbability = 0.5 + + // CurrentCockroachPath is the path to the binary where the current + // version of cockroach being tested is located. This file is + // uploaded before any user functions are run. The primary use case + // are tests that need long runnig background functions on startup + // (such as running a workload). + CurrentCockroachPath = "./cockroach-current" ) var ( @@ -157,6 +170,11 @@ type ( // for human-consumption. Displayed when pretty-printing the test // plan. Description() string + // Background indicates whether the step should be run in the + // background. When a step is *not* run in the background, the + // test will wait for it to finish before moving on. When a + // background step fails, the entire test fails. + Background() bool // Run implements the actual functionality of the step. Run(context.Context, *logger.Logger, cluster.Cluster, *Helper) error } @@ -167,6 +185,7 @@ type ( // its different stages: startup, mixed-version, and after-test. testHooks struct { startup hooks + background hooks mixedVersion hooks afterUpgradeFinalized hooks @@ -177,6 +196,7 @@ type ( // Test is the main struct callers of this package interact with. Test struct { ctx context.Context + cancel context.CancelFunc cluster cluster.Cluster logger *logger.Logger crdbNodes option.NodeListOption @@ -209,8 +229,10 @@ func NewTest( prng, seed := randutil.NewPseudoRand() testLogger.Printf("mixed-version random seed: %d", seed) + testCtx, cancel := context.WithCancel(ctx) return &Test{ - ctx: ctx, + ctx: testCtx, + cancel: cancel, cluster: c, logger: testLogger, crdbNodes: crdbNodes, @@ -248,7 +270,7 @@ func (t *Test) InMixedVersion(desc string, fn userFunc) { return len(testContext.ToVersionNodes) == numUpgradedNodes } - t.hooks.AddMixedVersion(versionUpgradeHook{desc, predicate, fn}) + t.hooks.AddMixedVersion(versionUpgradeHook{name: desc, predicate: predicate, fn: fn}) } // OnStartup registers a callback that is run once the cluster is @@ -260,7 +282,7 @@ func (t *Test) OnStartup(desc string, fn userFunc) { // Since the callbacks here are only referenced in the setup steps // of the planner, there is no need to have a predicate function // gating them. - t.hooks.AddStartup(versionUpgradeHook{desc, nil, fn}) + t.hooks.AddStartup(versionUpgradeHook{name: desc, fn: fn}) } // AfterUpgradeFinalized registers a callback that is run once the @@ -268,7 +290,52 @@ func (t *Test) OnStartup(desc string, fn userFunc) { // and allowed the upgrade to finalize successfully. If multiple such // hooks are passed, they will be executed concurrently. func (t *Test) AfterUpgradeFinalized(desc string, fn userFunc) { - t.hooks.AddAfterUpgradeFinalized(versionUpgradeHook{desc, nil, fn}) + t.hooks.AddAfterUpgradeFinalized(versionUpgradeHook{name: desc, fn: fn}) +} + +// BackgroundFunc runs the function passed as argument in the +// background during the test. Background functions are kicked off +// once the cluster has been initialized (i.e., after all startup +// steps have finished). +func (t *Test) BackgroundFunc(desc string, fn userFunc) { + t.hooks.AddBackground(versionUpgradeHook{name: desc, fn: fn}) +} + +// BackgroundCommand is a convenience wrapper around `BackgroundFunc` +// that runs the command passed once the cluster is initialized. The +// node where the command runs is picked randomly. +// +// TODO: unfortunately, `cluster.Run()` does not allow the caller to +// pass a logger instance. It would be convenient if the output of the +// command itself lived within the `mixed-version/*.log` files. +func (t *Test) BackgroundCommand( + desc string, nodes option.NodeListOption, cmd *roachtestutil.Command, +) { + t.BackgroundFunc(desc, t.runCommandFunc(nodes, cmd.String())) +} + +// Workload is a convenience wrapper that allows callers to run +// workloads in the background during a mixed-version test. `initCmd`, +// if passed, is the command run to initialize the workload; it is run +// synchronously as a regular startup function. `runCmd` is the +// command to actually run the command; it is run in the background. +func (t *Test) Workload( + name string, node option.NodeListOption, initCmd, runCmd *roachtestutil.Command, +) { + seed := uint64(t.prng.Int63()) + addSeed := func(cmd *roachtestutil.Command) { + if !cmd.HasFlag("seed") { + cmd.Flag("seed", seed) + } + } + + if initCmd != nil { + addSeed(initCmd) + t.OnStartup(fmt.Sprintf("initialize %s workload", name), t.runCommandFunc(node, initCmd.String())) + } + + addSeed(runCmd) + t.BackgroundCommand(fmt.Sprintf("%s workload", name), node, runCmd) } // Run runs the mixed-version test. It should be called once all @@ -290,7 +357,7 @@ func (t *Test) Run() { func (t *Test) run(plan *TestPlan) error { return newTestRunner( - t.ctx, plan, t.logger, t.cluster, t.crdbNodes, t.seed, + t.ctx, t.cancel, plan, t.logger, t.cluster, t.crdbNodes, t.seed, ).run() } @@ -319,6 +386,13 @@ func (t *Test) buildVersion() version.Version { return *t.rt.BuildVersion() } +func (t *Test) runCommandFunc(nodes option.NodeListOption, cmd string) userFunc { + return func(l *logger.Logger, rng *rand.Rand, h *Helper) error { + l.Printf("running command `%s` on nodes %v", cmd, nodes) + return t.cluster.RunE(t.ctx, nodes, cmd) + } +} + // startFromCheckpointStep is the step that starts the cluster from a // specific `version`, using checked-in fixtures. type startFromCheckpointStep struct { @@ -328,7 +402,8 @@ type startFromCheckpointStep struct { crdbNodes option.NodeListOption } -func (s startFromCheckpointStep) ID() int { return s.id } +func (s startFromCheckpointStep) ID() int { return s.id } +func (s startFromCheckpointStep) Background() bool { return false } func (s startFromCheckpointStep) Description() string { return fmt.Sprintf("starting cluster from fixtures for version %q", s.version) @@ -349,12 +424,41 @@ func (s startFromCheckpointStep) Run( return err } - startOpts := option.DefaultStartOpts() + startOpts := option.DefaultStartOptsNoBackups() startOpts.RoachprodOpts.Sequential = false clusterupgrade.StartWithBinary(ctx, l, c, s.crdbNodes, binaryPath, startOpts) return nil } +// uploadCurrentVersionStep uploads the current cockroach binary to +// all DB nodes in the test. This is so that startup steps can use +// them (if, for instance, they need to run a workload). The binary +// will be located in `dest`. +type uploadCurrentVersionStep struct { + id int + rt test.Test + crdbNodes option.NodeListOption + dest string +} + +func (s uploadCurrentVersionStep) ID() int { return s.id } +func (s uploadCurrentVersionStep) Background() bool { return false } + +func (s uploadCurrentVersionStep) Description() string { + return fmt.Sprintf("upload current binary to all cockroach nodes (%v)", s.crdbNodes) +} + +func (s uploadCurrentVersionStep) Run( + ctx context.Context, l *logger.Logger, c cluster.Cluster, helper *Helper, +) error { + _, err := clusterupgrade.UploadVersion(ctx, s.rt, l, c, s.crdbNodes, clusterupgrade.MainVersion) + if err != nil { + return err + } + + return c.RunE(ctx, s.crdbNodes, fmt.Sprintf("mv ./cockroach %s", s.dest)) +} + // waitForStableClusterVersionStep implements the process of waiting // for the `version` cluster setting being the same on all nodes of // the cluster and equal to the binary version of the first node in @@ -364,7 +468,8 @@ type waitForStableClusterVersionStep struct { nodes option.NodeListOption } -func (s waitForStableClusterVersionStep) ID() int { return s.id } +func (s waitForStableClusterVersionStep) ID() int { return s.id } +func (s waitForStableClusterVersionStep) Background() bool { return false } func (s waitForStableClusterVersionStep) Description() string { return fmt.Sprintf( @@ -387,7 +492,8 @@ type preserveDowngradeOptionStep struct { prng *rand.Rand } -func (s preserveDowngradeOptionStep) ID() int { return s.id } +func (s preserveDowngradeOptionStep) ID() int { return s.id } +func (s preserveDowngradeOptionStep) Background() bool { return false } func (s preserveDowngradeOptionStep) Description() string { return "preventing auto-upgrades by setting `preserve_downgrade_option`" @@ -421,7 +527,8 @@ type restartWithNewBinaryStep struct { node int } -func (s restartWithNewBinaryStep) ID() int { return s.id } +func (s restartWithNewBinaryStep) ID() int { return s.id } +func (s restartWithNewBinaryStep) Background() bool { return false } func (s restartWithNewBinaryStep) Description() string { return fmt.Sprintf("restart node %d with binary version %s", s.node, versionMsg(s.version)) @@ -436,7 +543,12 @@ func (s restartWithNewBinaryStep) Run( l, c, c.Node(s.node), - option.DefaultStartOpts(), + // Disable regular backups in mixed-version tests, as some tests + // check for running jobs and the scheduled backup may make + // things non-deterministic. In the future, we should change the + // default and add an API for tests to opt-out of the default + // scheduled backup if necessary. + option.DefaultStartOptsNoBackups(), s.version, ) } @@ -450,7 +562,8 @@ type finalizeUpgradeStep struct { prng *rand.Rand } -func (s finalizeUpgradeStep) ID() int { return s.id } +func (s finalizeUpgradeStep) ID() int { return s.id } +func (s finalizeUpgradeStep) Background() bool { return false } func (s finalizeUpgradeStep) Description() string { return "finalize upgrade by resetting `preserve_downgrade_option`" @@ -472,9 +585,11 @@ type runHookStep struct { testContext Context prng *rand.Rand hook versionUpgradeHook + background bool } -func (s runHookStep) ID() int { return s.id } +func (s runHookStep) ID() int { return s.id } +func (s runHookStep) Background() bool { return s.background } func (s runHookStep) Description() string { return fmt.Sprintf("run %q", s.hook.name) @@ -559,13 +674,19 @@ func (h hooks) Filter(testContext Context) hooks { // returned. Otherwise, a `concurrentRunStep` is returned, where every // hook is run concurrently. func (h hooks) AsSteps( - label string, idGen func() int, prng *rand.Rand, nodes option.NodeListOption, testContext Context, + label string, + idGen func() int, + prng *rand.Rand, + nodes option.NodeListOption, + testContext Context, + background bool, ) []testStep { steps := make([]testStep, 0, len(h)) for _, hook := range h { hookPrng := rngFromRNG(prng) - rhs := runHookStep{id: idGen(), prng: hookPrng, hook: hook, testContext: testContext} - steps = append(steps, rhs) + steps = append(steps, runHookStep{ + id: idGen(), prng: hookPrng, hook: hook, background: background, testContext: testContext, + }) } if len(steps) <= 1 { @@ -579,6 +700,10 @@ func (th *testHooks) AddStartup(hook versionUpgradeHook) { th.startup = append(th.startup, hook) } +func (th *testHooks) AddBackground(hook versionUpgradeHook) { + th.background = append(th.background, hook) +} + func (th *testHooks) AddMixedVersion(hook versionUpgradeHook) { th.mixedVersion = append(th.mixedVersion, hook) } @@ -588,15 +713,21 @@ func (th *testHooks) AddAfterUpgradeFinalized(hook versionUpgradeHook) { } func (th *testHooks) StartupSteps(idGen func() int, testContext Context) []testStep { - return th.startup.AsSteps(startupLabel, idGen, th.prng, th.crdbNodes, testContext) + return th.startup.AsSteps(startupLabel, idGen, th.prng, th.crdbNodes, testContext, false) +} + +func (th *testHooks) BackgroundSteps(idGen func() int, testContext Context) []testStep { + return th.background.AsSteps(backgroundLabel, idGen, th.prng, th.crdbNodes, testContext, true) } func (th *testHooks) MixedVersionSteps(testContext Context, idGen func() int) []testStep { - return th.mixedVersion.Filter(testContext).AsSteps(mixedVersionLabel, idGen, th.prng, th.crdbNodes, testContext) + return th.mixedVersion. + Filter(testContext). + AsSteps(mixedVersionLabel, idGen, th.prng, th.crdbNodes, testContext, false) } func (th *testHooks) AfterUpgradeFinalizedSteps(idGen func() int, testContext Context) []testStep { - return th.afterUpgradeFinalized.AsSteps(afterTestLabel, idGen, th.prng, th.crdbNodes, testContext) + return th.afterUpgradeFinalized.AsSteps(afterTestLabel, idGen, th.prng, th.crdbNodes, testContext, false) } func randomDelay(rng *rand.Rand) time.Duration { diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/planner.go b/pkg/cmd/roachtest/roachtestutil/mixedversion/planner.go index 0fbabe2c39f3..50a233f08d9b 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/planner.go +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/planner.go @@ -69,14 +69,16 @@ const ( // this is happening. // - run after-test hooks. // -// TODO(renato): further opportunities for random exploration: going -// back multiple releases instead of just one; picking a patch release -// randomly instead of just the latest release. +// TODO(renato): further opportunities for random exploration: +// - going back multiple releases instead of just one +// - picking a patch release randomly instead of just the latest release +// - inserting arbitrary delays (`sleep` calls) during the test. func (p *testPlanner) Plan() *TestPlan { var steps []testStep addSteps := func(ss []testStep) { steps = append(steps, ss...) } addSteps(p.initSteps()) + addSteps(p.hooks.BackgroundSteps(p.nextID, p.initialContext())) // previous -> current addSteps(p.upgradeSteps(p.initialVersion, clusterupgrade.MainVersion)) @@ -119,6 +121,7 @@ func (p *testPlanner) finalContext(finalizing bool) Context { func (p *testPlanner) initSteps() []testStep { return append([]testStep{ startFromCheckpointStep{id: p.nextID(), version: p.initialVersion, rt: p.rt, crdbNodes: p.crdbNodes}, + uploadCurrentVersionStep{id: p.nextID(), rt: p.rt, crdbNodes: p.crdbNodes, dest: CurrentCockroachPath}, waitForStableClusterVersionStep{id: p.nextID(), nodes: p.crdbNodes}, preserveDowngradeOptionStep{id: p.nextID(), prng: p.newRNG(), crdbNodes: p.crdbNodes}, }, p.hooks.StartupSteps(p.nextID, p.initialContext())...) @@ -244,7 +247,10 @@ func (plan *TestPlan) prettyPrintStep(out *strings.Builder, step testStep, prefi case concurrentRunStep: writeNested(s.Description(), s.delayedSteps) case delayedStep: - delayStr := fmt.Sprintf("after %s delay", s.delay) + var delayStr string + if s.delay.Milliseconds() > 0 { + delayStr = fmt.Sprintf("after %s delay", s.delay) + } writeSingle(s.step.(singleStep), delayStr) default: writeSingle(s.(singleStep)) diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/planner_test.go b/pkg/cmd/roachtest/roachtestutil/mixedversion/planner_test.go index c078aaf62f74..5b920c64f8b2 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/planner_test.go +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/planner_test.go @@ -17,7 +17,9 @@ import ( "math/rand" "testing" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/clusterupgrade" "github.com/cockroachdb/cockroach/pkg/roachprod/logger" "github.com/cockroachdb/cockroach/pkg/util/version" @@ -68,10 +70,17 @@ func TestTestPlanner(t *testing.T) { mvt := newTest(t) mvt.InMixedVersion("mixed-version 1", dummyHook) mvt.InMixedVersion("mixed-version 2", dummyHook) + initBank := roachtestutil.NewCommand("./cockroach workload bank init") + runBank := roachtestutil.NewCommand("./cockroach workload run bank").Flag("max-ops", 100) + mvt.Workload("bank", nodes, initBank, runBank) + runRand := roachtestutil.NewCommand("./cockroach run rand").Flag("seed", 321) + mvt.Workload("rand", nodes, nil /* initCmd */, runRand) + csvServer := roachtestutil.NewCommand("./cockroach workload csv-server").Flag("port", 9999) + mvt.BackgroundCommand("csv server", nodes, csvServer) plan, err := mvt.plan() require.NoError(t, err) - require.Len(t, plan.steps, 9) + require.Len(t, plan.steps, 12) // Assert on the pretty-printed version of the test plan as that // asserts the ordering of the steps we want to take, and as a bonus @@ -79,33 +88,42 @@ func TestTestPlanner(t *testing.T) { expectedPrettyPlan := fmt.Sprintf(` mixed-version test plan for upgrading from %[1]s to : ├── starting cluster from fixtures for version "%[1]s" (1) -├── wait for nodes :1-4 to all have the same cluster version (same as binary version of node 1) (2) -├── preventing auto-upgrades by setting `+"`preserve_downgrade_option`"+` (3) +├── upload current binary to all cockroach nodes (:1-4) (2) +├── wait for nodes :1-4 to all have the same cluster version (same as binary version of node 1) (3) +├── preventing auto-upgrades by setting `+"`preserve_downgrade_option`"+` (4) +├── run "initialize bank workload" (5) +├── start background hooks concurrently +│ ├── run "bank workload", after 50ms delay (6) +│ ├── run "rand workload", after 50ms delay (7) +│ └── run "csv server", after 200ms delay (8) ├── upgrade nodes :1-4 from "%[1]s" to "" -│ ├── restart node 2 with binary version (4) -│ ├── restart node 1 with binary version (5) -│ ├── run "mixed-version 1" (6) -│ ├── restart node 4 with binary version (7) -│ ├── restart node 3 with binary version (8) -│ └── run "mixed-version 2" (9) -├── downgrade nodes :1-4 from "" to "%[1]s" -│ ├── restart node 3 with binary version %[1]s (10) -│ ├── restart node 4 with binary version %[1]s (11) +│ ├── restart node 4 with binary version (9) │ ├── run mixed-version hooks concurrently -│ │ ├── run "mixed-version 1", after 200ms delay (12) -│ │ └── run "mixed-version 2", after 200ms delay (13) -│ ├── restart node 2 with binary version %[1]s (14) -│ └── restart node 1 with binary version %[1]s (15) +│ │ ├── run "mixed-version 1", after 100ms delay (10) +│ │ └── run "mixed-version 2", after 100ms delay (11) +│ ├── restart node 3 with binary version (12) +│ ├── restart node 2 with binary version (13) +│ └── restart node 1 with binary version (14) +├── downgrade nodes :1-4 from "" to "%[1]s" +│ ├── restart node 2 with binary version %[1]s (15) +│ ├── run "mixed-version 1" (16) +│ ├── restart node 1 with binary version %[1]s (17) +│ ├── run "mixed-version 2" (18) +│ ├── restart node 3 with binary version %[1]s (19) +│ └── restart node 4 with binary version %[1]s (20) ├── upgrade nodes :1-4 from "%[1]s" to "" -│ ├── restart node 3 with binary version (16) -│ ├── run "mixed-version 1" (17) -│ ├── restart node 4 with binary version (18) -│ ├── restart node 1 with binary version (19) -│ ├── restart node 2 with binary version (20) -│ └── run "mixed-version 2" (21) -├── finalize upgrade by resetting `+"`preserve_downgrade_option`"+` (22) -├── run "mixed-version 2" (23) -└── wait for nodes :1-4 to all have the same cluster version (same as binary version of node 1) (24) +│ ├── restart node 4 with binary version (21) +│ ├── restart node 3 with binary version (22) +│ ├── restart node 1 with binary version (23) +│ ├── run mixed-version hooks concurrently +│ │ ├── run "mixed-version 1" (24) +│ │ └── run "mixed-version 2" (25) +│ └── restart node 2 with binary version (26) +├── finalize upgrade by resetting `+"`preserve_downgrade_option`"+` (27) +├── run mixed-version hooks concurrently +│ ├── run "mixed-version 1", after 100ms delay (28) +│ └── run "mixed-version 2" (29) +└── wait for nodes :1-4 to all have the same cluster version (same as binary version of node 1) (30) `, previousVersion, ) @@ -120,7 +138,7 @@ mixed-version test plan for upgrading from %[1]s to : mvt.OnStartup("startup 2", dummyHook) plan, err = mvt.plan() require.NoError(t, err) - requireConcurrentHooks(t, plan.steps[3], "startup 1", "startup 2") + requireConcurrentHooks(t, plan.steps[4], "startup 1", "startup 2") // Assert that AfterUpgradeFinalized hooks are scheduled to run in // the last step of the test. @@ -130,8 +148,8 @@ mixed-version test plan for upgrading from %[1]s to : mvt.AfterUpgradeFinalized("finalizer 3", dummyHook) plan, err = mvt.plan() require.NoError(t, err) - require.Len(t, plan.steps, 9) - requireConcurrentHooks(t, plan.steps[8], "finalizer 1", "finalizer 2", "finalizer 3") + require.Len(t, plan.steps, 10) + requireConcurrentHooks(t, plan.steps[9], "finalizer 1", "finalizer 2", "finalizer 3") } // TestDeterministicTestPlan tests that generating a test plan with @@ -154,6 +172,84 @@ func TestDeterministicTestPlan(t *testing.T) { } } +var unused float64 + +// TestDeterministicHookSeeds ensures that user functions passed to +// `InMixedVersion` always see the same sequence of values even if the +// PRNG passed to the `Test` struct is perturbed during runs. In other +// words, this ensures that user functions have at their disposal a +// random number generator that is unique to them and concurrency with +// other functions should not change the sequence of values they see +// as long as the RNG is used deterministically in the user function +// itself. +func TestDeterministicHookSeeds(t *testing.T) { + generateData := func(generateMoreRandomNumbers bool) [][]int { + var generatedData [][]int + mvt := newTest(t) + mvt.InMixedVersion("do something", func(_ *logger.Logger, rng *rand.Rand, _ *Helper) error { + var data []int + for j := 0; j < 5; j++ { + data = append(data, rng.Intn(100)) + } + + generatedData = append(generatedData, data) + + // Ensure that changing the top-level random number generator + // has no impact on the rng passed to the user function. + if generateMoreRandomNumbers { + for j := 0; j < 10; j++ { + unused = mvt.prng.Float64() + } + } + return nil + }) + + var ( + // these variables are not used by the hook so they can be nil + ctx = context.Background() + nilCluster cluster.Cluster + emptyHelper = &Helper{} + ) + + plan, err := mvt.plan() + require.NoError(t, err) + + // We can hardcode these paths since we are using a fixed seed in + // these tests. + firstRun := plan.steps[4].(sequentialRunStep).steps[2].(runHookStep) + require.Equal(t, "do something", firstRun.hook.name) + require.NoError(t, firstRun.Run(ctx, nilLogger, nilCluster, emptyHelper)) + + secondRun := plan.steps[5].(sequentialRunStep).steps[3].(runHookStep) + require.Equal(t, "do something", secondRun.hook.name) + require.NoError(t, secondRun.Run(ctx, nilLogger, nilCluster, emptyHelper)) + + thirdRun := plan.steps[6].(sequentialRunStep).steps[1].(runHookStep) + require.Equal(t, "do something", thirdRun.hook.name) + require.NoError(t, thirdRun.Run(ctx, nilLogger, nilCluster, emptyHelper)) + + fourthRun := plan.steps[8].(runHookStep) + require.Equal(t, "do something", fourthRun.hook.name) + require.NoError(t, fourthRun.Run(ctx, nilLogger, nilCluster, emptyHelper)) + + require.Len(t, generatedData, 4) + return generatedData + } + + expectedData := [][]int{ + {82, 1, 17, 3, 87}, + {73, 17, 6, 37, 43}, + {82, 35, 57, 54, 8}, + {7, 95, 26, 31, 65}, + } + const numRums = 50 + for j := 0; j < numRums; j++ { + for _, b := range []bool{true, false} { + require.Equal(t, expectedData, generateData(b), "j = %d | b = %t", j, b) + } + } +} + func newTest(t *testing.T) *Test { prng := rand.New(rand.NewSource(seed)) return &Test{ diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go b/pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go index 7f9826f6fe09..8e6aec140b96 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go @@ -16,10 +16,12 @@ import ( "fmt" "math/rand" "os" + "path" "path/filepath" "regexp" "strconv" "strings" + "sync/atomic" "time" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" @@ -35,24 +37,48 @@ type ( // Helper is the struct passed to user-functions providing helper // functions that mixed-version tests can use. Helper struct { - ctx context.Context - testContext *Context - conns []*gosql.DB - crdbNodes option.NodeListOption + ctx context.Context + testContext *Context + backgroundCount int64 + runner *testRunner + stepLogger *logger.Logger + } + + // backgroundEvent is the struct sent by background steps when they + // finish (successfully or not). + backgroundEvent struct { + Name string + Err error + } - stepLogger *logger.Logger + backgroundRunner struct { + group *errgroup.Group + ctx context.Context + events chan backgroundEvent + } + + testFailure struct { + displayed bool + description string + seed int64 + binaryVersions []roachpb.Version + clusterVersionsBefore []roachpb.Version + clusterVersionsAfter []roachpb.Version } testRunner struct { ctx context.Context + cancel context.CancelFunc plan *TestPlan cluster cluster.Cluster crdbNodes option.NodeListOption seed int64 logger *logger.Logger - binaryVersions []roachpb.Version - clusterVersions []roachpb.Version + binaryVersions atomic.Value + clusterVersions atomic.Value + + background *backgroundRunner connCache []*gosql.DB } @@ -65,6 +91,7 @@ var ( func newTestRunner( ctx context.Context, + cancel context.CancelFunc, plan *TestPlan, l *logger.Logger, c cluster.Cluster, @@ -72,12 +99,14 @@ func newTestRunner( randomSeed int64, ) *testRunner { return &testRunner{ - ctx: ctx, - plan: plan, - logger: l, - cluster: c, - crdbNodes: crdbNodes, - seed: randomSeed, + ctx: ctx, + cancel: cancel, + plan: plan, + logger: l, + cluster: c, + crdbNodes: crdbNodes, + background: newBackgroundRunner(ctx), + seed: randomSeed, } } @@ -85,14 +114,39 @@ func newTestRunner( // each step in sequence. func (tr *testRunner) run() error { defer tr.closeConnections() + defer func() { + tr.logger.Printf("canceling mixed-version test context") + tr.cancel() + // Wait for some time so that any background tasks are properly + // canceled and their cancelation messages are displayed in the + // logs accordingly. + time.Sleep(5 * time.Second) + }() + + stepsErr := make(chan error) + go func() { + defer close(stepsErr) + for _, step := range tr.plan.steps { + if err := tr.runStep(step); err != nil { + stepsErr <- err + return + } + } + }() - for _, step := range tr.plan.steps { - if err := tr.runStep(step); err != nil { + for { + select { + case err := <-stepsErr: return err + case event := <-tr.background.Events(): + if event.Err == nil { + tr.logger.Printf("background step finished: %s", event.Name) + continue + } + + return fmt.Errorf("background step `%s` returned error: %w", event.Name, event.Err) } } - - return nil } // runStep contains the logic of running a single test step, called @@ -153,53 +207,87 @@ func (tr *testRunner) runStep(step testStep) error { return err } - tr.logStep("STARTING", ss, stepLogger) - tr.logVersions(stepLogger) - start := timeutil.Now() - defer func() { - prefix := fmt.Sprintf("FINISHED [%s]", timeutil.Since(start)) - tr.logStep(prefix, ss, stepLogger) - }() - if err := ss.Run(tr.ctx, stepLogger, tr.cluster, tr.newHelper(stepLogger)); err != nil { - return tr.reportError(err, ss, stepLogger) + if ss.Background() { + tr.startBackgroundStep(ss, stepLogger) + return nil } - return nil + return tr.runSingleStep(tr.ctx, ss, stepLogger) + } +} + +// runSingleStep takes care of the logic of running a `singleStep`, +// including logging start and finish times, wrapping the error (if +// any) with useful information, and renaming the log file to indicate +// failure. This logic is the same whether running a step in the +// background or not. +func (tr *testRunner) runSingleStep(ctx context.Context, ss singleStep, l *logger.Logger) error { + tr.logStep("STARTING", ss, l) + tr.logVersions(l) + start := timeutil.Now() + defer func() { + prefix := fmt.Sprintf("FINISHED [%s]", timeutil.Since(start)) + tr.logStep(prefix, ss, l) + }() + if err := ss.Run(ctx, l, tr.cluster, tr.newHelper(l)); err != nil { + if isContextCanceled(err) { + l.Printf("step terminated (context canceled)") + return nil + } + return tr.stepError(err, ss, l) } + + return nil +} + +func (tr *testRunner) startBackgroundStep(ss singleStep, l *logger.Logger) { + tr.background.Start(ss.Description(), func(ctx context.Context) error { + return tr.runSingleStep(ctx, ss, l) + }) } -// reportError augments the error passed with extra -// information. Specifically, the error message will include the ID of -// the step that failed, the random seed used, the binary version on -// each node when the error occurred, and the cluster version before -// and after the step (in case the failure happened *while* the -// cluster version was updating). -func (tr *testRunner) reportError(err error, step singleStep, l *logger.Logger) error { - errMsg := fmt.Sprintf("mixed-version test failure while running step %d (%s): %s", +// stepError augments generates a `testFailure` error by augmenting +// the error passed with extra information. Specifically, the error +// message will include the ID of the step that failed, the random +// seed used, the binary version on each node when the error occurred, +// and the cluster version before and after the step (in case the +// failure happened *while* the cluster version was updating). +func (tr *testRunner) stepError(err error, step singleStep, l *logger.Logger) error { + desc := fmt.Sprintf("mixed-version test failure while running step %d (%s): %s", step.ID(), step.Description(), err, ) - debugInfo := func(label, value string) string { - return fmt.Sprintf("%-40s%s", label+":", value) - } - seedInfo := debugInfo("test random seed", strconv.FormatInt(tr.seed, 10)) - binaryVersions := debugInfo("binary versions", formatVersions(tr.binaryVersions)) - clusterVersionsBefore := debugInfo("cluster versions before failure", formatVersions(tr.clusterVersions)) - var clusterVersionsAfter string - if err := tr.refreshClusterVersions(); err == nil { - clusterVersionsBefore += "\n" - clusterVersionsAfter = debugInfo("cluster versions after failure", formatVersions(tr.clusterVersions)) - } else { + + return tr.testFailure(desc, l) +} + +// testFailure generates a `testFailure` with the given +// description. It logs the error to the logger passed, and renames +// the underlying file to include the "FAILED" prefix to help in +// debugging. +func (tr *testRunner) testFailure(desc string, l *logger.Logger) error { + clusterVersionsBefore := tr.clusterVersions + if err := tr.refreshClusterVersions(); err != nil { tr.logger.Printf("failed to fetch cluster versions after failure: %s", err) } + clusterVersionsAfter := tr.clusterVersions + + tf := &testFailure{ + description: desc, + seed: tr.seed, + binaryVersions: loadAtomicVersions(tr.binaryVersions), + clusterVersionsBefore: loadAtomicVersions(clusterVersionsBefore), + clusterVersionsAfter: loadAtomicVersions(clusterVersionsAfter), + } + + // print the test failure on the step's logger for convenience, and + // to reduce cross referencing of logs. + l.Printf("%v", tf) if err := renameFailedLogger(l); err != nil { tr.logger.Printf("could not rename failed step logger: %v", err) } - return fmt.Errorf( - "%s\n%s\n%s\n%s%s", - errMsg, seedInfo, binaryVersions, clusterVersionsBefore, clusterVersionsAfter, - ) + return tf } func (tr *testRunner) logStep(prefix string, step singleStep, l *logger.Logger) { @@ -211,12 +299,15 @@ func (tr *testRunner) logStep(prefix string, step singleStep, l *logger.Logger) // cluster versions on each node. The cached versions should exist for // all steps but the first one (when we start the cluster itself). func (tr *testRunner) logVersions(l *logger.Logger) { - if tr.binaryVersions == nil || tr.clusterVersions == nil { + binaryVersions := loadAtomicVersions(tr.binaryVersions) + clusterVersions := loadAtomicVersions(tr.clusterVersions) + + if binaryVersions == nil || clusterVersions == nil { return } - l.Printf("binary versions: %s", formatVersions(tr.binaryVersions)) - l.Printf("cluster versions: %s", formatVersions(tr.clusterVersions)) + l.Printf("binary versions: %s", formatVersions(binaryVersions)) + l.Printf("cluster versions: %s", formatVersions(clusterVersions)) } // loggerFor creates a logger instance to be used by a test step. Logs @@ -227,22 +318,25 @@ func (tr *testRunner) loggerFor(step singleStep) (*logger.Logger, error) { name := invalidChars.ReplaceAllString(strings.ToLower(step.Description()), "") name = fmt.Sprintf("%d_%s", step.ID(), name) - prefix := fmt.Sprintf("%s/%s", logPrefix, name) + prefix := path.Join(logPrefix, name) return prefixedLogger(tr.logger, prefix) } // refreshBinaryVersions updates the internal `binaryVersions` field -// with the binary version running on each node of the cluster. +// with the binary version running on each node of the cluster. We use +// the `atomic` package here as this function may be called by two +// steps that are running concurrently. func (tr *testRunner) refreshBinaryVersions() error { - tr.binaryVersions = make([]roachpb.Version, 0, len(tr.crdbNodes)) + newBinaryVersions := make([]roachpb.Version, 0, len(tr.crdbNodes)) for _, node := range tr.crdbNodes { bv, err := clusterupgrade.BinaryVersion(tr.conn(node)) if err != nil { return fmt.Errorf("failed to get binary version for node %d: %w", node, err) } - tr.binaryVersions = append(tr.binaryVersions, bv) + newBinaryVersions = append(newBinaryVersions, bv) } + tr.binaryVersions.Store(newBinaryVersions) return nil } @@ -250,15 +344,16 @@ func (tr *testRunner) refreshBinaryVersions() error { // with the current view of the cluster version in each of the nodes // of the cluster. func (tr *testRunner) refreshClusterVersions() error { - tr.clusterVersions = make([]roachpb.Version, 0, len(tr.crdbNodes)) + newClusterVersions := make([]roachpb.Version, 0, len(tr.crdbNodes)) for _, node := range tr.crdbNodes { cv, err := clusterupgrade.ClusterVersion(tr.ctx, tr.conn(node)) if err != nil { return fmt.Errorf("failed to get cluster version for node %d: %w", node, err) } - tr.clusterVersions = append(tr.clusterVersions, cv) + newClusterVersions = append(newClusterVersions, cv) } + tr.clusterVersions.Store(newClusterVersions) return nil } @@ -294,8 +389,7 @@ func (tr *testRunner) maybeInitConnections() error { func (tr *testRunner) newHelper(l *logger.Logger) *Helper { return &Helper{ ctx: tr.ctx, - conns: tr.connCache, - crdbNodes: tr.crdbNodes, + runner: tr, stepLogger: l, } } @@ -314,10 +408,41 @@ func (tr *testRunner) closeConnections() { } } +func newBackgroundRunner(ctx context.Context) *backgroundRunner { + g, _ := errgroup.WithContext(ctx) + return &backgroundRunner{ + group: g, + ctx: ctx, + events: make(chan backgroundEvent), + } +} + +// Start will run the function `fn` in a goroutine. Any errors +// returned by that function are observable by reading from the +// channel returned by the `Events()` function. +func (br *backgroundRunner) Start(name string, fn func(context.Context) error) { + br.group.Go(func() error { + err := fn(br.ctx) + br.events <- backgroundEvent{ + Name: name, + Err: err, + } + return err + }) +} + +func (br *backgroundRunner) Events() <-chan backgroundEvent { + return br.events +} + +func (h *Helper) RandomNode(prng *rand.Rand, nodes option.NodeListOption) int { + return nodes[prng.Intn(len(nodes))] +} + // RandomDB returns a (nodeID, connection) tuple for a randomly picked // cockroach node according to the parameters passed. func (h *Helper) RandomDB(prng *rand.Rand, nodes option.NodeListOption) (int, *gosql.DB) { - node := nodes[prng.Intn(len(nodes))] + node := h.RandomNode(prng, nodes) return node, h.Connect(node) } @@ -325,7 +450,7 @@ func (h *Helper) RandomDB(prng *rand.Rand, nodes option.NodeListOption) (int, *g // database node. The query and the node picked are logged in the logs // of the step that calls this function. func (h *Helper) QueryRow(rng *rand.Rand, query string, args ...interface{}) *gosql.Row { - node, db := h.RandomDB(rng, h.crdbNodes) + node, db := h.RandomDB(rng, h.runner.crdbNodes) h.stepLogger.Printf("running SQL statement:\n%s\nArgs: %v\nNode: %d", query, args, node) return db.QueryRowContext(h.ctx, query, args...) } @@ -334,14 +459,14 @@ func (h *Helper) QueryRow(rng *rand.Rand, query string, args ...interface{}) *go // The query and the node picked are logged in the logs of the step // that calls this function. func (h *Helper) Exec(rng *rand.Rand, query string, args ...interface{}) error { - node, db := h.RandomDB(rng, h.crdbNodes) + node, db := h.RandomDB(rng, h.runner.crdbNodes) h.stepLogger.Printf("running SQL statement:\n%s\nArgs: %v\nNode: %d", query, args, node) _, err := db.ExecContext(h.ctx, query, args...) return err } func (h *Helper) Connect(node int) *gosql.DB { - return h.conns[node-1] + return h.runner.conn(node) } // SetContext should be called by steps that need access to the test @@ -356,13 +481,99 @@ func (h *Helper) Context() *Context { return h.testContext } +// Background allows test writers to create functions that run in the +// background in mixed-version hooks. +func (h *Helper) Background(name string, fn func(context.Context, *logger.Logger) error) { + h.runner.background.Start(name, func(ctx context.Context) error { + bgLogger, err := h.loggerFor(name) + if err != nil { + return fmt.Errorf("failed to create logger for background function %q: %w", name, err) + } + + err = fn(ctx, bgLogger) + if err != nil { + if isContextCanceled(err) { + bgLogger.Printf("background function terminated (context canceled)") + return nil + } + + desc := fmt.Sprintf("error in background function: %s", err) + return h.runner.testFailure(desc, bgLogger) + } + + return nil + }) +} + +// BackgroundCommand has the same semantics of `Background()`; the +// command passed will run and the test will fail if the command is +// not successful. +func (h *Helper) BackgroundCommand(cmd string, nodes option.NodeListOption) { + desc := fmt.Sprintf("run command: %q", cmd) + h.Background(desc, func(ctx context.Context, l *logger.Logger) error { + l.Printf("running command `%s` on nodes %v in the background", cmd, nodes) + return h.runner.cluster.RunE(ctx, nodes, cmd) + }) +} + +// loggerFor creates a logger instance to be used by background +// functions (created by calling `Background` on the helper +// instance). It is similar to the logger instances created for +// mixed-version steps, but with the `background_` prefix. +func (h *Helper) loggerFor(name string) (*logger.Logger, error) { + atomic.AddInt64(&h.backgroundCount, 1) + + fileName := invalidChars.ReplaceAllString(strings.ToLower(name), "") + fileName = fmt.Sprintf("background_%s_%d", fileName, h.backgroundCount) + fileName = path.Join(logPrefix, fileName) + + return prefixedLogger(h.runner.logger, fileName) +} + +func (tf *testFailure) Error() string { + if tf.displayed { + return tf.description + } + + tf.displayed = true + debugInfo := func(label, value string) string { + return fmt.Sprintf("%-40s%s", label+":", value) + } + seedInfo := debugInfo("test random seed", strconv.FormatInt(tf.seed, 10)) + binaryVersions := debugInfo("binary versions", formatVersions(tf.binaryVersions)) + clusterVersionsBefore := debugInfo( + "cluster versions before failure", + formatVersions(tf.clusterVersionsBefore), + ) + var clusterVersionsAfter string + if cv := tf.clusterVersionsAfter; cv != nil { + clusterVersionsBefore += "\n" + clusterVersionsAfter = debugInfo("cluster versions after failure", formatVersions(cv)) + } + + return fmt.Sprintf( + "%s\n%s\n%s\n%s%s", + tf.description, seedInfo, binaryVersions, clusterVersionsBefore, clusterVersionsAfter, + ) +} + func renameFailedLogger(l *logger.Logger) error { currentFileName := l.File.Name() - newLogName := strings.TrimSuffix(currentFileName, filepath.Ext(currentFileName)) - newLogName += "_FAILED.log" + newLogName := path.Join( + filepath.Dir(currentFileName), + "FAILED_"+filepath.Base(currentFileName), + ) return os.Rename(currentFileName, newLogName) } +func loadAtomicVersions(v atomic.Value) []roachpb.Version { + if v.Load() == nil { + return nil + } + + return v.Load().([]roachpb.Version) +} + func formatVersions(versions []roachpb.Version) string { var pairs []string for idx, version := range versions { @@ -371,3 +582,16 @@ func formatVersions(versions []roachpb.Version) string { return fmt.Sprintf("[%s]", strings.Join(pairs, ", ")) } + +// isContextCanceled returns a boolean indicating whether the error +// given happened because some context was canceled. +func isContextCanceled(err error) bool { + // TODO(renato): unfortunately, we have to resort to string + // comparison here. The most common use case for this function is + // detecting cluster commands that fail when the test context is + // canceled (after test success or failure), and roachtest does not + // return an error that wraps the context cancelation (in other + // words, `errors.Is` doesn't work). Once we fix this behavior, we + // should use structured errors here. + return strings.Contains(err.Error(), context.Canceled.Error()) +}