Skip to content

Commit

Permalink
Merge #62640 #62796
Browse files Browse the repository at this point in the history
62640: colexecutils: optimize the deselector a bit for zero batch r=yuzefovich a=yuzefovich

Zero length batch is special in the vectorized engine. Once we receive
it from the input, in the vast majority of cases we don't need to do any
processing in the operator. Previously, the deselector would still do
some stuff even for zero batch, and now it'll short-circuit as most
operators do.

Release note: None

62796: roachprod: add pprof command r=erikgrinaker a=stevendanna

The new pprof command allows to user to collect CPU and heap profiles
from nodes in a roachprod cluster using pprof's HTTP endpoints.

Profiles are collected in parallel and stored in the current working
directory of the machine running pprof. The command can also
optionally open the profiles using `go tool pprof`.

Some usage examples:

    # Capture CPU profile for all nodes in the cluster
    roachprod pprof CLUSTERNAME
    # Capture CPU profile for the first node in the cluster for 60 seconds
    roachprod pprof CLUSTERNAME:1 --duration 60s
    # Capture a Heap profile for the first node in the cluster
    roachprod pprof CLUSTERNAME:1 --heap
    # Same as above
    roachprod pprof-heap CLUSTERNAME:1

Fixes #62309

Note that I've skipped the `pprofurl` command in the original issue. I noticed that the `adminurl` 
command has an option to append any string you'd like to the URL, which is probably most of what one
needs from that command in most cases. But, I can add it if others feel differently.

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Steven Danna <[email protected]>
  • Loading branch information
