Skip to content

Commit

Permalink
roachtest: observe ctx cancellation in Run
Browse files Browse the repository at this point in the history
Before this patch, roachprod invocations would not observe
ctx cancellation. Or rather they would, but due to the usual
obscure passing of Stdout into the child process of roachprod
the Run call would not return until the child had finished.
As a result, the test would continue running, which is annoying
and also costs money.

Also fixes up the handling of calling `c.t.Fatal` on a monitor
goroutine (using what is perhaps unspecified behavior of the
Go runtime). Anyway, the result is that you can do basically
whatever inside of a monitor and get away with it:

```go
m.Go(func(ctx context.Context) error {
	// Make sure the context cancellation works (not true prior to the PR
	// adding this test).
	return c.RunE(ctx, c.Node(1), "sleep", "2000")
})
m.Go(func(ctx context.Context) error {
	// This will call c.t.Fatal which also used to wreak havoc on the test
	// harness. Now it exits just fine (and all it took were some mean hacks).
	// Note how it will exit with stderr and stdout in the failure message,
	// which is extremely helpful.
	c.Run(ctx, c.Node(1), "echo foo && echo bar && notfound")
	return errors.New("impossible")
})
m.Wait()
```

now returns

```
--- FAIL: tpmc/w=1/nodes=3 (0.24s)
...,errgroup.go:58: /Users/tschottdorf/go/bin/roachprod run local:1 -- echo foo && echo bar && notfound returned:
	stderr:
	bash: notfound: command not found
	Error:  exit status 127

	stdout:
	foo
	bar
	: exit status 1
...,tpcc.go:661: Goexit() was called
FAIL
```

Release note: None
  • Loading branch information
tbg committed Aug 29, 2018
1 parent d57a1f0 commit 5ba6c4c
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 23 deletions.
9 changes: 9 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

