Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

roachtest: remove direct go calls #134644

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions pkg/cmd/roachtest/roachtestutil/task/tasker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ type Func func(context.Context, *logger.Logger) error
// Tasker is an interface for executing tasks (goroutines). It is intended for
// use in tests, enabling the test framework to manage panics and errors.
type Tasker interface {
// Go runs the given function in a goroutine.
// Go runs the given function in a goroutine. If an error is returned, it will
// fail the test. Panics are recovered and treated as errors.
Go(fn Func, opts ...Option)
// GoWithCancel runs the given function in a goroutine and returns a
// CancelFunc that can be used to cancel the function.
// CancelFunc that can be used to cancel the function. If an error is
// returned, it will fail the test. Panics are recovered and treated as
// errors.
GoWithCancel(fn Func, opts ...Option) context.CancelFunc
}
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ go_test(
deps = [
"//pkg/cmd/roachtest/option",
"//pkg/cmd/roachtest/registry",
"//pkg/cmd/roachtest/roachtestutil/task",
"//pkg/cmd/roachtest/spec",
"//pkg/roachprod/logger",
"//pkg/roachprod/prometheus",
Expand Down
15 changes: 4 additions & 11 deletions pkg/cmd/roachtest/tests/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/task"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
Expand Down Expand Up @@ -81,18 +82,10 @@ func registerAllocator(r registry.Registry) {
c.Start(ctx, t.L(), startOpts, install.MakeClusterSettings(), c.Range(start+1, nodes))
c.Run(ctx, option.WithNodes(c.Node(1)), "./cockroach workload init kv --drop {pgurl:1}")
for node := 1; node <= nodes; node++ {
node := node
// TODO(dan): Ideally, the test would fail if this queryload failed,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto!

// but we can't put it in monitor as-is because the test deadlocks.
go func() {
t.Go(func(taskCtx context.Context, _ *logger.Logger) error {
cmd := fmt.Sprintf("./cockroach workload run kv --tolerate-errors --min-block-bytes=8 --max-block-bytes=127 {pgurl%s}", c.Node(node))
l, err := t.L().ChildLogger(fmt.Sprintf(`kv-%d`, node))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this logger was previously unused?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, had to look twice before removing, and in actual fact was dead code.

if err != nil {
t.Fatal(err)
}
defer l.Close()
_ = c.RunE(ctx, option.WithNodes(c.Node(node)), cmd)
}()
return c.RunE(taskCtx, option.WithNodes(c.Node(node)), cmd)
}, task.Name(fmt.Sprintf(`kv-%d`, node)))
}

// Wait for 3x replication, we record the time taken to achieve this.
Expand Down
16 changes: 10 additions & 6 deletions pkg/cmd/roachtest/tests/cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/task"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/util/cancelchecker"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -66,30 +68,32 @@ func registerCancel(r registry.Registry) {
// (either query execution error or an error indicating the
// absence of expected cancellation error).
errCh := make(chan error, 1)
go func(queryNum int) {
runnerConn := c.Conn(ctx, t.L(), 1)
t.Go(func(taskCtx context.Context, l *logger.Logger) error {
runnerConn := c.Conn(taskCtx, l, 1)
defer runnerConn.Close()
setupQueries := []string{"USE tpch;"}
if !useDistsql {
setupQueries = append(setupQueries, "SET distsql = off;")
}
for _, setupQuery := range setupQueries {
t.L().Printf("executing setup query %q", setupQuery)
l.Printf("executing setup query %q", setupQuery)
if _, err := runnerConn.Exec(setupQuery); err != nil {
errCh <- err
close(sem)
return
// Errors are handled in the main goroutine.
return nil //nolint:returnerrcheck
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it's not immediately obvious that returning errors in t.Go fails the test. I think it makes sense for that to be the behavior but you have to go a couple layers and peek at the implementation to confirm that is the case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with Darryl's comment; we should make this explicit in the interface doc. Also, this test could technically be refactored further to explicitly return err instead of passing it into errCh, but I also understand if you want to reduce the footprint of all changes in this PR.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the same thought, will need to make it clear that returning an error from a task will result in a test failure. We could alternatively provide a version of the Go/GoWithCancel methods that does not take an error return.

From the footprint perspective, I didn't want to fiddle too much with already working code. But we could definitely make some of these more ergonomic. I'll create an issue for it, since engineers tend to borrow from existing code, it would be better if the examples are improved.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could alternatively provide a version of the Go/GoWithCancel methods that does not take an error return.

No bother, it's not that hard to add return nil :)

}
}
query := tpch.QueriesByNumber[queryNum]
t.L().Printf("executing q%d\n", queryNum)
l.Printf("executing q%d\n", queryNum)
close(sem)
_, err := runnerConn.Exec(query)
if err == nil {
err = errors.New("query completed before it could be canceled")
}
errCh <- err
}(queryNum)
return nil
}, task.Name("query-runner"))

// Wait for the query-runner goroutine to start as well as
// to execute setup queries.
Expand Down
9 changes: 5 additions & 4 deletions pkg/cmd/roachtest/tests/cluster_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/server/authserver"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/util/httputil"
Expand Down Expand Up @@ -87,11 +88,11 @@ func runClusterInit(ctx context.Context, t test.Test, c cluster.Cluster) {
t.L().Printf("checking that the SQL conns are not failing immediately")
errCh := make(chan error, len(dbs))
for _, db := range dbs {
db := db
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good riddance :)

go func() {
t.Go(func(taskCtx context.Context, _ *logger.Logger) error {
var val int
errCh <- db.QueryRow("SELECT 1").Scan(&val)
}()
errCh <- db.QueryRowContext(taskCtx, "SELECT 1").Scan(&val)
return nil
})
}

// Give them time to get a "connection refused" or similar error if
Expand Down
18 changes: 11 additions & 7 deletions pkg/cmd/roachtest/tests/decommissionbench.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/task"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
Expand Down Expand Up @@ -313,20 +314,22 @@ func registerDecommissionBenchSpec(r registry.Registry, benchSpec decommissionBe
})
}

// fireAfter executes fn after the duration elapses. If the context expires
// first, it will not be executed.
func fireAfter(ctx context.Context, duration time.Duration, fn func()) {
go func() {
// fireAfter executes fn after the duration elapses. If the passed context, or
// tasker context, expires first, it will not be executed.
func fireAfter(ctx context.Context, t task.Tasker, duration time.Duration, fn func()) {
t.Go(func(taskCtx context.Context, _ *logger.Logger) error {
var fireTimer timeutil.Timer
defer fireTimer.Stop()
fireTimer.Reset(duration)
select {
case <-ctx.Done():
case <-taskCtx.Done():
case <-fireTimer.C:
fireTimer.Read = true
fn()
}
}()
return nil
})
}

// createDecommissionBenchPerfArtifacts initializes a histogram registry for
Expand Down Expand Up @@ -977,7 +980,7 @@ func runSingleDecommission(

if estimateDuration {
estimateDecommissionDuration(
ctx, h.t.L(), tickByName, snapshotRateMb, bytesUsed, candidateStores,
ctx, h.t, h.t.L(), tickByName, snapshotRateMb, bytesUsed, candidateStores,
rangeCount, avgBytesPerReplica,
)
}
Expand Down Expand Up @@ -1133,6 +1136,7 @@ func logLSMHealth(ctx context.Context, l *logger.Logger, c cluster.Cluster, targ
// recorded perf artifacts as ticks.
func estimateDecommissionDuration(
ctx context.Context,
t task.Tasker,
log *logger.Logger,
tickByName func(name string),
snapshotRateMb int,
Expand Down Expand Up @@ -1170,7 +1174,7 @@ func estimateDecommissionDuration(
rangeCount, humanizeutil.IBytes(avgBytesPerReplica), minDuration, estDuration,
)

fireAfter(ctx, estDuration, func() {
fireAfter(ctx, t, estDuration, func() {
tickByName(estimatedMetric)
})
}
8 changes: 5 additions & 3 deletions pkg/cmd/roachtest/tests/drt.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/task"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/roachprod/prometheus"
"github.com/cockroachdb/cockroach/pkg/util/retry"
Expand Down Expand Up @@ -233,8 +234,8 @@ func (ep *tpccChaosEventProcessor) err() error {
return err
}

func (ep *tpccChaosEventProcessor) listen(ctx context.Context, l *logger.Logger) {
go func() {
func (ep *tpccChaosEventProcessor) listen(ctx context.Context, t task.Tasker, l *logger.Logger) {
t.Go(func(context.Context, *logger.Logger) error {
var prevTime time.Time
started := false
for ev := range ep.ch {
Expand Down Expand Up @@ -294,5 +295,6 @@ func (ep *tpccChaosEventProcessor) listen(ctx context.Context, l *logger.Logger)
}
prevTime = ev.Time
}
}()
return nil
})
}
4 changes: 3 additions & 1 deletion pkg/cmd/roachtest/tests/drt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/task"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/roachprod/prometheus"
gomock "github.com/golang/mock/gomock"
Expand Down Expand Up @@ -530,7 +531,8 @@ func TestTPCCChaosEventProcessor(t *testing.T) {
l, err := (&logger.Config{}).NewLogger("")
require.NoError(t, err)

ep.listen(ctx, l)
tasker := task.NewManager(ctx, l)
ep.listen(ctx, tasker, l)
for _, chaosEvent := range tc.chaosEvents {
ch <- chaosEvent
}
Expand Down
16 changes: 11 additions & 5 deletions pkg/cmd/roachtest/tests/export_parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/grafana"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/task"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/roachprod/prometheus"
)

Expand Down Expand Up @@ -89,7 +91,8 @@ func registerExportParquet(r registry.Registry) {
wg := sync.WaitGroup{}
for i := 0; i < numConcurrentExports; i++ {
wg.Add(1)
go func(i int, target string) {
target := allTpccTargets[i%len(allTpccTargets)]
t.Go(func(context.Context, *logger.Logger) error {
t.Status(fmt.Sprintf("worker %d/%d starting export of target %s", i+1, numConcurrentExports, target))
fileNum := 0
db := c.Conn(ctx, t.L(), 1)
Expand All @@ -103,7 +106,8 @@ func registerExportParquet(r registry.Registry) {
}
t.Status(fmt.Sprintf("worker %d/%d terminated", i+1, numConcurrentExports))
wg.Done()
}(i, allTpccTargets[i%len(allTpccTargets)])
return nil
})
}
wg.Wait()

Expand Down Expand Up @@ -150,17 +154,19 @@ func registerExportParquet(r registry.Registry) {
wg := sync.WaitGroup{}
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func(i int, target string) {
target := allTpccTargets[i]
t.Go(func(taskCtx context.Context, l *logger.Logger) error {
t.Status(fmt.Sprintf("worker %d/%d starting export of target %s", i+1, numWorkers, target))
db := c.Conn(ctx, t.L(), 1)
db := c.Conn(taskCtx, l, 1)
_, err := db.Exec(
fmt.Sprintf("EXPORT INTO PARQUET 'nodelocal://1/outputfile%d' FROM SELECT * FROM %s", i, target))
if err != nil {
t.Fatal(err.Error())
}
t.Status(fmt.Sprintf("worker %d/%d terminated", i+1, numWorkers))
wg.Done()
}(i, allTpccTargets[i])
return nil
}, task.Name(fmt.Sprintf("parquet-export-worker-%d", i+1)))
}
wg.Wait()
},
Expand Down
11 changes: 7 additions & 4 deletions pkg/cmd/roachtest/tests/jepsen.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/util/retry"
)

Expand Down Expand Up @@ -279,17 +280,19 @@ func (j jepsenConfig) startTest(
}
t.Fatalf("error installing Jepsen deps: %+v", err)
}
go func() {
t.Go(func(context.Context, *logger.Logger) error {
errCh <- run("bash", "-e", "-c", fmt.Sprintf(
`"cd /mnt/data1/jepsen/cockroachdb && set -eo pipefail && ~/lein run %s > invoke.log 2>&1"`,
testArgs))
}()
return nil
})
} else {
go func() {
t.Go(func(context.Context, *logger.Logger) error {
errCh <- run("bash", "-e", "-c", fmt.Sprintf(
`"cd /mnt/data1/jepsen/cockroachdb && set -eo pipefail && java -jar %s %s > invoke.log 2>&1"`,
j.binaryName(), testArgs))
}()
return nil
})
}
return errCh
}
Expand Down
21 changes: 12 additions & 9 deletions pkg/cmd/roachtest/tests/mixed_version_c2c.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/clusterupgrade"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/mixedversion"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/task"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod"
Expand Down Expand Up @@ -71,7 +72,7 @@ func runC2CMixedVersions(ctx context.Context, t test.Test, c cluster.Cluster, sp
cm.WorkloadHook(ctx)
cm.LatencyHook(ctx)
cm.UpdateHook(ctx)
cm.Run(ctx, c)
cm.Run(t)
}

func InitC2CMixed(
Expand Down Expand Up @@ -451,31 +452,33 @@ func (cm *c2cMixed) sourceFingerprintAndCompare(
return nil
}

func (cm *c2cMixed) Run(ctx context.Context, c cluster.Cluster) {
func (cm *c2cMixed) Run(t task.Tasker) {
var wg sync.WaitGroup
wg.Add(2)

go func() {
t.Go(func(_ context.Context, l *logger.Logger) error {
defer func() {
if r := recover(); r != nil {
cm.t.L().Printf("source cluster upgrade failed: %v", r)
l.Printf("source cluster upgrade failed: %v", r)
}
}()
defer wg.Done()
cm.sourceMvt.Run()
}()
return nil
})

go func() {
t.Go(func(taskCtx context.Context, l *logger.Logger) error {
defer func() {
if r := recover(); r != nil {
cm.t.L().Printf("destination cluster upgrade failed: %v", r)
l.Printf("destination cluster upgrade failed: %v", r)
}
}()
defer wg.Done()

chanReadCtx(ctx, cm.sourceStartedChan)
chanReadCtx(taskCtx, cm.sourceStartedChan)
cm.destMvt.Run()
}()
return nil
})

wg.Wait()
}
Expand Down
Loading