Skip to content

Commit

Permalink
server,sql: add status server endpoint to request profiler details
Browse files Browse the repository at this point in the history
This change introduces a new status server endpoint to
request job profiler details. This endpoint will redirect
the request to the current coordinator node of the job in
question. This will be useful because in a followup we will
load the resumer from the coordinator node's job registry
and trigger its specific job profiler detail collection logic.

This is the first step of a few to move to a "fetch model" rather
than have each resumer dump their execution details at some
arbitrary cadence.

The core logic involved in collecting profiler details has not changed,
it has been moved in its entirety from pkg/sql to pkg/server. The
`crdb_internal.request_job_execution_details` builtin now resolves
the job's coordinator ID and calls the new status server endpoint.

Informs: #109671
Release note: None
  • Loading branch information
adityamaru committed Sep 5, 2023
1 parent 2b71df6 commit a99de9f
Show file tree
Hide file tree
Showing 15 changed files with 430 additions and 126 deletions.
40 changes: 40 additions & 0 deletions docs/generated/http/full.md
Original file line number Diff line number Diff line change
Expand Up @@ -5204,6 +5204,46 @@ Support status: [reserved](#support-status)



## RequestJobProfilerExecutionDetails

`GET /_status/request_job_profiler_execution_details/{job_id}`



Support status: [reserved](#support-status)

#### Request Parameters







| Field | Type | Label | Description | Support status |
| ----- | ---- | ----- | ----------- | -------------- |
| job_id | [int64](#cockroach.server.serverpb.RequestJobProfilerExecutionDetailsRequest-int64) | | | [reserved](#support-status) |







#### Response Parameters













## GetJobProfilerExecutionDetails

`GET /_status/job_profiler_execution_details/{job_id}`
Expand Down
18 changes: 18 additions & 0 deletions pkg/jobs/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,24 @@ func JobExists(
return row != nil, nil
}

// JobCoordinatorID returns the coordinator node ID of the job.
func JobCoordinatorID(
ctx context.Context, jobID jobspb.JobID, txn *kv.Txn, ex isql.Executor,
) (int32, error) {
row, err := ex.QueryRow(ctx, "fetch-job-coordinator", txn, `SELECT claim_instance_id FROM system.jobs WHERE id = $1`, jobID)
if err != nil {
return 0, err
}
if row == nil {
return 0, errors.Errorf("coordinator not found for job %d", jobID)
}
coordinatorID, ok := tree.AsDInt(row[0])
if !ok {
return 0, errors.AssertionFailedf("expected coordinator ID to be an int, got %T", row[0])
}
return int32(coordinatorID), nil
}

// isJobTypeColumnDoesNotExistError returns true if the error is of the form
// `column "job_type" does not exist`.
func isJobTypeColumnDoesNotExistError(err error) bool {
Expand Down
6 changes: 6 additions & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ go_library(
"init.go",
"init_handshake.go",
"initial_sql.go",
"job_profiler.go",
"key_visualizer_server.go",
"listen_and_update_addrs.go",
"load_endpoint.go",
Expand Down Expand Up @@ -336,6 +337,7 @@ go_library(
"//pkg/util/tracing/tracingpb",
"//pkg/util/tracing/tracingservicepb",
"//pkg/util/tracing/tracingui",
"//pkg/util/tracing/zipper",
"//pkg/util/uint128",
"//pkg/util/uuid",
"@com_github_cenkalti_backoff//:backoff",
Expand Down Expand Up @@ -434,6 +436,7 @@ go_test(
"index_usage_stats_test.go",
"init_handshake_test.go",
"intent_test.go",
"job_profiler_test.go",
"load_endpoint_test.go",
"main_test.go",
"migration_test.go",
Expand Down Expand Up @@ -478,10 +481,13 @@ go_test(
"//pkg/base/serverident",
"//pkg/build",
"//pkg/cli/exit",
"//pkg/cloud/impl:cloudimpl",
"//pkg/clusterversion",
"//pkg/config",
"//pkg/config/zonepb",
"//pkg/gossip",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient/kvtenant",
Expand Down
175 changes: 175 additions & 0 deletions pkg/server/job_profiler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package server

import (
"context"
"fmt"
"strconv"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/server/srverrors"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing/zipper"
"github.com/cockroachdb/errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// RequestJobProfilerExecutionDetails requests the profiler details for a job.
// This method ensures that the details are requested on the current coordinator
// node of the job to allow for the collection of Resumer specific details.
func (s *statusServer) RequestJobProfilerExecutionDetails(
ctx context.Context, req *serverpb.RequestJobProfilerExecutionDetailsRequest,
) (*serverpb.RequestJobProfilerExecutionDetailsResponse, error) {
ctx = s.AnnotateCtx(ctx)
// TODO(adityamaru): Figure out the correct privileges required to request execution details.
_, err := s.privilegeChecker.RequireAdminUser(ctx)
if err != nil {
return nil, err
}

execCfg := s.sqlServer.execCfg
var coordinatorID int32
err = execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// Find the jobs' current coordinator node ID.
coordinatorID, err = jobs.JobCoordinatorID(ctx, jobspb.JobID(req.JobId),
txn, execCfg.InternalDB.Executor())
return err
})
if err != nil {
return nil, err
}

nodeID, local, err := s.parseNodeID(strconv.Itoa(int(coordinatorID)))
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, err.Error())
}

// If this node is the current coordinator of the job then we can collect the
// profiler details.
if local {
jobID := jobspb.JobID(req.JobId)
if !execCfg.Settings.Version.IsActive(ctx, clusterversion.V23_2) {
return nil, errors.Newf("execution details can only be requested on a cluster with version >= %s",
clusterversion.V23_2.String())
}
e := makeJobProfilerExecutionDetailsBuilder(execCfg.SQLStatusServer, execCfg.InternalDB, jobID, execCfg.JobRegistry)

// TODO(adityamaru): When we start collecting more information we can consider
// parallelize the collection of the various pieces.
e.addDistSQLDiagram(ctx)
e.addLabelledGoroutines(ctx)
e.addClusterWideTraces(ctx)

// TODO(dt,adityamaru): add logic to reach out the registry and call resumer
// specific execution details collection logic.

return &serverpb.RequestJobProfilerExecutionDetailsResponse{}, nil
}

// Forward the request to the coordinator node
status, err := s.dialNode(ctx, nodeID)
if err != nil {
return nil, srverrors.ServerError(ctx, err)
}
return status.RequestJobProfilerExecutionDetails(ctx, req)
}

// executionDetailsBuilder can be used to read and write execution details corresponding
// to a job.
type executionDetailsBuilder struct {
srv serverpb.SQLStatusServer
db isql.DB
jobID jobspb.JobID
registry *jobs.Registry
}

// makeJobProfilerExecutionDetailsBuilder returns an instance of an executionDetailsBuilder.
func makeJobProfilerExecutionDetailsBuilder(
srv serverpb.SQLStatusServer, db isql.DB, jobID jobspb.JobID, registry *jobs.Registry,
) executionDetailsBuilder {
e := executionDetailsBuilder{
srv: srv, db: db, jobID: jobID, registry: registry,
}
return e
}

// addLabelledGoroutines collects and persists goroutines from all nodes in the
// cluster that have a pprof label tying it to the job whose execution details
// are being collected.
func (e *executionDetailsBuilder) addLabelledGoroutines(ctx context.Context) {
profileRequest := serverpb.ProfileRequest{
NodeId: "all",
Type: serverpb.ProfileRequest_GOROUTINE,
Labels: true,
LabelFilter: fmt.Sprintf("%d", e.jobID),
}
resp, err := e.srv.Profile(ctx, &profileRequest)
if err != nil {
log.Errorf(ctx, "failed to collect goroutines for job %d: %v", e.jobID, err.Error())
return
}
filename := fmt.Sprintf("goroutines.%s.txt", timeutil.Now().Format("20060102_150405.00"))
if err := jobs.WriteExecutionDetailFile(ctx, filename, resp.Data, e.db, e.jobID); err != nil {
log.Errorf(ctx, "failed to write goroutine for job %d: %v", e.jobID, err.Error())
}
}

// addDistSQLDiagram generates and persists a `distsql.<timestamp>.html` file.
func (e *executionDetailsBuilder) addDistSQLDiagram(ctx context.Context) {
query := `SELECT plan_diagram FROM [SHOW JOB $1 WITH EXECUTION DETAILS]`
row, err := e.db.Executor().QueryRowEx(ctx, "profiler-bundler-add-diagram", nil, /* txn */
sessiondata.NoSessionDataOverride, query, e.jobID)
if err != nil {
log.Errorf(ctx, "failed to write DistSQL diagram for job %d: %v", e.jobID, err.Error())
return
}
if row != nil && row[0] != tree.DNull {
dspDiagramURL := string(tree.MustBeDString(row[0]))
filename := fmt.Sprintf("distsql.%s.html", timeutil.Now().Format("20060102_150405.00"))
if err := jobs.WriteExecutionDetailFile(ctx, filename,
[]byte(fmt.Sprintf(`<meta http-equiv="Refresh" content="0; url=%s">`, dspDiagramURL)),
e.db, e.jobID); err != nil {
log.Errorf(ctx, "failed to write DistSQL diagram for job %d: %v", e.jobID, err.Error())
}
}
}

// addClusterWideTraces generates and persists a `trace.<timestamp>.zip` file
// that captures the active tracing spans of a job on all nodes in the cluster.
func (e *executionDetailsBuilder) addClusterWideTraces(ctx context.Context) {
z := zipper.MakeInternalExecutorInflightTraceZipper(e.db.Executor())

traceID, err := jobs.GetJobTraceID(ctx, e.db, e.jobID)
if err != nil {
log.Warningf(ctx, "failed to fetch job trace ID: %+v", err.Error())
return
}
zippedTrace, err := z.Zip(ctx, int64(traceID))
if err != nil {
log.Errorf(ctx, "failed to collect cluster wide traces for job %d: %v", e.jobID, err.Error())
return
}

filename := fmt.Sprintf("trace.%s.zip", timeutil.Now().Format("20060102_150405.00"))
if err := jobs.WriteExecutionDetailFile(ctx, filename, zippedTrace, e.db, e.jobID); err != nil {
log.Errorf(ctx, "failed to write traces for job %d: %v", e.jobID, err.Error())
}
}
Loading

0 comments on commit a99de9f

Please sign in to comment.