From b38951a3e12c134371b6d42bf1b3adbe19e0a39a Mon Sep 17 00:00:00 2001 From: adityamaru Date: Fri, 30 Jun 2023 13:46:29 -0400 Subject: [PATCH] pprofui: add support for collecting goroutines with labels This change refactors some of the logic in the pprofui/. Additionally, we add support for collecting cluster-wide goroutine profiles. These goroutine profiles can be collected with or without labels. When collected without labels we request pprof profiles with debug=0, which generates Profile protobufs on every node and `pprof.Merge`s them before redirecting the client to a flamegraph. When collected with labels we request pprof profiles with debug=1, which generates a legacy text format with comments translating addresses to function names and line numbers, so that a programmer can read the profile without tools. These profiles cannot be `pprof.Merged` and so we manually stitch them together on a per-node basis before downloading them for the client as txt file. This change also adds support for filtering the aforementioned goroutines by pprof label. This will be used by the Jobs Profiler to collect cluster-wide stacks relevant to the running job. Informs: #105440 Release note: None --- docs/generated/http/full.md | 5 +- pkg/server/BUILD.bazel | 1 + pkg/server/debug/pprofui/server.go | 326 ++++++++++++++-------- pkg/server/debug/pprofui/server_test.go | 207 +++++++++++++- pkg/server/debug/pprofui/storage.go | 8 +- pkg/server/debug/pprofui/storage_mem.go | 15 +- pkg/server/serverpb/status.proto | 15 +- pkg/server/status.go | 192 ++++++++++--- pkg/server/status_local_file_retrieval.go | 28 +- 9 files changed, 611 insertions(+), 186 deletions(-) diff --git a/docs/generated/http/full.md b/docs/generated/http/full.md index df40a033666c..7a264d6add83 100644 --- a/docs/generated/http/full.md +++ b/docs/generated/http/full.md @@ -3043,10 +3043,11 @@ Support status: [reserved](#support-status) | Field | Type | Label | Description | Support status | | ----- | ---- | ----- | ----------- | -------------- | -| node_id | [string](#cockroach.server.serverpb.ProfileRequest-string) | | node_id is a string so that "local" can be used to specify that no forwarding is necessary. node_id translates to a KV node ID on a storage server and SQL instance ID on a SQL only server. | [reserved](#support-status) | +| node_id | [string](#cockroach.server.serverpb.ProfileRequest-string) | | node_id is a string so that "local" or "all" can be used to specify that no forwarding is necessary. node_id translates to a KV node ID on a storage server and SQL instance ID on a SQL only server. | [reserved](#support-status) | | type | [ProfileRequest.Type](#cockroach.server.serverpb.ProfileRequest-cockroach.server.serverpb.ProfileRequest.Type) | | The type of profile to retrieve. | [reserved](#support-status) | | seconds | [int32](#cockroach.server.serverpb.ProfileRequest-int32) | | applies only to Type=CPU, defaults to 30 | [reserved](#support-status) | -| labels | [bool](#cockroach.server.serverpb.ProfileRequest-bool) | | applies only to Type=CPU, defaults to false | [reserved](#support-status) | +| labels | [bool](#cockroach.server.serverpb.ProfileRequest-bool) | | Labels can be specified for Type=CPU or Type=GOROUTINE.

- If true for CPU profiles, we request a CPU profile with pprof labels.

