Skip to content

Commit

Permalink
roachtest: refactor mixedversion background runner
Browse files Browse the repository at this point in the history
Previously, the mixedversion framework had its own implementation for managing
background tasks. This implementation performed well, but in its current state
it is difficult to share with tests that do not require the mixedversion
framework.

This change replace the mixedversion framework background runner with the newly
implemented task manager. The implementation details are relatively similar
making it close to a drop-in replacement. Additionally, some utilities relating
to tasks have been replaced by the task package.

Informs: cockroachdb#118214

Epic: None
Release note: None
  • Loading branch information
herkolategan committed Oct 23, 2024
1 parent 825bb5e commit c3ce4d9
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 140 deletions.
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"//pkg/cmd/roachtest/registry",
"//pkg/cmd/roachtest/roachtestutil",
"//pkg/cmd/roachtest/roachtestutil/clusterupgrade",
"//pkg/cmd/roachtest/roachtestutil/task",
"//pkg/cmd/roachtest/spec",
"//pkg/cmd/roachtest/test",
"//pkg/roachpb",
Expand Down
44 changes: 32 additions & 12 deletions pkg/cmd/roachtest/roachtestutil/mixedversion/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/clusterupgrade"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/task"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/testutils/release"
Expand Down Expand Up @@ -226,27 +227,46 @@ func (h *Helper) ExecWithGateway(

// Background allows test authors to create functions that run in the
// background in mixed-version hooks.
func (h *Helper) Background(
name string, fn func(context.Context, *logger.Logger) error,
) context.CancelFunc {
return h.runner.background.Start(name, func(ctx context.Context) error {
func (h *Helper) Background(name string, fn task.Func, opts ...task.Option) context.CancelFunc {
loggerFuncOpt := task.LoggerFunc(func() (*logger.Logger, error) {
bgLogger, err := h.loggerFor(name)
if err != nil {
return fmt.Errorf("failed to create logger for background function %q: %w", name, err)
return nil, fmt.Errorf("failed to create logger for background function %q: %w", name, err)
}

err = panicAsError(bgLogger, func() error { return fn(ctx, bgLogger) })
return bgLogger, nil
})
panicOpt := task.PanicHandler(func(_ context.Context, l *logger.Logger, r interface{}) error {
return logPanicToErr(l, r)
})
errHandlerOpt := task.ErrorHandler(func(ctx context.Context, l *logger.Logger, err error) error {
if err != nil {
if isContextCanceled(ctx) {
if task.IsContextCanceled(ctx) {
return err
}

err := errors.Wrapf(err, "error in background function %s", name)
return h.runner.testFailure(ctx, err, bgLogger, nil)
errWrapped := errors.Wrapf(err, "error in background function %s", name)
return h.runner.testFailure(ctx, errWrapped, l, nil)
}

return nil
})
return h.runner.background.GoWithCancel(
fn, task.OptionList(opts...), task.Name(name), loggerFuncOpt, panicOpt, errHandlerOpt,
)
}

// Go implements the Tasker interface. It is a wrapper around the `Background`
// function that allows the helper to be used as a Tasker.
func (h *Helper) Go(fn task.Func, opts ...task.Option) {
h.Background("task", func(ctx context.Context, l *logger.Logger) error {
return fn(ctx, l)
}, opts...)
}

// GoWithCancel implements the Tasker interface. It is a wrapper around the `Background`
// function that allows the helper to be used as a Tasker.
func (h *Helper) GoWithCancel(fn task.Func, opts ...task.Option) context.CancelFunc {
return h.Background("task", func(ctx context.Context, l *logger.Logger) error {
return fn(ctx, l)
}, opts...)
}

// BackgroundCommand has the same semantics of `Background()`; the
Expand Down
145 changes: 17 additions & 128 deletions pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/clusterupgrade"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/task"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
Expand All @@ -36,22 +37,6 @@ import (
)

type (
// backgroundEvent is the struct sent by background steps when they
// finish (successfully or not).
backgroundEvent struct {
Name string
Err error
TriggeredByTest bool
}

backgroundRunner struct {
group ctxgroup.Group
ctx context.Context
events chan backgroundEvent
logger *logger.Logger
stopFuncs []StopFunc
}

// crdbMonitor is a thin wrapper around the roachtest monitor API
// (cluster.NewMonitor) that produces error events through a channel
// whenever an unexpected node death happens. It also allows us to
Expand Down Expand Up @@ -86,7 +71,7 @@ type (
tenantService *serviceRuntime
logger *logger.Logger

background *backgroundRunner
background task.Manager
monitor *crdbMonitor

// ranUserHooks keeps track of whether the runner has run any
Expand Down Expand Up @@ -156,7 +141,7 @@ func newTestRunner(
systemService: systemService,
tenantService: tenantService,
cluster: c,
background: newBackgroundRunner(ctx, l),
background: task.NewManager(ctx, l),
monitor: newCRDBMonitor(ctx, c, maps.Keys(allCRDBNodes)),
ranUserHooks: &ranUserHooks,
}
Expand Down Expand Up @@ -202,7 +187,7 @@ func (tr *testRunner) run() (retErr error) {
if event.Err == nil {
tr.logger.Printf("background step finished: %s", event.Name)
continue
} else if event.TriggeredByTest {
} else if event.ExpectedCancel {
tr.logger.Printf("background step canceled by test: %s", event.Name)
continue
}
Expand Down Expand Up @@ -298,7 +283,7 @@ func (tr *testRunner) runSingleStep(ctx context.Context, ss *singleStep, l *logg
if err := panicAsError(l, func() error {
return ss.impl.Run(ctx, l, ss.rng, tr.newHelper(ctx, l, ss.context))
}); err != nil {
if isContextCanceled(ctx) {
if task.IsContextCanceled(ctx) {
l.Printf("step terminated (context canceled)")
// Avoid creating a `stepError` (which involves querying binary
// and cluster versions) when the context was canceled as the
Expand All @@ -316,9 +301,9 @@ func (tr *testRunner) runSingleStep(ctx context.Context, ss *singleStep, l *logg
}

func (tr *testRunner) startBackgroundStep(ss *singleStep, l *logger.Logger, stopChan shouldStop) {
stop := tr.background.Start(ss.impl.Description(), func(ctx context.Context) error {
stop := tr.background.GoWithCancel(func(ctx context.Context, l *logger.Logger) error {
return tr.runSingleStep(ctx, ss, l)
})
}, task.Logger(l), task.Name(ss.impl.Description()))

// We start a goroutine to listen for user-requests to stop the
// background function.
Expand Down Expand Up @@ -415,7 +400,7 @@ func (tr *testRunner) teardown(stepsChan chan error, testFailed bool) {
// artifacts, which would be confusing.
if testFailed {
tr.logger.Printf("waiting for all steps to finish after context cancelation")
waitForChannel(stepsChan, "test steps", tr.logger)
task.WaitForChannel(stepsChan, "test steps", tr.logger)
}

tr.logger.Printf("closing database connections")
Expand Down Expand Up @@ -803,77 +788,6 @@ func (cm *crdbMonitor) Stop() error {
return cm.monitor.WaitE()
}

func newBackgroundRunner(ctx context.Context, l *logger.Logger) *backgroundRunner {
g := ctxgroup.WithContext(ctx)
return &backgroundRunner{
group: g,
ctx: ctx,
logger: l,
events: make(chan backgroundEvent),
}
}

// Start will run the function `fn` in a goroutine. Any errors
// returned by that function are observable by reading from the
// channel returned by the `Events()` function. Returns a function
// that can be called to stop the background function (canceling the
// context passed to it).
func (br *backgroundRunner) Start(name string, fn func(context.Context) error) context.CancelFunc {
bgCtx, cancel := context.WithCancel(br.ctx)
var expectedContextCancelation bool
br.group.Go(func() error {
err := fn(bgCtx)
event := backgroundEvent{
Name: name,
Err: err,
TriggeredByTest: err != nil && isContextCanceled(bgCtx) && expectedContextCancelation,
}

select {
case br.events <- event:
// exit goroutine
case <-br.ctx.Done():
// Test already finished, exit goroutine.
return nil
}

return err
})

stopBgFunc := func() {
expectedContextCancelation = true
cancel()
}
// Collect all stopFuncs so that we can explicitly stop all
// background functions when the test finishes.
br.stopFuncs = append(br.stopFuncs, stopBgFunc)
return stopBgFunc
}

// Terminate will call the stop functions for every background function
// started during the test. This includes background functions created
// during test runtime (using `helper.Background()`), as well as
// background steps declared in the test setup (using
// `BackgroundFunc`, `Workload`, et al). Returns when all background
// functions have returned.
func (br *backgroundRunner) Terminate() {
for _, stop := range br.stopFuncs {
stop()
}

doneCh := make(chan error)
go func() {
defer close(doneCh)
_ = br.group.Wait()
}()

waitForChannel(doneCh, "background functions", br.logger)
}

func (br *backgroundRunner) CompletedEvents() <-chan backgroundEvent {
return br.events
}

// tableWriter is a thin wrapper around the `tabwriter` package used
// by the test runner to display logical and released binary versions
// in a tabular format.
Expand Down Expand Up @@ -939,37 +853,23 @@ func loadAtomicVersions(v *atomic.Value) []roachpb.Version {
return v.Load().([]roachpb.Version)
}

// panicAsError ensures that the any panics that might happen while
// the function passed runs are captured and returned as regular
// errors. A stack trace is included in the logs when that happens to
// facilitate debugging.
// panicAsError ensures that any panics that might happen while the function
// passed runs are captured and returned as regular errors. A stack trace is
// included in the logs when that happens to facilitate debugging.
func panicAsError(l *logger.Logger, f func() error) (retErr error) {
defer func() {
if r := recover(); r != nil {
l.Printf("panic stack trace:\n%s", string(debug.Stack()))
retErr = fmt.Errorf("panic (stack trace above): %v", r)
retErr = logPanicToErr(l, r)
}
}()
return f()
}

// waitForChannel waits for the given channel `ch` to close; returns
// when that happens. If the channel does not close within 5 minutes,
// the function logs a message and returns.
//
// The main use-case for this function is waiting for user-provided
// hooks to return after the context passed to them is canceled. We
// want to allow some time for them to finish, but we also don't want
// to block indefinitely if a function inadvertently ignores context
// cancelation.
func waitForChannel(ch chan error, desc string, l *logger.Logger) {
maxWait := 5 * time.Minute
select {
case <-ch:
// return
case <-time.After(maxWait):
l.Printf("waited for %s for %s to finish, giving up", maxWait, desc)
}
// logPanicToErr logs the panic stack trace and returns an error with the
// panic message.
func logPanicToErr(l *logger.Logger, r interface{}) error {
l.Printf("panic stack trace:\n%s", string(debug.Stack()))
return fmt.Errorf("panic (stack trace above): %v", r)
}

func toString[T fmt.Stringer](xs []T) []string {
Expand All @@ -980,14 +880,3 @@ func toString[T fmt.Stringer](xs []T) []string {

return result
}

// isContextCanceled returns a boolean indicating whether the context
// passed is canceled.
func isContextCanceled(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
default:
return false
}
}

0 comments on commit c3ce4d9

Please sign in to comment.