From 5ba6c4c4d54b2502079aa1e49f36dbde6bdde97f Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Wed, 29 Aug 2018 12:20:52 +0200 Subject: [PATCH] roachtest: observe ctx cancellation in Run Before this patch, roachprod invocations would not observe ctx cancellation. Or rather they would, but due to the usual obscure passing of Stdout into the child process of roachprod the Run call would not return until the child had finished. As a result, the test would continue running, which is annoying and also costs money. Also fixes up the handling of calling `c.t.Fatal` on a monitor goroutine (using what is perhaps unspecified behavior of the Go runtime). Anyway, the result is that you can do basically whatever inside of a monitor and get away with it: ```go m.Go(func(ctx context.Context) error { // Make sure the context cancellation works (not true prior to the PR // adding this test). return c.RunE(ctx, c.Node(1), "sleep", "2000") }) m.Go(func(ctx context.Context) error { // This will call c.t.Fatal which also used to wreak havoc on the test // harness. Now it exits just fine (and all it took were some mean hacks). // Note how it will exit with stderr and stdout in the failure message, // which is extremely helpful. c.Run(ctx, c.Node(1), "echo foo && echo bar && notfound") return errors.New("impossible") }) m.Wait() ``` now returns ``` --- FAIL: tpmc/w=1/nodes=3 (0.24s) ...,errgroup.go:58: /Users/tschottdorf/go/bin/roachprod run local:1 -- echo foo && echo bar && notfound returned: stderr: bash: notfound: command not found Error: exit status 127 stdout: foo bar : exit status 1 ...,tpcc.go:661: Goexit() was called FAIL ``` Release note: None --- Gopkg.lock | 9 ++ pkg/cmd/roachtest/cluster.go | 138 +++++++++++++++++++++++++++--- pkg/cmd/roachtest/cluster_test.go | 59 ++++++++++++- pkg/cmd/roachtest/main.go | 4 +- pkg/cmd/roachtest/test.go | 8 +- vendor | 2 +- 6 files changed, 197 insertions(+), 23 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 4307b071a626..8e6ca25233f2 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -148,6 +148,14 @@ revision = "327ebb6c2b6df8bf075da02ef45a2a034e9b79ba" version = "0.11.0" +[[projects]] + branch = "master" + digest = "1:752f1e64a07686d354c797fbde07c21f3d0e5a24f096858dbb731765a7e6a449" + name = "github.com/armon/circbuf" + packages = ["."] + pruneopts = "UT" + revision = "bbbad097214e2918d8543d5201d12bfd7bca254d" + [[projects]] branch = "master" digest = "1:9fd3a6ab34bb103ba228eefd044d3f9aa476237ea95a46d12e8cccd3abf3fea2" @@ -1614,6 +1622,7 @@ "github.com/VividCortex/ewma", "github.com/abourget/teamcity", "github.com/andy-kimball/arenaskl", + "github.com/armon/circbuf", "github.com/aws/aws-sdk-go/aws", "github.com/aws/aws-sdk-go/aws/awsutil", "github.com/aws/aws-sdk-go/aws/credentials", diff --git a/pkg/cmd/roachtest/cluster.go b/pkg/cmd/roachtest/cluster.go index 1a1a6e322dca..a36627ec9b4c 100644 --- a/pkg/cmd/roachtest/cluster.go +++ b/pkg/cmd/roachtest/cluster.go @@ -38,14 +38,15 @@ import ( "sync/atomic" "time" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" + // "postgres" gosql driver + _ "github.com/lib/pq" + + "github.com/armon/circbuf" "github.com/pkg/errors" "golang.org/x/sync/errgroup" - // "postgres" gosql driver - - _ "github.com/lib/pq" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) var ( @@ -209,12 +210,83 @@ func unregisterCluster(c *cluster) bool { } func execCmd(ctx context.Context, l *logger, args ...string) error { + // NB: It is important that this waitgroup Waits after cancel() below. + var wg sync.WaitGroup + defer wg.Wait() + + var cancel func() + ctx, cancel = context.WithCancel(ctx) + defer cancel() + l.printf("> %s\n", strings.Join(args, " ")) cmd := exec.CommandContext(ctx, args[0], args[1:]...) - cmd.Stdout = l.stdout - cmd.Stderr = l.stderr + + debugStdoutBuffer, _ := circbuf.NewBuffer(1024) + debugStderrBuffer, _ := circbuf.NewBuffer(1024) + + // Do a dance around https://github.com/golang/go/issues/23019. + // Briefly put, passing os.Std{out,err} to subprocesses isn't great for + // context cancellation as Run() will wait for any subprocesses to finish. + // For example, "roachprod run x -- sleep 20" would wait 20 seconds, even + // if the context got canceled right away. Work around the problem by passing + // pipes to the command on which we set aggressive deadlines once the context + // expires. + { + rOut, wOut, err := os.Pipe() + if err != nil { + return err + } + defer rOut.Close() + defer wOut.Close() + rErr, wErr, err := os.Pipe() + if err != nil { + return err + } + defer rErr.Close() + defer wErr.Close() + + cmd.Stdout = wOut + wg.Add(3) + go func() { + defer wg.Done() + _, _ = io.Copy(l.stdout, io.TeeReader(rOut, debugStdoutBuffer)) + }() + + if l.stderr == l.stdout { + // If l.stderr == l.stdout, we use only one pipe to avoid + // duplicating everything. + wg.Done() + cmd.Stderr = wOut + } else { + cmd.Stderr = wErr + go func() { + defer wg.Done() + _, _ = io.Copy(l.stderr, io.TeeReader(rErr, debugStderrBuffer)) + }() + } + + go func() { + defer wg.Done() + <-ctx.Done() + // NB: setting a more aggressive deadline here makes TestClusterMonitor flaky. + now := timeutil.Now().Add(3 * time.Second) + _ = rOut.SetDeadline(now) + _ = wOut.SetDeadline(now) + _ = rErr.SetDeadline(now) + _ = wErr.SetDeadline(now) + }() + } + if err := cmd.Run(); err != nil { - return errors.Wrapf(err, `%s`, strings.Join(args, ` `)) + cancel() + wg.Wait() // synchronize access to ring buffer + return errors.Wrapf( + err, + "%s returned:\nstderr:\n%s\nstdout:\n%s", + strings.Join(args, " "), + debugStderrBuffer.String(), + debugStdoutBuffer.String(), + ) } return nil } @@ -1083,24 +1155,62 @@ func (m *monitor) ExpectDeaths(count int32) { atomic.AddInt32(&m.expDeaths, count) } +var errGoexit = errors.New("Goexit() was called") + func (m *monitor) Go(fn func(context.Context) error) { - m.g.Go(func() error { + m.g.Go(func() (err error) { + var returned bool + defer func() { + if returned { + return + } + if r := recover(); r != errGoexit && r != nil { + // Pass any regular panics through. + panic(r) + } else { + // If the invoked method called runtime.Goexit (such as it + // happens when it calls t.Fatal), exit with a sentinel error + // here so that the wrapped errgroup cancels itself. + // + // Note that the trick here is that we panicked explicitly below, + // which somehow "overrides" the Goexit which is supposed to be + // un-recoverable, but we do need to recover to return an error. + err = errGoexit + } + }() if impl, ok := m.t.(*test); ok { // Automatically clear the worker status message when the goroutine exits. - defer impl.Status() + defer impl.WorkerStatus() } - return fn(m.ctx) + defer func() { + if !returned { + if r := recover(); r != nil { + panic(r) + } + panic(errGoexit) + } + }() + err = fn(m.ctx) + returned = true + return err }) } +func (m *monitor) WaitE() error { + if m.t.Failed() { + // If the test has failed, don't try to limp along. + return errors.New("already failed") + } + + return m.wait(roachprod, "monitor", m.nodes) +} + func (m *monitor) Wait() { if m.t.Failed() { // If the test has failed, don't try to limp along. return } - - err := m.wait(roachprod, "monitor", m.nodes) - if err != nil { + if err := m.WaitE(); err != nil { m.t.Fatal(err) } } diff --git a/pkg/cmd/roachtest/cluster_test.go b/pkg/cmd/roachtest/cluster_test.go index 263003496a08..cfed3f082ad7 100644 --- a/pkg/cmd/roachtest/cluster_test.go +++ b/pkg/cmd/roachtest/cluster_test.go @@ -19,7 +19,10 @@ import ( "context" "fmt" "os" + "regexp" + "runtime" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/pkg/errors" @@ -60,7 +63,7 @@ func TestClusterMonitor(t *testing.T) { c := &cluster{t: t, l: logger} m := newMonitor(context.Background(), c) m.Go(func(context.Context) error { return nil }) - if err := m.wait(`sleep`, `100`); err != nil { + if err := m.wait(`echo`, `1`); err != nil { t.Fatal(err) } }) @@ -92,10 +95,62 @@ func TestClusterMonitor(t *testing.T) { return ctx.Err() }) - err := m.wait(`sleep`, `100`) + err := m.wait(`echo`, `1`) expectedErr := `worker-fail` if !testutils.IsError(err, expectedErr) { t.Errorf(`expected %s err got: %+v`, expectedErr, err) } }) + + // NB: the forker sleeps in these tests actually get leaked, so it's important to let + // them finish pretty soon (think stress testing). As a matter of fact, `make test` waits + // for these child goroutines to finish (so these tests take seconds). + t.Run(`worker-fd-error`, func(t *testing.T) { + c := &cluster{t: t, l: logger} + m := newMonitor(context.Background(), c) + m.Go(func(ctx context.Context) error { + defer func() { + fmt.Println("sleep returns") + }() + return execCmd(ctx, logger, "/bin/bash", "-c", "sleep 3& wait") + }) + m.Go(func(ctx context.Context) error { + defer func() { + fmt.Println("failure returns") + }() + time.Sleep(30 * time.Millisecond) + return execCmd(ctx, logger, "/bin/bash", "-c", "echo hi && notthere") + }) + expectedErr := regexp.QuoteMeta(`/bin/bash -c echo hi && notthere returned: +stderr: +/bin/bash: notthere: command not found + +stdout: +hi +: exit status 127`) + if err := m.wait("sleep", "100"); !testutils.IsError(err, expectedErr) { + t.Error(err) + } + }) + t.Run(`worker-fd-fatal`, func(t *testing.T) { + c := &cluster{t: t, l: logger} + m := newMonitor(context.Background(), c) + m.Go(func(ctx context.Context) error { + err := execCmd(ctx, logger, "/bin/bash", "-c", "echo foo && sleep 3& wait") + return err + }) + m.Go(func(ctx context.Context) error { + time.Sleep(30 * time.Millisecond) + // Simulate c.t.Fatal for which there isn't enough mocking here. + // In reality t.Fatal adds text that is returned when the test fails, + // so the failing goroutine will be referenced (not like in the expected + // error below, where all you see is the other one being canceled). + runtime.Goexit() + return errors.New("unreachable") + }) + expectedErr := regexp.QuoteMeta(`Goexit() was called`) + if err := m.wait("sleep", "100"); !testutils.IsError(err, expectedErr) { + t.Error(err) + } + }) } diff --git a/pkg/cmd/roachtest/main.go b/pkg/cmd/roachtest/main.go index d925710ae9dd..7e7757097e50 100644 --- a/pkg/cmd/roachtest/main.go +++ b/pkg/cmd/roachtest/main.go @@ -117,7 +117,7 @@ Use 'roachtest bench -n' to see a list of all benchmarks. cmd.Flags().IntVar( &count, "count", 1, "the number of times to run each test") cmd.Flags().BoolVarP( - &debug, "debug", "d", debug, "don't wipe and destroy cluster if test fails") + &debugEnabled, "debug", "d", debugEnabled, "don't wipe and destroy cluster if test fails") cmd.Flags().BoolVarP( &dryrun, "dry-run", "n", dryrun, "dry run (don't run tests)") cmd.Flags().IntVarP( @@ -151,7 +151,7 @@ Cockroach cluster with existing data. &stores, "stores", "n", stores, "number of stores to distribute data across") storeGenCmd.Flags().SetInterspersed(false) // ignore workload flags storeGenCmd.Flags().BoolVarP( - &debug, "debug", "d", debug, "don't wipe and destroy cluster if test fails") + &debugEnabled, "debug", "d", debugEnabled, "don't wipe and destroy cluster if test fails") rootCmd.AddCommand(runCmd) rootCmd.AddCommand(benchCmd) diff --git a/pkg/cmd/roachtest/test.go b/pkg/cmd/roachtest/test.go index 018aeb97c0fe..b0442adc451f 100644 --- a/pkg/cmd/roachtest/test.go +++ b/pkg/cmd/roachtest/test.go @@ -41,7 +41,7 @@ import ( var ( parallelism = 10 count = 1 - debug = false + debugEnabled = false dryrun = false postIssues = true clusterNameRE = regexp.MustCompile(`^[a-z](?:[-a-z0-9]{0,61}[a-z0-9])?$`) @@ -395,7 +395,7 @@ func (r *registry) Run(filter []string) int { r.status.Unlock() case <-sig: - if !debug { + if !debugEnabled { destroyAllClusters() } } @@ -725,7 +725,7 @@ func (r *registry) run(spec *testSpec, filter *regexp.Regexp, c *cluster, done f c = newCluster(ctx, t, t.spec.Nodes) if c != nil { defer func() { - if !debug || !t.Failed() { + if !debugEnabled || !t.Failed() { c.Destroy(ctx) } else { c.l.printf("not destroying cluster to allow debugging\n") @@ -772,7 +772,7 @@ func (r *registry) run(spec *testSpec, filter *regexp.Regexp, c *cluster, done f if c != nil { c.FetchLogs(ctx) // NB: c.destroyed is nil for cloned clusters (i.e. in subtests). - if !debug && c.destroyed != nil { + if !debugEnabled && c.destroyed != nil { c.Destroy(ctx) } } diff --git a/vendor b/vendor index 588ce25caa39..7dd5f189c28f 160000 --- a/vendor +++ b/vendor @@ -1 +1 @@ -Subproject commit 588ce25caa391636ac84bdbb683d3f6957b8003d +Subproject commit 7dd5f189c28f661387afca139a84c35efcf853df