- If true for GOROUTINE profiles, we request an aggregated goroutine profile with debug=1. | [reserved](#support-status) | +| label_filter | [string](#cockroach.server.serverpb.ProfileRequest-string) | | LabelFilter only applies to Type=GOROUTINE. Only goroutines with a pprof label matching the filter will be returned. | [reserved](#support-status) | | sender_server_version | [cockroach.roachpb.Version](#cockroach.server.serverpb.ProfileRequest-cockroach.roachpb.Version) | | SenderServerVersion is the server version of the node sending the Profile request. If this field is set then the node processing the request will only collect the profile if its server version is equal to the sender's server version.

Currently, this is only used when collecting profiles that will be merged using pprof.Merge as all the samples must be from the same binary version. | [reserved](#support-status) | diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index d58264665c1f..b91422aaa272 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -173,6 +173,7 @@ go_library( "//pkg/server/autoconfig", "//pkg/server/autoconfig/acprovider", "//pkg/server/debug", + "//pkg/server/debug/pprofui", "//pkg/server/diagnostics", "//pkg/server/diagnostics/diagnosticspb", "//pkg/server/goroutinedumper", diff --git a/pkg/server/debug/pprofui/server.go b/pkg/server/debug/pprofui/server.go index 525d89f5b4a3..3997c31d7260 100644 --- a/pkg/server/debug/pprofui/server.go +++ b/pkg/server/debug/pprofui/server.go @@ -18,6 +18,7 @@ import ( "net/http" "net/url" "path" + "regexp" runtimepprof "runtime/pprof" "sort" "strconv" @@ -97,6 +98,162 @@ func (s *Server) parsePath(reqPath string) (profType string, id string, remainin } } +// validateProfileRequest validates that the request does not contain any +// conflicting or invalid configurations. +func validateProfileRequest(req *serverpb.ProfileRequest) error { + switch req.Type { + case serverpb.ProfileRequest_GOROUTINE: + case serverpb.ProfileRequest_CPU: + if req.LabelFilter != "" { + return errors.Newf("filtering using a pprof label is unsupported for %s", req.Type) + } + default: + if req.NodeId == "all" { + return errors.Newf("cluster-wide collection is unsupported for %s", req.Type) + } + + if req.Labels { + return errors.Newf("profiling with labels is unsupported for %s", req.Type) + } + + if req.LabelFilter != "" { + return errors.Newf("filtering using a pprof label is unsupported for %s", req.Type) + } + } + + return nil +} + +func constructProfileReq(r *http.Request, profileName string) (*serverpb.ProfileRequest, error) { + req, err := http.NewRequest("GET", "/unused", bytes.NewReader(nil)) + if err != nil { + return nil, err + } + + profileType, ok := serverpb.ProfileRequest_Type_value[strings.ToUpper(profileName)] + if !ok && profileName != "profile" { + return nil, errors.Newf("unknown profile name: %s", profileName) + } + // There is a discrepancy between the usage of the "CPU" and + // "profile" names to refer to the CPU profile in the + // implementations. The URL to the CPU profile has been modified + // on the Advanced Debug page to point to /pprof/ui/cpu but this + // is retained for backwards compatibility. + if profileName == "profile" { + profileType = int32(serverpb.ProfileRequest_CPU) + } + profileReq := &serverpb.ProfileRequest{ + NodeId: "local", + Type: serverpb.ProfileRequest_Type(profileType), + } + + // Pass through any parameters. Most notably, allow ?seconds=10 for + // CPU profiles. + _ = r.ParseForm() + req.Form = r.Form + + if r.Form.Get("seconds") != "" { + sec, err := strconv.ParseInt(r.Form.Get("seconds"), 10, 32) + if err != nil { + return nil, err + } + profileReq.Seconds = int32(sec) + } + if r.Form.Get("node") != "" { + profileReq.NodeId = r.Form.Get("node") + } + if r.Form.Get("labels") != "" { + withLabels, err := strconv.ParseBool(r.Form.Get("labels")) + if err != nil { + return nil, err + } + profileReq.Labels = withLabels + } + if labelFilter := r.Form.Get("labelfilter"); labelFilter != "" { + profileReq.Labels = true + profileReq.LabelFilter = labelFilter + } + if err := validateProfileRequest(profileReq); err != nil { + return nil, err + } + return profileReq, nil +} + +func (s *Server) serveCachedProfile( + id string, remainingPath string, profileName string, w http.ResponseWriter, r *http.Request, +) { + // Catch nonexistent IDs early or pprof will do a worse job at + // giving an informative error. + var isProfileProto bool + if err := s.storage.Get(id, func(b bool, _ io.Reader) error { + isProfileProto = b + return nil + }); err != nil { + msg := fmt.Sprintf("profile for id %s not found: %s", id, err) + http.Error(w, msg, http.StatusNotFound) + return + } + + // If the profile is not in a protobuf format, downloading it is the only + // option. + if !isProfileProto || r.URL.Query().Get("download") != "" { + // TODO(tbg): this has zero discoverability. + w.Header().Set("Content-Disposition", + fmt.Sprintf("attachment; filename=%s_%s.pb.gz", profileName, id)) + w.Header().Set("Content-Type", "application/octet-stream") + if err := s.storage.Get(id, func(_ bool, r io.Reader) error { + _, err := io.Copy(w, r) + return err + }); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } + return + } + + server := func(args *driver.HTTPServerArgs) error { + handler, ok := args.Handlers[remainingPath] + if !ok { + return errors.Errorf("unknown endpoint %s", remainingPath) + } + handler.ServeHTTP(w, r) + return nil + } + + storageFetcher := func(_ string, _, _ time.Duration) (*profile.Profile, string, error) { + var p *profile.Profile + if err := s.storage.Get(id, func(_ bool, reader io.Reader) error { + var err error + p, err = profile.Parse(reader) + return err + }); err != nil { + return nil, "", err + } + return p, "", nil + } + + // Invoke the (library version) of `pprof` with a number of stubs. + // Specifically, we pass a fake FlagSet that plumbs through the + // given args, a UI that logs any errors pprof may emit, a fetcher + // that simply reads the profile we downloaded earlier, and a + // HTTPServer that pprof will pass the web ui handlers to at the + // end (and we let it handle this client request). + if err := driver.PProf(&driver.Options{ + Flagset: &pprofFlags{ + FlagSet: pflag.NewFlagSet("pprof", pflag.ExitOnError), + args: []string{ + "--symbolize", "none", + "--http", "localhost:0", + "", // we inject our own target + }, + }, + UI: &fakeUI{}, + Fetch: fetcherFn(storageFetcher), + HTTPServer: server, + }); err != nil { + _, _ = w.Write([]byte(err.Error())) + } +} + func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { profileName, id, remainingPath := s.parsePath(r.URL.Path) @@ -113,124 +270,24 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { } if id != "" { - // Catch nonexistent IDs early or pprof will do a worse job at - // giving an informative error. - if err := s.storage.Get(id, func(io.Reader) error { return nil }); err != nil { - msg := fmt.Sprintf("profile for id %s not found: %s", id, err) - http.Error(w, msg, http.StatusNotFound) - return - } - - if r.URL.Query().Get("download") != "" { - // TODO(tbg): this has zero discoverability. - w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%s_%s.pb.gz", profileName, id)) - w.Header().Set("Content-Type", "application/octet-stream") - if err := s.storage.Get(id, func(r io.Reader) error { - _, err := io.Copy(w, r) - return err - }); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - } - return - } - - server := func(args *driver.HTTPServerArgs) error { - handler, ok := args.Handlers[remainingPath] - if !ok { - return errors.Errorf("unknown endpoint %s", remainingPath) - } - handler.ServeHTTP(w, r) - return nil - } - - storageFetcher := func(_ string, _, _ time.Duration) (*profile.Profile, string, error) { - var p *profile.Profile - if err := s.storage.Get(id, func(reader io.Reader) error { - var err error - p, err = profile.Parse(reader) - return err - }); err != nil { - return nil, "", err - } - return p, "", nil - } - - // Invoke the (library version) of `pprof` with a number of stubs. - // Specifically, we pass a fake FlagSet that plumbs through the - // given args, a UI that logs any errors pprof may emit, a fetcher - // that simply reads the profile we downloaded earlier, and a - // HTTPServer that pprof will pass the web ui handlers to at the - // end (and we let it handle this client request). - if err := driver.PProf(&driver.Options{ - Flagset: &pprofFlags{ - FlagSet: pflag.NewFlagSet("pprof", pflag.ExitOnError), - args: []string{ - "--symbolize", "none", - "--http", "localhost:0", - "", // we inject our own target - }, - }, - UI: &fakeUI{}, - Fetch: fetcherFn(storageFetcher), - HTTPServer: server, - }); err != nil { - _, _ = w.Write([]byte(err.Error())) - } - + s.serveCachedProfile(id, remainingPath, profileName, w, r) return } // Create and save new profile, then redirect client to corresponding ui URL. - id = s.storage.ID() - - if err := s.storage.Store(id, func(w io.Writer) error { - req, err := http.NewRequest("GET", "/unused", bytes.NewReader(nil)) - if err != nil { - return err - } - - profileType, ok := serverpb.ProfileRequest_Type_value[strings.ToUpper(profileName)] - if !ok && profileName != "profile" { - return errors.Newf("unknown profile name: %s", profileName) - } - // There is a discrepancy between the usage of the "CPU" and - // "profile" names to refer to the CPU profile in the - // implementations. The URL to the CPU profile has been modified - // on the Advanced Debug page to point to /pprof/ui/cpu but this - // is retained for backwards compatibility. - if profileName == "profile" { - profileType = int32(serverpb.ProfileRequest_CPU) - } - var resp *serverpb.JSONResponse - profileReq := &serverpb.ProfileRequest{ - NodeId: "local", - Type: serverpb.ProfileRequest_Type(profileType), - } - - // Pass through any parameters. Most notably, allow ?seconds=10 for - // CPU profiles. - _ = r.ParseForm() - req.Form = r.Form - - if r.Form.Get("seconds") != "" { - sec, err := strconv.ParseInt(r.Form.Get("seconds"), 10, 32) - if err != nil { - return err - } - profileReq.Seconds = int32(sec) - } - if r.Form.Get("node") != "" { - profileReq.NodeId = r.Form.Get("node") - } - if r.Form.Get("labels") != "" { - labels, err := strconv.ParseBool(r.Form.Get("labels")) - if err != nil { - return err - } - profileReq.Labels = labels - } - resp, err = s.profiler.Profile(r.Context(), profileReq) + profileReq, err := constructProfileReq(r, profileName) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + // A GOROUTINE profile with labels is not generated in a protobuf format. It + // is outputted in a "legacy text format with comments translating addresses + // to function names and line numbers, so that a programmer can read the + // profile without tools." + isProfileProto := !(profileReq.Type == serverpb.ProfileRequest_GOROUTINE && profileReq.Labels) + if err := s.storage.Store(id, isProfileProto, func(w io.Writer) error { + resp, err := s.profiler.Profile(r.Context(), profileReq) if err != nil { return err } @@ -244,6 +301,18 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } + // If the profile is not in a protobuf format, downloading it is the only + // option. + if !isProfileProto { + w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%s_%s.txt", profileName, id)) + w.Header().Set("Content-Type", "text/plain") + _ = s.storage.Get(id, func(isProtoProfile bool, r io.Reader) error { + _, err := io.Copy(w, r) + return err + }) + return + } + // NB: direct straight to the flamegraph. This is because `pprof` // shells out to `dot` for the default landing page and thus works // only on hosts that have graphviz installed. You can still navigate @@ -263,13 +332,42 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { if !isGoPProf { http.Redirect(w, r, origURL.String(), http.StatusTemporaryRedirect) } else { - _ = s.storage.Get(id, func(r io.Reader) error { + _ = s.storage.Get(id, func(isProtoProfile bool, r io.Reader) error { _, err := io.Copy(w, r) return err }) } } +// FilterStacksWithLabels writes all the stacks that have pprof labels matching +// any one of the expectedLabels to resBuf. +// +// The input must be in the format used by debug/pprof/goroutines?debug=1. +func FilterStacksWithLabels(stacks []byte, labelFilter string) []byte { + if labelFilter == "" { + return stacks + } + res := bytes.NewBuffer(nil) + goroutines := strings.Split(string(stacks), "\n\n") + // The first element is the nodeID which we want to always keep. + res.WriteString(goroutines[0]) + goroutines = goroutines[1:] + regex := regexp.MustCompile(fmt.Sprintf(`labels: {.*%s.*}`, labelFilter)) + for _, g := range goroutines { + // pprof.Lookup("goroutine") with debug=1 renders the pprof labels + // corresponding to a stack as: + // # labels: {"foo":"bar", "baz":"biz"}. + match := regex.MatchString(g) + if match { + res.WriteString("\n\n") + res.WriteString(g) + } + } + res.WriteString("\n\n") + + return res.Bytes() +} + type fetcherFn func(_ string, _, _ time.Duration) (*profile.Profile, string, error) func (f fetcherFn) Fetch(s string, d, t time.Duration) (*profile.Profile, string, error) { diff --git a/pkg/server/debug/pprofui/server_test.go b/pkg/server/debug/pprofui/server_test.go index 90028593bdf4..010e192c4231 100644 --- a/pkg/server/debug/pprofui/server_test.go +++ b/pkg/server/debug/pprofui/server_test.go @@ -50,11 +50,17 @@ func init() { } } +var supportedProfiles = []string{ + "cpu", "goroutine", "heap", "threadcreate", "block", "mutex", "allocs", +} + func TestServer(t *testing.T) { expectedNodeID := "local" + withLabels := false mockProfile := func(ctx context.Context, req *serverpb.ProfileRequest) (*serverpb.JSONResponse, error) { require.Equal(t, expectedNodeID, req.NodeId) + require.Equal(t, withLabels, req.Labels) b, err := os.ReadFile(datapathutils.TestDataPath(t, "heap.profile")) require.NoError(t, err) return &serverpb.JSONResponse{Data: b}, nil @@ -63,26 +69,30 @@ func TestServer(t *testing.T) { storage := NewMemStorage(1, 0) s := NewServer(storage, ProfilerFunc(mockProfile)) + count := 1 for i := 0; i < 3; i++ { - t.Run(fmt.Sprintf("request local profile %d", i), func(t *testing.T) { - r := httptest.NewRequest("GET", "/heap/", nil) - w := httptest.NewRecorder() - s.ServeHTTP(w, r) + for _, profileType := range supportedProfiles { + t.Run(fmt.Sprintf("request local profile %s:%d", profileType, i), func(t *testing.T) { + r := httptest.NewRequest("GET", fmt.Sprintf("/%s/", profileType), nil) + w := httptest.NewRecorder() + s.ServeHTTP(w, r) - require.Equal(t, http.StatusTemporaryRedirect, w.Code) + require.Equal(t, http.StatusTemporaryRedirect, w.Code) - loc := w.Result().Header.Get("Location") - require.Equal(t, fmt.Sprintf("/heap/%d/flamegraph", i+1), loc) + loc := w.Result().Header.Get("Location") + require.Equal(t, fmt.Sprintf("/%s/%d/flamegraph", profileType, count), loc) + count++ - r = httptest.NewRequest("GET", loc, nil) - w = httptest.NewRecorder() + r = httptest.NewRequest("GET", loc, nil) + w = httptest.NewRecorder() - s.ServeHTTP(w, r) + s.ServeHTTP(w, r) - require.Equal(t, http.StatusOK, w.Code) - require.Contains(t, w.Body.String(), "pprof") - }) - require.Equal(t, 1, len(storage.getRecords()), + require.Equal(t, http.StatusOK, w.Code) + require.Contains(t, w.Body.String(), "pprof") + }) + } + require.Equal(t, 1, len(storage.GetRecords()), "storage did not expunge records") } @@ -95,7 +105,97 @@ func TestServer(t *testing.T) { require.Equal(t, http.StatusTemporaryRedirect, w.Code) loc := w.Result().Header.Get("Location") - require.Equal(t, "/heap/4/flamegraph?node=3", loc) + require.Equal(t, fmt.Sprintf("/heap/%d/flamegraph?node=3", count), loc) + }) + + t.Run("request profile with labels", func(t *testing.T) { + expectedNodeID = "3" + withLabels = true + defer func() { + withLabels = false + }() + + // Labels are only supported for CPU and GOROUTINE profiles. + r := httptest.NewRequest("GET", "/heap/?node=3&labels=true", nil) + w := httptest.NewRecorder() + s.ServeHTTP(w, r) + count++ + require.Equal(t, http.StatusInternalServerError, w.Code) + require.Equal(t, "profiling with labels is unsupported for HEAP\n", w.Body.String()) + + r = httptest.NewRequest("GET", "/cpu/?node=3&labels=true", nil) + w = httptest.NewRecorder() + s.ServeHTTP(w, r) + count++ + require.Equal(t, http.StatusTemporaryRedirect, w.Code) + loc := w.Result().Header.Get("Location") + require.Equal(t, fmt.Sprintf("/cpu/%d/flamegraph?node=3&labels=true", count), loc) + + // A GOROUTINE profile with a label will trigger a download since it is not + // generated a Profile protobuf format. + r = httptest.NewRequest("GET", "/goroutine/?node=3&labels=true", nil) + w = httptest.NewRecorder() + s.ServeHTTP(w, r) + count++ + require.Equal(t, http.StatusOK, w.Code) + require.Equal(t, "attachment; filename=goroutine_25.txt", w.Header().Get("Content-Disposition")) + require.Equal(t, "text/plain", w.Header().Get("Content-Type")) + }) + + t.Run("request cluster-wide profiles", func(t *testing.T) { + expectedNodeID = "all" + + // Cluster-wide profiles are only supported for CPU and GOROUTINE profiles. + r := httptest.NewRequest("GET", "/heap/?node=all", nil) + w := httptest.NewRecorder() + s.ServeHTTP(w, r) + count++ + require.Equal(t, http.StatusInternalServerError, w.Code) + require.Equal(t, "cluster-wide collection is unsupported for HEAP\n", w.Body.String()) + + r = httptest.NewRequest("GET", "/cpu/?node=all", nil) + w = httptest.NewRecorder() + s.ServeHTTP(w, r) + count++ + require.Equal(t, http.StatusTemporaryRedirect, w.Code) + loc := w.Result().Header.Get("Location") + require.Equal(t, fmt.Sprintf("/cpu/%d/flamegraph?node=all", count), loc) + + // A GOROUTINE profile with a label will trigger a download since it is not + // generated a Profile protobuf format. + r = httptest.NewRequest("GET", "/goroutine/?node=all", nil) + w = httptest.NewRecorder() + s.ServeHTTP(w, r) + count++ + require.Equal(t, http.StatusTemporaryRedirect, w.Code) + loc = w.Result().Header.Get("Location") + require.Equal(t, fmt.Sprintf("/goroutine/%d/flamegraph?node=all", count), loc) + }) + + t.Run("request profile with label filters", func(t *testing.T) { + expectedNodeID = "3" + withLabels = true + defer func() { + withLabels = false + }() + + // Labels are only supported for CPU and GOROUTINE profiles. + r := httptest.NewRequest("GET", "/heap/?node=3&labels=true&labelfilter=foo:bar", nil) + w := httptest.NewRecorder() + s.ServeHTTP(w, r) + count++ + require.Equal(t, http.StatusInternalServerError, w.Code) + require.Equal(t, "profiling with labels is unsupported for HEAP\n", w.Body.String()) + + // A GOROUTINE profile with a label will trigger a download since it is not + // generated a Profile protobuf format. + r = httptest.NewRequest("GET", "/goroutine/?node=3&labelfilter=foo:bar", nil) + w = httptest.NewRecorder() + s.ServeHTTP(w, r) + count++ + require.Equal(t, http.StatusOK, w.Code) + require.Equal(t, "attachment; filename=goroutine_30.txt", w.Header().Get("Content-Disposition")) + require.Equal(t, "text/plain", w.Header().Get("Content-Type")) }) } @@ -154,3 +254,80 @@ func TestServerConcurrentAccess(t *testing.T) { } wg.Wait() } + +func TestFilterStacksWithLabels(t *testing.T) { + testStacks := ` +Stacks for node 1: + +8 @ 0x4a13d6 0x46b3bb 0x46aef8 0x12fd53f 0xe9bc43 0x12fd478 0x4d3101 +# labels: {"foo":"baz", "bar":"biz"} +# 0x12fd53e github.com/cockroachdb/pebble.(*tableCacheShard).releaseLoop.func1+0x9e github.com/cockroachdb/pebble/external/com_github_cockroachdb_pebble/table_cache.go:324 + +10 @ 0x4a13d6 0x4b131c 0x17969e6 0x4d3101 +# 0x17969e5 github.com/cockroachdb/cockroach/pkg/util/admission.initWorkQueue.func2+0x85 github.com/cockroachdb/cockroach/pkg/util/admission/work_queue.go:388 + +8 @ 0x4a13d6 0x4b131c 0x1796e96 0x4d3101 +# labels: {"bar":"biz"} +# 0x1796e95 github.com/cockroachdb/cockroach/pkg/util/admission.(*WorkQueue).startClosingEpochs.func1+0x1d5 github.com/cockroachdb/cockroach/pkg +/util/admission/work_queue.go:462 + +1 @ 0x100907fc4 0x100919c8c 0x100919c69 0x1009354d8 0x1009459c0 0x101e2015c 0x101ec4b08 0x101ec4fc8 0x101f0b78c 0x101f0addc 0x1015319fc 0x100939fb4 +# labels: {"range_str":"12419/2:/Table/136/1/"{NHCH-…-PWN-a"}", "n":"1", "rangefeed":"sql-watcher-descriptor-rangefeed"} +# 0x1009354d7 sync.runtime_Semacquire+0x27 GOROOT/src/runtime/sema.go:62 + + +` + t.Run("empty filter", func(t *testing.T) { + res := FilterStacksWithLabels([]byte(testStacks), "") + require.Equal(t, ` +Stacks for node 1: + +8 @ 0x4a13d6 0x46b3bb 0x46aef8 0x12fd53f 0xe9bc43 0x12fd478 0x4d3101 +# labels: {"foo":"baz", "bar":"biz"} +# 0x12fd53e github.com/cockroachdb/pebble.(*tableCacheShard).releaseLoop.func1+0x9e github.com/cockroachdb/pebble/external/com_github_cockroachdb_pebble/table_cache.go:324 + +10 @ 0x4a13d6 0x4b131c 0x17969e6 0x4d3101 +# 0x17969e5 github.com/cockroachdb/cockroach/pkg/util/admission.initWorkQueue.func2+0x85 github.com/cockroachdb/cockroach/pkg/util/admission/work_queue.go:388 + +8 @ 0x4a13d6 0x4b131c 0x1796e96 0x4d3101 +# labels: {"bar":"biz"} +# 0x1796e95 github.com/cockroachdb/cockroach/pkg/util/admission.(*WorkQueue).startClosingEpochs.func1+0x1d5 github.com/cockroachdb/cockroach/pkg +/util/admission/work_queue.go:462 + +1 @ 0x100907fc4 0x100919c8c 0x100919c69 0x1009354d8 0x1009459c0 0x101e2015c 0x101ec4b08 0x101ec4fc8 0x101f0b78c 0x101f0addc 0x1015319fc 0x100939fb4 +# labels: {"range_str":"12419/2:/Table/136/1/"{NHCH-…-PWN-a"}", "n":"1", "rangefeed":"sql-watcher-descriptor-rangefeed"} +# 0x1009354d7 sync.runtime_Semacquire+0x27 GOROOT/src/runtime/sema.go:62 + + +`, string(res)) + }) + + t.Run("bar-biz filter", func(t *testing.T) { + res := FilterStacksWithLabels([]byte(testStacks), "\"bar\":\"biz\"") + require.Equal(t, ` +Stacks for node 1: + +8 @ 0x4a13d6 0x46b3bb 0x46aef8 0x12fd53f 0xe9bc43 0x12fd478 0x4d3101 +# labels: {"foo":"baz", "bar":"biz"} +# 0x12fd53e github.com/cockroachdb/pebble.(*tableCacheShard).releaseLoop.func1+0x9e github.com/cockroachdb/pebble/external/com_github_cockroachdb_pebble/table_cache.go:324 + +8 @ 0x4a13d6 0x4b131c 0x1796e96 0x4d3101 +# labels: {"bar":"biz"} +# 0x1796e95 github.com/cockroachdb/cockroach/pkg/util/admission.(*WorkQueue).startClosingEpochs.func1+0x1d5 github.com/cockroachdb/cockroach/pkg +/util/admission/work_queue.go:462 + +`, string(res)) + }) + + t.Run("filter non-alphanumeric characters", func(t *testing.T) { + res := FilterStacksWithLabels([]byte(testStacks), "\"{NHCH") + require.Equal(t, ` +Stacks for node 1: + +1 @ 0x100907fc4 0x100919c8c 0x100919c69 0x1009354d8 0x1009459c0 0x101e2015c 0x101ec4b08 0x101ec4fc8 0x101f0b78c 0x101f0addc 0x1015319fc 0x100939fb4 +# labels: {"range_str":"12419/2:/Table/136/1/"{NHCH-…-PWN-a"}", "n":"1", "rangefeed":"sql-watcher-descriptor-rangefeed"} +# 0x1009354d7 sync.runtime_Semacquire+0x27 GOROOT/src/runtime/sema.go:62 + +`, string(res)) + }) +} diff --git a/pkg/server/debug/pprofui/storage.go b/pkg/server/debug/pprofui/storage.go index 90f7659722f4..4958c369b3f9 100644 --- a/pkg/server/debug/pprofui/storage.go +++ b/pkg/server/debug/pprofui/storage.go @@ -17,8 +17,12 @@ type Storage interface { // ID generates a unique ID for use in Store. ID() string // Store invokes the passed-in closure with a writer that stores its input. - Store(id string, write func(io.Writer) error) error + // IsProfileProto determines whether the input will be in the protobuf format + // outlined in https://github.com/google/pprof/tree/main/proto#overview. + Store(id string, isProfileProto bool, write func(io.Writer) error) error // Get invokes the passed-in closure with a reader for the data at the given id. // An error is returned when no data is found. - Get(id string, read func(io.Reader) error) error + // The boolean indicates whether the stored record is in protobuf format + // outlined in https://github.com/google/pprof/tree/main/proto#overview. + Get(id string, read func(bool, io.Reader) error) error } diff --git a/pkg/server/debug/pprofui/storage_mem.go b/pkg/server/debug/pprofui/storage_mem.go index f60c5c1adc2b..489734257797 100644 --- a/pkg/server/debug/pprofui/storage_mem.go +++ b/pkg/server/debug/pprofui/storage_mem.go @@ -26,7 +26,10 @@ import ( type record struct { id string t time.Time - b []byte + // isProfileProto is true when the profile record is stored in protobuf format + // outlined in https://github.com/google/pprof/tree/main/proto#overview. + isProfileProto bool + b []byte } // A MemStorage is a Storage implementation that holds recent profiles in memory. @@ -40,7 +43,7 @@ type MemStorage struct { keepNumber int // zero for disabled } -func (s *MemStorage) getRecords() []record { +func (s *MemStorage) GetRecords() []record { s.mu.Lock() defer s.mu.Unlock() return append([]record(nil), s.mu.records...) @@ -78,14 +81,14 @@ func (s *MemStorage) cleanLocked() { } // Store implements Storage. -func (s *MemStorage) Store(id string, write func(io.Writer) error) error { +func (s *MemStorage) Store(id string, isProfileProto bool, write func(io.Writer) error) error { var b bytes.Buffer if err := write(&b); err != nil { return err } s.mu.Lock() defer s.mu.Unlock() - s.mu.records = append(s.mu.records, record{id: id, t: timeutil.Now(), b: b.Bytes()}) + s.mu.records = append(s.mu.records, record{id: id, t: timeutil.Now(), b: b.Bytes(), isProfileProto: isProfileProto}) sort.Slice(s.mu.records, func(i, j int) bool { return s.mu.records[i].t.Before(s.mu.records[j].t) }) @@ -94,12 +97,12 @@ func (s *MemStorage) Store(id string, write func(io.Writer) error) error { } // Get implements Storage. -func (s *MemStorage) Get(id string, read func(io.Reader) error) error { +func (s *MemStorage) Get(id string, read func(bool, io.Reader) error) error { s.mu.Lock() defer s.mu.Unlock() for _, v := range s.mu.records { if v.id == id { - return read(bytes.NewReader(v.b)) + return read(v.isProfileProto, bytes.NewReader(v.b)) } } return errors.Errorf("profile not found; it may have expired, please regenerate the profile.\n" + diff --git a/pkg/server/serverpb/status.proto b/pkg/server/serverpb/status.proto index f3eab6f08eab..afd1d292f71c 100644 --- a/pkg/server/serverpb/status.proto +++ b/pkg/server/serverpb/status.proto @@ -753,7 +753,7 @@ message GetFilesResponse { } message ProfileRequest { - // node_id is a string so that "local" can be used to specify that no + // node_id is a string so that "local" or "all" can be used to specify that no // forwarding is necessary. node_id translates to a KV node ID on a storage // server and SQL instance ID on a SQL only server. string node_id = 1; @@ -771,7 +771,18 @@ message ProfileRequest { Type type = 5; int32 seconds = 6; // applies only to Type=CPU, defaults to 30 - bool labels = 7; // applies only to Type=CPU, defaults to false + + // Labels can be specified for Type=CPU or Type=GOROUTINE. + // + // - If true for CPU profiles, we request a CPU profile with pprof labels. + // + // - If true for GOROUTINE profiles, we request an aggregated goroutine + // profile with debug=1. + bool labels = 7; + + // LabelFilter only applies to Type=GOROUTINE. Only goroutines with a pprof + // label matching the filter will be returned. + string label_filter = 9; // SenderServerVersion is the server version of the node sending the Profile // request. If this field is set then the node processing the request will diff --git a/pkg/server/status.go b/pkg/server/status.go index 309c67ac8efc..2fad36d60b2d 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -1461,18 +1461,142 @@ func (s *statusServer) Stacks( return stacksLocal(req) } -// fetchProfileFromAllNodes fetches the CPU profiles from all live nodes in the +func (s *statusServer) processRawGoroutines( + _ context.Context, response profDataResponse, +) ([]byte, error) { + res := bytes.NewBuffer(nil) + for nodeID, pd := range response.profDataByNodeID { + if len(pd.data) == 0 && pd.err == nil { + res.WriteString(fmt.Sprintf("No goroutines collected for node %d\n", nodeID)) + continue // skipped node + } + + if pd.err != nil { + res.WriteString(fmt.Sprintf("Failed to collect goroutines for node %d: %v\n", nodeID, pd.err)) + continue + } + + res.Write(pd.data) + } + return res.Bytes(), nil +} + +func (s *statusServer) processProfileProtoGoroutines( + _ context.Context, response profDataResponse, +) ([]byte, error) { + profileErrs := make([]string, 0) + res := bytes.NewBuffer(nil) + profs := make([]*profile.Profile, 0, len(response.profDataByNodeID)) + for nodeID, pd := range response.profDataByNodeID { + if len(pd.data) == 0 && pd.err == nil { + profileErrs = append(profileErrs, fmt.Sprintf("No goroutines collected for node %d", nodeID)) + continue // skipped node + } + + if pd.err != nil { + profileErrs = append(profileErrs, fmt.Sprintf("Failed to collect goroutines for node %d: %v", nodeID, pd.err)) + continue + } + + p, err := profile.ParseData(pd.data) + if err != nil { + return nil, err + } + p.Comments = append(p.Comments, fmt.Sprintf("n%d", nodeID)) + profs = append(profs, p) + } + + errMsg := "Errors while collecting profiles:\n" + for _, pErr := range profileErrs { + errMsg += fmt.Sprintf("%s\n", pErr) + } + if len(profs) == 0 { + return nil, errors.Newf("no profiles could be collected: %s", errMsg) + } + + mergedProfiles, err := profile.Merge(profs) + if err != nil { + return nil, errors.Wrap(err, "failed to merge profiles") + } + if len(profileErrs) > 0 { + mergedProfiles.Comments = append(mergedProfiles.Comments, errMsg) + } + + if err := mergedProfiles.Write(res); err != nil { + return nil, status.Errorf(codes.Internal, err.Error()) + } + + return res.Bytes(), nil +} + +func (s *statusServer) processGoroutineProfilesFromAllNodes( + ctx context.Context, request *serverpb.ProfileRequest, response profDataResponse, +) ([]byte, error) { + if request.Labels { + return s.processRawGoroutines(ctx, response) + } + return s.processProfileProtoGoroutines(ctx, response) +} + +func (s *statusServer) processCPUProfilesFromAllNodes( + _ context.Context, response profDataResponse, +) ([]byte, error) { + profileErrs := make([]string, 0) + profs := make([]*profile.Profile, 0, len(response.profDataByNodeID)) + for nodeID, pd := range response.profDataByNodeID { + if len(pd.data) == 0 && pd.err == nil { + profileErrs = append(profileErrs, fmt.Sprintf("No profile collected for node %d", nodeID)) + continue // skipped node + } + + if pd.err != nil { + profileErrs = append(profileErrs, fmt.Sprintf("Failed to collect profile for node %d: %v", nodeID, pd.err)) + continue + } + + p, err := profile.ParseData(pd.data) + if err != nil { + return nil, err + } + p.Comments = append(p.Comments, fmt.Sprintf("n%d", nodeID)) + profs = append(profs, p) + } + + errMsg := "Errors while collecting profiles:\n" + for _, pErr := range profileErrs { + errMsg += fmt.Sprintf("%s\n", pErr) + } + if len(profs) == 0 { + return nil, errors.Newf("no profiles could be collected: %s", errMsg) + } + mergedProfiles, err := profile.Merge(profs) + if err != nil { + return nil, errors.Wrap(err, "failed to merge profiles") + } + if len(profileErrs) > 0 { + mergedProfiles.Comments = append(mergedProfiles.Comments, errMsg) + } + + var buf bytes.Buffer + if err := mergedProfiles.Write(&buf); err != nil { + return nil, status.Errorf(codes.Internal, err.Error()) + } + return buf.Bytes(), nil +} + +type profData struct { + data []byte + err error +} +type profDataResponse struct { + profDataByNodeID map[roachpb.NodeID]*profData +} + +// fetchProfileFromAllNodes fetches the profile from all live nodes in the // cluster and merges the samples across all profiles. func (s *statusServer) fetchProfileFromAllNodes( ctx context.Context, req *serverpb.ProfileRequest, ) (*serverpb.JSONResponse, error) { - type profData struct { - data []byte - err error - } - type profDataResponse struct { - profDataByNodeID map[roachpb.NodeID]*profData - } response := profDataResponse{profDataByNodeID: make(map[roachpb.NodeID]*profData)} resp, err := s.Node(ctx, &serverpb.NodeRequest{NodeId: "local"}) @@ -1484,20 +1608,17 @@ func (s *statusServer) fetchProfileFromAllNodes( client, err := s.dialNode(ctx, nodeID) return client, err } + opName := fmt.Sprintf("fetch cluster-wide %s profile", req.Type) nodeFn := func(ctx context.Context, client interface{}, nodeID roachpb.NodeID) (interface{}, error) { statusClient := client.(serverpb.StatusClient) - resp, err := statusClient.Node(ctx, &serverpb.NodeRequest{NodeId: "local"}) - if err != nil { - return nodeID, err - } var pd *profData - err = timeutil.RunWithTimeout(ctx, "fetch cpu profile", 1*time.Minute, func(ctx context.Context) error { - log.Infof(ctx, "fetching a CPU profile for %d", resp.Desc.NodeID) + err = timeutil.RunWithTimeout(ctx, opName, 1*time.Minute, func(ctx context.Context) error { resp, err := statusClient.Profile(ctx, &serverpb.ProfileRequest{ NodeId: fmt.Sprintf("%d", nodeID), - Type: serverpb.ProfileRequest_CPU, + Type: req.Type, Seconds: req.Seconds, Labels: req.Labels, + LabelFilter: req.LabelFilter, SenderServerVersion: &senderServerVersion, }) if err != nil { @@ -1515,39 +1636,22 @@ func (s *statusServer) fetchProfileFromAllNodes( errorFn := func(nodeID roachpb.NodeID, err error) { response.profDataByNodeID[nodeID] = &profData{err: err} } - if err := s.iterateNodes(ctx, "cluster-wide CPU profile", dialFn, nodeFn, responseFn, errorFn); err != nil { + if err := s.iterateNodes(ctx, opName, dialFn, nodeFn, responseFn, errorFn); err != nil { return nil, serverError(ctx, err) } - - profs := make([]*profile.Profile, 0, len(response.profDataByNodeID)) - for nodeID, pd := range response.profDataByNodeID { - if len(pd.data) == 0 && pd.err == nil { - log.Warningf(ctx, "no profile collected for node %d", nodeID) - continue // skipped node - } - - if pd.err != nil { - log.Warningf(ctx, "failed to collect profile for node %d: %v", nodeID, pd.err) - continue - } - - p, err := profile.ParseData(pd.data) - if err != nil { - return nil, err - } - p.Comments = append(p.Comments, fmt.Sprintf("n%d", nodeID)) - profs = append(profs, p) + var data []byte + switch req.Type { + case serverpb.ProfileRequest_CPU: + data, err = s.processCPUProfilesFromAllNodes(ctx, response) + case serverpb.ProfileRequest_GOROUTINE: + data, err = s.processGoroutineProfilesFromAllNodes(ctx, req, response) + default: + return nil, errors.Newf("cluster-wide collection of %s is unsupported", req.Type) } - mergedProfiles, err := profile.Merge(profs) if err != nil { return nil, err } - - var buf bytes.Buffer - if err := mergedProfiles.Write(&buf); err != nil { - return nil, status.Errorf(codes.Internal, err.Error()) - } - return &serverpb.JSONResponse{Data: buf.Bytes()}, nil + return &serverpb.JSONResponse{Data: data}, nil } // TODO(tschottdorf): significant overlap with /debug/pprof/heap, except that @@ -1568,7 +1672,7 @@ func (s *statusServer) Profile( } // If the request is for "all" nodes then we collect profiles from all nodes - // in the cluster and merge them before returning to the user. + // in the cluster and process them before returning to the user. if req.NodeId == "all" { return s.fetchProfileFromAllNodes(ctx, req) } @@ -1595,7 +1699,7 @@ func (s *statusServer) Profile( serverVersion.String(), req.SenderServerVersion.String()) } } - return profileLocal(ctx, req, s.st) + return profileLocal(ctx, req, s.st, nodeID) } // Regions implements the serverpb.StatusServer interface. diff --git a/pkg/server/status_local_file_retrieval.go b/pkg/server/status_local_file_retrieval.go index 34bff54c5ae7..2d3c6363c68d 100644 --- a/pkg/server/status_local_file_retrieval.go +++ b/pkg/server/status_local_file_retrieval.go @@ -13,13 +13,16 @@ package server import ( "bytes" "context" + "fmt" "os" "path/filepath" "runtime/pprof" "strings" "time" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/debug" + "github.com/cockroachdb/cockroach/pkg/server/debug/pprofui" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/allstacks" @@ -30,7 +33,7 @@ import ( // profileLocal runs a performance profile of the requested type (heap, cpu etc). // on the local node. This method returns a gRPC error to the caller. func profileLocal( - ctx context.Context, req *serverpb.ProfileRequest, st *cluster.Settings, + ctx context.Context, req *serverpb.ProfileRequest, st *cluster.Settings, nodeID roachpb.NodeID, ) (*serverpb.JSONResponse, error) { switch req.Type { case serverpb.ProfileRequest_CPU: @@ -59,6 +62,29 @@ func profileLocal( return nil, err } return &serverpb.JSONResponse{Data: buf.Bytes()}, nil + case serverpb.ProfileRequest_GOROUTINE: + p := pprof.Lookup("goroutine") + if p == nil { + return nil, status.Error(codes.Internal, "unable to find goroutine profile") + } + var buf bytes.Buffer + if req.Labels { + buf.WriteString(fmt.Sprintf("Stacks for node: %d\n\n", nodeID)) + if err := p.WriteTo(&buf, 1); err != nil { + return nil, status.Errorf(codes.Internal, err.Error()) + } + buf.WriteString("\n\n") + + // Now check if we need to filter the goroutines by the provided label filter. + if req.LabelFilter != "" { + return &serverpb.JSONResponse{Data: pprofui.FilterStacksWithLabels(buf.Bytes(), req.LabelFilter)}, nil + } + } else { + if err := p.WriteTo(&buf, 0); err != nil { + return nil, status.Errorf(codes.Internal, err.Error()) + } + } + return &serverpb.JSONResponse{Data: buf.Bytes()}, nil default: name, ok := serverpb.ProfileRequest_Type_name[int32(req.Type)] if !ok {