Skip to content

Commit

Permalink
jobsprofiler: enable requesting a job's execution details
Browse files Browse the repository at this point in the history
Similar to statement bundles this change introduces the
infrastructure to request, collect and read the execution
details for a particular job.
Right now, the execution details  will only contain the
latest DSP diagram for a job, but going forward this will
give us a place to dump raw files such as:
- cluster-wide job traces
- cpu profiles
- trace-driven aggregated stats
- raw payload and progress protos

Downloading some or all of these execution details will be
exposed in a future patch in all of the places where
statement bundles are today:
- DBConsole
- CLI shell
- SQL shell

This change introduces a builtin that allows the caller
to request the collection and persistence of a job's
current execution details.

This change also introduces a new endpoint on the status
server to read the data corresponding to the execution details
persisted for a job. The next set of
PRs will add the necessary components to allow downloading
the files from the DBConsole.

Informs: cockroachdb#105076

Release note: None
  • Loading branch information
adityamaru committed Jul 11, 2023
1 parent c3a0838 commit 0558434
Show file tree
Hide file tree
Showing 14 changed files with 456 additions and 20 deletions.
45 changes: 45 additions & 0 deletions docs/generated/http/full.md
Original file line number Diff line number Diff line change
Expand Up @@ -5196,6 +5196,51 @@ Support status: [reserved](#support-status)



## GetJobProfilerExecutionDetails

`GET /_status/job_profiler_execution_details/{job_id}/{filename}`



Support status: [reserved](#support-status)

#### Request Parameters







| Field | Type | Label | Description | Support status |
| ----- | ---- | ----- | ----------- | -------------- |
| job_id | [int64](#cockroach.server.serverpb.GetJobProfilerExecutionDetailRequest-int64) | | | [reserved](#support-status) |
| filename | [string](#cockroach.server.serverpb.GetJobProfilerExecutionDetailRequest-string) | | | [reserved](#support-status) |







#### Response Parameters







| Field | Type | Label | Description | Support status |
| ----- | ---- | ----- | ----------- | -------------- |
| data | [bytes](#cockroach.server.serverpb.GetJobProfilerExecutionDetailResponse-bytes) | | | [reserved](#support-status) |







## RequestCA

`GET /_join/v1/ca`
Expand Down
2 changes: 2 additions & 0 deletions docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -3276,6 +3276,8 @@ active for the current transaction.</p>
</span></td><td>Volatile</td></tr>
<tr><td><a name="crdb_internal.repair_ttl_table_scheduled_job"></a><code>crdb_internal.repair_ttl_table_scheduled_job(oid: oid) &rarr; void</code></td><td><span class="funcdesc"><p>Repairs the scheduled job for a TTL table if it is missing.</p>
</span></td><td>Volatile</td></tr>
<tr><td><a name="crdb_internal.request_job_execution_details"></a><code>crdb_internal.request_job_execution_details(jobID: <a href="int.html">int</a>) &rarr; <a href="bool.html">bool</a></code></td><td><span class="funcdesc"><p>Used to request the collection of execution details for a given job ID</p>
</span></td><td>Volatile</td></tr>
<tr><td><a name="crdb_internal.request_statement_bundle"></a><code>crdb_internal.request_statement_bundle(stmtFingerprint: <a href="string.html">string</a>, samplingProbability: <a href="float.html">float</a>, minExecutionLatency: <a href="interval.html">interval</a>, expiresAfter: <a href="interval.html">interval</a>) &rarr; <a href="bool.html">bool</a></code></td><td><span class="funcdesc"><p>Used to request statement bundle for a given statement fingerprint
that has execution latency greater than the ‘minExecutionLatency’. If the
‘expiresAfter’ argument is empty, then the statement bundle request never
Expand Down
1 change: 0 additions & 1 deletion pkg/jobs/jobsprofiler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ go_library(
"//pkg/util/log",
"//pkg/util/stop",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
],
)

Expand Down
11 changes: 2 additions & 9 deletions pkg/jobs/jobsprofiler/profiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)

// StorePlanDiagram stores the DistSQL diagram generated from p in the job info
Expand All @@ -45,10 +44,7 @@ func StorePlanDiagram(
return err
}

dspKey, err := profilerconstants.MakeDSPDiagramInfoKey(timeutil.Now().UnixNano())
if err != nil {
return errors.Wrap(err, "failed to construct DSP diagram info key")
}
dspKey := profilerconstants.MakeDSPDiagramInfoKey(timeutil.Now().UnixNano())
infoStorage := jobs.InfoStorageForJob(txn, jobID)
return infoStorage.Write(ctx, dspKey, []byte(diagURL.String()))
})
Expand All @@ -75,11 +71,8 @@ func StorePerNodeProcessorProgressFraction(
if err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := jobs.InfoStorageForJob(txn, jobID)
for componentID, fraction := range perComponentProgress {
key, err := profilerconstants.MakeNodeProcessorProgressInfoKey(componentID.FlowID.String(),
key := profilerconstants.MakeNodeProcessorProgressInfoKey(componentID.FlowID.String(),
componentID.SQLInstanceID.String(), componentID.ID)
if err != nil {
return errors.Wrap(err, "failed to construct progress info key")
}
fractionBytes := []byte(fmt.Sprintf("%f", fraction))
return infoStorage.Write(ctx, key, fractionBytes)
}
Expand Down
26 changes: 20 additions & 6 deletions pkg/jobs/jobsprofiler/profilerconstants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (
const DSPDiagramInfoKeyPrefix = "~dsp-diag-url-"

// MakeDSPDiagramInfoKey constructs an ephemeral DSP diagram info key.
func MakeDSPDiagramInfoKey(timestampInNanos int64) (string, error) {
return fmt.Sprintf("%s%d", DSPDiagramInfoKeyPrefix, timestampInNanos), nil
func MakeDSPDiagramInfoKey(timestampInNanos int64) string {
return fmt.Sprintf("%s%d", DSPDiagramInfoKeyPrefix, timestampInNanos)
}

// NodeProcessorProgressInfoKeyPrefix is the prefix of the info key used for
Expand All @@ -32,11 +32,25 @@ const NodeProcessorProgressInfoKeyPrefix = "~node-processor-progress-"

// MakeNodeProcessorProgressInfoKey returns the info_key used for rows that
// store the per node, per processor progress for a job.
func MakeNodeProcessorProgressInfoKey(
flowID string, instanceID string, processorID int32,
) (string, error) {
func MakeNodeProcessorProgressInfoKey(flowID string, instanceID string, processorID int32) string {
// The info key is of the form: <prefix>-<flowID>,<instanceID>,<processorID>.
return fmt.Sprintf("%s%s,%s,%d", NodeProcessorProgressInfoKeyPrefix, flowID, instanceID, processorID), nil
return fmt.Sprintf("%s%s,%s,%d", NodeProcessorProgressInfoKeyPrefix, flowID, instanceID, processorID)
}

// ExecutionDetailsChunkKeyPrefix is the prefix of the info key used for rows that
// store chunks of a job's execution details.
const ExecutionDetailsChunkKeyPrefix = "~profiler/"

// MakeProfilerExecutionDetailsChunkKeyPrefix is the prefix of the info key used to store all
// chunks of a job's execution details for a given filename.
func MakeProfilerExecutionDetailsChunkKeyPrefix(filename string) string {
return fmt.Sprintf("%s%s", ExecutionDetailsChunkKeyPrefix, filename)
}

// MakeProfilerBundleChunkKey is the info key used to store a chunk of a job's
// execution details for a given filename.
func MakeProfilerBundleChunkKey(filename string, chunkCounter int) string {
return fmt.Sprintf("%s%s#%04d", ExecutionDetailsChunkKeyPrefix, filename, chunkCounter)
}

// GetNodeProcessorProgressInfoKeyParts deconstructs the passed in info key and
Expand Down
17 changes: 17 additions & 0 deletions pkg/server/serverpb/status.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2055,6 +2055,16 @@ message NetworkConnectivityResponse {
];
}

message GetJobProfilerExecutionDetailRequest {
int64 job_id = 1;
string filename = 2;
}

message GetJobProfilerExecutionDetailResponse {
bytes data = 1;
}


service Status {
// Certificates retrieves a copy of the TLS certificates.
rpc Certificates(CertificatesRequest) returns (CertificatesResponse) {
Expand Down Expand Up @@ -2523,4 +2533,11 @@ service Status {
get: "/_status/connectivity"
};
}


rpc GetJobProfilerExecutionDetails(GetJobProfilerExecutionDetailRequest) returns (GetJobProfilerExecutionDetailResponse) {
option (google.api.http) = {
get: "/_status/job_profiler_execution_details/{job_id}/{filename}"
};
}
}
22 changes: 22 additions & 0 deletions pkg/server/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -4118,3 +4118,25 @@ func (s *statusServer) TransactionContentionEvents(

return resp, nil
}

// GetJobProfilerExecutionDetails reads all the stored execution details for a
// given job ID.
func (s *statusServer) GetJobProfilerExecutionDetails(
ctx context.Context, req *serverpb.GetJobProfilerExecutionDetailRequest,
) (*serverpb.GetJobProfilerExecutionDetailResponse, error) {
ctx = s.AnnotateCtx(ctx)
// TODO(adityamaru): Figure out the correct privileges required to get execution details.
_, err := s.privilegeChecker.requireAdminUser(ctx)
if err != nil {
return nil, err
}

jobID := jobspb.JobID(req.JobId)
execCfg := s.sqlServer.execCfg
eb := sql.MakeJobProfilerExecutionDetailsBuilder(execCfg.InternalDB, jobID)
data, err := eb.ReadExecutionDetail(ctx, req.Filename)
if err != nil {
return nil, err
}
return &serverpb.GetJobProfilerExecutionDetailResponse{Data: data}, nil
}
3 changes: 3 additions & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ go_library(
"job_exec_context_test_util.go",
"jobs_collection.go",
"jobs_execution_details.go",
"jobs_profiler_bundle.go",
"join.go",
"join_predicate.go",
"join_token.go",
Expand Down Expand Up @@ -576,6 +577,7 @@ go_library(
"@com_github_dustin_go_humanize//:go-humanize",
"@com_github_gogo_protobuf//proto",
"@com_github_gogo_protobuf//types",
"@com_github_klauspost_compress//gzip",
"@com_github_lib_pq//:pq",
"@com_github_lib_pq//oid",
"@com_github_prometheus_client_model//go",
Expand Down Expand Up @@ -648,6 +650,7 @@ go_test(
"instrumentation_test.go",
"internal_test.go",
"jobs_execution_details_test.go",
"jobs_profiler_bundle_test.go",
"join_token_test.go",
"main_test.go",
"materialized_view_test.go",
Expand Down
5 changes: 1 addition & 4 deletions pkg/sql/jobs_execution_details.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,7 @@ func constructBackupExecutionDetails(
if err != nil {
return err
}
key, err := profilerconstants.MakeDSPDiagramInfoKey(timeutil.Now().UnixNano())
if err != nil {
return errors.Wrap(err, "failed to construct DSP info key")
}
key := profilerconstants.MakeDSPDiagramInfoKey(timeutil.Now().UnixNano())
if err := infoStorage.Write(ctx, key, []byte(annotatedURL.String())); err != nil {
return err
}
Expand Down
Loading

0 comments on commit 0558434

Please sign in to comment.