Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
106629: sql,server: add endpoint to list a job's execution details r=dt a=adityamaru

In cockroachdb#105384 we added infrastructure to request and store execution details for a job. This currently only includes the DistSQL diagram generated during a job execution. Going forward this will include several files such as traces, goroutines, profiles etc.

This change introduces an endpoint that allows listing all such files that are available for consumption. This list will be displayed on the job details page allowing the user to download any subset of the files collected during job execution.

Informs: cockroachdb#105076
Release note: None

Co-authored-by: adityamaru <[email protected]>
  • Loading branch information
craig[bot] and adityamaru committed Jul 13, 2023
2 parents bbfe3d0 + 62038ac commit 73bffc0
Show file tree
Hide file tree
Showing 7 changed files with 232 additions and 20 deletions.
44 changes: 44 additions & 0 deletions docs/generated/http/full.md
Original file line number Diff line number Diff line change
Expand Up @@ -5241,6 +5241,50 @@ Support status: [reserved](#support-status)



## ListJobProfilerExecutionDetails

`GET /_status/list_job_profiler_execution_details/{job_id}`



Support status: [reserved](#support-status)

#### Request Parameters







| Field | Type | Label | Description | Support status |
| ----- | ---- | ----- | ----------- | -------------- |
| job_id | [int64](#cockroach.server.serverpb.ListJobProfilerExecutionDetailsRequest-int64) | | | [reserved](#support-status) |







#### Response Parameters







| Field | Type | Label | Description | Support status |
| ----- | ---- | ----- | ----------- | -------------- |
| files | [string](#cockroach.server.serverpb.ListJobProfilerExecutionDetailsResponse-string) | repeated | | [reserved](#support-status) |







## RequestCA

`GET /_join/v1/ca`
Expand Down
10 changes: 5 additions & 5 deletions pkg/jobs/jobsprofiler/profilerconstants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ func MakeNodeProcessorProgressInfoKey(flowID string, instanceID string, processo
const ExecutionDetailsChunkKeyPrefix = "~profiler/"

// MakeProfilerExecutionDetailsChunkKeyPrefix is the prefix of the info key used to store all
// chunks of a job's execution details for a given filename.
// chunks of a job's execution details.
func MakeProfilerExecutionDetailsChunkKeyPrefix(filename string) string {
return fmt.Sprintf("%s%s", ExecutionDetailsChunkKeyPrefix, filename)
}

// MakeProfilerBundleChunkKey is the info key used to store a chunk of a job's
// execution details for a given filename.
func MakeProfilerBundleChunkKey(filename string, chunkCounter int) string {
return fmt.Sprintf("%s%s#%04d", ExecutionDetailsChunkKeyPrefix, filename, chunkCounter)
// MakeProfilerExecutionDetailsChunkKey is the info key used to store a chunk of
// a job's execution details.
func MakeProfilerExecutionDetailsChunkKey(chunkName string) string {
return fmt.Sprintf("%s%s", ExecutionDetailsChunkKeyPrefix, chunkName)
}

// GetNodeProcessorProgressInfoKeyParts deconstructs the passed in info key and
Expand Down
15 changes: 15 additions & 0 deletions pkg/server/serverpb/status.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2064,6 +2064,14 @@ message NetworkConnectivityResponse {
bytes data = 1;
}

message ListJobProfilerExecutionDetailsRequest {
int64 job_id = 1;
}

message ListJobProfilerExecutionDetailsResponse {
repeated string files = 1;
}


service Status {
// Certificates retrieves a copy of the TLS certificates.
Expand Down Expand Up @@ -2540,4 +2548,11 @@ service Status {
get: "/_status/job_profiler_execution_details/{job_id}/{filename}"
};
}

rpc ListJobProfilerExecutionDetails(ListJobProfilerExecutionDetailsRequest) returns
(ListJobProfilerExecutionDetailsResponse) {
option (google.api.http) = {
get: "/_status/list_job_profiler_execution_details/{job_id}"
};
}
}
22 changes: 22 additions & 0 deletions pkg/server/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -4146,3 +4146,25 @@ func (s *statusServer) GetJobProfilerExecutionDetails(
}
return &serverpb.GetJobProfilerExecutionDetailResponse{Data: data}, nil
}

// ListJobProfilerExecutionDetails lists all the stored execution details for a
// given job ID.
func (s *statusServer) ListJobProfilerExecutionDetails(
ctx context.Context, req *serverpb.ListJobProfilerExecutionDetailsRequest,
) (*serverpb.ListJobProfilerExecutionDetailsResponse, error) {
ctx = s.AnnotateCtx(ctx)
// TODO(adityamaru): Figure out the correct privileges required to get execution details.
_, err := s.privilegeChecker.requireAdminUser(ctx)
if err != nil {
return nil, err
}

jobID := jobspb.JobID(req.JobId)
execCfg := s.sqlServer.execCfg
eb := sql.MakeJobProfilerExecutionDetailsBuilder(execCfg.SQLStatusServer, execCfg.InternalDB, jobID)
files, err := eb.ListExecutionDetailFiles(ctx)
if err != nil {
return nil, err
}
return &serverpb.ListJobProfilerExecutionDetailsResponse{Files: files}, nil
}
19 changes: 12 additions & 7 deletions pkg/sql/jobs_execution_details_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,23 @@ func (d fakeExecResumer) OnFailOrCancel(ctx context.Context, _ interface{}, _ er

// checkForPlanDiagram is a method used in tests to wait for the existence of a
// DSP diagram for the provided jobID.
func checkForPlanDiagram(ctx context.Context, t *testing.T, db isql.DB, jobID jobspb.JobID) {
func checkForPlanDiagrams(
ctx context.Context, t *testing.T, db isql.DB, jobID jobspb.JobID, expectedNumDiagrams int,
) {
testutils.SucceedsSoon(t, func() error {
return db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := jobs.InfoStorageForJob(txn, jobID)
var found bool
err := infoStorage.GetLast(ctx, profilerconstants.DSPDiagramInfoKeyPrefix,
var found int
err := infoStorage.Iterate(ctx, profilerconstants.DSPDiagramInfoKeyPrefix,
func(infoKey string, value []byte) error {
found = true
found++
return nil
})
if err != nil || !found {
return errors.New("not found")
if err != nil {
return err
}
if found != expectedNumDiagrams {
return errors.Newf("found %d diagrams, expected %d", found, expectedNumDiagrams)
}
return nil
})
Expand Down Expand Up @@ -107,7 +112,7 @@ func TestJobsExecutionDetails(t *testing.T) {
infra := physicalplan.NewPhysicalInfrastructure(uuid.FastMakeV4(), base.SQLInstanceID(1))
p.PhysicalInfrastructure = infra
jobsprofiler.StorePlanDiagram(ctx, s.Stopper(), &p, s.InternalDB().(isql.DB), j.ID())
checkForPlanDiagram(ctx, t, s.InternalDB().(isql.DB), j.ID())
checkForPlanDiagrams(ctx, t, s.InternalDB().(isql.DB), j.ID(), 1)
return nil
},
}
Expand Down
45 changes: 38 additions & 7 deletions pkg/sql/jobs_profiler_bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
)

const bundleChunkSize = 1 << 20 // 1 MiB
const finalChunkSuffix = "#_final"

// RequestExecutionDetails implements the JobProfiler interface.
func (p *planner) RequestExecutionDetails(ctx context.Context, jobID jobspb.JobID) error {
Expand Down Expand Up @@ -81,30 +82,31 @@ func (e *ExecutionDetailsBuilder) WriteExecutionDetail(
jobInfo := jobs.InfoStorageForJob(txn, e.jobID)

var chunkCounter int
chunkFileName := filename
var chunkName string
for len(chunkData) > 0 {
chunkSize := bundleChunkSize
chunk := chunkData
if len(chunk) > chunkSize {
chunkName = fmt.Sprintf("%s#%04d", filename, chunkCounter)
chunk = chunk[:chunkSize]
} else {
// This is the last chunk we will write, assign it a sentinel file name.
chunkFileName = chunkFileName + "_final"
chunkName = filename + finalChunkSuffix
}
chunkData = chunkData[len(chunk):]
var err error
chunk, err = compressChunk(chunk)
if err != nil {
return errors.Wrapf(err, "failed to compress chunk for file %s", chunkFileName)
return errors.Wrapf(err, "failed to compress chunk for file %s", filename)
}

// On listing we want the info_key of each chunk to sort after the
// previous chunk of the same file so that the chunks can be reassembled
// on download. For this reason we use a monotonically increasing
// chunk counter as the suffix.
err = jobInfo.Write(ctx, profilerconstants.MakeProfilerBundleChunkKey(chunkFileName, chunkCounter), chunk)
err = jobInfo.Write(ctx, profilerconstants.MakeProfilerExecutionDetailsChunkKey(chunkName), chunk)
if err != nil {
return errors.Wrapf(err, "failed to write chunk for file %s", chunkFileName)
return errors.Wrapf(err, "failed to write chunk for file %s", filename)
}
chunkCounter++
}
Expand Down Expand Up @@ -146,7 +148,7 @@ func (e *ExecutionDetailsBuilder) ReadExecutionDetail(
return errors.Wrapf(err, "failed to iterate over chunks for job %d", jobID)
}

if lastInfoKey != "" && !strings.Contains(lastInfoKey, "_final") {
if lastInfoKey != "" && !strings.Contains(lastInfoKey, finalChunkSuffix) {
return errors.Newf("failed to read all chunks for file %s, last info key read was %s", filename, lastInfoKey)
}

Expand All @@ -157,6 +159,35 @@ func (e *ExecutionDetailsBuilder) ReadExecutionDetail(
return buf.Bytes(), nil
}

// ListExecutionDetailFiles lists all the files that have been generated as part
// of a job's execution details.
func (e *ExecutionDetailsBuilder) ListExecutionDetailFiles(ctx context.Context) ([]string, error) {
var res []string
if err := e.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
jobInfo := jobs.InfoStorageForJob(txn, e.jobID)

// Iterate over all the files that have been stored as part of the job's
// execution details.
files := make([]string, 0)
if err := jobInfo.Iterate(ctx, profilerconstants.ExecutionDetailsChunkKeyPrefix,
func(infoKey string, value []byte) error {
// Look for the final chunk of each file to find the unique file name.
if strings.HasSuffix(infoKey, finalChunkSuffix) {
files = append(files, strings.TrimSuffix(infoKey, finalChunkSuffix))
}
return nil
}); err != nil {
return errors.Wrapf(err, "failed to iterate over execution detail files for job %d", jobID)
}
res = files
return nil
}); err != nil {
return nil, err
}

return res, nil
}

// MakeJobProfilerExecutionDetailsBuilder returns an instance of an ExecutionDetailsBuilder.
func MakeJobProfilerExecutionDetailsBuilder(
srv serverpb.SQLStatusServer, db isql.DB, jobID jobspb.JobID,
Expand Down Expand Up @@ -199,7 +230,7 @@ func (e *ExecutionDetailsBuilder) addDistSQLDiagram(ctx context.Context) {
}
if row[0] != tree.DNull {
dspDiagramURL := string(tree.MustBeDString(row[0]))
filename := fmt.Sprintf("distsql.%s.html", timeutil.Now().Format("20060102_150405"))
filename := fmt.Sprintf("distsql.%s.html", timeutil.Now().Format("20060102_150405.00"))
if err := e.WriteExecutionDetail(ctx, filename,
[]byte(fmt.Sprintf(`<meta http-equiv="Refresh" content="0; url=%s">`, dspDiagramURL))); err != nil {
log.Errorf(ctx, "failed to write DistSQL diagram for job %d: %+v", e.jobID, err.Error())
Expand Down
97 changes: 96 additions & 1 deletion pkg/sql/jobs_profiler_bundle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"io"
"net/http"
"runtime/pprof"
"sort"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -72,7 +73,7 @@ func TestReadWriteProfilerExecutionDetails(t *testing.T) {
infra := physicalplan.NewPhysicalInfrastructure(uuid.FastMakeV4(), base.SQLInstanceID(1))
p.PhysicalInfrastructure = infra
jobsprofiler.StorePlanDiagram(ctx, s.Stopper(), &p, s.InternalDB().(isql.DB), j.ID())
checkForPlanDiagram(ctx, t, s.InternalDB().(isql.DB), j.ID())
checkForPlanDiagrams(ctx, t, s.InternalDB().(isql.DB), j.ID(), 1)
return nil
},
}
Expand Down Expand Up @@ -115,6 +116,100 @@ func TestReadWriteProfilerExecutionDetails(t *testing.T) {
})
}

func TestListProfilerExecutionDetails(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Timeout the test in a few minutes if it hasn't succeeded.
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, time.Minute*2)
defer cancel()

params, _ := tests.CreateTestServerParams()
params.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals()
defer jobs.ResetConstructors()()
s, sqlDB, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)

runner := sqlutils.MakeSQLRunner(sqlDB)
execCfg := s.ExecutorConfig().(sql.ExecutorConfig)

expectedDiagrams := 1
jobs.RegisterConstructor(jobspb.TypeImport, func(j *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return fakeExecResumer{
OnResume: func(ctx context.Context) error {
p := sql.PhysicalPlan{}
infra := physicalplan.NewPhysicalInfrastructure(uuid.FastMakeV4(), base.SQLInstanceID(1))
p.PhysicalInfrastructure = infra
jobsprofiler.StorePlanDiagram(ctx, s.Stopper(), &p, s.InternalDB().(isql.DB), j.ID())
checkForPlanDiagrams(ctx, t, s.InternalDB().(isql.DB), j.ID(), expectedDiagrams)
if err := execCfg.JobRegistry.CheckPausepoint("fakeresumer.pause"); err != nil {
return err
}
return nil
},
}
}, jobs.UsesTenantCostControl)

runner.Exec(t, `CREATE TABLE t (id INT)`)
runner.Exec(t, `INSERT INTO t SELECT generate_series(1, 100)`)

t.Run("list execution detail files", func(t *testing.T) {
runner.Exec(t, `SET CLUSTER SETTING jobs.debug.pausepoints = 'fakeresumer.pause'`)
var importJobID int
runner.QueryRow(t, `IMPORT INTO t CSV DATA ('nodelocal://1/foo') WITH DETACHED`).Scan(&importJobID)
jobutils.WaitForJobToPause(t, runner, jobspb.JobID(importJobID))

runner.Exec(t, `SELECT crdb_internal.request_job_execution_details($1)`, importJobID)
files := listExecutionDetails(t, s, jobspb.JobID(importJobID))
require.Len(t, files, 2)
require.Regexp(t, "distsql\\..*\\.html", files[0])
require.Regexp(t, "goroutines\\..*\\.txt", files[1])

// Resume the job, so it can write another DistSQL diagram and goroutine
// snapshot.
runner.Exec(t, `SET CLUSTER SETTING jobs.debug.pausepoints = ''`)
expectedDiagrams = 2
runner.Exec(t, `RESUME JOB $1`, importJobID)
jobutils.WaitForJobToSucceed(t, runner, jobspb.JobID(importJobID))
runner.Exec(t, `SELECT crdb_internal.request_job_execution_details($1)`, importJobID)
files = listExecutionDetails(t, s, jobspb.JobID(importJobID))
require.Len(t, files, 4)
require.Regexp(t, "distsql\\..*\\.html", files[0])
require.Regexp(t, "distsql\\..*\\.html", files[1])
require.Regexp(t, "goroutines\\..*\\.txt", files[2])
require.Regexp(t, "goroutines\\..*\\.txt", files[3])
})
}

func listExecutionDetails(
t *testing.T, s serverutils.TestServerInterface, jobID jobspb.JobID,
) []string {
t.Helper()

client, err := s.GetAdminHTTPClient()
require.NoError(t, err)

url := s.AdminURL().String() + fmt.Sprintf("/_status/list_job_profiler_execution_details/%d", jobID)
req, err := http.NewRequest("GET", url, nil)
require.NoError(t, err)

req.Header.Set("Content-Type", httputil.ProtoContentType)
resp, err := client.Do(req)
require.NoError(t, err)
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)

edResp := serverpb.ListJobProfilerExecutionDetailsResponse{}
require.NoError(t, protoutil.Unmarshal(body, &edResp))
sort.Slice(edResp.Files, func(i, j int) bool {
return edResp.Files[i] < edResp.Files[j]
})
return edResp.Files
}

func checkExecutionDetails(
t *testing.T, s serverutils.TestServerInterface, jobID jobspb.JobID, filename string,
) []byte {
Expand Down

0 comments on commit 73bffc0

Please sign in to comment.