138 changes: 124 additions & 14 deletions pkg/cmd/roachtest/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,15 @@ import (
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
// "postgres" gosql driver
_ "github.com/lib/pq"

"github.com/armon/circbuf"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"

// "postgres" gosql driver

_ "github.com/lib/pq"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

var (
Expand Down Expand Up @@ -209,12 +210,83 @@ func unregisterCluster(c *cluster) bool {
}

func execCmd(ctx context.Context, l *logger, args ...string) error {
// NB: It is important that this waitgroup Waits after cancel() below.
var wg sync.WaitGroup
defer wg.Wait()

var cancel func()
ctx, cancel = context.WithCancel(ctx)
defer cancel()

l.printf("> %s\n", strings.Join(args, " "))
cmd := exec.CommandContext(ctx, args[0], args[1:]...)
cmd.Stdout = l.stdout
cmd.Stderr = l.stderr

debugStdoutBuffer, _ := circbuf.NewBuffer(1024)
debugStderrBuffer, _ := circbuf.NewBuffer(1024)

// Do a dance around https://github.com/golang/go/issues/23019.
// Briefly put, passing os.Std{out,err} to subprocesses isn't great for
// context cancellation as Run() will wait for any subprocesses to finish.
// For example, "roachprod run x -- sleep 20" would wait 20 seconds, even
// if the context got canceled right away. Work around the problem by passing
// pipes to the command on which we set aggressive deadlines once the context
// expires.
{
rOut, wOut, err := os.Pipe()
if err != nil {
return err
}
defer rOut.Close()
defer wOut.Close()
rErr, wErr, err := os.Pipe()
if err != nil {
return err
}
defer rErr.Close()
defer wErr.Close()

cmd.Stdout = wOut
wg.Add(3)
go func() {
defer wg.Done()
_, _ = io.Copy(l.stdout, io.TeeReader(rOut, debugStdoutBuffer))
}()

if l.stderr == l.stdout {
// If l.stderr == l.stdout, we use only one pipe to avoid
// duplicating everything.
wg.Done()
cmd.Stderr = wOut
} else {
cmd.Stderr = wErr
go func() {
defer wg.Done()
_, _ = io.Copy(l.stderr, io.TeeReader(rErr, debugStderrBuffer))
}()
}

go func() {
defer wg.Done()
<-ctx.Done()
// NB: setting a more aggressive deadline here makes TestClusterMonitor flaky.
now := timeutil.Now().Add(3 * time.Second)
_ = rOut.SetDeadline(now)
_ = wOut.SetDeadline(now)
_ = rErr.SetDeadline(now)
_ = wErr.SetDeadline(now)
}()
}

if err := cmd.Run(); err != nil {
return errors.Wrapf(err, `%s`, strings.Join(args, ` `))
cancel()
wg.Wait() // synchronize access to ring buffer
return errors.Wrapf(
err,
"%s returned:\nstderr:\n%s\nstdout:\n%s",
strings.Join(args, " "),
debugStderrBuffer.String(),
debugStdoutBuffer.String(),
)
}
return nil
}
Expand Down Expand Up @@ -1083,24 +1155,62 @@ func (m *monitor) ExpectDeaths(count int32) {
atomic.AddInt32(&m.expDeaths, count)
}

var errGoexit = errors.New("Goexit() was called")

func (m *monitor) Go(fn func(context.Context) error) {
m.g.Go(func() error {
m.g.Go(func() (err error) {
var returned bool
defer func() {
if returned {
return
}
if r := recover(); r != errGoexit && r != nil {
// Pass any regular panics through.
panic(r)
} else {
// If the invoked method called runtime.Goexit (such as it
// happens when it calls t.Fatal), exit with a sentinel error
// here so that the wrapped errgroup cancels itself.
//
// Note that the trick here is that we panicked explicitly below,
// which somehow "overrides" the Goexit which is supposed to be
// un-recoverable, but we do need to recover to return an error.
err = errGoexit
}
}()
if impl, ok := m.t.(*test); ok {
// Automatically clear the worker status message when the goroutine exits.
defer impl.Status()
defer impl.WorkerStatus()
}
return fn(m.ctx)
defer func() {
if !returned {
if r := recover(); r != nil {
panic(r)
}
panic(errGoexit)
}
}()
err = fn(m.ctx)
returned = true
return err
})
}

func (m *monitor) WaitE() error {
if m.t.Failed() {
// If the test has failed, don't try to limp along.
return errors.New("already failed")
}

return m.wait(roachprod, "monitor", m.nodes)
}

func (m *monitor) Wait() {
if m.t.Failed() {
// If the test has failed, don't try to limp along.
return
}

err := m.wait(roachprod, "monitor", m.nodes)
if err != nil {
if err := m.WaitE(); err != nil {
m.t.Fatal(err)
}
}
Expand Down
59 changes: 57 additions & 2 deletions pkg/cmd/roachtest/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ import (
"context"
"fmt"
"os"
"regexp"
"runtime"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/pkg/errors"
Expand Down Expand Up @@ -60,7 +63,7 @@ func TestClusterMonitor(t *testing.T) {
c := &cluster{t: t, l: logger}
m := newMonitor(context.Background(), c)
m.Go(func(context.Context) error { return nil })
if err := m.wait(`sleep`, `100`); err != nil {
if err := m.wait(`echo`, `1`); err != nil {
t.Fatal(err)
}
})
Expand Down Expand Up @@ -92,10 +95,62 @@ func TestClusterMonitor(t *testing.T) {
return ctx.Err()
})

err := m.wait(`sleep`, `100`)
err := m.wait(`echo`, `1`)
expectedErr := `worker-fail`
if !testutils.IsError(err, expectedErr) {
t.Errorf(`expected %s err got: %+v`, expectedErr, err)
}
})

// NB: the forker sleeps in these tests actually get leaked, so it's important to let
// them finish pretty soon (think stress testing). As a matter of fact, `make test` waits
// for these child goroutines to finish (so these tests take seconds).
t.Run(`worker-fd-error`, func(t *testing.T) {
c := &cluster{t: t, l: logger}
m := newMonitor(context.Background(), c)
m.Go(func(ctx context.Context) error {
defer func() {
fmt.Println("sleep returns")
}()
return execCmd(ctx, logger, "/bin/bash", "-c", "sleep 3& wait")
})
m.Go(func(ctx context.Context) error {
defer func() {
fmt.Println("failure returns")
}()
time.Sleep(30 * time.Millisecond)
return execCmd(ctx, logger, "/bin/bash", "-c", "echo hi && notthere")
})
expectedErr := regexp.QuoteMeta(`/bin/bash -c echo hi && notthere returned:
stderr:
/bin/bash: notthere: command not found
stdout:
hi
: exit status 127`)
if err := m.wait("sleep", "100"); !testutils.IsError(err, expectedErr) {
t.Error(err)
}
})
t.Run(`worker-fd-fatal`, func(t *testing.T) {
c := &cluster{t: t, l: logger}
m := newMonitor(context.Background(), c)
m.Go(func(ctx context.Context) error {
err := execCmd(ctx, logger, "/bin/bash", "-c", "echo foo && sleep 3& wait")
return err
})
m.Go(func(ctx context.Context) error {
time.Sleep(30 * time.Millisecond)
// Simulate c.t.Fatal for which there isn't enough mocking here.
// In reality t.Fatal adds text that is returned when the test fails,
// so the failing goroutine will be referenced (not like in the expected
// error below, where all you see is the other one being canceled).
runtime.Goexit()
return errors.New("unreachable")
})
expectedErr := regexp.QuoteMeta(`Goexit() was called`)
if err := m.wait("sleep", "100"); !testutils.IsError(err, expectedErr) {
t.Error(err)
}
})
}
4 changes: 2 additions & 2 deletions pkg/cmd/roachtest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ Use 'roachtest bench -n' to see a list of all benchmarks.
cmd.Flags().IntVar(
&count, "count", 1, "the number of times to run each test")
cmd.Flags().BoolVarP(
&debug, "debug", "d", debug, "don't wipe and destroy cluster if test fails")
&debugEnabled, "debug", "d", debugEnabled, "don't wipe and destroy cluster if test fails")
cmd.Flags().BoolVarP(
&dryrun, "dry-run", "n", dryrun, "dry run (don't run tests)")
cmd.Flags().IntVarP(
Expand Down Expand Up @@ -151,7 +151,7 @@ Cockroach cluster with existing data.
&stores, "stores", "n", stores, "number of stores to distribute data across")
storeGenCmd.Flags().SetInterspersed(false) // ignore workload flags
storeGenCmd.Flags().BoolVarP(
&debug, "debug", "d", debug, "don't wipe and destroy cluster if test fails")
&debugEnabled, "debug", "d", debugEnabled, "don't wipe and destroy cluster if test fails")

