diff --git a/Gopkg.lock b/Gopkg.lock index 4307b071a626..8e6ca25233f2 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -148,6 +148,14 @@ revision = "327ebb6c2b6df8bf075da02ef45a2a034e9b79ba" version = "0.11.0" +[[projects]] + branch = "master" + digest = "1:752f1e64a07686d354c797fbde07c21f3d0e5a24f096858dbb731765a7e6a449" + name = "github.com/armon/circbuf" + packages = ["."] + pruneopts = "UT" + revision = "bbbad097214e2918d8543d5201d12bfd7bca254d" + [[projects]] branch = "master" digest = "1:9fd3a6ab34bb103ba228eefd044d3f9aa476237ea95a46d12e8cccd3abf3fea2" @@ -1614,6 +1622,7 @@ "github.com/VividCortex/ewma", "github.com/abourget/teamcity", "github.com/andy-kimball/arenaskl", + "github.com/armon/circbuf", "github.com/aws/aws-sdk-go/aws", "github.com/aws/aws-sdk-go/aws/awsutil", "github.com/aws/aws-sdk-go/aws/credentials", diff --git a/pkg/cmd/roachtest/cluster.go b/pkg/cmd/roachtest/cluster.go index 1a1a6e322dca..a36627ec9b4c 100644 --- a/pkg/cmd/roachtest/cluster.go +++ b/pkg/cmd/roachtest/cluster.go @@ -38,14 +38,15 @@ import ( "sync/atomic" "time" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" + // "postgres" gosql driver + _ "github.com/lib/pq" + + "github.com/armon/circbuf" "github.com/pkg/errors" "golang.org/x/sync/errgroup" - // "postgres" gosql driver - - _ "github.com/lib/pq" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) var ( @@ -209,12 +210,83 @@ func unregisterCluster(c *cluster) bool { } func execCmd(ctx context.Context, l *logger, args ...string) error { + // NB: It is important that this waitgroup Waits after cancel() below. + var wg sync.WaitGroup + defer wg.Wait() + + var cancel func() + ctx, cancel = context.WithCancel(ctx) + defer cancel() + l.printf("> %s\n", strings.Join(args, " ")) cmd := exec.CommandContext(ctx, args[0], args[1:]...) - cmd.Stdout = l.stdout - cmd.Stderr = l.stderr + + debugStdoutBuffer, _ := circbuf.NewBuffer(1024) + debugStderrBuffer, _ := circbuf.NewBuffer(1024) + + // Do a dance around https://github.com/golang/go/issues/23019. + // Briefly put, passing os.Std{out,err} to subprocesses isn't great for + // context cancellation as Run() will wait for any subprocesses to finish. + // For example, "roachprod run x -- sleep 20" would wait 20 seconds, even + // if the context got canceled right away. Work around the problem by passing + // pipes to the command on which we set aggressive deadlines once the context + // expires. + { + rOut, wOut, err := os.Pipe() + if err != nil { + return err + } + defer rOut.Close() + defer wOut.Close() + rErr, wErr, err := os.Pipe() + if err != nil { + return err + } + defer rErr.Close() + defer wErr.Close() + + cmd.Stdout = wOut + wg.Add(3) + go func() { + defer wg.Done() + _, _ = io.Copy(l.stdout, io.TeeReader(rOut, debugStdoutBuffer)) + }() + + if l.stderr == l.stdout { + // If l.stderr == l.stdout, we use only one pipe to avoid + // duplicating everything. + wg.Done() + cmd.Stderr = wOut + } else { + cmd.Stderr = wErr + go func() { + defer wg.Done() + _, _ = io.Copy(l.stderr, io.TeeReader(rErr, debugStderrBuffer)) + }() + } + + go func() { + defer wg.Done() + <-ctx.Done() + // NB: setting a more aggressive deadline here makes TestClusterMonitor flaky. + now := timeutil.Now().Add(3 * time.Second) + _ = rOut.SetDeadline(now) + _ = wOut.SetDeadline(now) + _ = rErr.SetDeadline(now) + _ = wErr.SetDeadline(now) + }() + } + if err := cmd.Run(); err != nil { - return errors.Wrapf(err, `%s`, strings.Join(args, ` `)) + cancel() + wg.Wait() // synchronize access to ring buffer + return errors.Wrapf( + err, + "%s returned:\nstderr:\n%s\nstdout:\n%s", + strings.Join(args, " "), + debugStderrBuffer.String(), + debugStdoutBuffer.String(), + ) } return nil } @@ -1083,24 +1155,62 @@ func (m *monitor) ExpectDeaths(count int32) { atomic.AddInt32(&m.expDeaths, count) } +var errGoexit = errors.New("Goexit() was called") + func (m *monitor) Go(fn func(context.Context) error) { - m.g.Go(func() error { + m.g.Go(func() (err error) { + var returned bool + defer func() { + if returned { + return + } + if r := recover(); r != errGoexit && r != nil { + // Pass any regular panics through. + panic(r) + } else { + // If the invoked method called runtime.Goexit (such as it + // happens when it calls t.Fatal), exit with a sentinel error + // here so that the wrapped errgroup cancels itself. + // + // Note that the trick here is that we panicked explicitly below, + // which somehow "overrides" the Goexit which is supposed to be + // un-recoverable, but we do need to recover to return an error. + err = errGoexit + } + }() if impl, ok := m.t.(*test); ok { // Automatically clear the worker status message when the goroutine exits. - defer impl.Status() + defer impl.WorkerStatus() } - return fn(m.ctx) + defer func() { + if !returned { + if r := recover(); r != nil { + panic(r) + } + panic(errGoexit) + } + }() + err = fn(m.ctx) + returned = true + return err }) } +func (m *monitor) WaitE() error { + if m.t.Failed() { + // If the test has failed, don't try to limp along. + return errors.New("already failed") + } + + return m.wait(roachprod, "monitor", m.nodes) +} + func (m *monitor) Wait() { if m.t.Failed() { // If the test has failed, don't try to limp along. return } - - err := m.wait(roachprod, "monitor", m.nodes) - if err != nil { + if err := m.WaitE(); err != nil { m.t.Fatal(err) } } diff --git a/pkg/cmd/roachtest/cluster_test.go b/pkg/cmd/roachtest/cluster_test.go index 263003496a08..cfed3f082ad7 100644 --- a/pkg/cmd/roachtest/cluster_test.go +++ b/pkg/cmd/roachtest/cluster_test.go @@ -19,7 +19,10 @@ import ( "context" "fmt" "os" + "regexp" + "runtime" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/pkg/errors" @@ -60,7 +63,7 @@ func TestClusterMonitor(t *testing.T) { c := &cluster{t: t, l: logger} m := newMonitor(context.Background(), c) m.Go(func(context.Context) error { return nil }) - if err := m.wait(`sleep`, `100`); err != nil { + if err := m.wait(`echo`, `1`); err != nil { t.Fatal(err) } }) @@ -92,10 +95,62 @@ func TestClusterMonitor(t *testing.T) { return ctx.Err() }) - err := m.wait(`sleep`, `100`) + err := m.wait(`echo`, `1`) expectedErr := `worker-fail` if !testutils.IsError(err, expectedErr) { t.Errorf(`expected %s err got: %+v`, expectedErr, err) } }) + + // NB: the forker sleeps in these tests actually get leaked, so it's important to let + // them finish pretty soon (think stress testing). As a matter of fact, `make test` waits + // for these child goroutines to finish (so these tests take seconds). + t.Run(`worker-fd-error`, func(t *testing.T) { + c := &cluster{t: t, l: logger} + m := newMonitor(context.Background(), c) + m.Go(func(ctx context.Context) error { + defer func() { + fmt.Println("sleep returns") + }() + return execCmd(ctx, logger, "/bin/bash", "-c", "sleep 3& wait") + }) + m.Go(func(ctx context.Context) error { + defer func() { + fmt.Println("failure returns") + }() + time.Sleep(30 * time.Millisecond) + return execCmd(ctx, logger, "/bin/bash", "-c", "echo hi && notthere") + }) + expectedErr := regexp.QuoteMeta(`/bin/bash -c echo hi && notthere returned: +stderr: +/bin/bash: notthere: command not found + +stdout: +hi +: exit status 127`) + if err := m.wait("sleep", "100"); !testutils.IsError(err, expectedErr) { + t.Error(err) + } + }) + t.Run(`worker-fd-fatal`, func(t *testing.T) { + c := &cluster{t: t, l: logger} + m := newMonitor(context.Background(), c) + m.Go(func(ctx context.Context) error { + err := execCmd(ctx, logger, "/bin/bash", "-c", "echo foo && sleep 3& wait") + return err + }) + m.Go(func(ctx context.Context) error { + time.Sleep(30 * time.Millisecond) + // Simulate c.t.Fatal for which there isn't enough mocking here. + // In reality t.Fatal adds text that is returned when the test fails, + // so the failing goroutine will be referenced (not like in the expected + // error below, where all you see is the other one being canceled). + runtime.Goexit() + return errors.New("unreachable") + }) + expectedErr := regexp.QuoteMeta(`Goexit() was called`) + if err := m.wait("sleep", "100"); !testutils.IsError(err, expectedErr) { + t.Error(err) + } + }) } diff --git a/pkg/cmd/roachtest/main.go b/pkg/cmd/roachtest/main.go index d925710ae9dd..7e7757097e50 100644 --- a/pkg/cmd/roachtest/main.go +++ b/pkg/cmd/roachtest/main.go @@ -117,7 +117,7 @@ Use 'roachtest bench -n' to see a list of all benchmarks. cmd.Flags().IntVar( &count, "count", 1, "the number of times to run each test") cmd.Flags().BoolVarP( - &debug, "debug", "d", debug, "don't wipe and destroy cluster if test fails") + &debugEnabled, "debug", "d", debugEnabled, "don't wipe and destroy cluster if test fails") cmd.Flags().BoolVarP( &dryrun, "dry-run", "n", dryrun, "dry run (don't run tests)") cmd.Flags().IntVarP( @@ -151,7 +151,7 @@ Cockroach cluster with existing data. &stores, "stores", "n", stores, "number of stores to distribute data across") storeGenCmd.Flags().SetInterspersed(false) // ignore workload flags storeGenCmd.Flags().BoolVarP( - &debug, "debug", "d", debug, "don't wipe and destroy cluster if test fails") + &debugEnabled, "debug", "d", debugEnabled, "don't wipe and destroy cluster if test fails") rootCmd.AddCommand(runCmd) rootCmd.AddCommand(benchCmd) diff --git a/pkg/cmd/roachtest/test.go b/pkg/cmd/roachtest/test.go index 018aeb97c0fe..b0442adc451f 100644 --- a/pkg/cmd/roachtest/test.go +++ b/pkg/cmd/roachtest/test.go @@ -41,7 +41,7 @@ import ( var ( parallelism = 10 count = 1 - debug = false + debugEnabled = false dryrun = false postIssues = true clusterNameRE = regexp.MustCompile(`^[a-z](?:[-a-z0-9]{0,61}[a-z0-9])?$`) @@ -395,7 +395,7 @@ func (r *registry) Run(filter []string) int { r.status.Unlock() case <-sig: - if !debug { + if !debugEnabled { destroyAllClusters() } } @@ -725,7 +725,7 @@ func (r *registry) run(spec *testSpec, filter *regexp.Regexp, c *cluster, done f c = newCluster(ctx, t, t.spec.Nodes) if c != nil { defer func() { - if !debug || !t.Failed() { + if !debugEnabled || !t.Failed() { c.Destroy(ctx) } else { c.l.printf("not destroying cluster to allow debugging\n") @@ -772,7 +772,7 @@ func (r *registry) run(spec *testSpec, filter *regexp.Regexp, c *cluster, done f if c != nil { c.FetchLogs(ctx) // NB: c.destroyed is nil for cloned clusters (i.e. in subtests). - if !debug && c.destroyed != nil { + if !debugEnabled && c.destroyed != nil { c.Destroy(ctx) } } diff --git a/vendor b/vendor index 588ce25caa39..7dd5f189c28f 160000 --- a/vendor +++ b/vendor @@ -1 +1 @@ -Subproject commit 588ce25caa391636ac84bdbb683d3f6957b8003d +Subproject commit 7dd5f189c28f661387afca139a84c35efcf853df