From 0558434755416457c873859db0c39d005f11f176 Mon Sep 17 00:00:00 2001
From: adityamaru
Date: Thu, 22 Jun 2023 15:53:09 -0400
Subject: [PATCH] jobsprofiler: enable requesting a job's execution details
Similar to statement bundles this change introduces the
infrastructure to request, collect and read the execution
details for a particular job.
Right now, the execution details will only contain the
latest DSP diagram for a job, but going forward this will
give us a place to dump raw files such as:
- cluster-wide job traces
- cpu profiles
- trace-driven aggregated stats
- raw payload and progress protos
Downloading some or all of these execution details will be
exposed in a future patch in all of the places where
statement bundles are today:
- DBConsole
- CLI shell
- SQL shell
This change introduces a builtin that allows the caller
to request the collection and persistence of a job's
current execution details.
This change also introduces a new endpoint on the status
server to read the data corresponding to the execution details
persisted for a job. The next set of
PRs will add the necessary components to allow downloading
the files from the DBConsole.
Informs: #105076
Release note: None
---
docs/generated/http/full.md | 45 +++++
docs/generated/sql/functions.md | 2 +
pkg/jobs/jobsprofiler/BUILD.bazel | 1 -
pkg/jobs/jobsprofiler/profiler.go | 11 +-
.../profilerconstants/constants.go | 26 ++-
pkg/server/serverpb/status.proto | 17 ++
pkg/server/status.go | 22 +++
pkg/sql/BUILD.bazel | 3 +
pkg/sql/jobs_execution_details.go | 5 +-
pkg/sql/jobs_profiler_bundle.go | 184 ++++++++++++++++++
pkg/sql/jobs_profiler_bundle_test.go | 115 +++++++++++
pkg/sql/sem/builtins/builtins.go | 37 ++++
pkg/sql/sem/builtins/fixed_oids.go | 1 +
pkg/sql/sem/eval/context.go | 7 +
14 files changed, 456 insertions(+), 20 deletions(-)
create mode 100644 pkg/sql/jobs_profiler_bundle.go
create mode 100644 pkg/sql/jobs_profiler_bundle_test.go
diff --git a/docs/generated/http/full.md b/docs/generated/http/full.md
index 7a264d6add83..12312e521aea 100644
--- a/docs/generated/http/full.md
+++ b/docs/generated/http/full.md
@@ -5196,6 +5196,51 @@ Support status: [reserved](#support-status)
+## GetJobProfilerExecutionDetails
+
+`GET /_status/job_profiler_execution_details/{job_id}/{filename}`
+
+
+
+Support status: [reserved](#support-status)
+
+#### Request Parameters
+
+
+
+
+
+
+
+| Field | Type | Label | Description | Support status |
+| ----- | ---- | ----- | ----------- | -------------- |
+| job_id | [int64](#cockroach.server.serverpb.GetJobProfilerExecutionDetailRequest-int64) | | | [reserved](#support-status) |
+| filename | [string](#cockroach.server.serverpb.GetJobProfilerExecutionDetailRequest-string) | | | [reserved](#support-status) |
+
+
+
+
+
+
+
+#### Response Parameters
+
+
+
+
+
+
+
+| Field | Type | Label | Description | Support status |
+| ----- | ---- | ----- | ----------- | -------------- |
+| data | [bytes](#cockroach.server.serverpb.GetJobProfilerExecutionDetailResponse-bytes) | | | [reserved](#support-status) |
+
+
+
+
+
+
+
## RequestCA
`GET /_join/v1/ca`
diff --git a/docs/generated/sql/functions.md b/docs/generated/sql/functions.md
index 3ea806340263..d52dfd0a0cca 100644
--- a/docs/generated/sql/functions.md
+++ b/docs/generated/sql/functions.md
@@ -3276,6 +3276,8 @@ active for the current transaction.
crdb_internal.request_statement_bundle(stmtFingerprint: string, samplingProbability: float, minExecutionLatency: interval, expiresAfter: interval) → bool | Used to request statement bundle for a given statement fingerprint
that has execution latency greater than the ‘minExecutionLatency’. If the
‘expiresAfter’ argument is empty, then the statement bundle request never
diff --git a/pkg/jobs/jobsprofiler/BUILD.bazel b/pkg/jobs/jobsprofiler/BUILD.bazel
index 298eaee2dd28..6b345922d7ac 100644
--- a/pkg/jobs/jobsprofiler/BUILD.bazel
+++ b/pkg/jobs/jobsprofiler/BUILD.bazel
@@ -15,7 +15,6 @@ go_library(
"//pkg/util/log",
"//pkg/util/stop",
"//pkg/util/timeutil",
- "@com_github_cockroachdb_errors//:errors",
],
)
diff --git a/pkg/jobs/jobsprofiler/profiler.go b/pkg/jobs/jobsprofiler/profiler.go
index d39d97aaa711..43c37151a3ae 100644
--- a/pkg/jobs/jobsprofiler/profiler.go
+++ b/pkg/jobs/jobsprofiler/profiler.go
@@ -23,7 +23,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
- "github.com/cockroachdb/errors"
)
// StorePlanDiagram stores the DistSQL diagram generated from p in the job info
@@ -45,10 +44,7 @@ func StorePlanDiagram(
return err
}
- dspKey, err := profilerconstants.MakeDSPDiagramInfoKey(timeutil.Now().UnixNano())
- if err != nil {
- return errors.Wrap(err, "failed to construct DSP diagram info key")
- }
+ dspKey := profilerconstants.MakeDSPDiagramInfoKey(timeutil.Now().UnixNano())
infoStorage := jobs.InfoStorageForJob(txn, jobID)
return infoStorage.Write(ctx, dspKey, []byte(diagURL.String()))
})
@@ -75,11 +71,8 @@ func StorePerNodeProcessorProgressFraction(
if err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := jobs.InfoStorageForJob(txn, jobID)
for componentID, fraction := range perComponentProgress {
- key, err := profilerconstants.MakeNodeProcessorProgressInfoKey(componentID.FlowID.String(),
+ key := profilerconstants.MakeNodeProcessorProgressInfoKey(componentID.FlowID.String(),
componentID.SQLInstanceID.String(), componentID.ID)
- if err != nil {
- return errors.Wrap(err, "failed to construct progress info key")
- }
fractionBytes := []byte(fmt.Sprintf("%f", fraction))
return infoStorage.Write(ctx, key, fractionBytes)
}
diff --git a/pkg/jobs/jobsprofiler/profilerconstants/constants.go b/pkg/jobs/jobsprofiler/profilerconstants/constants.go
index 8a8e2107f06e..12933f39b71f 100644
--- a/pkg/jobs/jobsprofiler/profilerconstants/constants.go
+++ b/pkg/jobs/jobsprofiler/profilerconstants/constants.go
@@ -22,8 +22,8 @@ import (
const DSPDiagramInfoKeyPrefix = "~dsp-diag-url-"
// MakeDSPDiagramInfoKey constructs an ephemeral DSP diagram info key.
-func MakeDSPDiagramInfoKey(timestampInNanos int64) (string, error) {
- return fmt.Sprintf("%s%d", DSPDiagramInfoKeyPrefix, timestampInNanos), nil
+func MakeDSPDiagramInfoKey(timestampInNanos int64) string {
+ return fmt.Sprintf("%s%d", DSPDiagramInfoKeyPrefix, timestampInNanos)
}
// NodeProcessorProgressInfoKeyPrefix is the prefix of the info key used for
@@ -32,11 +32,25 @@ const NodeProcessorProgressInfoKeyPrefix = "~node-processor-progress-"
// MakeNodeProcessorProgressInfoKey returns the info_key used for rows that
// store the per node, per processor progress for a job.
-func MakeNodeProcessorProgressInfoKey(
- flowID string, instanceID string, processorID int32,
-) (string, error) {
+func MakeNodeProcessorProgressInfoKey(flowID string, instanceID string, processorID int32) string {
// The info key is of the form: -,,.
- return fmt.Sprintf("%s%s,%s,%d", NodeProcessorProgressInfoKeyPrefix, flowID, instanceID, processorID), nil
+ return fmt.Sprintf("%s%s,%s,%d", NodeProcessorProgressInfoKeyPrefix, flowID, instanceID, processorID)
+}
+
+// ExecutionDetailsChunkKeyPrefix is the prefix of the info key used for rows that
+// store chunks of a job's execution details.
+const ExecutionDetailsChunkKeyPrefix = "~profiler/"
+
+// MakeProfilerExecutionDetailsChunkKeyPrefix is the prefix of the info key used to store all
+// chunks of a job's execution details for a given filename.
+func MakeProfilerExecutionDetailsChunkKeyPrefix(filename string) string {
+ return fmt.Sprintf("%s%s", ExecutionDetailsChunkKeyPrefix, filename)
+}
+
+// MakeProfilerBundleChunkKey is the info key used to store a chunk of a job's
+// execution details for a given filename.
+func MakeProfilerBundleChunkKey(filename string, chunkCounter int) string {
+ return fmt.Sprintf("%s%s#%04d", ExecutionDetailsChunkKeyPrefix, filename, chunkCounter)
}
// GetNodeProcessorProgressInfoKeyParts deconstructs the passed in info key and
diff --git a/pkg/server/serverpb/status.proto b/pkg/server/serverpb/status.proto
index afd1d292f71c..567504c3b877 100644
--- a/pkg/server/serverpb/status.proto
+++ b/pkg/server/serverpb/status.proto
@@ -2055,6 +2055,16 @@ message NetworkConnectivityResponse {
];
}
+ message GetJobProfilerExecutionDetailRequest {
+ int64 job_id = 1;
+ string filename = 2;
+ }
+
+ message GetJobProfilerExecutionDetailResponse {
+ bytes data = 1;
+ }
+
+
service Status {
// Certificates retrieves a copy of the TLS certificates.
rpc Certificates(CertificatesRequest) returns (CertificatesResponse) {
@@ -2523,4 +2533,11 @@ service Status {
get: "/_status/connectivity"
};
}
+
+
+ rpc GetJobProfilerExecutionDetails(GetJobProfilerExecutionDetailRequest) returns (GetJobProfilerExecutionDetailResponse) {
+ option (google.api.http) = {
+ get: "/_status/job_profiler_execution_details/{job_id}/{filename}"
+ };
+ }
}
diff --git a/pkg/server/status.go b/pkg/server/status.go
index 2fad36d60b2d..a99c49f90db9 100644
--- a/pkg/server/status.go
+++ b/pkg/server/status.go
@@ -4118,3 +4118,25 @@ func (s *statusServer) TransactionContentionEvents(
return resp, nil
}
+
+// GetJobProfilerExecutionDetails reads all the stored execution details for a
+// given job ID.
+func (s *statusServer) GetJobProfilerExecutionDetails(
+ ctx context.Context, req *serverpb.GetJobProfilerExecutionDetailRequest,
+) (*serverpb.GetJobProfilerExecutionDetailResponse, error) {
+ ctx = s.AnnotateCtx(ctx)
+ // TODO(adityamaru): Figure out the correct privileges required to get execution details.
+ _, err := s.privilegeChecker.requireAdminUser(ctx)
+ if err != nil {
+ return nil, err
+ }
+
+ jobID := jobspb.JobID(req.JobId)
+ execCfg := s.sqlServer.execCfg
+ eb := sql.MakeJobProfilerExecutionDetailsBuilder(execCfg.InternalDB, jobID)
+ data, err := eb.ReadExecutionDetail(ctx, req.Filename)
+ if err != nil {
+ return nil, err
+ }
+ return &serverpb.GetJobProfilerExecutionDetailResponse{Data: data}, nil
+}
diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel
index c79a50cf5dfa..77722665b591 100644
--- a/pkg/sql/BUILD.bazel
+++ b/pkg/sql/BUILD.bazel
@@ -142,6 +142,7 @@ go_library(
"job_exec_context_test_util.go",
"jobs_collection.go",
"jobs_execution_details.go",
+ "jobs_profiler_bundle.go",
"join.go",
"join_predicate.go",
"join_token.go",
@@ -576,6 +577,7 @@ go_library(
"@com_github_dustin_go_humanize//:go-humanize",
"@com_github_gogo_protobuf//proto",
"@com_github_gogo_protobuf//types",
+ "@com_github_klauspost_compress//gzip",
"@com_github_lib_pq//:pq",
"@com_github_lib_pq//oid",
"@com_github_prometheus_client_model//go",
@@ -648,6 +650,7 @@ go_test(
"instrumentation_test.go",
"internal_test.go",
"jobs_execution_details_test.go",
+ "jobs_profiler_bundle_test.go",
"join_token_test.go",
"main_test.go",
"materialized_view_test.go",
diff --git a/pkg/sql/jobs_execution_details.go b/pkg/sql/jobs_execution_details.go
index 1483a1685c65..1a0238dfba2b 100644
--- a/pkg/sql/jobs_execution_details.go
+++ b/pkg/sql/jobs_execution_details.go
@@ -145,10 +145,7 @@ func constructBackupExecutionDetails(
if err != nil {
return err
}
- key, err := profilerconstants.MakeDSPDiagramInfoKey(timeutil.Now().UnixNano())
- if err != nil {
- return errors.Wrap(err, "failed to construct DSP info key")
- }
+ key := profilerconstants.MakeDSPDiagramInfoKey(timeutil.Now().UnixNano())
if err := infoStorage.Write(ctx, key, []byte(annotatedURL.String())); err != nil {
return err
}
diff --git a/pkg/sql/jobs_profiler_bundle.go b/pkg/sql/jobs_profiler_bundle.go
new file mode 100644
index 000000000000..ed6a87780b25
--- /dev/null
+++ b/pkg/sql/jobs_profiler_bundle.go
@@ -0,0 +1,184 @@
+// Copyright 2023 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package sql
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "io"
+ "strings"
+
+ "github.com/cockroachdb/cockroach/pkg/clusterversion"
+ "github.com/cockroachdb/cockroach/pkg/jobs"
+ "github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
+ "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler/profilerconstants"
+ "github.com/cockroachdb/cockroach/pkg/sql/isql"
+ "github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
+ "github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
+ "github.com/cockroachdb/cockroach/pkg/util/log"
+ "github.com/cockroachdb/cockroach/pkg/util/timeutil"
+ "github.com/cockroachdb/errors"
+ "github.com/klauspost/compress/gzip"
+)
+
+const bundleChunkSize = 1 << 20 // 1 MiB
+
+// RequestExecutionDetails implements the JobProfiler interface.
+func (p *planner) RequestExecutionDetails(ctx context.Context, jobID jobspb.JobID) error {
+ execCfg := p.ExecCfg()
+ if !execCfg.Settings.Version.IsActive(ctx, clusterversion.V23_1) {
+ return errors.Newf("execution details can only be requested on a cluster with version >= %s",
+ clusterversion.V23_1.String())
+ }
+
+ e := MakeJobProfilerExecutionDetailsBuilder(execCfg.InternalDB, jobID)
+ // TODO(adityamaru): When we start collecting more information we can consider
+ // parallelize the collection of the various pieces.
+ e.addDistSQLDiagram(ctx)
+
+ return nil
+}
+
+// ExecutionDetailsBuilder can be used to read and write execution details corresponding
+// to a job.
+type ExecutionDetailsBuilder struct {
+ db isql.DB
+ jobID jobspb.JobID
+}
+
+func compressChunk(chunkBuf []byte) ([]byte, error) {
+ gzipBuf := bytes.NewBuffer([]byte{})
+ gz := gzip.NewWriter(gzipBuf)
+ if _, err := gz.Write(chunkBuf); err != nil {
+ return nil, err
+ }
+ if err := gz.Close(); err != nil {
+ return nil, err
+ }
+ return gzipBuf.Bytes(), nil
+}
+
+// WriteExecutionDetail will break up data into chunks of a fixed size, and
+// gzip compress them before writing them to the job_info table.
+func (e *ExecutionDetailsBuilder) WriteExecutionDetail(
+ ctx context.Context, filename string, data []byte,
+) error {
+ return e.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
+ // Take a copy of the data to operate on inside the txn closure.
+ chunkData := data[:]
+ jobInfo := jobs.InfoStorageForJob(txn, e.jobID)
+
+ var chunkCounter int
+ chunkFileName := filename
+ for len(chunkData) > 0 {
+ chunkSize := bundleChunkSize
+ chunk := chunkData
+ if len(chunk) > chunkSize {
+ chunk = chunk[:chunkSize]
+ } else {
+ // This is the last chunk we will write, assign it a sentinel file name.
+ chunkFileName = chunkFileName + "_final"
+ }
+ chunkData = chunkData[len(chunk):]
+ var err error
+ chunk, err = compressChunk(chunk)
+ if err != nil {
+ return errors.Wrapf(err, "failed to compress chunk for file %s", chunkFileName)
+ }
+
+ // On listing we want the info_key of each chunk to sort after the
+ // previous chunk of the same file so that the chunks can be reassembled
+ // on download. For this reason we use a monotonically increasing
+ // chunk counter as the suffix.
+ err = jobInfo.Write(ctx, profilerconstants.MakeProfilerBundleChunkKey(chunkFileName, chunkCounter), chunk)
+ if err != nil {
+ return errors.Wrapf(err, "failed to write chunk for file %s", chunkFileName)
+ }
+ chunkCounter++
+ }
+ return nil
+ })
+}
+
+// ReadExecutionDetail will stitch together all the chunks corresponding to the
+// filename and return the uncompressed data of the file.
+func (e *ExecutionDetailsBuilder) ReadExecutionDetail(
+ ctx context.Context, filename string,
+) ([]byte, error) {
+ // TODO(adityamaru): If filename=all add logic to zip up all the files corresponding
+ // to the job's execution details and return the zipped bundle instead.
+
+ buf := bytes.NewBuffer([]byte{})
+ if err := e.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
+ // Reset the buf inside the txn closure to guard against txn retries.
+ buf.Reset()
+ jobInfo := jobs.InfoStorageForJob(txn, e.jobID)
+
+ // Iterate over all the chunks of the requested file and return the unzipped
+ // chunks of data.
+ var lastInfoKey string
+ if err := jobInfo.Iterate(ctx, profilerconstants.MakeProfilerExecutionDetailsChunkKeyPrefix(filename),
+ func(infoKey string, value []byte) error {
+ lastInfoKey = infoKey
+ r, err := gzip.NewReader(bytes.NewBuffer(value))
+ if err != nil {
+ return err
+ }
+ decompressed, err := io.ReadAll(r)
+ if err != nil {
+ return err
+ }
+ buf.Write(decompressed)
+ return nil
+ }); err != nil {
+ return errors.Wrapf(err, "failed to iterate over chunks for job %d", jobID)
+ }
+
+ if lastInfoKey != "" && !strings.Contains(lastInfoKey, "_final") {
+ return errors.Newf("failed to read all chunks for file %s, last info key read was %s", filename, lastInfoKey)
+ }
+
+ return nil
+ }); err != nil {
+ return nil, err
+ }
+ return buf.Bytes(), nil
+}
+
+// MakeJobProfilerExecutionDetailsBuilder returns an instance of an ExecutionDetailsBuilder.
+func MakeJobProfilerExecutionDetailsBuilder(
+ db isql.DB, jobID jobspb.JobID,
+) ExecutionDetailsBuilder {
+ e := ExecutionDetailsBuilder{
+ db: db, jobID: jobID,
+ }
+ return e
+}
+
+// addDistSQLDiagram generates and persists a `distsql..html` file.
+func (e *ExecutionDetailsBuilder) addDistSQLDiagram(ctx context.Context) {
+ query := `SELECT plan_diagram FROM [SHOW JOB $1 WITH EXECUTION DETAILS]`
+ row, err := e.db.Executor().QueryRowEx(ctx, "profiler-bundler-add-diagram", nil, /* txn */
+ sessiondata.NoSessionDataOverride, query, e.jobID)
+ if err != nil {
+ log.Errorf(ctx, "failed to write DistSQL diagram for job %d: %+v", e.jobID, err.Error())
+ return
+ }
+ if row[0] != tree.DNull {
+ dspDiagramURL := string(tree.MustBeDString(row[0]))
+ filename := fmt.Sprintf("distsql.%s.html", timeutil.Now().Format("20060102_150405"))
+ if err := e.WriteExecutionDetail(ctx, filename,
+ []byte(fmt.Sprintf(``, dspDiagramURL))); err != nil {
+ log.Errorf(ctx, "failed to write DistSQL diagram for job %d: %+v", e.jobID, err.Error())
+ }
+ }
+}
diff --git a/pkg/sql/jobs_profiler_bundle_test.go b/pkg/sql/jobs_profiler_bundle_test.go
new file mode 100644
index 000000000000..dca3fb483d93
--- /dev/null
+++ b/pkg/sql/jobs_profiler_bundle_test.go
@@ -0,0 +1,115 @@
+// Copyright 2023 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package sql_test
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "io"
+ "net/http"
+ "testing"
+ "time"
+
+ "github.com/cockroachdb/cockroach/pkg/base"
+ "github.com/cockroachdb/cockroach/pkg/jobs"
+ "github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
+ "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler"
+ "github.com/cockroachdb/cockroach/pkg/server/serverpb"
+ "github.com/cockroachdb/cockroach/pkg/settings/cluster"
+ "github.com/cockroachdb/cockroach/pkg/sql"
+ "github.com/cockroachdb/cockroach/pkg/sql/isql"
+ "github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
+ "github.com/cockroachdb/cockroach/pkg/sql/tests"
+ "github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
+ "github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
+ "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
+ "github.com/cockroachdb/cockroach/pkg/util/httputil"
+ "github.com/cockroachdb/cockroach/pkg/util/leaktest"
+ "github.com/cockroachdb/cockroach/pkg/util/log"
+ "github.com/cockroachdb/cockroach/pkg/util/protoutil"
+ "github.com/cockroachdb/cockroach/pkg/util/uuid"
+ "github.com/stretchr/testify/require"
+)
+
+// TestReadWriteProfilerExecutionDetails is an end-to-end test of requesting and collecting
+// execution details for a job.
+func TestReadWriteProfilerExecutionDetails(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+
+ // Timeout the test in a few minutes if it hasn't succeeded.
+ ctx := context.Background()
+ ctx, cancel := context.WithTimeout(ctx, time.Minute*2)
+ defer cancel()
+
+ params, _ := tests.CreateTestServerParams()
+ params.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals()
+ defer jobs.ResetConstructors()()
+ s, sqlDB, _ := serverutils.StartServer(t, params)
+ defer s.Stopper().Stop(ctx)
+
+ runner := sqlutils.MakeSQLRunner(sqlDB)
+
+ jobs.RegisterConstructor(jobspb.TypeImport, func(j *jobs.Job, _ *cluster.Settings) jobs.Resumer {
+ return fakeExecResumer{
+ OnResume: func(ctx context.Context) error {
+ p := sql.PhysicalPlan{}
+ infra := physicalplan.NewPhysicalInfrastructure(uuid.FastMakeV4(), base.SQLInstanceID(1))
+ p.PhysicalInfrastructure = infra
+ jobsprofiler.StorePlanDiagram(ctx, s.Stopper(), &p, s.InternalDB().(isql.DB), j.ID())
+ checkForPlanDiagram(ctx, t, s.InternalDB().(isql.DB), j.ID())
+ return nil
+ },
+ }
+ }, jobs.UsesTenantCostControl)
+
+ runner.Exec(t, `CREATE TABLE t (id INT)`)
+ runner.Exec(t, `INSERT INTO t SELECT generate_series(1, 100)`)
+
+ t.Run("read/write DistSQL diagram", func(t *testing.T) {
+ var importJobID int
+ runner.QueryRow(t, `IMPORT INTO t CSV DATA ('nodelocal://1/foo') WITH DETACHED`).Scan(&importJobID)
+ jobutils.WaitForJobToSucceed(t, runner, jobspb.JobID(importJobID))
+
+ runner.Exec(t, `SELECT crdb_internal.request_job_execution_details($1)`, importJobID)
+ checkExecutionDetails(t, s, jobspb.JobID(importJobID), "distsql")
+ })
+}
+
+func checkExecutionDetails(
+ t *testing.T, s serverutils.TestServerInterface, jobID jobspb.JobID, filename string,
+) {
+ t.Helper()
+
+ client, err := s.GetAdminHTTPClient()
+ require.NoError(t, err)
+
+ url := s.AdminURL().String() + fmt.Sprintf("/_status/job_profiler_execution_details/%d/%s", jobID, filename)
+ req, err := http.NewRequest("GET", url, nil)
+ require.NoError(t, err)
+
+ req.Header.Set("Content-Type", httputil.ProtoContentType)
+ resp, err := client.Do(req)
+ require.NoError(t, err)
+ defer resp.Body.Close()
+ body, err := io.ReadAll(resp.Body)
+ require.NoError(t, err)
+ require.Equal(t, http.StatusOK, resp.StatusCode)
+
+ edResp := serverpb.GetJobProfilerExecutionDetailResponse{}
+ require.NoError(t, protoutil.Unmarshal(body, &edResp))
+
+ r := bytes.NewReader(edResp.Data)
+ data, err := io.ReadAll(r)
+ require.NoError(t, err)
+ require.NotEmpty(t, data)
+}
diff --git a/pkg/sql/sem/builtins/builtins.go b/pkg/sql/sem/builtins/builtins.go
index 7d9f9fdc1ad7..676ea0206fcc 100644
--- a/pkg/sql/sem/builtins/builtins.go
+++ b/pkg/sql/sem/builtins/builtins.go
@@ -7699,6 +7699,43 @@ specified store on the node it's run from. One of 'mvccGC', 'merge', 'split',
},
),
+ "crdb_internal.request_job_execution_details": makeBuiltin(
+ tree.FunctionProperties{
+ Category: builtinconstants.CategorySystemInfo,
+ DistsqlBlocklist: true, // applicable only on the gateway
+ },
+ tree.Overload{
+ Types: tree.ParamTypes{
+ {Name: "jobID", Typ: types.Int},
+ },
+ ReturnType: tree.FixedReturnType(types.Bool),
+ Fn: func(ctx context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) {
+ // TODO(adityamaru): Figure out the correct permissions for collecting a
+ // job profiler bundle. For now only allow the admin role.
+ isAdmin, err := evalCtx.SessionAccessor.HasAdminRole(ctx)
+ if err != nil {
+ return nil, err
+ }
+
+ if !isAdmin {
+ return nil, errors.New("must be admin to request a job profiler bundle")
+ }
+
+ jobID := int(tree.MustBeDInt(args[0]))
+ if err := evalCtx.JobsProfiler.RequestExecutionDetails(
+ ctx,
+ jobspb.JobID(jobID),
+ ); err != nil {
+ return nil, err
+ }
+
+ return tree.DBoolTrue, nil
+ },
+ Volatility: volatility.Volatile,
+ Info: `Used to request the collection of execution details for a given job ID`,
+ },
+ ),
+
"crdb_internal.request_statement_bundle": makeBuiltin(
tree.FunctionProperties{
Category: builtinconstants.CategorySystemInfo,
diff --git a/pkg/sql/sem/builtins/fixed_oids.go b/pkg/sql/sem/builtins/fixed_oids.go
index 41ecb4d56de9..9e274a4f79c0 100644
--- a/pkg/sql/sem/builtins/fixed_oids.go
+++ b/pkg/sql/sem/builtins/fixed_oids.go
@@ -2427,6 +2427,7 @@ var builtinOidsArray = []string{
2454: `crdb_internal.sstable_metrics(node_id: int, store_id: int, start_key: bytes, end_key: bytes) -> tuple{int AS node_id,, int AS store_id, int AS level, int AS file_num, jsonb AS metrics}`,
2455: `crdb_internal.repair_catalog_corruption(descriptor_id: int, corruption: string) -> bool`,
2456: `crdb_internal.merge_aggregated_stmt_metadata(input: jsonb[]) -> jsonb`,
+ 2457: `crdb_internal.request_job_execution_details(jobID: int) -> bool`,
}
var builtinOidsBySignature map[string]oid.Oid
diff --git a/pkg/sql/sem/eval/context.go b/pkg/sql/sem/eval/context.go
index 9778808948e7..7c0dd9248ddc 100644
--- a/pkg/sql/sem/eval/context.go
+++ b/pkg/sql/sem/eval/context.go
@@ -286,6 +286,13 @@ type JobsProfiler interface {
// GenerateExecutionDetailsJSON generates a JSON blob of the job specific
// execution details.
GenerateExecutionDetailsJSON(ctx context.Context, evalCtx *Context, jobID jobspb.JobID) ([]byte, error)
+
+ // RequestExecutionDetails triggers the collection of execution details for
+ // the specified jobID that are then persisted to `system.job_info`. This
+ // currently includes the following pieces of information:
+ //
+ // - Latest DistSQL diagram of the job
+ RequestExecutionDetails(ctx context.Context, jobID jobspb.JobID) error
}
// DescIDGenerator generates unique descriptor IDs.
|