rootCmd.AddCommand(runCmd)
rootCmd.AddCommand(benchCmd)
Expand Down
8 changes: 4 additions & 4 deletions pkg/cmd/roachtest/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (
var (
parallelism = 10
count = 1
debug = false
debugEnabled = false
dryrun = false
postIssues = true
clusterNameRE = regexp.MustCompile(`^[a-z](?:[-a-z0-9]{0,61}[a-z0-9])?$`)
Expand Down Expand Up @@ -395,7 +395,7 @@ func (r *registry) Run(filter []string) int {
r.status.Unlock()

case <-sig:
if !debug {
if !debugEnabled {
destroyAllClusters()
}
}
Expand Down Expand Up @@ -725,7 +725,7 @@ func (r *registry) run(spec *testSpec, filter *regexp.Regexp, c *cluster, done f
c = newCluster(ctx, t, t.spec.Nodes)
if c != nil {
defer func() {
if !debug || !t.Failed() {
if !debugEnabled || !t.Failed() {
c.Destroy(ctx)
} else {
c.l.printf("not destroying cluster to allow debugging\n")
Expand Down Expand Up @@ -772,7 +772,7 @@ func (r *registry) run(spec *testSpec, filter *regexp.Regexp, c *cluster, done f
if c != nil {
c.FetchLogs(ctx)
// NB: c.destroyed is nil for cloned clusters (i.e. in subtests).
if !debug && c.destroyed != nil {
if !debugEnabled && c.destroyed != nil {
c.Destroy(ctx)
}
}
Expand Down

0 comments on commit 5ba6c4c

Please sign in to comment.