-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
pprofui: add support for collecting goroutines with labels #105916
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ import ( | |
"net/http" | ||
"net/url" | ||
"path" | ||
"regexp" | ||
runtimepprof "runtime/pprof" | ||
"sort" | ||
"strconv" | ||
|
@@ -97,6 +98,162 @@ func (s *Server) parsePath(reqPath string) (profType string, id string, remainin | |
} | ||
} | ||
|
||
// validateProfileRequest validates that the request does not contain any | ||
// conflicting or invalid configurations. | ||
func validateProfileRequest(req *serverpb.ProfileRequest) error { | ||
switch req.Type { | ||
case serverpb.ProfileRequest_GOROUTINE: | ||
case serverpb.ProfileRequest_CPU: | ||
if req.LabelFilter != "" { | ||
return errors.Newf("filtering using a pprof label is unsupported for %s", req.Type) | ||
} | ||
default: | ||
if req.NodeId == "all" { | ||
return errors.Newf("cluster-wide collection is unsupported for %s", req.Type) | ||
} | ||
|
||
if req.Labels { | ||
return errors.Newf("profiling with labels is unsupported for %s", req.Type) | ||
} | ||
|
||
if req.LabelFilter != "" { | ||
return errors.Newf("filtering using a pprof label is unsupported for %s", req.Type) | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func constructProfileReq(r *http.Request, profileName string) (*serverpb.ProfileRequest, error) { | ||
req, err := http.NewRequest("GET", "/unused", bytes.NewReader(nil)) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
profileType, ok := serverpb.ProfileRequest_Type_value[strings.ToUpper(profileName)] | ||
if !ok && profileName != "profile" { | ||
return nil, errors.Newf("unknown profile name: %s", profileName) | ||
} | ||
// There is a discrepancy between the usage of the "CPU" and | ||
// "profile" names to refer to the CPU profile in the | ||
// implementations. The URL to the CPU profile has been modified | ||
// on the Advanced Debug page to point to /pprof/ui/cpu but this | ||
adityamaru marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// is retained for backwards compatibility. | ||
if profileName == "profile" { | ||
profileType = int32(serverpb.ProfileRequest_CPU) | ||
} | ||
profileReq := &serverpb.ProfileRequest{ | ||
NodeId: "local", | ||
Type: serverpb.ProfileRequest_Type(profileType), | ||
} | ||
|
||
// Pass through any parameters. Most notably, allow ?seconds=10 for | ||
// CPU profiles. | ||
_ = r.ParseForm() | ||
req.Form = r.Form | ||
|
||
if r.Form.Get("seconds") != "" { | ||
sec, err := strconv.ParseInt(r.Form.Get("seconds"), 10, 32) | ||
if err != nil { | ||
return nil, err | ||
} | ||
profileReq.Seconds = int32(sec) | ||
} | ||
if r.Form.Get("node") != "" { | ||
profileReq.NodeId = r.Form.Get("node") | ||
} | ||
if r.Form.Get("labels") != "" { | ||
withLabels, err := strconv.ParseBool(r.Form.Get("labels")) | ||
if err != nil { | ||
return nil, err | ||
} | ||
profileReq.Labels = withLabels | ||
} | ||
if labelFilter := r.Form.Get("labelfilter"); labelFilter != "" { | ||
profileReq.Labels = true | ||
profileReq.LabelFilter = labelFilter | ||
} | ||
if err := validateProfileRequest(profileReq); err != nil { | ||
return nil, err | ||
} | ||
return profileReq, nil | ||
} | ||
|
||
func (s *Server) serveCachedProfile( | ||
id string, remainingPath string, profileName string, w http.ResponseWriter, r *http.Request, | ||
) { | ||
// Catch nonexistent IDs early or pprof will do a worse job at | ||
// giving an informative error. | ||
var isProfileProto bool | ||
if err := s.storage.Get(id, func(b bool, _ io.Reader) error { | ||
isProfileProto = b | ||
return nil | ||
}); err != nil { | ||
msg := fmt.Sprintf("profile for id %s not found: %s", id, err) | ||
http.Error(w, msg, http.StatusNotFound) | ||
return | ||
} | ||
|
||
// If the profile is not in a protobuf format, downloading it is the only | ||
// option. | ||
if !isProfileProto || r.URL.Query().Get("download") != "" { | ||
// TODO(tbg): this has zero discoverability. | ||
adityamaru marked this conversation as resolved.
Show resolved
Hide resolved
|
||
w.Header().Set("Content-Disposition", | ||
fmt.Sprintf("attachment; filename=%s_%s.pb.gz", profileName, id)) | ||
w.Header().Set("Content-Type", "application/octet-stream") | ||
if err := s.storage.Get(id, func(_ bool, r io.Reader) error { | ||
_, err := io.Copy(w, r) | ||
return err | ||
}); err != nil { | ||
http.Error(w, err.Error(), http.StatusInternalServerError) | ||
} | ||
return | ||
} | ||
|
||
server := func(args *driver.HTTPServerArgs) error { | ||
handler, ok := args.Handlers[remainingPath] | ||
if !ok { | ||
return errors.Errorf("unknown endpoint %s", remainingPath) | ||
} | ||
handler.ServeHTTP(w, r) | ||
return nil | ||
} | ||
|
||
storageFetcher := func(_ string, _, _ time.Duration) (*profile.Profile, string, error) { | ||
var p *profile.Profile | ||
if err := s.storage.Get(id, func(_ bool, reader io.Reader) error { | ||
var err error | ||
p, err = profile.Parse(reader) | ||
return err | ||
}); err != nil { | ||
return nil, "", err | ||
} | ||
return p, "", nil | ||
} | ||
|
||
// Invoke the (library version) of `pprof` with a number of stubs. | ||
// Specifically, we pass a fake FlagSet that plumbs through the | ||
// given args, a UI that logs any errors pprof may emit, a fetcher | ||
// that simply reads the profile we downloaded earlier, and a | ||
// HTTPServer that pprof will pass the web ui handlers to at the | ||
// end (and we let it handle this client request). | ||
if err := driver.PProf(&driver.Options{ | ||
Flagset: &pprofFlags{ | ||
FlagSet: pflag.NewFlagSet("pprof", pflag.ExitOnError), | ||
args: []string{ | ||
"--symbolize", "none", | ||
"--http", "localhost:0", | ||
"", // we inject our own target | ||
}, | ||
}, | ||
UI: &fakeUI{}, | ||
Fetch: fetcherFn(storageFetcher), | ||
HTTPServer: server, | ||
}); err != nil { | ||
_, _ = w.Write([]byte(err.Error())) | ||
} | ||
} | ||
|
||
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { | ||
profileName, id, remainingPath := s.parsePath(r.URL.Path) | ||
|
||
|
@@ -113,124 +270,24 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { | |
} | ||
|
||
if id != "" { | ||
// Catch nonexistent IDs early or pprof will do a worse job at | ||
// giving an informative error. | ||
if err := s.storage.Get(id, func(io.Reader) error { return nil }); err != nil { | ||
msg := fmt.Sprintf("profile for id %s not found: %s", id, err) | ||
http.Error(w, msg, http.StatusNotFound) | ||
return | ||
} | ||
|
||
if r.URL.Query().Get("download") != "" { | ||
// TODO(tbg): this has zero discoverability. | ||
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%s_%s.pb.gz", profileName, id)) | ||
w.Header().Set("Content-Type", "application/octet-stream") | ||
if err := s.storage.Get(id, func(r io.Reader) error { | ||
_, err := io.Copy(w, r) | ||
return err | ||
}); err != nil { | ||
http.Error(w, err.Error(), http.StatusInternalServerError) | ||
} | ||
return | ||
} | ||
|
||
server := func(args *driver.HTTPServerArgs) error { | ||
handler, ok := args.Handlers[remainingPath] | ||
if !ok { | ||
return errors.Errorf("unknown endpoint %s", remainingPath) | ||
} | ||
handler.ServeHTTP(w, r) | ||
return nil | ||
} | ||
|
||
storageFetcher := func(_ string, _, _ time.Duration) (*profile.Profile, string, error) { | ||
var p *profile.Profile | ||
if err := s.storage.Get(id, func(reader io.Reader) error { | ||
var err error | ||
p, err = profile.Parse(reader) | ||
return err | ||
}); err != nil { | ||
return nil, "", err | ||
} | ||
return p, "", nil | ||
} | ||
|
||
// Invoke the (library version) of `pprof` with a number of stubs. | ||
// Specifically, we pass a fake FlagSet that plumbs through the | ||
// given args, a UI that logs any errors pprof may emit, a fetcher | ||
// that simply reads the profile we downloaded earlier, and a | ||
// HTTPServer that pprof will pass the web ui handlers to at the | ||
// end (and we let it handle this client request). | ||
if err := driver.PProf(&driver.Options{ | ||
Flagset: &pprofFlags{ | ||
FlagSet: pflag.NewFlagSet("pprof", pflag.ExitOnError), | ||
args: []string{ | ||
"--symbolize", "none", | ||
"--http", "localhost:0", | ||
"", // we inject our own target | ||
}, | ||
}, | ||
UI: &fakeUI{}, | ||
Fetch: fetcherFn(storageFetcher), | ||
HTTPServer: server, | ||
}); err != nil { | ||
_, _ = w.Write([]byte(err.Error())) | ||
} | ||
|
||
s.serveCachedProfile(id, remainingPath, profileName, w, r) | ||
return | ||
} | ||
|
||
// Create and save new profile, then redirect client to corresponding ui URL. | ||
|
||
id = s.storage.ID() | ||
|
||
if err := s.storage.Store(id, func(w io.Writer) error { | ||
req, err := http.NewRequest("GET", "/unused", bytes.NewReader(nil)) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
profileType, ok := serverpb.ProfileRequest_Type_value[strings.ToUpper(profileName)] | ||
if !ok && profileName != "profile" { | ||
return errors.Newf("unknown profile name: %s", profileName) | ||
} | ||
// There is a discrepancy between the usage of the "CPU" and | ||
// "profile" names to refer to the CPU profile in the | ||
// implementations. The URL to the CPU profile has been modified | ||
// on the Advanced Debug page to point to /pprof/ui/cpu but this | ||
// is retained for backwards compatibility. | ||
if profileName == "profile" { | ||
profileType = int32(serverpb.ProfileRequest_CPU) | ||
} | ||
var resp *serverpb.JSONResponse | ||
profileReq := &serverpb.ProfileRequest{ | ||
NodeId: "local", | ||
Type: serverpb.ProfileRequest_Type(profileType), | ||
} | ||
|
||
// Pass through any parameters. Most notably, allow ?seconds=10 for | ||
// CPU profiles. | ||
_ = r.ParseForm() | ||
req.Form = r.Form | ||
|
||
if r.Form.Get("seconds") != "" { | ||
sec, err := strconv.ParseInt(r.Form.Get("seconds"), 10, 32) | ||
if err != nil { | ||
return err | ||
} | ||
profileReq.Seconds = int32(sec) | ||
} | ||
if r.Form.Get("node") != "" { | ||
profileReq.NodeId = r.Form.Get("node") | ||
} | ||
if r.Form.Get("labels") != "" { | ||
labels, err := strconv.ParseBool(r.Form.Get("labels")) | ||
if err != nil { | ||
return err | ||
} | ||
profileReq.Labels = labels | ||
} | ||
resp, err = s.profiler.Profile(r.Context(), profileReq) | ||
profileReq, err := constructProfileReq(r, profileName) | ||
if err != nil { | ||
http.Error(w, err.Error(), http.StatusInternalServerError) | ||
return | ||
} | ||
// A GOROUTINE profile with labels is not generated in a protobuf format. It | ||
// is outputted in a "legacy text format with comments translating addresses | ||
// to function names and line numbers, so that a programmer can read the | ||
// profile without tools." | ||
isProfileProto := !(profileReq.Type == serverpb.ProfileRequest_GOROUTINE && profileReq.Labels) | ||
if err := s.storage.Store(id, isProfileProto, func(w io.Writer) error { | ||
resp, err := s.profiler.Profile(r.Context(), profileReq) | ||
if err != nil { | ||
return err | ||
} | ||
|
@@ -244,6 +301,18 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { | |
return | ||
} | ||
|
||
// If the profile is not in a protobuf format, downloading it is the only | ||
// option. | ||
if !isProfileProto { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've seen this before, why do we have this twice? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This logic exists to serve a newly generated profile. The same logic also exists to serve a cached profile request by profile ID. |
||
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%s_%s.txt", profileName, id)) | ||
w.Header().Set("Content-Type", "text/plain") | ||
_ = s.storage.Get(id, func(isProtoProfile bool, r io.Reader) error { | ||
_, err := io.Copy(w, r) | ||
return err | ||
}) | ||
return | ||
} | ||
|
||
// NB: direct straight to the flamegraph. This is because `pprof` | ||
// shells out to `dot` for the default landing page and thus works | ||
// only on hosts that have graphviz installed. You can still navigate | ||
|
@@ -263,13 +332,42 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { | |
if !isGoPProf { | ||
http.Redirect(w, r, origURL.String(), http.StatusTemporaryRedirect) | ||
} else { | ||
_ = s.storage.Get(id, func(r io.Reader) error { | ||
_ = s.storage.Get(id, func(isProtoProfile bool, r io.Reader) error { | ||
_, err := io.Copy(w, r) | ||
return err | ||
}) | ||
} | ||
} | ||
|
||
// FilterStacksWithLabels writes all the stacks that have pprof labels matching | ||
// any one of the expectedLabels to resBuf. | ||
adityamaru marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// | ||
// The input must be in the format used by debug/pprof/goroutines?debug=1. | ||
func FilterStacksWithLabels(stacks []byte, labelFilter string) []byte { | ||
if labelFilter == "" { | ||
return stacks | ||
} | ||
res := bytes.NewBuffer(nil) | ||
goroutines := strings.Split(string(stacks), "\n\n") | ||
// The first element is the nodeID which we want to always keep. | ||
res.WriteString(goroutines[0]) | ||
goroutines = goroutines[1:] | ||
regex := regexp.MustCompile(fmt.Sprintf(`labels: {.*%s.*}`, labelFilter)) | ||
tbg marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for _, g := range goroutines { | ||
// pprof.Lookup("goroutine") with debug=1 renders the pprof labels | ||
// corresponding to a stack as: | ||
// # labels: {"foo":"bar", "baz":"biz"}. | ||
match := regex.MatchString(g) | ||
if match { | ||
res.WriteString("\n\n") | ||
res.WriteString(g) | ||
} | ||
} | ||
res.WriteString("\n\n") | ||
|
||
return res.Bytes() | ||
} | ||
|
||
type fetcherFn func(_ string, _, _ time.Duration) (*profile.Profile, string, error) | ||
|
||
func (f fetcherFn) Fetch(s string, d, t time.Duration) (*profile.Profile, string, error) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this code movement? For future PRs, see if there is a way to have a separate commit that mechanically moves code (and the reviewer can trust that's all that's happening) and then layer additional modifications of the logic on top in separate commits. As is, I basically have to review from scratch, which is a lot more work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was just code movement. Yeah I realize that a separate refactor only commit would have made this a lot easier. The truth is I didn't realize this change would balloon into what it did but I should've made the call to switch to smaller commits before getting in so deep. Noted for next time!