Skip to content

Commit

Permalink
roachprod: add pprof command
Browse files Browse the repository at this point in the history
The new pprof command allows to user to collect CPU and heap profiles
from nodes in a roachprod cluster using pprof's HTTP endpoints.

Profiles are collected in parallel and stored in the current working
directory of the machine running pprof. The command can also
optionally open the profiles using `go tool pprof`.

Some usage examples:

    # Capture CPU profile for all nodes in the cluster
    roachprod pprof CLUSTERNAME
    # Capture CPU profile for the first node in the cluster for 60 seconds
    roachprod pprof CLUSTERNAME:1 --duration 60s
    # Capture a Heap profile for the first node in the cluster
    roachprod pprof CLUSTERNAME:1 --heap
    # Same as above
    roachprod pprof-heap CLUSTERNAME:1

Fixes cockroachdb#62309

Release note: None
  • Loading branch information
stevendanna committed Mar 30, 2021
1 parent 52d81ca commit 0a5f420
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 18 deletions.
4 changes: 4 additions & 0 deletions pkg/cmd/roachprod/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ go_library(
"//pkg/cmd/roachprod/vm/gce",
"//pkg/cmd/roachprod/vm/local",
"//pkg/util/flagutil",
"//pkg/util/httputil",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_errors//oserror",
"@com_github_spf13_cobra//:cobra",
"@org_golang_x_sys//unix",
"@org_golang_x_term//:term",
Expand Down
60 changes: 43 additions & 17 deletions pkg/cmd/roachprod/install/cluster_synced.go
Original file line number Diff line number Diff line change
Expand Up @@ -1623,23 +1623,53 @@ func (c *SyncedCluster) scp(src, dest string) error {
return nil
}

// Parallel TODO(peter): document
// ParallelResult captures the result of a user-defined function
// passed to Parallel or ParallelE.
type ParallelResult struct {
Index int
Out []byte
Err error
}

// Parallel runs a user-defined function across the nodes in the
// cluster. If any of the commands fail, Parallel will log an error
// and exit the program.
//
// See ParallelE for more information.
func (c *SyncedCluster) Parallel(
display string, count, concurrency int, fn func(i int) ([]byte, error),
) {
failed, err := c.ParallelE(display, count, concurrency, fn)
if err != nil {
sort.Slice(failed, func(i, j int) bool { return failed[i].Index < failed[j].Index })
for _, f := range failed {
fmt.Fprintf(os.Stderr, "%d: %+v: %s\n", f.Index, f.Err, f.Out)
}
log.Fatal("command failed")
}
}

// ParallelE runs the given function in parallel across the given
// nodes, returning an error if function returns an error.
//
// ParallelE runs the user-defined functions on the first `count`
// nodes in the cluster. It runs at most `concurrency` (or
// `c.MaxConcurrency` if it is lower) in parallel. If `concurrency` is
// 0, then it defaults to `count`.
//
// If err is non-nil, the slice of ParallelResults will contain the
// results from any of the failed invocations.
func (c *SyncedCluster) ParallelE(
display string, count, concurrency int, fn func(i int) ([]byte, error),
) ([]ParallelResult, error) {
if concurrency == 0 || concurrency > count {
concurrency = count
}
if c.MaxConcurrency > 0 && concurrency > c.MaxConcurrency {
concurrency = c.MaxConcurrency
}
type result struct {
index int
out []byte
err error
}

results := make(chan result, count)
results := make(chan ParallelResult, count)
var wg sync.WaitGroup
wg.Add(count)

Expand All @@ -1648,7 +1678,7 @@ func (c *SyncedCluster) Parallel(
go func(i int) {
defer wg.Done()
out, err := fn(i)
results <- result{i, out, err}
results <- ParallelResult{i, out, err}
}(index)
index++
}
Expand All @@ -1675,10 +1705,9 @@ func (c *SyncedCluster) Parallel(
ticker = time.NewTicker(1000 * time.Millisecond)
fmt.Fprintf(out, "%s", display)
}

defer ticker.Stop()
complete := make([]bool, count)
var failed []result
var failed []ParallelResult

var spinner = []string{"|", "/", "-", "\\"}
spinnerIdx := 0
Expand All @@ -1690,12 +1719,12 @@ func (c *SyncedCluster) Parallel(
fmt.Fprintf(out, ".")
}
case r, ok := <-results:
if r.err != nil {
if r.Err != nil {
failed = append(failed, r)
}
done = !ok
if ok {
complete[r.index] = true
complete[r.Index] = true
}
if index < count {
startNext()
Expand Down Expand Up @@ -1725,12 +1754,9 @@ func (c *SyncedCluster) Parallel(
}

if len(failed) > 0 {
sort.Slice(failed, func(i, j int) bool { return failed[i].index < failed[j].index })
for _, f := range failed {
fmt.Fprintf(os.Stderr, "%d: %+v: %s\n", f.index, f.err, f.out)
}
log.Fatal("command failed")
return failed, errors.New("one or more parallel execution failure")
}
return nil, nil
}

func (c *SyncedCluster) escapedTag() string {
Expand Down
149 changes: 148 additions & 1 deletion pkg/cmd/roachprod/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@
package main

import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
"os"
"os/exec"
"os/user"
Expand All @@ -39,7 +43,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachprod/vm/gce"
"github.com/cockroachdb/cockroach/pkg/cmd/roachprod/vm/local"
"github.com/cockroachdb/cockroach/pkg/util/flagutil"
"github.com/cockroachdb/cockroach/pkg/util/httputil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/errors/oserror"
"github.com/spf13/cobra"
"golang.org/x/sys/unix"
"golang.org/x/term"
Expand Down Expand Up @@ -1496,6 +1504,136 @@ var pgurlCmd = &cobra.Command{
}),
}

var pprofOptions = struct {
heap bool
open bool
startingPort int
duration time.Duration
}{}

var pprofCmd = &cobra.Command{
Use: "pprof <cluster>",
Args: cobra.ExactArgs(1),
Aliases: []string{"pprof-cpu", "pprof-profile", "pprof-heap"},
Short: "capture a pprof profile from the specified nodes",
Long: `Capture a pprof profile from the specified nodes.
Examples:
# Capture CPU profile for all nodes in the cluster
roachprod pprof CLUSTERNAME
# Capture CPU profile for the first node in the cluster for 60 seconds
roachprod pprof CLUSTERNAME:1 --duration 60s
# Capture a Heap profile for the first node in the cluster
roachprod pprof CLUSTERNAME:1 --heap
# Same as above
roachprod pprof-heap CLUSTERNAME:1
`,
Run: wrap(func(cmd *cobra.Command, args []string) error {
c, err := newCluster(args[0])
if err != nil {
return err
}

var profType string
var description string
if cmd.CalledAs() == "pprof-heap" || pprofOptions.heap {
description = "capturing heap profile"
profType = "heap"
} else {
description = "capturing CPU profile"
profType = "profile"
}

outputFiles := []string{}
mu := &syncutil.Mutex{}
pprofPath := fmt.Sprintf("debug/pprof/%s?seconds=%d", profType, int(pprofOptions.duration.Seconds()))
httpClient := httputil.NewClientWithTimeout(5 * pprofOptions.duration)

failed, err := c.ParallelE(description, len(c.ServerNodes()), 0, func(i int) ([]byte, error) {
host := c.VMs[i]
port := install.GetAdminUIPort(c.Impl.NodePort(c, i))
scheme := "http"
if c.Secure {
scheme = "https"
}
outputFile := fmt.Sprintf("pprof-%s-%s-%04d-%d.out", profType, c.Name, i, timeutil.Now().Unix())
outputDir := filepath.Dir(outputFile)
file, err := ioutil.TempFile(outputDir, ".pprof")
if err != nil {
return nil, errors.Wrap(err, "create tmpfile for pprof download")
}

defer func() {
err := os.Remove(file.Name())
if err != nil && !oserror.IsNotExist(err) {
fmt.Fprintf(os.Stderr, "warning: could not remove temporary file")
}
}()

pprofURL := fmt.Sprintf("%s://%s:%d/%s", scheme, host, port, pprofPath)
resp, err := httpClient.Get(context.Background(), pprofURL)
if err != nil {
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return nil, errors.Newf("unexpected status from pprof endpoint: %s", resp.Status)
}

if _, err := io.Copy(file, resp.Body); err != nil {
return nil, err
}
if err := file.Sync(); err != nil {
return nil, err
}
if err := os.Rename(file.Name(), outputFile); err != nil {
return nil, err
}

mu.Lock()
outputFiles = append(outputFiles, outputFile)
mu.Unlock()
return nil, nil
})

for _, s := range outputFiles {
fmt.Printf("Created %s\n", s)
}

if err != nil {
sort.Slice(failed, func(i, j int) bool { return failed[i].Index < failed[j].Index })
for _, f := range failed {
fmt.Fprintf(os.Stderr, "%d: %+v: %s\n", f.Index, f.Err, f.Out)
}
os.Exit(1)
}

if pprofOptions.open {
waitCommands := []*exec.Cmd{}
for i, file := range outputFiles {
port := pprofOptions.startingPort + i
cmd := exec.Command("go", "tool", "pprof",
"-http", fmt.Sprintf(":%d", port),
file)
waitCommands = append(waitCommands, cmd)
if err := cmd.Start(); err != nil {
return err
}
}

for _, cmd := range waitCommands {
err := cmd.Wait()
if err != nil {
return err
}
}
}
return nil
}),
}

var adminurlCmd = &cobra.Command{
Use: "adminurl <cluster>",
Aliases: []string{"admin", "adminui"},
Expand Down Expand Up @@ -1610,7 +1748,7 @@ func main() {
pgurlCmd,
adminurlCmd,
logsCmd,

pprofCmd,
cachedHostsCmd,
)
rootCmd.BashCompletionFunction = fmt.Sprintf(`__custom_func()
Expand Down Expand Up @@ -1715,6 +1853,15 @@ func main() {
pgurlCmd.Flags().BoolVar(
&external, "external", false, "return pgurls for external connections")

pprofCmd.Flags().DurationVar(
&pprofOptions.duration, "duration", 30*time.Second, "Duration of profile to capture")
pprofCmd.Flags().BoolVar(
&pprofOptions.heap, "heap", false, "Capture a heap profile instead of a CPU profile")
pprofCmd.Flags().BoolVar(
&pprofOptions.open, "open", false, "Open the profile using `go tool pprof -http`")
pprofCmd.Flags().IntVar(
&pprofOptions.startingPort, "starting-port", 9000, "Initial port to use when opening pprof's HTTP interface")

ipCmd.Flags().BoolVar(
&external, "external", false, "return external IP addresses")

Expand Down

0 comments on commit 0a5f420

Please sign in to comment.