Skip to content

Commit

Permalink
Merge #134644
Browse files Browse the repository at this point in the history
134644: roachtest: remove direct go calls r=srosenberg,DarrylWong a=herkolategan

Previously, tests had to use  bare goroutine calls to start a goroutine. This change removes all bare goroutine calls and replaces it with the new task APIs.

The use of `errgroup.Group` and `Monitor` still remains and will be addressed in a different PR.

Informs: #118214

Epic: None
Release note: None

Co-authored-by: Herko Lategan <[email protected]>
  • Loading branch information
craig[bot] and herkolategan committed Nov 27, 2024
2 parents b77653f + fd90592 commit a86b127
Show file tree
Hide file tree
Showing 17 changed files with 108 additions and 81 deletions.
7 changes: 5 additions & 2 deletions pkg/cmd/roachtest/roachtestutil/task/tasker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ type Func func(context.Context, *logger.Logger) error
// Tasker is an interface for executing tasks (goroutines). It is intended for
// use in tests, enabling the test framework to manage panics and errors.
type Tasker interface {
// Go runs the given function in a goroutine.
// Go runs the given function in a goroutine. If an error is returned, it will
// fail the test. Panics are recovered and treated as errors.
Go(fn Func, opts ...Option)
// GoWithCancel runs the given function in a goroutine and returns a
// CancelFunc that can be used to cancel the function.
// CancelFunc that can be used to cancel the function. If an error is
// returned, it will fail the test. Panics are recovered and treated as
// errors.
GoWithCancel(fn Func, opts ...Option) context.CancelFunc
}
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ go_test(
deps = [
"//pkg/cmd/roachtest/option",
"//pkg/cmd/roachtest/registry",
"//pkg/cmd/roachtest/roachtestutil/task",
"//pkg/cmd/roachtest/spec",
"//pkg/roachprod/logger",
"//pkg/roachprod/prometheus",
Expand Down
15 changes: 4 additions & 11 deletions pkg/cmd/roachtest/tests/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/task"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
Expand Down Expand Up @@ -81,18 +82,10 @@ func registerAllocator(r registry.Registry) {
c.Start(ctx, t.L(), startOpts, install.MakeClusterSettings(), c.Range(start+1, nodes))
c.Run(ctx, option.WithNodes(c.Node(1)), "./cockroach workload init kv --drop {pgurl:1}")
for node := 1; node <= nodes; node++ {
node := node
// TODO(dan): Ideally, the test would fail if this queryload failed,
// but we can't put it in monitor as-is because the test deadlocks.
go func() {
t.Go(func(taskCtx context.Context, _ *logger.Logger) error {
cmd := fmt.Sprintf("./cockroach workload run kv --tolerate-errors --min-block-bytes=8 --max-block-bytes=127 {pgurl%s}", c.Node(node))
l, err := t.L().ChildLogger(fmt.Sprintf(`kv-%d`, node))
if err != nil {
t.Fatal(err)
}
defer l.Close()
_ = c.RunE(ctx, option.WithNodes(c.Node(node)), cmd)
}()
return c.RunE(taskCtx, option.WithNodes(c.Node(node)), cmd)
}, task.Name(fmt.Sprintf(`kv-%d`, node)))
}

// Wait for 3x replication, we record the time taken to achieve this.
Expand Down
16 changes: 10 additions & 6 deletions pkg/cmd/roachtest/tests/cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/task"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/util/cancelchecker"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -66,30 +68,32 @@ func registerCancel(r registry.Registry) {
// (either query execution error or an error indicating the
// absence of expected cancellation error).
errCh := make(chan error, 1)
go func(queryNum int) {
runnerConn := c.Conn(ctx, t.L(), 1)
t.Go(func(taskCtx context.Context, l *logger.Logger) error {
runnerConn := c.Conn(taskCtx, l, 1)
defer runnerConn.Close()
setupQueries := []string{"USE tpch;"}
if !useDistsql {
setupQueries = append(setupQueries, "SET distsql = off;")
}
for _, setupQuery := range setupQueries {
t.L().Printf("executing setup query %q", setupQuery)
l.Printf("executing setup query %q", setupQuery)
if _, err := runnerConn.Exec(setupQuery); err != nil {
errCh <- err
close(sem)
return
// Errors are handled in the main goroutine.
return nil //nolint:returnerrcheck
}
}
query := tpch.QueriesByNumber[queryNum]
t.L().Printf("executing q%d\n", queryNum)
l.Printf("executing q%d\n", queryNum)
close(sem)
_, err := runnerConn.Exec(query)
if err == nil {
err = errors.New("query completed before it could be canceled")
}
errCh <- err
}(queryNum)
return nil
}, task.Name("query-runner"))

// Wait for the query-runner goroutine to start as well as
// to execute setup queries.
Expand Down
9 changes: 5 additions & 4 deletions pkg/cmd/roachtest/tests/cluster_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/server/authserver"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/util/httputil"
Expand Down Expand Up @@ -87,11 +88,11 @@ func runClusterInit(ctx context.Context, t test.Test, c cluster.Cluster) {
t.L().Printf("checking that the SQL conns are not failing immediately")
errCh := make(chan error, len(dbs))
for _, db := range dbs {
db := db
go func() {
t.Go(func(taskCtx context.Context, _ *logger.Logger) error {
var val int
errCh <- db.QueryRow("SELECT 1").Scan(&val)
}()
errCh <- db.QueryRowContext(taskCtx, "SELECT 1").Scan(&val)
return nil
})
}

// Give them time to get a "connection refused" or similar error if
Expand Down
18 changes: 11 additions & 7 deletions pkg/cmd/roachtest/tests/decommissionbench.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/task"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
Expand Down Expand Up @@ -313,20 +314,22 @@ func registerDecommissionBenchSpec(r registry.Registry, benchSpec decommissionBe
})
}

// fireAfter executes fn after the duration elapses. If the context expires
// first, it will not be executed.
func fireAfter(ctx context.Context, duration time.Duration, fn func()) {
go func() {
// fireAfter executes fn after the duration elapses. If the passed context, or
// tasker context, expires first, it will not be executed.
func fireAfter(ctx context.Context, t task.Tasker, duration time.Duration, fn func()) {
t.Go(func(taskCtx context.Context, _ *logger.Logger) error {
var fireTimer timeutil.Timer
defer fireTimer.Stop()
fireTimer.Reset(duration)
select {
case <-ctx.Done():
case <-taskCtx.Done():
case <-fireTimer.C:
fireTimer.Read = true
fn()
}
}()
return nil
})
}

// createDecommissionBenchPerfArtifacts initializes a histogram registry for
Expand Down Expand Up @@ -992,7 +995,7 @@ func runSingleDecommission(

if estimateDuration {
estimateDecommissionDuration(
ctx, h.t.L(), tickByName, snapshotRateMb, bytesUsed, candidateStores,
ctx, h.t, h.t.L(), tickByName, snapshotRateMb, bytesUsed, candidateStores,
rangeCount, avgBytesPerReplica,
)
}
Expand Down Expand Up @@ -1148,6 +1151,7 @@ func logLSMHealth(ctx context.Context, l *logger.Logger, c cluster.Cluster, targ
// recorded perf artifacts as ticks.
func estimateDecommissionDuration(
ctx context.Context,
t task.Tasker,
log *logger.Logger,
tickByName func(name string),
snapshotRateMb int,
Expand Down Expand Up @@ -1185,7 +1189,7 @@ func estimateDecommissionDuration(
rangeCount, humanizeutil.IBytes(avgBytesPerReplica), minDuration, estDuration,
)

fireAfter(ctx, estDuration, func() {
fireAfter(ctx, t, estDuration, func() {
tickByName(estimatedMetric)
})
}
8 changes: 5 additions & 3 deletions pkg/cmd/roachtest/tests/drt.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/task"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/roachprod/prometheus"
"github.com/cockroachdb/cockroach/pkg/util/retry"
Expand Down Expand Up @@ -233,8 +234,8 @@ func (ep *tpccChaosEventProcessor) err() error {
return err
}

func (ep *tpccChaosEventProcessor) listen(ctx context.Context, l *logger.Logger) {
go func() {
func (ep *tpccChaosEventProcessor) listen(ctx context.Context, t task.Tasker, l *logger.Logger) {
t.Go(func(context.Context, *logger.Logger) error {
var prevTime time.Time
started := false
for ev := range ep.ch {
Expand Down Expand Up @@ -294,5 +295,6 @@ func (ep *tpccChaosEventProcessor) listen(ctx context.Context, l *logger.Logger)
}
prevTime = ev.Time
}
}()
return nil
})
}
4 changes: 3 additions & 1 deletion pkg/cmd/roachtest/tests/drt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/task"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/roachprod/prometheus"
gomock "github.com/golang/mock/gomock"
Expand Down Expand Up @@ -530,7 +531,8 @@ func TestTPCCChaosEventProcessor(t *testing.T) {
l, err := (&logger.Config{}).NewLogger("")
require.NoError(t, err)

ep.listen(ctx, l)
tasker := task.NewManager(ctx, l)
ep.listen(ctx, tasker, l)
for _, chaosEvent := range tc.chaosEvents {
ch <- chaosEvent
}
Expand Down
16 changes: 11 additions & 5 deletions pkg/cmd/roachtest/tests/export_parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/grafana"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/task"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/roachprod/prometheus"
)

Expand Down Expand Up @@ -89,7 +91,8 @@ func registerExportParquet(r registry.Registry) {
wg := sync.WaitGroup{}
for i := 0; i < numConcurrentExports; i++ {
wg.Add(1)
go func(i int, target string) {
target := allTpccTargets[i%len(allTpccTargets)]
t.Go(func(context.Context, *logger.Logger) error {
t.Status(fmt.Sprintf("worker %d/%d starting export of target %s", i+1, numConcurrentExports, target))
fileNum := 0
db := c.Conn(ctx, t.L(), 1)
Expand All @@ -103,7 +106,8 @@ func registerExportParquet(r registry.Registry) {
}
t.Status(fmt.Sprintf("worker %d/%d terminated", i+1, numConcurrentExports))
wg.Done()
}(i, allTpccTargets[i%len(allTpccTargets)])
return nil
})
}
wg.Wait()

Expand Down Expand Up @@ -150,17 +154,19 @@ func registerExportParquet(r registry.Registry) {
wg := sync.WaitGroup{}
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func(i int, target string) {
target := allTpccTargets[i]
t.Go(func(taskCtx context.Context, l *logger.Logger) error {
t.Status(fmt.Sprintf("worker %d/%d starting export of target %s", i+1, numWorkers, target))
db := c.Conn(ctx, t.L(), 1)
db := c.Conn(taskCtx, l, 1)
_, err := db.Exec(
fmt.Sprintf("EXPORT INTO PARQUET 'nodelocal://1/outputfile%d' FROM SELECT * FROM %s", i, target))
if err != nil {
t.Fatal(err.Error())
}
t.Status(fmt.Sprintf("worker %d/%d terminated", i+1, numWorkers))
wg.Done()
}(i, allTpccTargets[i])
return nil
}, task.Name(fmt.Sprintf("parquet-export-worker-%d", i+1)))
}
wg.Wait()
},
Expand Down
11 changes: 7 additions & 4 deletions pkg/cmd/roachtest/tests/jepsen.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/util/retry"
)

Expand Down Expand Up @@ -279,17 +280,19 @@ func (j jepsenConfig) startTest(
}
t.Fatalf("error installing Jepsen deps: %+v", err)
}
go func() {
t.Go(func(context.Context, *logger.Logger) error {
errCh <- run("bash", "-e", "-c", fmt.Sprintf(
`"cd /mnt/data1/jepsen/cockroachdb && set -eo pipefail && ~/lein run %s > invoke.log 2>&1"`,
testArgs))
}()
return nil
})
} else {
go func() {
t.Go(func(context.Context, *logger.Logger) error {
errCh <- run("bash", "-e", "-c", fmt.Sprintf(
`"cd /mnt/data1/jepsen/cockroachdb && set -eo pipefail && java -jar %s %s > invoke.log 2>&1"`,
j.binaryName(), testArgs))
}()
return nil
})
}
return errCh
}
Expand Down
21 changes: 12 additions & 9 deletions pkg/cmd/roachtest/tests/mixed_version_c2c.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/clusterupgrade"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/mixedversion"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/task"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod"
Expand Down Expand Up @@ -71,7 +72,7 @@ func runC2CMixedVersions(ctx context.Context, t test.Test, c cluster.Cluster, sp
cm.WorkloadHook(ctx)
cm.LatencyHook(ctx)
cm.UpdateHook(ctx)
cm.Run(ctx, c)
cm.Run(t)
}

func InitC2CMixed(
Expand Down Expand Up @@ -451,31 +452,33 @@ func (cm *c2cMixed) sourceFingerprintAndCompare(
return nil
}

func (cm *c2cMixed) Run(ctx context.Context, c cluster.Cluster) {
func (cm *c2cMixed) Run(t task.Tasker) {
var wg sync.WaitGroup
wg.Add(2)

go func() {
t.Go(func(_ context.Context, l *logger.Logger) error {
defer func() {
if r := recover(); r != nil {
cm.t.L().Printf("source cluster upgrade failed: %v", r)
l.Printf("source cluster upgrade failed: %v", r)
}
}()
defer wg.Done()
cm.sourceMvt.Run()
}()
return nil
})

go func() {
t.Go(func(taskCtx context.Context, l *logger.Logger) error {
defer func() {
if r := recover(); r != nil {
cm.t.L().Printf("destination cluster upgrade failed: %v", r)
l.Printf("destination cluster upgrade failed: %v", r)
}
}()
defer wg.Done()

chanReadCtx(ctx, cm.sourceStartedChan)
chanReadCtx(taskCtx, cm.sourceStartedChan)
cm.destMvt.Run()
}()
return nil
})

wg.Wait()
}
Expand Down
Loading

0 comments on commit a86b127

Please sign in to comment.