Skip to content

Commit

Permalink
Merge #135831
Browse files Browse the repository at this point in the history
135831: roachtest: add groups to task manager r=DarrylWong a=herkolategan

Previously, a new API for managing tasks were introduced (see #133263). However, this did not address roachtests that want to manage groups. In an effort to replace `errgroup`, and `monitor.Go` for task management, this change introduces a group provider in the task manager.

A group adds the ability to wait on a subset of tasks to finish before proceeding. The task handler will still report returned errors or panics to the test framework.

Informs: #118214

Epic: None
Release note: None

Co-authored-by: Herko Lategan <[email protected]>
  • Loading branch information
craig[bot] and herkolategan committed Dec 13, 2024
2 parents fada042 + 9b40ff3 commit ada0ea7
Show file tree
Hide file tree
Showing 10 changed files with 266 additions and 65 deletions.
4 changes: 4 additions & 0 deletions pkg/cmd/roachtest/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ func (t testWrapper) Go(_ task.Func, _ ...task.Option) {
panic("implement me")
}

func (t testWrapper) NewGroup() task.Group {
panic("implement me")
}

var _ test2.Test = testWrapper{}

// ArtifactsDir is part of the test.Test interface.
Expand Down
14 changes: 14 additions & 0 deletions pkg/cmd/roachtest/clusterstats/mock_test_generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 14 additions & 3 deletions pkg/cmd/roachtest/roachtestutil/mixedversion/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,9 @@ func (h *Helper) ExecWithGateway(
return h.DefaultService().ExecWithGateway(rng, nodes, query, args...)
}

// GoWithCancel implements the Tasker interface.
func (h *Helper) GoWithCancel(fn task.Func, opts ...task.Option) context.CancelFunc {
// defaultTaskOptions returns the default options that are passed to all tasks
// started by the helper.
func (h *Helper) defaultTaskOptions() []task.Option {
loggerFuncOpt := task.LoggerFunc(func(name string) (*logger.Logger, error) {
bgLogger, err := h.loggerFor(name)
if err != nil {
Expand All @@ -246,8 +247,13 @@ func (h *Helper) GoWithCancel(fn task.Func, opts ...task.Option) context.CancelF
}
return nil
})
return []task.Option{loggerFuncOpt, panicOpt, errHandlerOpt}
}

// GoWithCancel implements the Tasker interface.
func (h *Helper) GoWithCancel(fn task.Func, opts ...task.Option) context.CancelFunc {
return h.runner.background.GoWithCancel(
fn, task.OptionList(opts...), loggerFuncOpt, panicOpt, errHandlerOpt,
fn, task.OptionList(h.defaultTaskOptions()...), task.OptionList(opts...),
)
}

Expand All @@ -256,6 +262,11 @@ func (h *Helper) Go(fn task.Func, opts ...task.Option) {
h.GoWithCancel(fn, opts...)
}

// NewGroup implements the Group interface.
func (h *Helper) NewGroup() task.Group {
return h.runner.background.NewGroup(h.defaultTaskOptions()...)
}

// GoCommand has the same semantics of `GoWithCancel()`; the command passed will
// run and the test will fail if the command is not successful. The task name is
// derived from the command passed.
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/roachtestutil/task/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "task",
srcs = [
"group.go",
"manager.go",
"options.go",
"tasker.go",
Expand Down
25 changes: 25 additions & 0 deletions pkg/cmd/roachtest/roachtestutil/task/group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package task

// Group is an interface for managing a group of tasks. It is intended for use
// in roachtests, for creating a group and waiting for all tasks in the group to
// complete.
type Group interface {
Tasker
// Wait waits for all tasks in the group to complete. Errors from tasks are reported to the
// test framework automatically and will cause the test to fail, which also
// cancels the context passed to the group.
Wait()
}

// GroupProvider is an interface for creating new Group(s). Generally, the test
// framework will supply a GroupProvider to tests.
type GroupProvider interface {
// NewGroup creates a new Group to manage tasks. Any options passed to this
// function will be applied to all tasks started by the group.
NewGroup(opts ...Option) Group
}
141 changes: 106 additions & 35 deletions pkg/cmd/roachtest/roachtestutil/task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ import (
type (
// Manager is responsible for managing a group of tasks initiated during
// tests. The interface is designed for the test framework to control tasks.
// Typically, tests will only interact, and be provided with the smaller
// Tasker interface to start tasks.
// Typically, tests will only interact, and be provided with the smaller Group
// and Tasker interfaces to start tasks or wait on groups of tasks.
Manager interface {
Tasker
GroupProvider
Terminate(*logger.Logger)
CompletedEvents() <-chan Event
}
Expand All @@ -34,26 +35,42 @@ type (
}

manager struct {
group ctxgroup.Group
ctx context.Context
logger *logger.Logger
events chan Event
id atomic.Uint32
mu struct {
group *group
}

group struct {
manager *manager
options []Option
ctxGroup ctxgroup.Group
groupMu struct {
syncutil.Mutex
groups []*group
}
cancelMu struct {
syncutil.Mutex
cancelFns []context.CancelFunc
}
}
)

// NewManager creates a new Manager. The context passed to the manager is used
// to control the lifetime of all tasks started by the manager. The logger is
// the default logger used by all tasks started by the manager.
func NewManager(ctx context.Context, l *logger.Logger) Manager {
g := ctxgroup.WithContext(ctx)
return &manager{
group: g,
m := &manager{
ctx: ctx,
logger: l,
events: make(chan Event),
}
m.group = &group{
manager: m,
ctxGroup: ctxgroup.WithContext(ctx),
}
return m
}

func (m *manager) defaultOptions() []Option {
Expand All @@ -73,9 +90,65 @@ func (m *manager) defaultOptions() []Option {
}
}

// Terminate will call the stop functions for every task started during the
// test. Returns when all task functions have returned, or after a 5-minute
// timeout, whichever comes first. If the timeout is reached, the function logs
// a warning message and returns.
func (m *manager) Terminate(l *logger.Logger) {
m.group.cancelAll()

doneCh := make(chan error)
go func() {
defer close(doneCh)
m.group.Wait()
}()

WaitForChannel(doneCh, "tasks", l)
}

// CompletedEvents returns a channel that will receive events for all tasks
// started by the manager.
func (m *manager) CompletedEvents() <-chan Event {
return m.events
}

// NewGroup creates a new group of tasks as a subgroup under the manager's
// default group.
func (m *manager) NewGroup(opts ...Option) Group {
return m.group.NewGroup(opts...)
}

// GoWithCancel runs GoWithCancel on the manager's default group.
func (m *manager) GoWithCancel(fn Func, opts ...Option) context.CancelFunc {
opt := CombineOptions(OptionList(m.defaultOptions()...), OptionList(opts...))
groupCtx, cancel := context.WithCancel(m.ctx)
return m.group.GoWithCancel(fn, opts...)
}

// Go runs Go on the manager's default group.
func (m *manager) Go(fn Func, opts ...Option) {
_ = m.group.GoWithCancel(fn, opts...)
}

func (t *group) NewGroup(opts ...Option) Group {
subgroup := &group{
manager: t.manager,
options: opts,
ctxGroup: ctxgroup.WithContext(t.manager.ctx),
}
t.groupMu.Lock()
defer t.groupMu.Unlock()
t.groupMu.groups = append(t.groupMu.groups, subgroup)
return subgroup
}

func (t *group) GoWithCancel(fn Func, opts ...Option) context.CancelFunc {
// Combine options in order of precedence: default options, task options, and
// options passed to GoWithCancel.
opt := CombineOptions(
OptionList(t.manager.defaultOptions()...),
OptionList(t.options...),
OptionList(opts...),
)
groupCtx, cancel := context.WithCancel(t.manager.ctx)
var expectedContextCancellation atomic.Bool

// internalFunc is a wrapper around the user-provided function that
Expand All @@ -91,7 +164,7 @@ func (m *manager) GoWithCancel(fn Func, opts ...Option) context.CancelFunc {
return retErr
}

m.group.Go(func() error {
t.ctxGroup.Go(func() error {
l, err := opt.L(opt.Name)
if err != nil {
return err
Expand All @@ -114,10 +187,10 @@ func (m *manager) GoWithCancel(fn Func, opts ...Option) context.CancelFunc {
// already aware of the cancelation and sending an event would be redundant.
// For instance, a call to test.Fatal would already have captured the error
// and canceled the context.
if IsContextCanceled(m.ctx) {
if IsContextCanceled(t.manager.ctx) {
return nil
}
m.events <- event
t.manager.events <- event
return err
})

Expand All @@ -127,38 +200,36 @@ func (m *manager) GoWithCancel(fn Func, opts ...Option) context.CancelFunc {
}
// Collect all taskCancelFn(s) so that we can explicitly stop all tasks when
// the tasker is terminated.
m.mu.Lock()
defer m.mu.Unlock()
m.mu.cancelFns = append(m.mu.cancelFns, taskCancelFn)
t.cancelMu.Lock()
defer t.cancelMu.Unlock()
t.cancelMu.cancelFns = append(t.cancelMu.cancelFns, taskCancelFn)
return taskCancelFn
}

func (m *manager) Go(fn Func, opts ...Option) {
_ = m.GoWithCancel(fn, opts...)
func (t *group) Go(fn Func, opts ...Option) {
_ = t.GoWithCancel(fn, opts...)
}

// Terminate will call the stop functions for every task started during the
// test. Returns when all task functions have returned, or after a 5-minute
// timeout, whichever comes first. If the timeout is reached, the function logs
// a warning message and returns.
func (m *manager) Terminate(l *logger.Logger) {
func (t *group) cancelAll() {
func() {
m.mu.Lock()
defer m.mu.Unlock()
for _, cancel := range m.mu.cancelFns {
t.cancelMu.Lock()
defer t.cancelMu.Unlock()
for _, cancel := range t.cancelMu.cancelFns {
cancel()
}
}()

doneCh := make(chan error)
go func() {
defer close(doneCh)
_ = m.group.Wait()
}()

WaitForChannel(doneCh, "tasks", l)
t.groupMu.Lock()
defer t.groupMu.Unlock()
for _, g := range t.groupMu.groups {
g.cancelAll()
}
}

func (m *manager) CompletedEvents() <-chan Event {
return m.events
func (t *group) Wait() {
t.groupMu.Lock()
defer t.groupMu.Unlock()
_ = t.ctxGroup.Wait()
for _, g := range t.groupMu.groups {
g.Wait()
}
}
Loading

0 comments on commit ada0ea7

Please sign in to comment.