Skip to content

Commit

Permalink
Merge #105384
Browse files Browse the repository at this point in the history
105384: jobsprofiler: enable requesting a job's execution details r=dt a=adityamaru

Similar to statement bundles this change introduces the
infrastructure to request, collect and read the execution
details for a particular job.
Right now, the execution details  will only contain the
latest DSP diagram for a job, but going forward this will
give us a place to dump raw files such as:
- cluster-wide job traces
- cpu profiles
- trace-driven aggregated stats
- raw payload and progress protos

Downloading some or all of these execution details will be
exposed in a future patch in all of the places where
statement bundles are today:
- DBConsole
- CLI shell
- SQL shell

This change introduces a builtin that allows the caller
to request the collection and persistence of a job's
current execution details.

This change also introduces a new endpoint on the status
server to read the data corresponding to the execution details
persisted for a job. The next set of
PRs will add the necessary components to allow downloading
the files from the DBConsole.

Informs: #105076

Release note: None

Co-authored-by: adityamaru <[email protected]>
  • Loading branch information
craig[bot] and adityamaru committed Jul 11, 2023
2 parents 92d5d0e + 0558434 commit 89d6fdd
Show file tree
Hide file tree
Showing 14 changed files with 456 additions and 20 deletions.
45 changes: 45 additions & 0 deletions docs/generated/http/full.md
Original file line number Diff line number Diff line change
Expand Up @@ -5196,6 +5196,51 @@ Support status: [reserved](#support-status)



## GetJobProfilerExecutionDetails

`GET /_status/job_profiler_execution_details/{job_id}/{filename}`



Support status: [reserved](#support-status)

#### Request Parameters







| Field | Type | Label | Description | Support status |
| ----- | ---- | ----- | ----------- | -------------- |
| job_id | [int64](#cockroach.server.serverpb.GetJobProfilerExecutionDetailRequest-int64) | | | [reserved](#support-status) |
| filename | [string](#cockroach.server.serverpb.GetJobProfilerExecutionDetailRequest-string) | | | [reserved](#support-status) |







#### Response Parameters







| Field | Type | Label | Description | Support status |
| ----- | ---- | ----- | ----------- | -------------- |
| data | [bytes](#cockroach.server.serverpb.GetJobProfilerExecutionDetailResponse-bytes) | | | [reserved](#support-status) |







## RequestCA

`GET /_join/v1/ca`
Expand Down
2 changes: 2 additions & 0 deletions docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -3276,6 +3276,8 @@ active for the current transaction.</p>
</span></td><td>Volatile</td></tr>
<tr><td><a name="crdb_internal.repair_ttl_table_scheduled_job"></a><code>crdb_internal.repair_ttl_table_scheduled_job(oid: oid) &rarr; void</code></td><td><span class="funcdesc"><p>Repairs the scheduled job for a TTL table if it is missing.</p>
</span></td><td>Volatile</td></tr>
<tr><td><a name="crdb_internal.request_job_execution_details"></a><code>crdb_internal.request_job_execution_details(jobID: <a href="int.html">int</a>) &rarr; <a href="bool.html">bool</a></code></td><td><span class="funcdesc"><p>Used to request the collection of execution details for a given job ID</p>
</span></td><td>Volatile</td></tr>
<tr><td><a name="crdb_internal.request_statement_bundle"></a><code>crdb_internal.request_statement_bundle(stmtFingerprint: <a href="string.html">string</a>, samplingProbability: <a href="float.html">float</a>, minExecutionLatency: <a href="interval.html">interval</a>, expiresAfter: <a href="interval.html">interval</a>) &rarr; <a href="bool.html">bool</a></code></td><td><span class="funcdesc"><p>Used to request statement bundle for a given statement fingerprint
that has execution latency greater than the ‘minExecutionLatency’. If the
‘expiresAfter’ argument is empty, then the statement bundle request never
Expand Down
1 change: 0 additions & 1 deletion pkg/jobs/jobsprofiler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ go_library(
"//pkg/util/log",
"//pkg/util/stop",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
],
)

Expand Down
11 changes: 2 additions & 9 deletions pkg/jobs/jobsprofiler/profiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)

// StorePlanDiagram stores the DistSQL diagram generated from p in the job info
Expand All @@ -45,10 +44,7 @@ func StorePlanDiagram(
return err
}

dspKey, err := profilerconstants.MakeDSPDiagramInfoKey(timeutil.Now().UnixNano())
if err != nil {
return errors.Wrap(err, "failed to construct DSP diagram info key")
}
dspKey := profilerconstants.MakeDSPDiagramInfoKey(timeutil.Now().UnixNano())
infoStorage := jobs.InfoStorageForJob(txn, jobID)
return infoStorage.Write(ctx, dspKey, []byte(diagURL.String()))
})
Expand All @@ -75,11 +71,8 @@ func StorePerNodeProcessorProgressFraction(
if err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := jobs.InfoStorageForJob(txn, jobID)
for componentID, fraction := range perComponentProgress {
key, err := profilerconstants.MakeNodeProcessorProgressInfoKey(componentID.FlowID.String(),
key := profilerconstants.MakeNodeProcessorProgressInfoKey(componentID.FlowID.String(),
componentID.SQLInstanceID.String(), componentID.ID)
if err != nil {
return errors.Wrap(err, "failed to construct progress info key")
}
fractionBytes := []byte(fmt.Sprintf("%f", fraction))
return infoStorage.Write(ctx, key, fractionBytes)
}
Expand Down
26 changes: 20 additions & 6 deletions pkg/jobs/jobsprofiler/profilerconstants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (
const DSPDiagramInfoKeyPrefix = "~dsp-diag-url-"

// MakeDSPDiagramInfoKey constructs an ephemeral DSP diagram info key.
func MakeDSPDiagramInfoKey(timestampInNanos int64) (string, error) {
return fmt.Sprintf("%s%d", DSPDiagramInfoKeyPrefix, timestampInNanos), nil
func MakeDSPDiagramInfoKey(timestampInNanos int64) string {
return fmt.Sprintf("%s%d", DSPDiagramInfoKeyPrefix, timestampInNanos)
}

// NodeProcessorProgressInfoKeyPrefix is the prefix of the info key used for
Expand All @@ -32,11 +32,25 @@ const NodeProcessorProgressInfoKeyPrefix = "~node-processor-progress-"

// MakeNodeProcessorProgressInfoKey returns the info_key used for rows that
// store the per node, per processor progress for a job.
func MakeNodeProcessorProgressInfoKey(
flowID string, instanceID string, processorID int32,
) (string, error) {
func MakeNodeProcessorProgressInfoKey(flowID string, instanceID string, processorID int32) string {
// The info key is of the form: <prefix>-<flowID>,<instanceID>,<processorID>.
return fmt.Sprintf("%s%s,%s,%d", NodeProcessorProgressInfoKeyPrefix, flowID, instanceID, processorID), nil
return fmt.Sprintf("%s%s,%s,%d", NodeProcessorProgressInfoKeyPrefix, flowID, instanceID, processorID)
}

// ExecutionDetailsChunkKeyPrefix is the prefix of the info key used for rows that
// store chunks of a job's execution details.
const ExecutionDetailsChunkKeyPrefix = "~profiler/"

// MakeProfilerExecutionDetailsChunkKeyPrefix is the prefix of the info key used to store all
// chunks of a job's execution details for a given filename.
func MakeProfilerExecutionDetailsChunkKeyPrefix(filename string) string {
return fmt.Sprintf("%s%s", ExecutionDetailsChunkKeyPrefix, filename)
}

// MakeProfilerBundleChunkKey is the info key used to store a chunk of a job's
// execution details for a given filename.
func MakeProfilerBundleChunkKey(filename string, chunkCounter int) string {
return fmt.Sprintf("%s%s#%04d", ExecutionDetailsChunkKeyPrefix, filename, chunkCounter)
}

// GetNodeProcessorProgressInfoKeyParts deconstructs the passed in info key and
Expand Down
17 changes: 17 additions & 0 deletions pkg/server/serverpb/status.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2055,6 +2055,16 @@ message NetworkConnectivityResponse {
];
}

message GetJobProfilerExecutionDetailRequest {
int64 job_id = 1;
string filename = 2;
}

message GetJobProfilerExecutionDetailResponse {
bytes data = 1;
}


service Status {
// Certificates retrieves a copy of the TLS certificates.
rpc Certificates(CertificatesRequest) returns (CertificatesResponse) {
Expand Down Expand Up @@ -2523,4 +2533,11 @@ service Status {
get: "/_status/connectivity"
};
}


rpc GetJobProfilerExecutionDetails(GetJobProfilerExecutionDetailRequest) returns (GetJobProfilerExecutionDetailResponse) {
option (google.api.http) = {
get: "/_status/job_profiler_execution_details/{job_id}/{filename}"
};
}
}
22 changes: 22 additions & 0 deletions pkg/server/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -4118,3 +4118,25 @@ func (s *statusServer) TransactionContentionEvents(

return resp, nil
}

// GetJobProfilerExecutionDetails reads all the stored execution details for a
// given job ID.
func (s *statusServer) GetJobProfilerExecutionDetails(
ctx context.Context, req *serverpb.GetJobProfilerExecutionDetailRequest,
) (*serverpb.GetJobProfilerExecutionDetailResponse, error) {
ctx = s.AnnotateCtx(ctx)
// TODO(adityamaru): Figure out the correct privileges required to get execution details.
_, err := s.privilegeChecker.requireAdminUser(ctx)
if err != nil {
return nil, err
}

jobID := jobspb.JobID(req.JobId)
execCfg := s.sqlServer.execCfg
eb := sql.MakeJobProfilerExecutionDetailsBuilder(execCfg.InternalDB, jobID)
data, err := eb.ReadExecutionDetail(ctx, req.Filename)
if err != nil {
return nil, err
}
return &serverpb.GetJobProfilerExecutionDetailResponse{Data: data}, nil
}
3 changes: 3 additions & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ go_library(
"job_exec_context_test_util.go",
"jobs_collection.go",
"jobs_execution_details.go",
"jobs_profiler_bundle.go",
"join.go",
"join_predicate.go",
"join_token.go",
Expand Down Expand Up @@ -576,6 +577,7 @@ go_library(
"@com_github_dustin_go_humanize//:go-humanize",
"@com_github_gogo_protobuf//proto",
"@com_github_gogo_protobuf//types",
"@com_github_klauspost_compress//gzip",
"@com_github_lib_pq//:pq",
"@com_github_lib_pq//oid",
"@com_github_prometheus_client_model//go",
Expand Down Expand Up @@ -648,6 +650,7 @@ go_test(
"instrumentation_test.go",
"internal_test.go",
"jobs_execution_details_test.go",
"jobs_profiler_bundle_test.go",
"join_token_test.go",
"main_test.go",
"materialized_view_test.go",
Expand Down
5 changes: 1 addition & 4 deletions pkg/sql/jobs_execution_details.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,7 @@ func constructBackupExecutionDetails(
if err != nil {
return err
}
key, err := profilerconstants.MakeDSPDiagramInfoKey(timeutil.Now().UnixNano())
if err != nil {
return errors.Wrap(err, "failed to construct DSP info key")
}
key := profilerconstants.MakeDSPDiagramInfoKey(timeutil.Now().UnixNano())
if err := infoStorage.Write(ctx, key, []byte(annotatedURL.String())); err != nil {
return err
}
Expand Down
Loading

0 comments on commit 89d6fdd

Please sign in to comment.