3 people committed Mar 31, 2021
3 parents 46a8edd + 6a0589c + 1d2b57e commit 00e8d9c
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 19 deletions.
4 changes: 4 additions & 0 deletions pkg/cmd/roachprod/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ go_library(
"//pkg/cmd/roachprod/vm/gce",
"//pkg/cmd/roachprod/vm/local",
"//pkg/util/flagutil",
"//pkg/util/httputil",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_errors//oserror",
"@com_github_spf13_cobra//:cobra",
"@org_golang_x_sys//unix",
"@org_golang_x_term//:term",
Expand Down
60 changes: 43 additions & 17 deletions pkg/cmd/roachprod/install/cluster_synced.go
Original file line number Diff line number Diff line change
Expand Up @@ -1623,23 +1623,53 @@ func (c *SyncedCluster) scp(src, dest string) error {
return nil
}

// Parallel TODO(peter): document
// ParallelResult captures the result of a user-defined function
// passed to Parallel or ParallelE.
type ParallelResult struct {
Index int
Out []byte
Err error
}

// Parallel runs a user-defined function across the nodes in the
// cluster. If any of the commands fail, Parallel will log an error
// and exit the program.
//
// See ParallelE for more information.
func (c *SyncedCluster) Parallel(
display string, count, concurrency int, fn func(i int) ([]byte, error),
) {
failed, err := c.ParallelE(display, count, concurrency, fn)
if err != nil {
sort.Slice(failed, func(i, j int) bool { return failed[i].Index < failed[j].Index })
for _, f := range failed {
fmt.Fprintf(os.Stderr, "%d: %+v: %s\n", f.Index, f.Err, f.Out)
}
log.Fatal("command failed")
}
}

// ParallelE runs the given function in parallel across the given
// nodes, returning an error if function returns an error.
//
// ParallelE runs the user-defined functions on the first `count`
// nodes in the cluster. It runs at most `concurrency` (or
// `c.MaxConcurrency` if it is lower) in parallel. If `concurrency` is
// 0, then it defaults to `count`.
//
// If err is non-nil, the slice of ParallelResults will contain the
// results from any of the failed invocations.
func (c *SyncedCluster) ParallelE(
display string, count, concurrency int, fn func(i int) ([]byte, error),
) ([]ParallelResult, error) {
if concurrency == 0 || concurrency > count {
concurrency = count
}
if c.MaxConcurrency > 0 && concurrency > c.MaxConcurrency {
concurrency = c.MaxConcurrency
}
type result struct {
index int
out []byte
err error
}

results := make(chan result, count)
results := make(chan ParallelResult, count)
var wg sync.WaitGroup
wg.Add(count)

Expand All @@ -1648,7 +1678,7 @@ func (c *SyncedCluster) Parallel(
go func(i int) {
defer wg.Done()
out, err := fn(i)
results <- result{i, out, err}
results <- ParallelResult{i, out, err}
}(index)
index++
}
Expand All @@ -1675,10 +1705,9 @@ func (c *SyncedCluster) Parallel(
ticker = time.NewTicker(1000 * time.Millisecond)
fmt.Fprintf(out, "%s", display)
}

defer ticker.Stop()
complete := make([]bool, count)
var failed []result
var failed []ParallelResult

var spinner = []string{"|", "/", "-", "\\"}
spinnerIdx := 0
Expand All @@ -1690,12 +1719,12 @@ func (c *SyncedCluster) Parallel(
fmt.Fprintf(out, ".")
}
case r, ok := <-results:
if r.err != nil {
if r.Err != nil {
failed = append(failed, r)
}
done = !ok
if ok {
complete[r.index] = true
complete[r.Index] = true
}
if index < count {
startNext()
Expand Down Expand Up @@ -1725,12 +1754,9 @@ func (c *SyncedCluster) Parallel(
}

if len(failed) > 0 {
sort.Slice(failed, func(i, j int) bool { return failed[i].index < failed[j].index })
for _, f := range failed {
fmt.Fprintf(os.Stderr, "%d: %+v: %s\n", f.index, f.err, f.out)
}
log.Fatal("command failed")
return failed, errors.New("one or more parallel execution failure")
}
return nil, nil
}

func (c *SyncedCluster) escapedTag() string {
Expand Down
163 changes: 162 additions & 1 deletion pkg/cmd/roachprod/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@
package main

import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
"os"
"os/exec"
"os/user"
Expand All @@ -39,7 +43,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachprod/vm/gce"
"github.com/cockroachdb/cockroach/pkg/cmd/roachprod/vm/local"
"github.com/cockroachdb/cockroach/pkg/util/flagutil"
"github.com/cockroachdb/cockroach/pkg/util/httputil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/errors/oserror"
"github.com/spf13/cobra"
"golang.org/x/sys/unix"
"golang.org/x/term"
Expand Down Expand Up @@ -1496,6 +1504,150 @@ var pgurlCmd = &cobra.Command{
}),
}

var pprofOptions = struct {
heap bool
open bool
startingPort int
duration time.Duration
}{}

var pprofCmd = &cobra.Command{
Use: "pprof <cluster>",
Args: cobra.ExactArgs(1),
Aliases: []string{"pprof-heap"},
Short: "capture a pprof profile from the specified nodes",
Long: `Capture a pprof profile from the specified nodes.
Examples:
# Capture CPU profile for all nodes in the cluster
roachprod pprof CLUSTERNAME
# Capture CPU profile for the first node in the cluster for 60 seconds
roachprod pprof CLUSTERNAME:1 --duration 60s
# Capture a Heap profile for the first node in the cluster
roachprod pprof CLUSTERNAME:1 --heap
# Same as above
roachprod pprof-heap CLUSTERNAME:1
`,
Run: wrap(func(cmd *cobra.Command, args []string) error {
c, err := newCluster(args[0])
if err != nil {
return err
}

var profType string
var description string
if cmd.CalledAs() == "pprof-heap" || pprofOptions.heap {
description = "capturing heap profile"
profType = "heap"
} else {
description = "capturing CPU profile"
profType = "profile"
}

outputFiles := []string{}
mu := &syncutil.Mutex{}
pprofPath := fmt.Sprintf("debug/pprof/%s?seconds=%d", profType, int(pprofOptions.duration.Seconds()))

minTimeout := 30 * time.Second
timeout := 2 * pprofOptions.duration
if timeout < minTimeout {
timeout = minTimeout
}

httpClient := httputil.NewClientWithTimeout(timeout)
startTime := timeutil.Now().Unix()
failed, err := c.ParallelE(description, len(c.ServerNodes()), 0, func(i int) ([]byte, error) {
host := c.VMs[i]
port := install.GetAdminUIPort(c.Impl.NodePort(c, i))
scheme := "http"
if c.Secure {
scheme = "https"
}
outputFile := fmt.Sprintf("pprof-%s-%d-%s-%04d.out", profType, startTime, c.Name, i+1)
outputDir := filepath.Dir(outputFile)
file, err := ioutil.TempFile(outputDir, ".pprof")
if err != nil {
return nil, errors.Wrap(err, "create tmpfile for pprof download")
}

defer func() {
err := file.Close()
if err != nil && !errors.Is(err, oserror.ErrClosed) {
fmt.Fprintf(os.Stderr, "warning: could not close temporary file")
}
err = os.Remove(file.Name())
if err != nil && !oserror.IsNotExist(err) {
fmt.Fprintf(os.Stderr, "warning: could not remove temporary file")
}
}()

pprofURL := fmt.Sprintf("%s://%s:%d/%s", scheme, host, port, pprofPath)
resp, err := httpClient.Get(context.Background(), pprofURL)
if err != nil {
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return nil, errors.Newf("unexpected status from pprof endpoint: %s", resp.Status)
}

if _, err := io.Copy(file, resp.Body); err != nil {
return nil, err
}
if err := file.Sync(); err != nil {
return nil, err
}
if err := file.Close(); err != nil {
return nil, err
}
if err := os.Rename(file.Name(), outputFile); err != nil {
return nil, err
}

mu.Lock()
outputFiles = append(outputFiles, outputFile)
mu.Unlock()
return nil, nil
})

for _, s := range outputFiles {
fmt.Printf("Created %s\n", s)
}

if err != nil {
sort.Slice(failed, func(i, j int) bool { return failed[i].Index < failed[j].Index })
for _, f := range failed {
fmt.Fprintf(os.Stderr, "%d: %+v: %s\n", f.Index, f.Err, f.Out)
}
os.Exit(1)
}

if pprofOptions.open {
waitCommands := []*exec.Cmd{}
for i, file := range outputFiles {
port := pprofOptions.startingPort + i
cmd := exec.Command("go", "tool", "pprof",
"-http", fmt.Sprintf(":%d", port),
file)
waitCommands = append(waitCommands, cmd)
if err := cmd.Start(); err != nil {
return err
}
}

for _, cmd := range waitCommands {
err := cmd.Wait()
if err != nil {
return err
}
}
}
return nil
}),
}

var adminurlCmd = &cobra.Command{
Use: "adminurl <cluster>",
Aliases: []string{"admin", "adminui"},
Expand Down Expand Up @@ -1610,7 +1762,7 @@ func main() {
pgurlCmd,
adminurlCmd,
logsCmd,

pprofCmd,
cachedHostsCmd,
)
rootCmd.BashCompletionFunction = fmt.Sprintf(`__custom_func()
Expand Down Expand Up @@ -1715,6 +1867,15 @@ func main() {
pgurlCmd.Flags().BoolVar(
&external, "external", false, "return pgurls for external connections")

pprofCmd.Flags().DurationVar(
&pprofOptions.duration, "duration", 30*time.Second, "Duration of profile to capture")
pprofCmd.Flags().BoolVar(
&pprofOptions.heap, "heap", false, "Capture a heap profile instead of a CPU profile")
pprofCmd.Flags().BoolVar(
&pprofOptions.open, "open", false, "Open the profile using `go tool pprof -http`")
pprofCmd.Flags().IntVar(
&pprofOptions.startingPort, "starting-port", 9000, "Initial port to use when opening pprof's HTTP interface")

ipCmd.Flags().BoolVar(
&external, "external", false, "return external IP addresses")

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/colexecutils/deselector.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (p *deselectorOp) Next(ctx context.Context) coldata.Batch {
p.inputTypes, p.output, 1 /* minCapacity */, maxBatchMemSize,
)
batch := p.Input.Next(ctx)
if batch.Selection() == nil {
if batch.Selection() == nil || batch.Length() == 0 {
return batch
}
p.output, _ = p.allocator.ResetMaybeReallocate(
Expand Down

0 comments on commit 00e8d9c

Please sign in to comment.