From c618d4461c798d00b08af18d014763392480250b Mon Sep 17 00:00:00 2001 From: adityamaru Date: Fri, 30 Jun 2023 13:46:29 -0400 Subject: [PATCH] pprofui: add support for collecting goroutines with labels This change refactors some of the logic in the pprofui/. Additionally, we add support for collecting cluster-wide goroutine profiles. These goroutine profiles can be collected with or without labels. When collected without labels we request pprof profiles with debug=0, which generates Profile protobufs on every node and `pprof.Merge`s them before redirecting the client to a flamegraph. When collected with labels we request pprof profiles with debug=1, which generates a legacy text format with comments translating addresses to function names and line numbers, so that a programmer can read the profile without tools. These profiles cannot be `pprof.Merged` and so we manually stitch them together on a per-node basis before downloading them for the client as txt file. This change also adds support for filtering the aforementioned goroutines by pprof label. This will be used by the Jobs Profiler to collect cluster-wide stacks relevant to the running job. Informs: #105440 Release note: None --- docs/generated/http/full.md | 5 +- pkg/cli/zip_per_node.go | 8 +- pkg/server/debug/pprofui/BUILD.bazel | 3 +- pkg/server/debug/pprofui/server.go | 296 +++++++++++++--------- pkg/server/debug/pprofui/server_test.go | 188 ++++++++++++-- pkg/server/debug/pprofui/storage.go | 8 +- pkg/server/debug/pprofui/storage_mem.go | 15 +- pkg/server/serverpb/status.proto | 15 +- pkg/server/status.go | 122 +++++---- pkg/server/status_local_file_retrieval.go | 53 +++- 10 files changed, 517 insertions(+), 196 deletions(-) diff --git a/docs/generated/http/full.md b/docs/generated/http/full.md index df40a033666c..5040dff63258 100644 --- a/docs/generated/http/full.md +++ b/docs/generated/http/full.md @@ -3043,10 +3043,11 @@ Support status: [reserved](#support-status) | Field | Type | Label | Description | Support status | | ----- | ---- | ----- | ----------- | -------------- | -| node_id | [string](#cockroach.server.serverpb.ProfileRequest-string) | | node_id is a string so that "local" can be used to specify that no forwarding is necessary. node_id translates to a KV node ID on a storage server and SQL instance ID on a SQL only server. | [reserved](#support-status) | +| node_id | [string](#cockroach.server.serverpb.ProfileRequest-string) | | node_id is a string so that "local" or "all" can be used to specify that no forwarding is necessary. node_id translates to a KV node ID on a storage server and SQL instance ID on a SQL only server. | [reserved](#support-status) | | type | [ProfileRequest.Type](#cockroach.server.serverpb.ProfileRequest-cockroach.server.serverpb.ProfileRequest.Type) | | The type of profile to retrieve. | [reserved](#support-status) | | seconds | [int32](#cockroach.server.serverpb.ProfileRequest-int32) | | applies only to Type=CPU, defaults to 30 | [reserved](#support-status) | -| labels | [bool](#cockroach.server.serverpb.ProfileRequest-bool) | | applies only to Type=CPU, defaults to false | [reserved](#support-status) | +| with_labels | [bool](#cockroach.server.serverpb.ProfileRequest-bool) | | WithLabels can be specified for Type=CPU or Type=GOROUTINE.

- If true for CPU profiles, we request a CPU profile with pprof labels.

