Skip to content

Commit

Permalink
roachtest: support running steps in the background in mixedversion
Browse files Browse the repository at this point in the history
This adds a `BackgroundFunc` API to the `mixedversion` package in
roachtest, allowing test writers to run tasks in the background during
an upgrade test. The most common use-case for this functionality is
running a workload while the cluster upgrades (other similar use-cases
exist in a variety of tests); for this reason, a `Workload`
convenience function is added that allows tests to add a workload to a
mixed-version test with one function call.

Currently, each test needs to devise their own mechanism to: spawn the
background task; monitor its execution; and terminate the test on
error.

The current API aims to reduce copying and pasting of such logic,
making for a more declarative test. In the future, the test planner
itself could decide to run some steps in the background and it should
be able to leverage the mechanisms introduced in this commit.

Epic: CRDB-19321

Release note: None
  • Loading branch information
renatolabs committed Feb 24, 2023
1 parent e31f893 commit e70dac8
Show file tree
Hide file tree
Showing 5 changed files with 581 additions and 121 deletions.
3 changes: 3 additions & 0 deletions pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
deps = [
"//pkg/cmd/roachtest/cluster",
"//pkg/cmd/roachtest/option",
"//pkg/cmd/roachtest/roachtestutil",
"//pkg/cmd/roachtest/roachtestutil/clusterupgrade",
"//pkg/cmd/roachtest/test",
"//pkg/roachpb",
Expand All @@ -30,7 +31,9 @@ go_test(
args = ["-test.timeout=295s"],
embed = [":mixedversion"],
deps = [
"//pkg/cmd/roachtest/cluster",
"//pkg/cmd/roachtest/option",
"//pkg/cmd/roachtest/roachtestutil",
"//pkg/cmd/roachtest/roachtestutil/clusterupgrade",
"//pkg/roachprod/logger",
"//pkg/util/version",
Expand Down
175 changes: 153 additions & 22 deletions pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,18 @@
//
// Typical usage:
//
// mvt, err := NewMixedVersionTest(...)
// mvt.InMixedVersion("test my feature", func(l *logger.Logger, db *gosql.DB) error {
// mvt, err := mixedversion.NewTest(...)
// mvt.InMixedVersion("test my feature", func(l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error {
// l.Printf("testing feature X")
// node, db := h.RandomDB(rng, c.All())
// l.Printf("running query on node %d", node)
// _, err := db.ExecContext(ctx, "SELECT * FROM test")
// return err
// })
// mvt.InMixedVersion("test another feature", func(l *logger.Logger, db *gosql.DB) error {
// mvt.InMixedVersion("test another feature", func(l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error {
// l.Printf("testing feature Y")
// node, db := h.RandomDB(rng, c.All())
// l.Printf("running query on node %d", node)
// _, err := db.ExecContext(ctx, "SELECT * FROM test2")
// return err
// })
Expand Down Expand Up @@ -72,6 +76,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/clusterupgrade"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
Expand All @@ -82,6 +87,7 @@ import (
const (
logPrefix = "mixed-version-test"
startupLabel = "run startup hooks"
backgroundLabel = "start background hooks"
mixedVersionLabel = "run mixed-version hooks"
afterTestLabel = "run after test hooks"

Expand All @@ -92,6 +98,13 @@ const (
// of migration steps before the new cluster version can be
// finalized.
runWhileMigratingProbability = 0.5

// CurrentCockroachPath is the path to the binary where the current
// version of cockroach being tested is located. This file is
// uploaded before any user functions are run. The primary use case
// are tests that need long runnig background functions on startup
// (such as running a workload).
CurrentCockroachPath = "./cockroach-current"
)

var (
Expand Down Expand Up @@ -157,6 +170,11 @@ type (
// for human-consumption. Displayed when pretty-printing the test
// plan.
Description() string
// Background indicates whether the step should be run in the
// background. When a step is *not* run in the background, the
// test will wait for it to finish before moving on. When a
// background step fails, the entire test fails.
Background() bool
// Run implements the actual functionality of the step.
Run(context.Context, *logger.Logger, cluster.Cluster, *Helper) error
}
Expand All @@ -167,6 +185,7 @@ type (
// its different stages: startup, mixed-version, and after-test.
testHooks struct {
startup hooks
background hooks
mixedVersion hooks
afterUpgradeFinalized hooks

Expand All @@ -177,6 +196,7 @@ type (
// Test is the main struct callers of this package interact with.
Test struct {
ctx context.Context
cancel context.CancelFunc
cluster cluster.Cluster
logger *logger.Logger
crdbNodes option.NodeListOption
Expand Down Expand Up @@ -209,8 +229,10 @@ func NewTest(
prng, seed := randutil.NewPseudoRand()
testLogger.Printf("mixed-version random seed: %d", seed)

testCtx, cancel := context.WithCancel(ctx)
return &Test{
ctx: ctx,
ctx: testCtx,
cancel: cancel,
cluster: c,
logger: testLogger,
crdbNodes: crdbNodes,
Expand Down Expand Up @@ -248,7 +270,7 @@ func (t *Test) InMixedVersion(desc string, fn userFunc) {
return len(testContext.ToVersionNodes) == numUpgradedNodes
}

t.hooks.AddMixedVersion(versionUpgradeHook{desc, predicate, fn})
t.hooks.AddMixedVersion(versionUpgradeHook{name: desc, predicate: predicate, fn: fn})
}

// OnStartup registers a callback that is run once the cluster is
Expand All @@ -260,15 +282,60 @@ func (t *Test) OnStartup(desc string, fn userFunc) {
// Since the callbacks here are only referenced in the setup steps
// of the planner, there is no need to have a predicate function
// gating them.
t.hooks.AddStartup(versionUpgradeHook{desc, nil, fn})
t.hooks.AddStartup(versionUpgradeHook{name: desc, fn: fn})
}

// AfterUpgradeFinalized registers a callback that is run once the
// mixed-version test has brought the cluster to the latest version,
// and allowed the upgrade to finalize successfully. If multiple such
// hooks are passed, they will be executed concurrently.
func (t *Test) AfterUpgradeFinalized(desc string, fn userFunc) {
t.hooks.AddAfterUpgradeFinalized(versionUpgradeHook{desc, nil, fn})
t.hooks.AddAfterUpgradeFinalized(versionUpgradeHook{name: desc, fn: fn})
}

// BackgroundFunc runs the function passed as argument in the
// background during the test. Background functions are kicked off
// once the cluster has been initialized (i.e., after all startup
// steps have finished).
func (t *Test) BackgroundFunc(desc string, fn userFunc) {
t.hooks.AddBackground(versionUpgradeHook{name: desc, fn: fn})
}

// BackgroundCommand is a convenience wrapper around `BackgroundFunc`
// that runs the command passed once the cluster is initialized. The
// node where the command runs is picked randomly.
//
// TODO: unfortunately, `cluster.Run()` does not allow the caller to
// pass a logger instance. It would be convenient if the output of the
// command itself lived within the `mixed-version/*.log` files.
func (t *Test) BackgroundCommand(
desc string, nodes option.NodeListOption, cmd *roachtestutil.Command,
) {
t.BackgroundFunc(desc, t.runCommandFunc(nodes, cmd.String()))
}

// Workload is a convenience wrapper that allows callers to run
// workloads in the background during a mixed-version test. `initCmd`,
// if passed, is the command run to initialize the workload; it is run
// synchronously as a regular startup function. `runCmd` is the
// command to actually run the command; it is run in the background.
func (t *Test) Workload(
name string, node option.NodeListOption, initCmd, runCmd *roachtestutil.Command,
) {
seed := uint64(t.prng.Int63())
addSeed := func(cmd *roachtestutil.Command) {
if !cmd.HasFlag("seed") {
cmd.Flag("seed", seed)
}
}

if initCmd != nil {
addSeed(initCmd)
t.OnStartup(fmt.Sprintf("initialize %s workload", name), t.runCommandFunc(node, initCmd.String()))
}

addSeed(runCmd)
t.BackgroundCommand(fmt.Sprintf("%s workload", name), node, runCmd)
}

// Run runs the mixed-version test. It should be called once all
Expand All @@ -290,7 +357,7 @@ func (t *Test) Run() {

func (t *Test) run(plan *TestPlan) error {
return newTestRunner(
t.ctx, plan, t.logger, t.cluster, t.crdbNodes, t.seed,
t.ctx, t.cancel, plan, t.logger, t.cluster, t.crdbNodes, t.seed,
).run()
}

Expand Down Expand Up @@ -319,6 +386,13 @@ func (t *Test) buildVersion() version.Version {
return *t.rt.BuildVersion()
}

func (t *Test) runCommandFunc(nodes option.NodeListOption, cmd string) userFunc {
return func(l *logger.Logger, rng *rand.Rand, h *Helper) error {
l.Printf("running command `%s` on nodes %v", cmd, nodes)
return t.cluster.RunE(t.ctx, nodes, cmd)
}
}

// startFromCheckpointStep is the step that starts the cluster from a
// specific `version`, using checked-in fixtures.
type startFromCheckpointStep struct {
Expand All @@ -328,7 +402,8 @@ type startFromCheckpointStep struct {
crdbNodes option.NodeListOption
}

func (s startFromCheckpointStep) ID() int { return s.id }
func (s startFromCheckpointStep) ID() int { return s.id }
func (s startFromCheckpointStep) Background() bool { return false }

func (s startFromCheckpointStep) Description() string {
return fmt.Sprintf("starting cluster from fixtures for version %q", s.version)
Expand All @@ -349,12 +424,41 @@ func (s startFromCheckpointStep) Run(
return err
}

startOpts := option.DefaultStartOpts()
startOpts := option.DefaultStartOptsNoBackups()
startOpts.RoachprodOpts.Sequential = false
clusterupgrade.StartWithBinary(ctx, l, c, s.crdbNodes, binaryPath, startOpts)
return nil
}

// uploadCurrentVersionStep uploads the current cockroach binary to
// all DB nodes in the test. This is so that startup steps can use
// them (if, for instance, they need to run a workload). The binary
// will be located in `dest`.
type uploadCurrentVersionStep struct {
id int
rt test.Test
crdbNodes option.NodeListOption
dest string
}

func (s uploadCurrentVersionStep) ID() int { return s.id }
func (s uploadCurrentVersionStep) Background() bool { return false }

func (s uploadCurrentVersionStep) Description() string {
return fmt.Sprintf("upload current binary to all cockroach nodes (%v)", s.crdbNodes)
}

func (s uploadCurrentVersionStep) Run(
ctx context.Context, l *logger.Logger, c cluster.Cluster, helper *Helper,
) error {
_, err := clusterupgrade.UploadVersion(ctx, s.rt, l, c, s.crdbNodes, clusterupgrade.MainVersion)
if err != nil {
return err
}

return c.RunE(ctx, s.crdbNodes, fmt.Sprintf("mv ./cockroach %s", s.dest))
}

// waitForStableClusterVersionStep implements the process of waiting
// for the `version` cluster setting being the same on all nodes of
// the cluster and equal to the binary version of the first node in
Expand All @@ -364,7 +468,8 @@ type waitForStableClusterVersionStep struct {
nodes option.NodeListOption
}

func (s waitForStableClusterVersionStep) ID() int { return s.id }
func (s waitForStableClusterVersionStep) ID() int { return s.id }
func (s waitForStableClusterVersionStep) Background() bool { return false }

func (s waitForStableClusterVersionStep) Description() string {
return fmt.Sprintf(
Expand All @@ -387,7 +492,8 @@ type preserveDowngradeOptionStep struct {
prng *rand.Rand
}

func (s preserveDowngradeOptionStep) ID() int { return s.id }
func (s preserveDowngradeOptionStep) ID() int { return s.id }
func (s preserveDowngradeOptionStep) Background() bool { return false }

func (s preserveDowngradeOptionStep) Description() string {
return "preventing auto-upgrades by setting `preserve_downgrade_option`"
Expand Down Expand Up @@ -421,7 +527,8 @@ type restartWithNewBinaryStep struct {
node int
}

func (s restartWithNewBinaryStep) ID() int { return s.id }
func (s restartWithNewBinaryStep) ID() int { return s.id }
func (s restartWithNewBinaryStep) Background() bool { return false }

func (s restartWithNewBinaryStep) Description() string {
return fmt.Sprintf("restart node %d with binary version %s", s.node, versionMsg(s.version))
Expand All @@ -436,7 +543,12 @@ func (s restartWithNewBinaryStep) Run(
l,
c,
c.Node(s.node),
option.DefaultStartOpts(),
// Disable regular backups in mixed-version tests, as some tests
// check for running jobs and the scheduled backup may make
// things non-deterministic. In the future, we should change the
// default and add an API for tests to opt-out of the default
// scheduled backup if necessary.
option.DefaultStartOptsNoBackups(),
s.version,
)
}
Expand All @@ -450,7 +562,8 @@ type finalizeUpgradeStep struct {
prng *rand.Rand
}

func (s finalizeUpgradeStep) ID() int { return s.id }
func (s finalizeUpgradeStep) ID() int { return s.id }
func (s finalizeUpgradeStep) Background() bool { return false }

func (s finalizeUpgradeStep) Description() string {
return "finalize upgrade by resetting `preserve_downgrade_option`"
Expand All @@ -472,9 +585,11 @@ type runHookStep struct {
testContext Context
prng *rand.Rand
hook versionUpgradeHook
background bool
}

func (s runHookStep) ID() int { return s.id }
func (s runHookStep) ID() int { return s.id }
func (s runHookStep) Background() bool { return s.background }

func (s runHookStep) Description() string {
return fmt.Sprintf("run %q", s.hook.name)
Expand Down Expand Up @@ -559,13 +674,19 @@ func (h hooks) Filter(testContext Context) hooks {
// returned. Otherwise, a `concurrentRunStep` is returned, where every
// hook is run concurrently.
func (h hooks) AsSteps(
label string, idGen func() int, prng *rand.Rand, nodes option.NodeListOption, testContext Context,
label string,
idGen func() int,
prng *rand.Rand,
nodes option.NodeListOption,
testContext Context,
background bool,
) []testStep {
steps := make([]testStep, 0, len(h))
for _, hook := range h {
hookPrng := rngFromRNG(prng)
rhs := runHookStep{id: idGen(), prng: hookPrng, hook: hook, testContext: testContext}
steps = append(steps, rhs)
steps = append(steps, runHookStep{
id: idGen(), prng: hookPrng, hook: hook, background: background, testContext: testContext,
})
}

if len(steps) <= 1 {
Expand All @@ -579,6 +700,10 @@ func (th *testHooks) AddStartup(hook versionUpgradeHook) {
th.startup = append(th.startup, hook)
}

func (th *testHooks) AddBackground(hook versionUpgradeHook) {
th.background = append(th.background, hook)
}

func (th *testHooks) AddMixedVersion(hook versionUpgradeHook) {
th.mixedVersion = append(th.mixedVersion, hook)
}
Expand All @@ -588,15 +713,21 @@ func (th *testHooks) AddAfterUpgradeFinalized(hook versionUpgradeHook) {
}

func (th *testHooks) StartupSteps(idGen func() int, testContext Context) []testStep {
return th.startup.AsSteps(startupLabel, idGen, th.prng, th.crdbNodes, testContext)
return th.startup.AsSteps(startupLabel, idGen, th.prng, th.crdbNodes, testContext, false)
}

func (th *testHooks) BackgroundSteps(idGen func() int, testContext Context) []testStep {
return th.background.AsSteps(backgroundLabel, idGen, th.prng, th.crdbNodes, testContext, true)
}

func (th *testHooks) MixedVersionSteps(testContext Context, idGen func() int) []testStep {
return th.mixedVersion.Filter(testContext).AsSteps(mixedVersionLabel, idGen, th.prng, th.crdbNodes, testContext)
return th.mixedVersion.
Filter(testContext).
AsSteps(mixedVersionLabel, idGen, th.prng, th.crdbNodes, testContext, false)
}

func (th *testHooks) AfterUpgradeFinalizedSteps(idGen func() int, testContext Context) []testStep {
return th.afterUpgradeFinalized.AsSteps(afterTestLabel, idGen, th.prng, th.crdbNodes, testContext)
return th.afterUpgradeFinalized.AsSteps(afterTestLabel, idGen, th.prng, th.crdbNodes, testContext, false)
}

func randomDelay(rng *rand.Rand) time.Duration {
Expand Down
Loading

0 comments on commit e70dac8

Please sign in to comment.