From c9ab246967de5ad41b4afff2f7ce436058f209ba Mon Sep 17 00:00:00 2001 From: Herko Lategan Date: Wed, 23 Oct 2024 16:54:13 +0100 Subject: [PATCH] roachtest: refactor mixedversion background runner Previously, the mixedversion framework had its own implementation for managing background tasks. This implementation performed well, but in its current state it is difficult to share with tests that do not require the mixedversion framework. This change replace the mixedversion framework background runner with the newly implemented task manager. The implementation details are relatively similar making it close to a drop-in replacement. Additionally, some utilities relating to tasks have been replaced by the task package. Informs: #118214 Epic: None Release note: None --- .../roachtestutil/mixedversion/BUILD.bazel | 1 + .../roachtestutil/mixedversion/helper.go | 44 ++++-- .../roachtestutil/mixedversion/runner.go | 138 ++---------------- 3 files changed, 47 insertions(+), 136 deletions(-) diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel b/pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel index 5a9bf046fecc..7fd66171e353 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel @@ -20,6 +20,7 @@ go_library( "//pkg/cmd/roachtest/registry", "//pkg/cmd/roachtest/roachtestutil", "//pkg/cmd/roachtest/roachtestutil/clusterupgrade", + "//pkg/cmd/roachtest/roachtestutil/task", "//pkg/cmd/roachtest/spec", "//pkg/cmd/roachtest/test", "//pkg/roachpb", diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/helper.go b/pkg/cmd/roachtest/roachtestutil/mixedversion/helper.go index a58d9c163df8..9f1b53cbb16a 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/helper.go +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/helper.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/clusterupgrade" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/task" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/roachprod/logger" "github.com/cockroachdb/cockroach/pkg/testutils/release" @@ -226,27 +227,46 @@ func (h *Helper) ExecWithGateway( // Background allows test authors to create functions that run in the // background in mixed-version hooks. -func (h *Helper) Background( - name string, fn func(context.Context, *logger.Logger) error, -) context.CancelFunc { - return h.runner.background.Start(name, func(ctx context.Context) error { +func (h *Helper) Background(name string, fn task.Func, opts ...task.Option) context.CancelFunc { + loggerFuncOpt := task.LoggerFunc(func() (*logger.Logger, error) { bgLogger, err := h.loggerFor(name) if err != nil { - return fmt.Errorf("failed to create logger for background function %q: %w", name, err) + return nil, fmt.Errorf("failed to create logger for background function %q: %w", name, err) } - - err = panicAsError(bgLogger, func() error { return fn(ctx, bgLogger) }) + return bgLogger, nil + }) + panicOpt := task.PanicHandler(func(_ context.Context, l *logger.Logger, r interface{}) error { + return logPanicToErr(l, r) + }) + errHandlerOpt := task.ErrorHandler(func(ctx context.Context, l *logger.Logger, err error) error { if err != nil { - if isContextCanceled(ctx) { + if task.IsContextCanceled(ctx) { return err } - - err := errors.Wrapf(err, "error in background function %s", name) - return h.runner.testFailure(ctx, err, bgLogger, nil) + errWrapped := errors.Wrapf(err, "error in background function %s", name) + return h.runner.testFailure(ctx, errWrapped, l, nil) } - return nil }) + return h.runner.background.GoWithCancel( + fn, task.OptionList(opts...), task.Name(name), loggerFuncOpt, panicOpt, errHandlerOpt, + ) +} + +// Go implements the Tasker interface. It is a wrapper around the `Background` +// function that allows the helper to be used as a Tasker. +func (h *Helper) Go(fn task.Func, opts ...task.Option) { + h.Background("task", func(ctx context.Context, l *logger.Logger) error { + return fn(ctx, l) + }, opts...) +} + +// GoWithCancel implements the Tasker interface. It is a wrapper around the `Background` +// function that allows the helper to be used as a Tasker. +func (h *Helper) GoWithCancel(fn task.Func, opts ...task.Option) context.CancelFunc { + return h.Background("task", func(ctx context.Context, l *logger.Logger) error { + return fn(ctx, l) + }, opts...) } // BackgroundCommand has the same semantics of `Background()`; the diff --git a/pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go b/pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go index 3dc39d0f0fbe..aa4f360ba757 100644 --- a/pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go +++ b/pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/clusterupgrade" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/task" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/roachprod/install" "github.com/cockroachdb/cockroach/pkg/roachprod/logger" @@ -36,22 +37,6 @@ import ( ) type ( - // backgroundEvent is the struct sent by background steps when they - // finish (successfully or not). - backgroundEvent struct { - Name string - Err error - TriggeredByTest bool - } - - backgroundRunner struct { - group ctxgroup.Group - ctx context.Context - events chan backgroundEvent - logger *logger.Logger - stopFuncs []StopFunc - } - // crdbMonitor is a thin wrapper around the roachtest monitor API // (cluster.NewMonitor) that produces error events through a channel // whenever an unexpected node death happens. It also allows us to @@ -86,7 +71,7 @@ type ( tenantService *serviceRuntime logger *logger.Logger - background *backgroundRunner + background task.Manager monitor *crdbMonitor // ranUserHooks keeps track of whether the runner has run any @@ -156,7 +141,7 @@ func newTestRunner( systemService: systemService, tenantService: tenantService, cluster: c, - background: newBackgroundRunner(ctx, l), + background: task.NewManager(ctx, l), monitor: newCRDBMonitor(ctx, c, maps.Keys(allCRDBNodes)), ranUserHooks: &ranUserHooks, } @@ -202,7 +187,7 @@ func (tr *testRunner) run() (retErr error) { if event.Err == nil { tr.logger.Printf("background step finished: %s", event.Name) continue - } else if event.TriggeredByTest { + } else if event.ExpectedCancel { tr.logger.Printf("background step canceled by test: %s", event.Name) continue } @@ -298,7 +283,7 @@ func (tr *testRunner) runSingleStep(ctx context.Context, ss *singleStep, l *logg if err := panicAsError(l, func() error { return ss.impl.Run(ctx, l, ss.rng, tr.newHelper(ctx, l, ss.context)) }); err != nil { - if isContextCanceled(ctx) { + if task.IsContextCanceled(ctx) { l.Printf("step terminated (context canceled)") // Avoid creating a `stepError` (which involves querying binary // and cluster versions) when the context was canceled as the @@ -316,9 +301,9 @@ func (tr *testRunner) runSingleStep(ctx context.Context, ss *singleStep, l *logg } func (tr *testRunner) startBackgroundStep(ss *singleStep, l *logger.Logger, stopChan shouldStop) { - stop := tr.background.Start(ss.impl.Description(), func(ctx context.Context) error { + stop := tr.background.GoWithCancel(func(ctx context.Context, l *logger.Logger) error { return tr.runSingleStep(ctx, ss, l) - }) + }, task.Logger(l), task.Name(ss.impl.Description())) // We start a goroutine to listen for user-requests to stop the // background function. @@ -415,7 +400,7 @@ func (tr *testRunner) teardown(stepsChan chan error, testFailed bool) { // artifacts, which would be confusing. if testFailed { tr.logger.Printf("waiting for all steps to finish after context cancelation") - waitForChannel(stepsChan, "test steps", tr.logger) + task.WaitForChannel(stepsChan, "test steps", tr.logger) } tr.logger.Printf("closing database connections") @@ -803,77 +788,6 @@ func (cm *crdbMonitor) Stop() error { return cm.monitor.WaitE() } -func newBackgroundRunner(ctx context.Context, l *logger.Logger) *backgroundRunner { - g := ctxgroup.WithContext(ctx) - return &backgroundRunner{ - group: g, - ctx: ctx, - logger: l, - events: make(chan backgroundEvent), - } -} - -// Start will run the function `fn` in a goroutine. Any errors -// returned by that function are observable by reading from the -// channel returned by the `Events()` function. Returns a function -// that can be called to stop the background function (canceling the -// context passed to it). -func (br *backgroundRunner) Start(name string, fn func(context.Context) error) context.CancelFunc { - bgCtx, cancel := context.WithCancel(br.ctx) - var expectedContextCancelation bool - br.group.Go(func() error { - err := fn(bgCtx) - event := backgroundEvent{ - Name: name, - Err: err, - TriggeredByTest: err != nil && isContextCanceled(bgCtx) && expectedContextCancelation, - } - - select { - case br.events <- event: - // exit goroutine - case <-br.ctx.Done(): - // Test already finished, exit goroutine. - return nil - } - - return err - }) - - stopBgFunc := func() { - expectedContextCancelation = true - cancel() - } - // Collect all stopFuncs so that we can explicitly stop all - // background functions when the test finishes. - br.stopFuncs = append(br.stopFuncs, stopBgFunc) - return stopBgFunc -} - -// Terminate will call the stop functions for every background function -// started during the test. This includes background functions created -// during test runtime (using `helper.Background()`), as well as -// background steps declared in the test setup (using -// `BackgroundFunc`, `Workload`, et al). Returns when all background -// functions have returned. -func (br *backgroundRunner) Terminate() { - for _, stop := range br.stopFuncs { - stop() - } - - doneCh := make(chan error) - go func() { - defer close(doneCh) - _ = br.group.Wait() - }() - - waitForChannel(doneCh, "background functions", br.logger) -} - -func (br *backgroundRunner) CompletedEvents() <-chan backgroundEvent { - return br.events -} - // tableWriter is a thin wrapper around the `tabwriter` package used // by the test runner to display logical and released binary versions // in a tabular format. @@ -945,30 +859,17 @@ func loadAtomicVersions(v *atomic.Value) []roachpb.Version { func panicAsError(l *logger.Logger, f func() error) (retErr error) { defer func() { if r := recover(); r != nil { - l.Printf("panic stack trace:\n%s", string(debug.Stack())) - retErr = fmt.Errorf("panic (stack trace above): %v", r) + retErr = logPanicToErr(l, r) } }() return f() } -// waitForChannel waits for the given channel `ch` to close; returns -// when that happens. If the channel does not close within 5 minutes, -// the function logs a message and returns. -// -// The main use-case for this function is waiting for user-provided -// hooks to return after the context passed to them is canceled. We -// want to allow some time for them to finish, but we also don't want -// to block indefinitely if a function inadvertently ignores context -// cancelation. -func waitForChannel(ch chan error, desc string, l *logger.Logger) { - maxWait := 5 * time.Minute - select { - case <-ch: - // return - case <-time.After(maxWait): - l.Printf("waited for %s for %s to finish, giving up", maxWait, desc) - } +// logPanicToErr logs the panic stack trace and returns an error with the +// panic message. +func logPanicToErr(l *logger.Logger, r interface{}) error { + l.Printf("panic stack trace:\n%s", string(debug.Stack())) + return fmt.Errorf("panic (stack trace above): %v", r) } func toString[T fmt.Stringer](xs []T) []string { @@ -979,14 +880,3 @@ func toString[T fmt.Stringer](xs []T) []string { return result } - -// isContextCanceled returns a boolean indicating whether the context -// passed is canceled. -func isContextCanceled(ctx context.Context) bool { - select { - case <-ctx.Done(): - return true - default: - return false - } -}