- If true for GOROUTINE profiles, we request an aggregated goroutine profile with debug=1. | [reserved](#support-status) | +| label_filter | [string](#cockroach.server.serverpb.ProfileRequest-string) | | LabelFilter only applies to Type=GOROUTINE. Only goroutines with a pprof label matching the filter will be returned. | [reserved](#support-status) | | sender_server_version | [cockroach.roachpb.Version](#cockroach.server.serverpb.ProfileRequest-cockroach.roachpb.Version) | | SenderServerVersion is the server version of the node sending the Profile request. If this field is set then the node processing the request will only collect the profile if its server version is equal to the sender's server version.

Currently, this is only used when collecting profiles that will be merged using pprof.Merge as all the samples must be from the same binary version. | [reserved](#support-status) | diff --git a/pkg/cli/zip_per_node.go b/pkg/cli/zip_per_node.go index 48c3d65546d1..62c9a4c07d7a 100644 --- a/pkg/cli/zip_per_node.go +++ b/pkg/cli/zip_per_node.go @@ -104,10 +104,10 @@ func (zc *debugZipContext) collectCPUProfiles( var pd profData err := timeutil.RunWithTimeout(ctx, "fetch cpu profile", zc.timeout+zipCtx.cpuProfDuration, func(ctx context.Context) error { resp, err := zc.status.Profile(ctx, &serverpb.ProfileRequest{ - NodeId: fmt.Sprintf("%d", nodeID), - Type: serverpb.ProfileRequest_CPU, - Seconds: secs, - Labels: true, + NodeId: fmt.Sprintf("%d", nodeID), + Type: serverpb.ProfileRequest_CPU, + Seconds: secs, + WithLabels: true, }) if err != nil { return err diff --git a/pkg/server/debug/pprofui/BUILD.bazel b/pkg/server/debug/pprofui/BUILD.bazel index 8e826810fcf0..05cf589c167c 100644 --- a/pkg/server/debug/pprofui/BUILD.bazel +++ b/pkg/server/debug/pprofui/BUILD.bazel @@ -31,9 +31,10 @@ go_test( srcs = ["server_test.go"], args = ["-test.timeout=55s"], data = glob(["testdata/**"]), - embed = [":pprofui"], deps = [ + ":pprofui", "//pkg/build/bazel", + "//pkg/server", "//pkg/server/serverpb", "//pkg/testutils/datapathutils", "//pkg/testutils/skip", diff --git a/pkg/server/debug/pprofui/server.go b/pkg/server/debug/pprofui/server.go index 525d89f5b4a3..8fc57156493e 100644 --- a/pkg/server/debug/pprofui/server.go +++ b/pkg/server/debug/pprofui/server.go @@ -97,6 +97,162 @@ func (s *Server) parsePath(reqPath string) (profType string, id string, remainin } } +// validateProfileRequest validates that the request does not contian any +// conflicting or invalid configurations. +func validateProfileRequest(req *serverpb.ProfileRequest) error { + switch req.Type { + case serverpb.ProfileRequest_GOROUTINE: + case serverpb.ProfileRequest_CPU: + if req.LabelFilter != "" { + return errors.Newf("filtering using a pprof label is unsupported for %s", req.Type.String()) + } + default: + if req.NodeId == "all" { + return errors.Newf("cluster-wide collection is unsupported for %s", req.Type.String()) + } + + if req.WithLabels { + return errors.Newf("profiling with labels is unsupported for %s", req.Type.String()) + } + + if req.LabelFilter != "" { + return errors.Newf("filtering using a pprof label is unsupported for %s", req.Type.String()) + } + } + + return nil +} + +func constructProfileReq(r *http.Request, profileName string) (*serverpb.ProfileRequest, error) { + req, err := http.NewRequest("GET", "/unused", bytes.NewReader(nil)) + if err != nil { + return nil, err + } + + profileType, ok := serverpb.ProfileRequest_Type_value[strings.ToUpper(profileName)] + if !ok && profileName != "profile" { + return nil, errors.Newf("unknown profile name: %s", profileName) + } + // There is a discrepancy between the usage of the "CPU" and + // "profile" names to refer to the CPU profile in the + // implementations. The URL to the CPU profile has been modified + // on the Advanced Debug page to point to /pprof/ui/cpu but this + // is retained for backwards compatibility. + if profileName == "profile" { + profileType = int32(serverpb.ProfileRequest_CPU) + } + profileReq := &serverpb.ProfileRequest{ + NodeId: "local", + Type: serverpb.ProfileRequest_Type(profileType), + } + + // Pass through any parameters. Most notably, allow ?seconds=10 for + // CPU profiles. + _ = r.ParseForm() + req.Form = r.Form + + if r.Form.Get("seconds") != "" { + sec, err := strconv.ParseInt(r.Form.Get("seconds"), 10, 32) + if err != nil { + return nil, err + } + profileReq.Seconds = int32(sec) + } + if r.Form.Get("node") != "" { + profileReq.NodeId = r.Form.Get("node") + } + if r.Form.Get("labels") != "" { + withLabels, err := strconv.ParseBool(r.Form.Get("labels")) + if err != nil { + return nil, err + } + profileReq.WithLabels = withLabels + } + if labelFilter := r.Form.Get("labelfilter"); labelFilter != "" { + profileReq.WithLabels = true + profileReq.LabelFilter = labelFilter + } + if err := validateProfileRequest(profileReq); err != nil { + return nil, err + } + return profileReq, nil +} + +func (s *Server) serveCachedProfile( + id string, remainingPath string, profileName string, w http.ResponseWriter, r *http.Request, +) { + // Catch nonexistent IDs early or pprof will do a worse job at + // giving an informative error. + var isProfileProto bool + if err := s.storage.Get(id, func(b bool, _ io.Reader) error { + isProfileProto = b + return nil + }); err != nil { + msg := fmt.Sprintf("profile for id %s not found: %s", id, err) + http.Error(w, msg, http.StatusNotFound) + return + } + + // If the profile is not in a protobuf format, downloading it is the only + // option. + if !isProfileProto || r.URL.Query().Get("download") != "" { + // TODO(tbg): this has zero discoverability. + w.Header().Set("Content-Disposition", + fmt.Sprintf("attachment; filename=%s_%s.pb.gz", profileName, id)) + w.Header().Set("Content-Type", "application/octet-stream") + if err := s.storage.Get(id, func(_ bool, r io.Reader) error { + _, err := io.Copy(w, r) + return err + }); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } + return + } + + server := func(args *driver.HTTPServerArgs) error { + handler, ok := args.Handlers[remainingPath] + if !ok { + return errors.Errorf("unknown endpoint %s", remainingPath) + } + handler.ServeHTTP(w, r) + return nil + } + + storageFetcher := func(_ string, _, _ time.Duration) (*profile.Profile, string, error) { + var p *profile.Profile + if err := s.storage.Get(id, func(_ bool, reader io.Reader) error { + var err error + p, err = profile.Parse(reader) + return err + }); err != nil { + return nil, "", err + } + return p, "", nil + } + + // Invoke the (library version) of `pprof` with a number of stubs. + // Specifically, we pass a fake FlagSet that plumbs through the + // given args, a UI that logs any errors pprof may emit, a fetcher + // that simply reads the profile we downloaded earlier, and a + // HTTPServer that pprof will pass the web ui handlers to at the + // end (and we let it handle this client request). + if err := driver.PProf(&driver.Options{ + Flagset: &pprofFlags{ + FlagSet: pflag.NewFlagSet("pprof", pflag.ExitOnError), + args: []string{ + "--symbolize", "none", + "--http", "localhost:0", + "", // we inject our own target + }, + }, + UI: &fakeUI{}, + Fetch: fetcherFn(storageFetcher), + HTTPServer: server, + }); err != nil { + _, _ = w.Write([]byte(err.Error())) + } +} + func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { profileName, id, remainingPath := s.parsePath(r.URL.Path) @@ -113,124 +269,24 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { } if id != "" { - // Catch nonexistent IDs early or pprof will do a worse job at - // giving an informative error. - if err := s.storage.Get(id, func(io.Reader) error { return nil }); err != nil { - msg := fmt.Sprintf("profile for id %s not found: %s", id, err) - http.Error(w, msg, http.StatusNotFound) - return - } - - if r.URL.Query().Get("download") != "" { - // TODO(tbg): this has zero discoverability. - w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%s_%s.pb.gz", profileName, id)) - w.Header().Set("Content-Type", "application/octet-stream") - if err := s.storage.Get(id, func(r io.Reader) error { - _, err := io.Copy(w, r) - return err - }); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - } - return - } - - server := func(args *driver.HTTPServerArgs) error { - handler, ok := args.Handlers[remainingPath] - if !ok { - return errors.Errorf("unknown endpoint %s", remainingPath) - } - handler.ServeHTTP(w, r) - return nil - } - - storageFetcher := func(_ string, _, _ time.Duration) (*profile.Profile, string, error) { - var p *profile.Profile - if err := s.storage.Get(id, func(reader io.Reader) error { - var err error - p, err = profile.Parse(reader) - return err - }); err != nil { - return nil, "", err - } - return p, "", nil - } - - // Invoke the (library version) of `pprof` with a number of stubs. - // Specifically, we pass a fake FlagSet that plumbs through the - // given args, a UI that logs any errors pprof may emit, a fetcher - // that simply reads the profile we downloaded earlier, and a - // HTTPServer that pprof will pass the web ui handlers to at the - // end (and we let it handle this client request). - if err := driver.PProf(&driver.Options{ - Flagset: &pprofFlags{ - FlagSet: pflag.NewFlagSet("pprof", pflag.ExitOnError), - args: []string{ - "--symbolize", "none", - "--http", "localhost:0", - "", // we inject our own target - }, - }, - UI: &fakeUI{}, - Fetch: fetcherFn(storageFetcher), - HTTPServer: server, - }); err != nil { - _, _ = w.Write([]byte(err.Error())) - } - + s.serveCachedProfile(id, remainingPath, profileName, w, r) return } // Create and save new profile, then redirect client to corresponding ui URL. - id = s.storage.ID() - - if err := s.storage.Store(id, func(w io.Writer) error { - req, err := http.NewRequest("GET", "/unused", bytes.NewReader(nil)) - if err != nil { - return err - } - - profileType, ok := serverpb.ProfileRequest_Type_value[strings.ToUpper(profileName)] - if !ok && profileName != "profile" { - return errors.Newf("unknown profile name: %s", profileName) - } - // There is a discrepancy between the usage of the "CPU" and - // "profile" names to refer to the CPU profile in the - // implementations. The URL to the CPU profile has been modified - // on the Advanced Debug page to point to /pprof/ui/cpu but this - // is retained for backwards compatibility. - if profileName == "profile" { - profileType = int32(serverpb.ProfileRequest_CPU) - } - var resp *serverpb.JSONResponse - profileReq := &serverpb.ProfileRequest{ - NodeId: "local", - Type: serverpb.ProfileRequest_Type(profileType), - } - - // Pass through any parameters. Most notably, allow ?seconds=10 for - // CPU profiles. - _ = r.ParseForm() - req.Form = r.Form - - if r.Form.Get("seconds") != "" { - sec, err := strconv.ParseInt(r.Form.Get("seconds"), 10, 32) - if err != nil { - return err - } - profileReq.Seconds = int32(sec) - } - if r.Form.Get("node") != "" { - profileReq.NodeId = r.Form.Get("node") - } - if r.Form.Get("labels") != "" { - labels, err := strconv.ParseBool(r.Form.Get("labels")) - if err != nil { - return err - } - profileReq.Labels = labels - } - resp, err = s.profiler.Profile(r.Context(), profileReq) + profileReq, err := constructProfileReq(r, profileName) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + // A GOROUTINE profile with labels is not generated in a protobuf format. It + // is outputted in a "legacy text format with comments translating addresses + // to function names and line numbers, so that a programmer can read the + // profile without tools." + isProfileProto := !(profileReq.Type == serverpb.ProfileRequest_GOROUTINE && profileReq.WithLabels) + if err := s.storage.Store(id, isProfileProto, func(w io.Writer) error { + resp, err := s.profiler.Profile(r.Context(), profileReq) if err != nil { return err } @@ -244,6 +300,18 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } + // If the profile is not in a protobuf format, downloading it is the only + // option. + if !isProfileProto { + w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%s_%s.txt", profileName, id)) + w.Header().Set("Content-Type", "text/plain") + _ = s.storage.Get(id, func(isProtoProfile bool, r io.Reader) error { + _, err := io.Copy(w, r) + return err + }) + return + } + // NB: direct straight to the flamegraph. This is because `pprof` // shells out to `dot` for the default landing page and thus works // only on hosts that have graphviz installed. You can still navigate @@ -263,7 +331,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { if !isGoPProf { http.Redirect(w, r, origURL.String(), http.StatusTemporaryRedirect) } else { - _ = s.storage.Get(id, func(r io.Reader) error { + _ = s.storage.Get(id, func(isProtoProfile bool, r io.Reader) error { _, err := io.Copy(w, r) return err }) diff --git a/pkg/server/debug/pprofui/server_test.go b/pkg/server/debug/pprofui/server_test.go index 90028593bdf4..3a6a63f991c3 100644 --- a/pkg/server/debug/pprofui/server_test.go +++ b/pkg/server/debug/pprofui/server_test.go @@ -8,7 +8,7 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package pprofui +package pprofui_test import ( "context" @@ -22,6 +22,8 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/build/bazel" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/server/debug/pprofui" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" @@ -50,39 +52,49 @@ func init() { } } +var supportedProfiles = []string{ + "cpu", "goroutine", "heap", "threadcreate", "block", "mutex", "allocs", +} + func TestServer(t *testing.T) { expectedNodeID := "local" + withLabels := false mockProfile := func(ctx context.Context, req *serverpb.ProfileRequest) (*serverpb.JSONResponse, error) { require.Equal(t, expectedNodeID, req.NodeId) + require.Equal(t, withLabels, req.WithLabels) b, err := os.ReadFile(datapathutils.TestDataPath(t, "heap.profile")) require.NoError(t, err) return &serverpb.JSONResponse{Data: b}, nil } - storage := NewMemStorage(1, 0) - s := NewServer(storage, ProfilerFunc(mockProfile)) + storage := pprofui.NewMemStorage(1, 0) + s := pprofui.NewServer(storage, ProfilerFunc(mockProfile)) + count := 1 for i := 0; i < 3; i++ { - t.Run(fmt.Sprintf("request local profile %d", i), func(t *testing.T) { - r := httptest.NewRequest("GET", "/heap/", nil) - w := httptest.NewRecorder() - s.ServeHTTP(w, r) + for _, profileType := range supportedProfiles { + t.Run(fmt.Sprintf("request local profile %s:%d", profileType, i), func(t *testing.T) { + r := httptest.NewRequest("GET", fmt.Sprintf("/%s/", profileType), nil) + w := httptest.NewRecorder() + s.ServeHTTP(w, r) - require.Equal(t, http.StatusTemporaryRedirect, w.Code) + require.Equal(t, http.StatusTemporaryRedirect, w.Code) - loc := w.Result().Header.Get("Location") - require.Equal(t, fmt.Sprintf("/heap/%d/flamegraph", i+1), loc) + loc := w.Result().Header.Get("Location") + require.Equal(t, fmt.Sprintf("/%s/%d/flamegraph", profileType, count), loc) + count++ - r = httptest.NewRequest("GET", loc, nil) - w = httptest.NewRecorder() + r = httptest.NewRequest("GET", loc, nil) + w = httptest.NewRecorder() - s.ServeHTTP(w, r) + s.ServeHTTP(w, r) - require.Equal(t, http.StatusOK, w.Code) - require.Contains(t, w.Body.String(), "pprof") - }) - require.Equal(t, 1, len(storage.getRecords()), + require.Equal(t, http.StatusOK, w.Code) + require.Contains(t, w.Body.String(), "pprof") + }) + } + require.Equal(t, 1, len(storage.GetRecords()), "storage did not expunge records") } @@ -95,7 +107,97 @@ func TestServer(t *testing.T) { require.Equal(t, http.StatusTemporaryRedirect, w.Code) loc := w.Result().Header.Get("Location") - require.Equal(t, "/heap/4/flamegraph?node=3", loc) + require.Equal(t, fmt.Sprintf("/heap/%d/flamegraph?node=3", count), loc) + }) + + t.Run("request profile with labels", func(t *testing.T) { + expectedNodeID = "3" + withLabels = true + defer func() { + withLabels = false + }() + + // Labels are only supported for CPU and GOROUTINE profiles. + r := httptest.NewRequest("GET", "/heap/?node=3&labels=true", nil) + w := httptest.NewRecorder() + s.ServeHTTP(w, r) + count++ + require.Equal(t, http.StatusInternalServerError, w.Code) + require.Equal(t, "profiling with labels is unsupported for HEAP\n", w.Body.String()) + + r = httptest.NewRequest("GET", "/cpu/?node=3&labels=true", nil) + w = httptest.NewRecorder() + s.ServeHTTP(w, r) + count++ + require.Equal(t, http.StatusTemporaryRedirect, w.Code) + loc := w.Result().Header.Get("Location") + require.Equal(t, fmt.Sprintf("/cpu/%d/flamegraph?node=3&labels=true", count), loc) + + // A GOROUTINE profile with a label will trigger a download since it is not + // generated a Profile protobuf format. + r = httptest.NewRequest("GET", "/goroutine/?node=3&labels=true", nil) + w = httptest.NewRecorder() + s.ServeHTTP(w, r) + count++ + require.Equal(t, http.StatusOK, w.Code) + require.Equal(t, "attachment; filename=goroutine_25.txt", w.Header().Get("Content-Disposition")) + require.Equal(t, "text/plain", w.Header().Get("Content-Type")) + }) + + t.Run("request cluster-wide profiles", func(t *testing.T) { + expectedNodeID = "all" + + // Cluster-wide profiles are only supported for CPU and GOROUTINE profiles. + r := httptest.NewRequest("GET", "/heap/?node=all", nil) + w := httptest.NewRecorder() + s.ServeHTTP(w, r) + count++ + require.Equal(t, http.StatusInternalServerError, w.Code) + require.Equal(t, "cluster-wide collection is unsupported for HEAP\n", w.Body.String()) + + r = httptest.NewRequest("GET", "/cpu/?node=all", nil) + w = httptest.NewRecorder() + s.ServeHTTP(w, r) + count++ + require.Equal(t, http.StatusTemporaryRedirect, w.Code) + loc := w.Result().Header.Get("Location") + require.Equal(t, fmt.Sprintf("/cpu/%d/flamegraph?node=all", count), loc) + + // A GOROUTINE profile with a label will trigger a download since it is not + // generated a Profile protobuf format. + r = httptest.NewRequest("GET", "/goroutine/?node=all", nil) + w = httptest.NewRecorder() + s.ServeHTTP(w, r) + count++ + require.Equal(t, http.StatusTemporaryRedirect, w.Code) + loc = w.Result().Header.Get("Location") + require.Equal(t, fmt.Sprintf("/goroutine/%d/flamegraph?node=all", count), loc) + }) + + t.Run("request profile with label filters", func(t *testing.T) { + expectedNodeID = "3" + withLabels = true + defer func() { + withLabels = false + }() + + // Labels are only supported for CPU and GOROUTINE profiles. + r := httptest.NewRequest("GET", "/heap/?node=3&labels=true&labelfilter=foo:bar", nil) + w := httptest.NewRecorder() + s.ServeHTTP(w, r) + count++ + require.Equal(t, http.StatusInternalServerError, w.Code) + require.Equal(t, "profiling with labels is unsupported for HEAP\n", w.Body.String()) + + // A GOROUTINE profile with a label will trigger a download since it is not + // generated a Profile protobuf format. + r = httptest.NewRequest("GET", "/goroutine/?node=3&labelfilter=foo:bar", nil) + w = httptest.NewRecorder() + s.ServeHTTP(w, r) + count++ + require.Equal(t, http.StatusOK, w.Code) + require.Equal(t, "attachment; filename=goroutine_30.txt", w.Header().Get("Content-Disposition")) + require.Equal(t, "text/plain", w.Header().Get("Content-Type")) }) } @@ -104,7 +206,7 @@ func TestServerConcurrentAccess(t *testing.T) { skip.UnderRace(t, "test fails under race due to known race condition with profiles") const ( runsPerWorker = 1 - workers = ProfileConcurrency + workers = pprofui.ProfileConcurrency ) mockProfile := func(ctx context.Context, req *serverpb.ProfileRequest) (*serverpb.JSONResponse, error) { require.Equal(t, expectedNodeID, req.NodeId) @@ -117,7 +219,7 @@ func TestServerConcurrentAccess(t *testing.T) { return &serverpb.JSONResponse{Data: b}, nil } - s := NewServer(NewMemStorage(ProfileConcurrency, ProfileExpiry), ProfilerFunc(mockProfile)) + s := pprofui.NewServer(pprofui.NewMemStorage(pprofui.ProfileConcurrency, pprofui.ProfileExpiry), ProfilerFunc(mockProfile)) getProfile := func(profile string, t *testing.T) { t.Helper() @@ -154,3 +256,49 @@ func TestServerConcurrentAccess(t *testing.T) { } wg.Wait() } + +func TestFilterStacksWithLabels(t *testing.T) { + testStacks := ` +8 @ 0x4a13d6 0x46b3bb 0x46aef8 0x12fd53f 0xe9bc43 0x12fd478 0x4d3101 +# labels: {"foo":"baz", "bar":"biz"} +# 0x12fd53e github.com/cockroachdb/pebble.(*tableCacheShard).releaseLoop.func1+0x9e github.com/cockroachdb/pebble/external/com_github_cockroachdb_pebble/table_cache.go:324 + +10 @ 0x4a13d6 0x4b131c 0x17969e6 0x4d3101 +# 0x17969e5 github.com/cockroachdb/cockroach/pkg/util/admission.initWorkQueue.func2+0x85 github.com/cockroachdb/cockroach/pkg/util/admission/work_queue.go:388 + +8 @ 0x4a13d6 0x4b131c 0x1796e96 0x4d3101 +# labels: {"bar":"biz"} +# 0x1796e95 github.com/cockroachdb/cockroach/pkg/util/admission.(*WorkQueue).startClosingEpochs.func1+0x1d5 github.com/cockroachdb/cockroach/pkg +/util/admission/work_queue.go:462 +` + t.Run("empty filter", func(t *testing.T) { + res := server.FilterStacksWithLabels([]byte(testStacks), "") + require.Equal(t, ` +8 @ 0x4a13d6 0x46b3bb 0x46aef8 0x12fd53f 0xe9bc43 0x12fd478 0x4d3101 +# labels: {"foo":"baz", "bar":"biz"} +# 0x12fd53e github.com/cockroachdb/pebble.(*tableCacheShard).releaseLoop.func1+0x9e github.com/cockroachdb/pebble/external/com_github_cockroachdb_pebble/table_cache.go:324 + +10 @ 0x4a13d6 0x4b131c 0x17969e6 0x4d3101 +# 0x17969e5 github.com/cockroachdb/cockroach/pkg/util/admission.initWorkQueue.func2+0x85 github.com/cockroachdb/cockroach/pkg/util/admission/work_queue.go:388 + +8 @ 0x4a13d6 0x4b131c 0x1796e96 0x4d3101 +# labels: {"bar":"biz"} +# 0x1796e95 github.com/cockroachdb/cockroach/pkg/util/admission.(*WorkQueue).startClosingEpochs.func1+0x1d5 github.com/cockroachdb/cockroach/pkg +/util/admission/work_queue.go:462 +`, string(res)) + }) + + t.Run("bar-biz filter", func(t *testing.T) { + res := server.FilterStacksWithLabels([]byte(testStacks), "\"bar\":\"biz\"") + require.Equal(t, ` +8 @ 0x4a13d6 0x46b3bb 0x46aef8 0x12fd53f 0xe9bc43 0x12fd478 0x4d3101 +# labels: {"foo":"baz", "bar":"biz"} +# 0x12fd53e github.com/cockroachdb/pebble.(*tableCacheShard).releaseLoop.func1+0x9e github.com/cockroachdb/pebble/external/com_github_cockroachdb_pebble/table_cache.go:324 + +8 @ 0x4a13d6 0x4b131c 0x1796e96 0x4d3101 +# labels: {"bar":"biz"} +# 0x1796e95 github.com/cockroachdb/cockroach/pkg/util/admission.(*WorkQueue).startClosingEpochs.func1+0x1d5 github.com/cockroachdb/cockroach/pkg +/util/admission/work_queue.go:462 +`, string(res)) + }) +} diff --git a/pkg/server/debug/pprofui/storage.go b/pkg/server/debug/pprofui/storage.go index 90f7659722f4..4958c369b3f9 100644 --- a/pkg/server/debug/pprofui/storage.go +++ b/pkg/server/debug/pprofui/storage.go @@ -17,8 +17,12 @@ type Storage interface { // ID generates a unique ID for use in Store. ID() string // Store invokes the passed-in closure with a writer that stores its input. - Store(id string, write func(io.Writer) error) error + // IsProfileProto determines whether the input will be in the protobuf format + // outlined in https://github.com/google/pprof/tree/main/proto#overview. + Store(id string, isProfileProto bool, write func(io.Writer) error) error // Get invokes the passed-in closure with a reader for the data at the given id. // An error is returned when no data is found. - Get(id string, read func(io.Reader) error) error + // The boolean indicates whether the stored record is in protobuf format + // outlined in https://github.com/google/pprof/tree/main/proto#overview. + Get(id string, read func(bool, io.Reader) error) error } diff --git a/pkg/server/debug/pprofui/storage_mem.go b/pkg/server/debug/pprofui/storage_mem.go index f60c5c1adc2b..489734257797 100644 --- a/pkg/server/debug/pprofui/storage_mem.go +++ b/pkg/server/debug/pprofui/storage_mem.go @@ -26,7 +26,10 @@ import ( type record struct { id string t time.Time - b []byte + // isProfileProto is true when the profile record is stored in protobuf format + // outlined in https://github.com/google/pprof/tree/main/proto#overview. + isProfileProto bool + b []byte } // A MemStorage is a Storage implementation that holds recent profiles in memory. @@ -40,7 +43,7 @@ type MemStorage struct { keepNumber int // zero for disabled } -func (s *MemStorage) getRecords() []record { +func (s *MemStorage) GetRecords() []record { s.mu.Lock() defer s.mu.Unlock() return append([]record(nil), s.mu.records...) @@ -78,14 +81,14 @@ func (s *MemStorage) cleanLocked() { } // Store implements Storage. -func (s *MemStorage) Store(id string, write func(io.Writer) error) error { +func (s *MemStorage) Store(id string, isProfileProto bool, write func(io.Writer) error) error { var b bytes.Buffer if err := write(&b); err != nil { return err } s.mu.Lock() defer s.mu.Unlock() - s.mu.records = append(s.mu.records, record{id: id, t: timeutil.Now(), b: b.Bytes()}) + s.mu.records = append(s.mu.records, record{id: id, t: timeutil.Now(), b: b.Bytes(), isProfileProto: isProfileProto}) sort.Slice(s.mu.records, func(i, j int) bool { return s.mu.records[i].t.Before(s.mu.records[j].t) }) @@ -94,12 +97,12 @@ func (s *MemStorage) Store(id string, write func(io.Writer) error) error { } // Get implements Storage. -func (s *MemStorage) Get(id string, read func(io.Reader) error) error { +func (s *MemStorage) Get(id string, read func(bool, io.Reader) error) error { s.mu.Lock() defer s.mu.Unlock() for _, v := range s.mu.records { if v.id == id { - return read(bytes.NewReader(v.b)) + return read(v.isProfileProto, bytes.NewReader(v.b)) } } return errors.Errorf("profile not found; it may have expired, please regenerate the profile.\n" + diff --git a/pkg/server/serverpb/status.proto b/pkg/server/serverpb/status.proto index f3eab6f08eab..72a89145ccbc 100644 --- a/pkg/server/serverpb/status.proto +++ b/pkg/server/serverpb/status.proto @@ -753,7 +753,7 @@ message GetFilesResponse { } message ProfileRequest { - // node_id is a string so that "local" can be used to specify that no + // node_id is a string so that "local" or "all" can be used to specify that no // forwarding is necessary. node_id translates to a KV node ID on a storage // server and SQL instance ID on a SQL only server. string node_id = 1; @@ -771,7 +771,18 @@ message ProfileRequest { Type type = 5; int32 seconds = 6; // applies only to Type=CPU, defaults to 30 - bool labels = 7; // applies only to Type=CPU, defaults to false + + // WithLabels can be specified for Type=CPU or Type=GOROUTINE. + // + // - If true for CPU profiles, we request a CPU profile with pprof labels. + // + // - If true for GOROUTINE profiles, we request an aggregated goroutine + // profile with debug=1. + bool with_labels = 7; + + // LabelFilter only applies to Type=GOROUTINE. Only goroutines with a pprof + // label matching the filter will be returned. + string label_filter = 9; // SenderServerVersion is the server version of the node sending the Profile // request. If this field is set then the node processing the request will diff --git a/pkg/server/status.go b/pkg/server/status.go index 309c67ac8efc..7198d60ee250 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -1461,18 +1461,73 @@ func (s *statusServer) Stacks( return stacksLocal(req) } +func (s *statusServer) processGoroutineProfilesFromAllNodes( + ctx context.Context, response profDataResponse, +) ([]byte, error) { + res := bytes.NewBuffer(nil) + for nodeID, pd := range response.profDataByNodeID { + if len(pd.data) == 0 && pd.err == nil { + log.Warningf(ctx, "no stacks collected for node %d", nodeID) + continue // skipped node + } + + if pd.err != nil { + log.Warningf(ctx, "failed to collect stacks for node %d: %v", nodeID, pd.err) + continue + } + res.Write(pd.data) + } + + return res.Bytes(), nil +} + +func (s *statusServer) processCPUProfilesFromAllNodes( + ctx context.Context, response profDataResponse, +) ([]byte, error) { + profs := make([]*profile.Profile, 0, len(response.profDataByNodeID)) + for nodeID, pd := range response.profDataByNodeID { + if len(pd.data) == 0 && pd.err == nil { + log.Warningf(ctx, "no profile collected for node %d", nodeID) + continue // skipped node + } + + if pd.err != nil { + log.Warningf(ctx, "failed to collect profile for node %d: %v", nodeID, pd.err) + continue + } + + p, err := profile.ParseData(pd.data) + if err != nil { + return nil, err + } + p.Comments = append(p.Comments, fmt.Sprintf("n%d", nodeID)) + profs = append(profs, p) + } + mergedProfiles, err := profile.Merge(profs) + if err != nil { + return nil, err + } + + var buf bytes.Buffer + if err := mergedProfiles.Write(&buf); err != nil { + return nil, status.Errorf(codes.Internal, err.Error()) + } + return buf.Bytes(), nil +} + +type profData struct { + data []byte + err error +} +type profDataResponse struct { + profDataByNodeID map[roachpb.NodeID]*profData +} + // fetchProfileFromAllNodes fetches the CPU profiles from all live nodes in the // cluster and merges the samples across all profiles. func (s *statusServer) fetchProfileFromAllNodes( ctx context.Context, req *serverpb.ProfileRequest, ) (*serverpb.JSONResponse, error) { - type profData struct { - data []byte - err error - } - type profDataResponse struct { - profDataByNodeID map[roachpb.NodeID]*profData - } response := profDataResponse{profDataByNodeID: make(map[roachpb.NodeID]*profData)} resp, err := s.Node(ctx, &serverpb.NodeRequest{NodeId: "local"}) @@ -1484,20 +1539,17 @@ func (s *statusServer) fetchProfileFromAllNodes( client, err := s.dialNode(ctx, nodeID) return client, err } + opName := fmt.Sprintf("fetch cluster-wide %s profile", req.Type.String()) nodeFn := func(ctx context.Context, client interface{}, nodeID roachpb.NodeID) (interface{}, error) { statusClient := client.(serverpb.StatusClient) - resp, err := statusClient.Node(ctx, &serverpb.NodeRequest{NodeId: "local"}) - if err != nil { - return nodeID, err - } var pd *profData - err = timeutil.RunWithTimeout(ctx, "fetch cpu profile", 1*time.Minute, func(ctx context.Context) error { - log.Infof(ctx, "fetching a CPU profile for %d", resp.Desc.NodeID) + err = timeutil.RunWithTimeout(ctx, opName, 1*time.Minute, func(ctx context.Context) error { resp, err := statusClient.Profile(ctx, &serverpb.ProfileRequest{ NodeId: fmt.Sprintf("%d", nodeID), - Type: serverpb.ProfileRequest_CPU, + Type: req.Type, Seconds: req.Seconds, - Labels: req.Labels, + WithLabels: req.WithLabels, + LabelFilter: req.LabelFilter, SenderServerVersion: &senderServerVersion, }) if err != nil { @@ -1515,39 +1567,23 @@ func (s *statusServer) fetchProfileFromAllNodes( errorFn := func(nodeID roachpb.NodeID, err error) { response.profDataByNodeID[nodeID] = &profData{err: err} } - if err := s.iterateNodes(ctx, "cluster-wide CPU profile", dialFn, nodeFn, responseFn, errorFn); err != nil { + if err := s.iterateNodes(ctx, opName, dialFn, nodeFn, responseFn, errorFn); err != nil { return nil, serverError(ctx, err) } - - profs := make([]*profile.Profile, 0, len(response.profDataByNodeID)) - for nodeID, pd := range response.profDataByNodeID { - if len(pd.data) == 0 && pd.err == nil { - log.Warningf(ctx, "no profile collected for node %d", nodeID) - continue // skipped node - } - - if pd.err != nil { - log.Warningf(ctx, "failed to collect profile for node %d: %v", nodeID, pd.err) - continue - } - - p, err := profile.ParseData(pd.data) - if err != nil { - return nil, err - } - p.Comments = append(p.Comments, fmt.Sprintf("n%d", nodeID)) - profs = append(profs, p) + var data []byte + switch req.Type { + case serverpb.ProfileRequest_CPU: + data, err = s.processCPUProfilesFromAllNodes(ctx, response) + case serverpb.ProfileRequest_GOROUTINE: + data, err = s.processGoroutineProfilesFromAllNodes(ctx, response) + default: + return nil, errors.Newf("cluster-wide collection of %s is unsupported", req.Type.String()) } - mergedProfiles, err := profile.Merge(profs) if err != nil { return nil, err } - var buf bytes.Buffer - if err := mergedProfiles.Write(&buf); err != nil { - return nil, status.Errorf(codes.Internal, err.Error()) - } - return &serverpb.JSONResponse{Data: buf.Bytes()}, nil + return &serverpb.JSONResponse{Data: data}, nil } // TODO(tschottdorf): significant overlap with /debug/pprof/heap, except that @@ -1568,7 +1604,7 @@ func (s *statusServer) Profile( } // If the request is for "all" nodes then we collect profiles from all nodes - // in the cluster and merge them before returning to the user. + // in the cluster and process them before returning to the user. if req.NodeId == "all" { return s.fetchProfileFromAllNodes(ctx, req) } @@ -1595,7 +1631,7 @@ func (s *statusServer) Profile( serverVersion.String(), req.SenderServerVersion.String()) } } - return profileLocal(ctx, req, s.st) + return profileLocal(ctx, req, s.st, nodeID) } // Regions implements the serverpb.StatusServer interface. diff --git a/pkg/server/status_local_file_retrieval.go b/pkg/server/status_local_file_retrieval.go index 34bff54c5ae7..644e2128ba09 100644 --- a/pkg/server/status_local_file_retrieval.go +++ b/pkg/server/status_local_file_retrieval.go @@ -13,12 +13,15 @@ package server import ( "bytes" "context" + "fmt" "os" "path/filepath" + "regexp" "runtime/pprof" "strings" "time" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/debug" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -30,13 +33,13 @@ import ( // profileLocal runs a performance profile of the requested type (heap, cpu etc). // on the local node. This method returns a gRPC error to the caller. func profileLocal( - ctx context.Context, req *serverpb.ProfileRequest, st *cluster.Settings, + ctx context.Context, req *serverpb.ProfileRequest, st *cluster.Settings, nodeID roachpb.NodeID, ) (*serverpb.JSONResponse, error) { switch req.Type { case serverpb.ProfileRequest_CPU: var buf bytes.Buffer profileType := cluster.CPUProfileDefault - if req.Labels { + if req.WithLabels { profileType = cluster.CPUProfileWithLabels } if err := debug.CPUProfileDo(st, profileType, func() error { @@ -59,6 +62,24 @@ func profileLocal( return nil, err } return &serverpb.JSONResponse{Data: buf.Bytes()}, nil + case serverpb.ProfileRequest_GOROUTINE: + p := pprof.Lookup("goroutine") + if p == nil { + return nil, status.Error(codes.InvalidArgument, "unable to find goroutine profile") + } + var debug int + buf := bytes.NewBuffer(nil) + if req.WithLabels { + debug = 1 + buf.WriteString(fmt.Sprintf("Stacks for node: %d\n\n", nodeID)) + } + if err := p.WriteTo(buf, debug); err != nil { + return nil, status.Errorf(codes.Internal, err.Error()) + } + if req.LabelFilter != "" { + return &serverpb.JSONResponse{Data: FilterStacksWithLabels(buf.Bytes(), req.LabelFilter)}, nil + } + return &serverpb.JSONResponse{Data: buf.Bytes()}, nil default: name, ok := serverpb.ProfileRequest_Type_name[int32(req.Type)] if !ok { @@ -97,6 +118,34 @@ func stacksLocal(req *serverpb.StacksRequest) (*serverpb.JSONResponse, error) { return &serverpb.JSONResponse{Data: buf.Bytes()}, nil } +// FilterStacksWithLabels writes all the stacks that have pprof labels matching +// any one of the expectedLabels to resBuf. +func FilterStacksWithLabels(stacks []byte, labelFilter string) []byte { + if labelFilter == "" { + return stacks + } + res := bytes.NewBuffer(nil) + goroutines := strings.Split(string(stacks), "\n\n") + // The first element is the nodeID which we want to always keep. + res.WriteString(goroutines[0]) + goroutines = goroutines[1:] + for i, g := range goroutines { + // pprof.Lookup("goroutine") with debug=1 renders the pprof labels + // corresponding to a stack as: + // # labels: {"foo":"bar", "baz":"biz"}. + regex := regexp.MustCompile(fmt.Sprintf(`labels: {.*%s.*}`, labelFilter)) + match := regex.MatchString(g) + if match { + if i != 0 { + res.WriteString("\n\n") + } + res.WriteString(g) + } + } + + return res.Bytes() +} + // getLocalFiles retrieves the requested files for the local node. This method // returns a gRPC error to the caller. func getLocalFiles(