diff --git a/docs/generated/http/full.md b/docs/generated/http/full.md index a6e7b772c5d8..6905863a0652 100644 --- a/docs/generated/http/full.md +++ b/docs/generated/http/full.md @@ -5241,6 +5241,50 @@ Support status: [reserved](#support-status) +## ListJobProfilerExecutionDetails + +`GET /_status/list_job_profiler_execution_details/{job_id}` + + + +Support status: [reserved](#support-status) + +#### Request Parameters + + + + + + + +| Field | Type | Label | Description | Support status | +| ----- | ---- | ----- | ----------- | -------------- | +| job_id | [int64](#cockroach.server.serverpb.ListJobProfilerExecutionDetailsRequest-int64) | | | [reserved](#support-status) | + + + + + + + +#### Response Parameters + + + + + + + +| Field | Type | Label | Description | Support status | +| ----- | ---- | ----- | ----------- | -------------- | +| files | [string](#cockroach.server.serverpb.ListJobProfilerExecutionDetailsResponse-string) | repeated | | [reserved](#support-status) | + + + + + + + ## RequestCA `GET /_join/v1/ca` diff --git a/pkg/jobs/jobsprofiler/profilerconstants/constants.go b/pkg/jobs/jobsprofiler/profilerconstants/constants.go index 12933f39b71f..32623fcec04e 100644 --- a/pkg/jobs/jobsprofiler/profilerconstants/constants.go +++ b/pkg/jobs/jobsprofiler/profilerconstants/constants.go @@ -42,15 +42,15 @@ func MakeNodeProcessorProgressInfoKey(flowID string, instanceID string, processo const ExecutionDetailsChunkKeyPrefix = "~profiler/" // MakeProfilerExecutionDetailsChunkKeyPrefix is the prefix of the info key used to store all -// chunks of a job's execution details for a given filename. +// chunks of a job's execution details. func MakeProfilerExecutionDetailsChunkKeyPrefix(filename string) string { return fmt.Sprintf("%s%s", ExecutionDetailsChunkKeyPrefix, filename) } -// MakeProfilerBundleChunkKey is the info key used to store a chunk of a job's -// execution details for a given filename. -func MakeProfilerBundleChunkKey(filename string, chunkCounter int) string { - return fmt.Sprintf("%s%s#%04d", ExecutionDetailsChunkKeyPrefix, filename, chunkCounter) +// MakeProfilerExecutionDetailsChunkKey is the info key used to store a chunk of +// a job's execution details. +func MakeProfilerExecutionDetailsChunkKey(chunkName string) string { + return fmt.Sprintf("%s%s", ExecutionDetailsChunkKeyPrefix, chunkName) } // GetNodeProcessorProgressInfoKeyParts deconstructs the passed in info key and diff --git a/pkg/server/serverpb/status.proto b/pkg/server/serverpb/status.proto index 567504c3b877..d56a9c0088f8 100644 --- a/pkg/server/serverpb/status.proto +++ b/pkg/server/serverpb/status.proto @@ -2064,6 +2064,14 @@ message NetworkConnectivityResponse { bytes data = 1; } + message ListJobProfilerExecutionDetailsRequest { + int64 job_id = 1; + } + + message ListJobProfilerExecutionDetailsResponse { + repeated string files = 1; + } + service Status { // Certificates retrieves a copy of the TLS certificates. @@ -2540,4 +2548,11 @@ service Status { get: "/_status/job_profiler_execution_details/{job_id}/{filename}" }; } + + rpc ListJobProfilerExecutionDetails(ListJobProfilerExecutionDetailsRequest) returns + (ListJobProfilerExecutionDetailsResponse) { + option (google.api.http) = { + get: "/_status/list_job_profiler_execution_details/{job_id}" + }; + } } diff --git a/pkg/server/status.go b/pkg/server/status.go index 6217e2052a6a..6949903a3e9f 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -4146,3 +4146,25 @@ func (s *statusServer) GetJobProfilerExecutionDetails( } return &serverpb.GetJobProfilerExecutionDetailResponse{Data: data}, nil } + +// ListJobProfilerExecutionDetails lists all the stored execution details for a +// given job ID. +func (s *statusServer) ListJobProfilerExecutionDetails( + ctx context.Context, req *serverpb.ListJobProfilerExecutionDetailsRequest, +) (*serverpb.ListJobProfilerExecutionDetailsResponse, error) { + ctx = s.AnnotateCtx(ctx) + // TODO(adityamaru): Figure out the correct privileges required to get execution details. + _, err := s.privilegeChecker.requireAdminUser(ctx) + if err != nil { + return nil, err + } + + jobID := jobspb.JobID(req.JobId) + execCfg := s.sqlServer.execCfg + eb := sql.MakeJobProfilerExecutionDetailsBuilder(execCfg.SQLStatusServer, execCfg.InternalDB, jobID) + files, err := eb.ListExecutionDetailFiles(ctx) + if err != nil { + return nil, err + } + return &serverpb.ListJobProfilerExecutionDetailsResponse{Files: files}, nil +} diff --git a/pkg/sql/jobs_execution_details_test.go b/pkg/sql/jobs_execution_details_test.go index 5034e3c95e7b..7ea0677c0c34 100644 --- a/pkg/sql/jobs_execution_details_test.go +++ b/pkg/sql/jobs_execution_details_test.go @@ -63,18 +63,23 @@ func (d fakeExecResumer) OnFailOrCancel(ctx context.Context, _ interface{}, _ er // checkForPlanDiagram is a method used in tests to wait for the existence of a // DSP diagram for the provided jobID. -func checkForPlanDiagram(ctx context.Context, t *testing.T, db isql.DB, jobID jobspb.JobID) { +func checkForPlanDiagrams( + ctx context.Context, t *testing.T, db isql.DB, jobID jobspb.JobID, expectedNumDiagrams int, +) { testutils.SucceedsSoon(t, func() error { return db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { infoStorage := jobs.InfoStorageForJob(txn, jobID) - var found bool - err := infoStorage.GetLast(ctx, profilerconstants.DSPDiagramInfoKeyPrefix, + var found int + err := infoStorage.Iterate(ctx, profilerconstants.DSPDiagramInfoKeyPrefix, func(infoKey string, value []byte) error { - found = true + found++ return nil }) - if err != nil || !found { - return errors.New("not found") + if err != nil { + return err + } + if found != expectedNumDiagrams { + return errors.Newf("found %d diagrams, expected %d", found, expectedNumDiagrams) } return nil }) @@ -107,7 +112,7 @@ func TestJobsExecutionDetails(t *testing.T) { infra := physicalplan.NewPhysicalInfrastructure(uuid.FastMakeV4(), base.SQLInstanceID(1)) p.PhysicalInfrastructure = infra jobsprofiler.StorePlanDiagram(ctx, s.Stopper(), &p, s.InternalDB().(isql.DB), j.ID()) - checkForPlanDiagram(ctx, t, s.InternalDB().(isql.DB), j.ID()) + checkForPlanDiagrams(ctx, t, s.InternalDB().(isql.DB), j.ID(), 1) return nil }, } diff --git a/pkg/sql/jobs_profiler_bundle.go b/pkg/sql/jobs_profiler_bundle.go index eacc76db80ce..22a9fa7a3556 100644 --- a/pkg/sql/jobs_profiler_bundle.go +++ b/pkg/sql/jobs_profiler_bundle.go @@ -32,6 +32,7 @@ import ( ) const bundleChunkSize = 1 << 20 // 1 MiB +const finalChunkSuffix = "#_final" // RequestExecutionDetails implements the JobProfiler interface. func (p *planner) RequestExecutionDetails(ctx context.Context, jobID jobspb.JobID) error { @@ -81,30 +82,31 @@ func (e *ExecutionDetailsBuilder) WriteExecutionDetail( jobInfo := jobs.InfoStorageForJob(txn, e.jobID) var chunkCounter int - chunkFileName := filename + var chunkName string for len(chunkData) > 0 { chunkSize := bundleChunkSize chunk := chunkData if len(chunk) > chunkSize { + chunkName = fmt.Sprintf("%s#%04d", filename, chunkCounter) chunk = chunk[:chunkSize] } else { // This is the last chunk we will write, assign it a sentinel file name. - chunkFileName = chunkFileName + "_final" + chunkName = filename + finalChunkSuffix } chunkData = chunkData[len(chunk):] var err error chunk, err = compressChunk(chunk) if err != nil { - return errors.Wrapf(err, "failed to compress chunk for file %s", chunkFileName) + return errors.Wrapf(err, "failed to compress chunk for file %s", filename) } // On listing we want the info_key of each chunk to sort after the // previous chunk of the same file so that the chunks can be reassembled // on download. For this reason we use a monotonically increasing // chunk counter as the suffix. - err = jobInfo.Write(ctx, profilerconstants.MakeProfilerBundleChunkKey(chunkFileName, chunkCounter), chunk) + err = jobInfo.Write(ctx, profilerconstants.MakeProfilerExecutionDetailsChunkKey(chunkName), chunk) if err != nil { - return errors.Wrapf(err, "failed to write chunk for file %s", chunkFileName) + return errors.Wrapf(err, "failed to write chunk for file %s", filename) } chunkCounter++ } @@ -146,7 +148,7 @@ func (e *ExecutionDetailsBuilder) ReadExecutionDetail( return errors.Wrapf(err, "failed to iterate over chunks for job %d", jobID) } - if lastInfoKey != "" && !strings.Contains(lastInfoKey, "_final") { + if lastInfoKey != "" && !strings.Contains(lastInfoKey, finalChunkSuffix) { return errors.Newf("failed to read all chunks for file %s, last info key read was %s", filename, lastInfoKey) } @@ -157,6 +159,35 @@ func (e *ExecutionDetailsBuilder) ReadExecutionDetail( return buf.Bytes(), nil } +// ListExecutionDetailFiles lists all the files that have been generated as part +// of a job's execution details. +func (e *ExecutionDetailsBuilder) ListExecutionDetailFiles(ctx context.Context) ([]string, error) { + var res []string + if err := e.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + jobInfo := jobs.InfoStorageForJob(txn, e.jobID) + + // Iterate over all the files that have been stored as part of the job's + // execution details. + files := make([]string, 0) + if err := jobInfo.Iterate(ctx, profilerconstants.ExecutionDetailsChunkKeyPrefix, + func(infoKey string, value []byte) error { + // Look for the final chunk of each file to find the unique file name. + if strings.HasSuffix(infoKey, finalChunkSuffix) { + files = append(files, strings.TrimSuffix(infoKey, finalChunkSuffix)) + } + return nil + }); err != nil { + return errors.Wrapf(err, "failed to iterate over execution detail files for job %d", jobID) + } + res = files + return nil + }); err != nil { + return nil, err + } + + return res, nil +} + // MakeJobProfilerExecutionDetailsBuilder returns an instance of an ExecutionDetailsBuilder. func MakeJobProfilerExecutionDetailsBuilder( srv serverpb.SQLStatusServer, db isql.DB, jobID jobspb.JobID, @@ -199,7 +230,7 @@ func (e *ExecutionDetailsBuilder) addDistSQLDiagram(ctx context.Context) { } if row[0] != tree.DNull { dspDiagramURL := string(tree.MustBeDString(row[0])) - filename := fmt.Sprintf("distsql.%s.html", timeutil.Now().Format("20060102_150405")) + filename := fmt.Sprintf("distsql.%s.html", timeutil.Now().Format("20060102_150405.00")) if err := e.WriteExecutionDetail(ctx, filename, []byte(fmt.Sprintf(``, dspDiagramURL))); err != nil { log.Errorf(ctx, "failed to write DistSQL diagram for job %d: %+v", e.jobID, err.Error()) diff --git a/pkg/sql/jobs_profiler_bundle_test.go b/pkg/sql/jobs_profiler_bundle_test.go index 4968548f5b7d..0d670d27e554 100644 --- a/pkg/sql/jobs_profiler_bundle_test.go +++ b/pkg/sql/jobs_profiler_bundle_test.go @@ -17,6 +17,7 @@ import ( "io" "net/http" "runtime/pprof" + "sort" "strings" "testing" "time" @@ -72,7 +73,7 @@ func TestReadWriteProfilerExecutionDetails(t *testing.T) { infra := physicalplan.NewPhysicalInfrastructure(uuid.FastMakeV4(), base.SQLInstanceID(1)) p.PhysicalInfrastructure = infra jobsprofiler.StorePlanDiagram(ctx, s.Stopper(), &p, s.InternalDB().(isql.DB), j.ID()) - checkForPlanDiagram(ctx, t, s.InternalDB().(isql.DB), j.ID()) + checkForPlanDiagrams(ctx, t, s.InternalDB().(isql.DB), j.ID(), 1) return nil }, } @@ -115,6 +116,100 @@ func TestReadWriteProfilerExecutionDetails(t *testing.T) { }) } +func TestListProfilerExecutionDetails(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // Timeout the test in a few minutes if it hasn't succeeded. + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, time.Minute*2) + defer cancel() + + params, _ := tests.CreateTestServerParams() + params.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals() + defer jobs.ResetConstructors()() + s, sqlDB, _ := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + + runner := sqlutils.MakeSQLRunner(sqlDB) + execCfg := s.ExecutorConfig().(sql.ExecutorConfig) + + expectedDiagrams := 1 + jobs.RegisterConstructor(jobspb.TypeImport, func(j *jobs.Job, _ *cluster.Settings) jobs.Resumer { + return fakeExecResumer{ + OnResume: func(ctx context.Context) error { + p := sql.PhysicalPlan{} + infra := physicalplan.NewPhysicalInfrastructure(uuid.FastMakeV4(), base.SQLInstanceID(1)) + p.PhysicalInfrastructure = infra + jobsprofiler.StorePlanDiagram(ctx, s.Stopper(), &p, s.InternalDB().(isql.DB), j.ID()) + checkForPlanDiagrams(ctx, t, s.InternalDB().(isql.DB), j.ID(), expectedDiagrams) + if err := execCfg.JobRegistry.CheckPausepoint("fakeresumer.pause"); err != nil { + return err + } + return nil + }, + } + }, jobs.UsesTenantCostControl) + + runner.Exec(t, `CREATE TABLE t (id INT)`) + runner.Exec(t, `INSERT INTO t SELECT generate_series(1, 100)`) + + t.Run("list execution detail files", func(t *testing.T) { + runner.Exec(t, `SET CLUSTER SETTING jobs.debug.pausepoints = 'fakeresumer.pause'`) + var importJobID int + runner.QueryRow(t, `IMPORT INTO t CSV DATA ('nodelocal://1/foo') WITH DETACHED`).Scan(&importJobID) + jobutils.WaitForJobToPause(t, runner, jobspb.JobID(importJobID)) + + runner.Exec(t, `SELECT crdb_internal.request_job_execution_details($1)`, importJobID) + files := listExecutionDetails(t, s, jobspb.JobID(importJobID)) + require.Len(t, files, 2) + require.Regexp(t, "distsql\\..*\\.html", files[0]) + require.Regexp(t, "goroutines\\..*\\.txt", files[1]) + + // Resume the job, so it can write another DistSQL diagram and goroutine + // snapshot. + runner.Exec(t, `SET CLUSTER SETTING jobs.debug.pausepoints = ''`) + expectedDiagrams = 2 + runner.Exec(t, `RESUME JOB $1`, importJobID) + jobutils.WaitForJobToSucceed(t, runner, jobspb.JobID(importJobID)) + runner.Exec(t, `SELECT crdb_internal.request_job_execution_details($1)`, importJobID) + files = listExecutionDetails(t, s, jobspb.JobID(importJobID)) + require.Len(t, files, 4) + require.Regexp(t, "distsql\\..*\\.html", files[0]) + require.Regexp(t, "distsql\\..*\\.html", files[1]) + require.Regexp(t, "goroutines\\..*\\.txt", files[2]) + require.Regexp(t, "goroutines\\..*\\.txt", files[3]) + }) +} + +func listExecutionDetails( + t *testing.T, s serverutils.TestServerInterface, jobID jobspb.JobID, +) []string { + t.Helper() + + client, err := s.GetAdminHTTPClient() + require.NoError(t, err) + + url := s.AdminURL().String() + fmt.Sprintf("/_status/list_job_profiler_execution_details/%d", jobID) + req, err := http.NewRequest("GET", url, nil) + require.NoError(t, err) + + req.Header.Set("Content-Type", httputil.ProtoContentType) + resp, err := client.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + + edResp := serverpb.ListJobProfilerExecutionDetailsResponse{} + require.NoError(t, protoutil.Unmarshal(body, &edResp)) + sort.Slice(edResp.Files, func(i, j int) bool { + return edResp.Files[i] < edResp.Files[j] + }) + return edResp.Files +} + func checkExecutionDetails( t *testing.T, s serverutils.TestServerInterface, jobID jobspb.JobID, filename string, ) []byte {