diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel b/pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel index 5a9bf046fecc..7fd66171e353 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel @@ -20,6 +20,7 @@ go_library( "//pkg/cmd/roachtest/registry", "//pkg/cmd/roachtest/roachtestutil", "//pkg/cmd/roachtest/roachtestutil/clusterupgrade", + "//pkg/cmd/roachtest/roachtestutil/task", "//pkg/cmd/roachtest/spec", "//pkg/cmd/roachtest/test", "//pkg/roachpb", diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/helper.go b/pkg/cmd/roachtest/roachtestutil/mixedversion/helper.go index a58d9c163df8..9f1b53cbb16a 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/helper.go +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/helper.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/clusterupgrade" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/task" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/roachprod/logger" "github.com/cockroachdb/cockroach/pkg/testutils/release" @@ -226,27 +227,46 @@ func (h *Helper) ExecWithGateway( // Background allows test authors to create functions that run in the // background in mixed-version hooks. -func (h *Helper) Background( - name string, fn func(context.Context, *logger.Logger) error, -) context.CancelFunc { - return h.runner.background.Start(name, func(ctx context.Context) error { +func (h *Helper) Background(name string, fn task.Func, opts ...task.Option) context.CancelFunc { + loggerFuncOpt := task.LoggerFunc(func() (*logger.Logger, error) { bgLogger, err := h.loggerFor(name) if err != nil { - return fmt.Errorf("failed to create logger for background function %q: %w", name, err) + return nil, fmt.Errorf("failed to create logger for background function %q: %w", name, err) } - - err = panicAsError(bgLogger, func() error { return fn(ctx, bgLogger) }) + return bgLogger, nil + }) + panicOpt := task.PanicHandler(func(_ context.Context, l *logger.Logger, r interface{}) error { + return logPanicToErr(l, r) + }) + errHandlerOpt := task.ErrorHandler(func(ctx context.Context, l *logger.Logger, err error) error { if err != nil { - if isContextCanceled(ctx) { + if task.IsContextCanceled(ctx) { return err } - - err := errors.Wrapf(err, "error in background function %s", name) - return h.runner.testFailure(ctx, err, bgLogger, nil) + errWrapped := errors.Wrapf(err, "error in background function %s", name) + return h.runner.testFailure(ctx, errWrapped, l, nil) } - return nil }) + return h.runner.background.GoWithCancel( + fn, task.OptionList(opts...), task.Name(name), loggerFuncOpt, panicOpt, errHandlerOpt, + ) +} + +// Go implements the Tasker interface. It is a wrapper around the `Background` +// function that allows the helper to be used as a Tasker. +func (h *Helper) Go(fn task.Func, opts ...task.Option) { + h.Background("task", func(ctx context.Context, l *logger.Logger) error { + return fn(ctx, l) + }, opts...) +} + +// GoWithCancel implements the Tasker interface. It is a wrapper around the `Background` +// function that allows the helper to be used as a Tasker. +func (h *Helper) GoWithCancel(fn task.Func, opts ...task.Option) context.CancelFunc { + return h.Background("task", func(ctx context.Context, l *logger.Logger) error { + return fn(ctx, l) + }, opts...) } // BackgroundCommand has the same semantics of `Background()`; the diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go b/pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go index 3dc39d0f0fbe..c29c3c8e99b6 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/clusterupgrade" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/task" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/roachprod/install" "github.com/cockroachdb/cockroach/pkg/roachprod/logger" @@ -36,22 +37,6 @@ import ( ) type ( - // backgroundEvent is the struct sent by background steps when they - // finish (successfully or not). - backgroundEvent struct { - Name string - Err error - TriggeredByTest bool - } - - backgroundRunner struct { - group ctxgroup.Group - ctx context.Context - events chan backgroundEvent - logger *logger.Logger - stopFuncs []StopFunc - } - // crdbMonitor is a thin wrapper around the roachtest monitor API // (cluster.NewMonitor) that produces error events through a channel // whenever an unexpected node death happens. It also allows us to @@ -86,7 +71,7 @@ type ( tenantService *serviceRuntime logger *logger.Logger - background *backgroundRunner + background task.Manager monitor *crdbMonitor // ranUserHooks keeps track of whether the runner has run any @@ -156,7 +141,7 @@ func newTestRunner( systemService: systemService, tenantService: tenantService, cluster: c, - background: newBackgroundRunner(ctx, l), + background: task.NewManager(ctx, l), monitor: newCRDBMonitor(ctx, c, maps.Keys(allCRDBNodes)), ranUserHooks: &ranUserHooks, } @@ -202,7 +187,7 @@ func (tr *testRunner) run() (retErr error) { if event.Err == nil { tr.logger.Printf("background step finished: %s", event.Name) continue - } else if event.TriggeredByTest { + } else if event.ExpectedCancel { tr.logger.Printf("background step canceled by test: %s", event.Name) continue } @@ -298,7 +283,7 @@ func (tr *testRunner) runSingleStep(ctx context.Context, ss *singleStep, l *logg if err := panicAsError(l, func() error { return ss.impl.Run(ctx, l, ss.rng, tr.newHelper(ctx, l, ss.context)) }); err != nil { - if isContextCanceled(ctx) { + if task.IsContextCanceled(ctx) { l.Printf("step terminated (context canceled)") // Avoid creating a `stepError` (which involves querying binary // and cluster versions) when the context was canceled as the @@ -316,9 +301,9 @@ func (tr *testRunner) runSingleStep(ctx context.Context, ss *singleStep, l *logg } func (tr *testRunner) startBackgroundStep(ss *singleStep, l *logger.Logger, stopChan shouldStop) { - stop := tr.background.Start(ss.impl.Description(), func(ctx context.Context) error { + stop := tr.background.GoWithCancel(func(ctx context.Context, l *logger.Logger) error { return tr.runSingleStep(ctx, ss, l) - }) + }, task.Logger(l), task.Name(ss.impl.Description())) // We start a goroutine to listen for user-requests to stop the // background function. @@ -401,7 +386,7 @@ func (tr *testRunner) teardown(stepsChan chan error, testFailed bool) { // termination is marked `TriggeredByTest` (not necessary for // correctness, just for clarity). tr.logger.Printf("stopping background functions") - tr.background.Terminate() + tr.background.Terminate(tr.logger) tr.logger.Printf("stopping node monitor") if err := tr.monitor.Stop(); err != nil { @@ -415,7 +400,7 @@ func (tr *testRunner) teardown(stepsChan chan error, testFailed bool) { // artifacts, which would be confusing. if testFailed { tr.logger.Printf("waiting for all steps to finish after context cancelation") - waitForChannel(stepsChan, "test steps", tr.logger) + task.WaitForChannel(stepsChan, "test steps", tr.logger) } tr.logger.Printf("closing database connections") @@ -803,77 +788,6 @@ func (cm *crdbMonitor) Stop() error { return cm.monitor.WaitE() } -func newBackgroundRunner(ctx context.Context, l *logger.Logger) *backgroundRunner { - g := ctxgroup.WithContext(ctx) - return &backgroundRunner{ - group: g, - ctx: ctx, - logger: l, - events: make(chan backgroundEvent), - } -} - -// Start will run the function `fn` in a goroutine. Any errors -// returned by that function are observable by reading from the -// channel returned by the `Events()` function. Returns a function -// that can be called to stop the background function (canceling the -// context passed to it). -func (br *backgroundRunner) Start(name string, fn func(context.Context) error) context.CancelFunc { - bgCtx, cancel := context.WithCancel(br.ctx) - var expectedContextCancelation bool - br.group.Go(func() error { - err := fn(bgCtx) - event := backgroundEvent{ - Name: name, - Err: err, - TriggeredByTest: err != nil && isContextCanceled(bgCtx) && expectedContextCancelation, - } - - select { - case br.events <- event: - // exit goroutine - case <-br.ctx.Done(): - // Test already finished, exit goroutine. - return nil - } - - return err - }) - - stopBgFunc := func() { - expectedContextCancelation = true - cancel() - } - // Collect all stopFuncs so that we can explicitly stop all - // background functions when the test finishes. - br.stopFuncs = append(br.stopFuncs, stopBgFunc) - return stopBgFunc -} - -// Terminate will call the stop functions for every background function -// started during the test. This includes background functions created -// during test runtime (using `helper.Background()`), as well as -// background steps declared in the test setup (using -// `BackgroundFunc`, `Workload`, et al). Returns when all background -// functions have returned. -func (br *backgroundRunner) Terminate() { - for _, stop := range br.stopFuncs { - stop() - } - - doneCh := make(chan error) - go func() { - defer close(doneCh) - _ = br.group.Wait() - }() - - waitForChannel(doneCh, "background functions", br.logger) -} - -func (br *backgroundRunner) CompletedEvents() <-chan backgroundEvent { - return br.events -} - // tableWriter is a thin wrapper around the `tabwriter` package used // by the test runner to display logical and released binary versions // in a tabular format. @@ -945,30 +859,17 @@ func loadAtomicVersions(v *atomic.Value) []roachpb.Version { func panicAsError(l *logger.Logger, f func() error) (retErr error) { defer func() { if r := recover(); r != nil { - l.Printf("panic stack trace:\n%s", string(debug.Stack())) - retErr = fmt.Errorf("panic (stack trace above): %v", r) + retErr = logPanicToErr(l, r) } }() return f() } -// waitForChannel waits for the given channel `ch` to close; returns -// when that happens. If the channel does not close within 5 minutes, -// the function logs a message and returns. -// -// The main use-case for this function is waiting for user-provided -// hooks to return after the context passed to them is canceled. We -// want to allow some time for them to finish, but we also don't want -// to block indefinitely if a function inadvertently ignores context -// cancelation. -func waitForChannel(ch chan error, desc string, l *logger.Logger) { - maxWait := 5 * time.Minute - select { - case <-ch: - // return - case <-time.After(maxWait): - l.Printf("waited for %s for %s to finish, giving up", maxWait, desc) - } +// logPanicToErr logs the panic stack trace and returns an error with the +// panic message. +func logPanicToErr(l *logger.Logger, r interface{}) error { + l.Printf("panic stack trace:\n%s", string(debug.Stack())) + return fmt.Errorf("panic (stack trace above): %v", r) } func toString[T fmt.Stringer](xs []T) []string { @@ -979,14 +880,3 @@ func toString[T fmt.Stringer](xs []T) []string { return result } - -// isContextCanceled returns a boolean indicating whether the context -// passed is canceled. -func isContextCanceled(ctx context.Context) bool { - select { - case <-ctx.Done(): - return true - default: - return false - } -}