From b4362a6d1fd370ce7955e5ebb6bb1ce6c8583f71 Mon Sep 17 00:00:00 2001 From: adityamaru Date: Wed, 19 Jul 2023 15:09:24 -0400 Subject: [PATCH] jobsprofiler: stringify protobin files when requested This change is in preparation for a larger change that will allow downloading debug files from the `Advanded Debugging` tab on the job details page. With this change a `binpb` file will have a `binpb.txt` version of the file listed too. If the user requests to download a `binpb.txt` file we unmarshal and stringify the contents of the file before serving them to the user. Currently, there is only one protobin file type written by a job resumer on completion. Release note: None --- pkg/jobs/adopt.go | 2 +- pkg/jobs/execution_detail_utils.go | 92 +++++++++++++------ pkg/jobs/jobs_test.go | 22 +++-- pkg/jobs/jobsprofiler/profiler_test.go | 15 +-- .../profilerconstants/constants.go | 8 -- .../jobs_profiler_execution_details_test.go | 19 ++-- 6 files changed, 99 insertions(+), 59 deletions(-) diff --git a/pkg/jobs/adopt.go b/pkg/jobs/adopt.go index 24c6743b1d56..50db531b23b3 100644 --- a/pkg/jobs/adopt.go +++ b/pkg/jobs/adopt.go @@ -82,7 +82,7 @@ func (r *Registry) maybeDumpTrace(resumerCtx context.Context, resumer Resumer, j return } - resumerTraceFilename := fmt.Sprintf("resumer-trace-n%s.%s.txt", + resumerTraceFilename := fmt.Sprintf("resumer-trace-n%s.%s.binpb", r.ID().String(), timeutil.Now().Format("20060102_150405.00")) td := jobspb.TraceData{CollectedSpans: sp.GetConfiguredRecording()} b, err := protoutil.Marshal(&td) diff --git a/pkg/jobs/execution_detail_utils.go b/pkg/jobs/execution_detail_utils.go index 94aee1aed0ac..75e303aefff3 100644 --- a/pkg/jobs/execution_detail_utils.go +++ b/pkg/jobs/execution_detail_utils.go @@ -20,6 +20,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler/profilerconstants" "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" "github.com/cockroachdb/errors" "github.com/klauspost/compress/gzip" ) @@ -82,6 +84,19 @@ func WriteExecutionDetailFile( }) } +func stringifyProtobinFile(filename string, fileContents *bytes.Buffer) ([]byte, error) { + if strings.HasPrefix(filename, "resumer-trace") { + td := &jobspb.TraceData{} + if err := protoutil.Unmarshal(fileContents.Bytes(), td); err != nil { + return nil, err + } + rec := tracingpb.Recording(td.CollectedSpans) + return []byte(rec.String()), nil + } else { + return nil, errors.AssertionFailedf("unknown file %s", filename) + } +} + // ReadExecutionDetailFile will stitch together all the chunks corresponding to the // filename and return the uncompressed data of the file. func ReadExecutionDetailFile( @@ -91,37 +106,52 @@ func ReadExecutionDetailFile( // to the job's execution details and return the zipped bundle instead. buf := bytes.NewBuffer([]byte{}) - if err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - // Reset the buf inside the txn closure to guard against txn retries. - buf.Reset() - jobInfo := InfoStorageForJob(txn, jobID) + fetchFileContent := func(file string) error { + return db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + // Reset the buf inside the txn closure to guard against txn retries. + buf.Reset() + jobInfo := InfoStorageForJob(txn, jobID) + + // Iterate over all the chunks of the requested file and return the unzipped + // chunks of data. + var lastInfoKey string + if err := jobInfo.Iterate(ctx, profilerconstants.MakeProfilerExecutionDetailsChunkKeyPrefix(file), + func(infoKey string, value []byte) error { + lastInfoKey = infoKey + r, err := gzip.NewReader(bytes.NewBuffer(value)) + if err != nil { + return err + } + decompressed, err := io.ReadAll(r) + if err != nil { + return err + } + buf.Write(decompressed) + return nil + }); err != nil { + return errors.Wrapf(err, "failed to iterate over chunks for job %d", jobID) + } - // Iterate over all the chunks of the requested file and return the unzipped - // chunks of data. - var lastInfoKey string - if err := jobInfo.Iterate(ctx, profilerconstants.MakeProfilerExecutionDetailsChunkKeyPrefix(filename), - func(infoKey string, value []byte) error { - lastInfoKey = infoKey - r, err := gzip.NewReader(bytes.NewBuffer(value)) - if err != nil { - return err - } - decompressed, err := io.ReadAll(r) - if err != nil { - return err - } - buf.Write(decompressed) - return nil - }); err != nil { - return errors.Wrapf(err, "failed to iterate over chunks for job %d", jobID) - } + if lastInfoKey != "" && !strings.Contains(lastInfoKey, finalChunkSuffix) { + return errors.Newf("failed to read all chunks for file %s, last info key read was %s", file, lastInfoKey) + } - if lastInfoKey != "" && !strings.Contains(lastInfoKey, finalChunkSuffix) { - return errors.Newf("failed to read all chunks for file %s, last info key read was %s", filename, lastInfoKey) + return nil + }) + } + + // If the file requested is the `binpb.txt` format of a `binpb` file, we must + // fetch the `binpb` version of the file and stringify the contents before + // returning the response. + if strings.HasSuffix(filename, "binpb.txt") { + trimmedFilename := strings.TrimSuffix(filename, ".txt") + if err := fetchFileContent(trimmedFilename); err != nil { + return nil, err } + return stringifyProtobinFile(filename, buf) + } - return nil - }); err != nil { + if err := fetchFileContent(filename); err != nil { return nil, err } return buf.Bytes(), nil @@ -143,7 +173,13 @@ func ListExecutionDetailFiles( func(infoKey string, value []byte) error { // Look for the final chunk of each file to find the unique file name. if strings.HasSuffix(infoKey, finalChunkSuffix) { - files = append(files, strings.TrimPrefix(strings.TrimSuffix(infoKey, finalChunkSuffix), profilerconstants.ExecutionDetailsChunkKeyPrefix)) + filename := strings.TrimPrefix(strings.TrimSuffix(infoKey, finalChunkSuffix), profilerconstants.ExecutionDetailsChunkKeyPrefix) + // If we see a `.binpb` file we also want to make the string version of + // the file available for consumption. + if strings.HasSuffix(filename, ".binpb") { + files = append(files, filename+".txt") + } + files = append(files, filename) } return nil }); err != nil { diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index ab46cbf836d0..a9b28590a308 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -1064,9 +1064,9 @@ func TestRegistryLifecycle(t *testing.T) { rts.check(t, jobs.StatusSucceeded) <-completeCh - checkTraceFiles(ctx, t, expectedNumFiles+1, j.ID(), rts.s) + checkTraceFiles(ctx, t, expectedNumFiles+2, j.ID(), rts.s) } - pauseUnpauseJob(1) + pauseUnpauseJob(2) }) t.Run("dump traces on fail", func(t *testing.T) { @@ -1103,7 +1103,7 @@ func TestRegistryLifecycle(t *testing.T) { checkTraceFiles(ctx, t, expectedNumFiles, j.ID(), rts.s) } - runJobAndFail(1) + runJobAndFail(2) }) t.Run("dump traces on cancel", func(t *testing.T) { @@ -1126,7 +1126,7 @@ func TestRegistryLifecycle(t *testing.T) { rts.sqlDB.Exec(t, "CANCEL JOB $1", j.ID()) <-completeCh - checkTraceFiles(rts.ctx, t, 1, j.ID(), rts.s) + checkTraceFiles(rts.ctx, t, 2, j.ID(), rts.s) rts.mu.e.OnFailOrCancelStart = true rts.check(t, jobs.StatusReverting) @@ -1189,17 +1189,23 @@ func checkTraceFiles( ) { t.Helper() - recordings := make([]jobspb.TraceData, 0) + recordings := make([][]byte, 0) execCfg := s.TenantOrServer().ExecutorConfig().(sql.ExecutorConfig) edFiles, err := jobs.ListExecutionDetailFiles(ctx, execCfg.InternalDB, jobID) require.NoError(t, err) + require.Len(t, edFiles, expectedNumFiles) for _, f := range edFiles { data, err := jobs.ReadExecutionDetailFile(ctx, f, execCfg.InternalDB, jobID) require.NoError(t, err) - td := jobspb.TraceData{} - require.NoError(t, protoutil.Unmarshal(data, &td)) - recordings = append(recordings, td) + // Trace files are dumped in `binpb` and `binpb.txt` format. The former + // should be unmarshal-able. + if strings.HasSuffix(f, "binpb") { + td := jobspb.TraceData{} + require.NoError(t, protoutil.Unmarshal(data, &td)) + require.NotEmpty(t, td.CollectedSpans) + } + recordings = append(recordings, data) } if len(recordings) != expectedNumFiles { t.Fatalf("expected %d entries but found %d", expectedNumFiles, len(recordings)) diff --git a/pkg/jobs/jobsprofiler/profiler_test.go b/pkg/jobs/jobsprofiler/profiler_test.go index b731c822b7c9..0bc9b0cfdffd 100644 --- a/pkg/jobs/jobsprofiler/profiler_test.go +++ b/pkg/jobs/jobsprofiler/profiler_test.go @@ -214,7 +214,7 @@ func TestTraceRecordingOnResumerCompletion(t *testing.T) { // At this point there should have been two resumers, and so we expect two // trace recordings. testutils.SucceedsSoon(t, func() error { - recordings := make([]jobspb.TraceData, 0) + recordings := make([][]byte, 0) execCfg := s.TenantOrServer().ExecutorConfig().(sql.ExecutorConfig) edFiles, err := jobs.ListExecutionDetailFiles(ctx, execCfg.InternalDB, jobspb.JobID(jobID)) if err != nil { @@ -232,13 +232,16 @@ func TestTraceRecordingOnResumerCompletion(t *testing.T) { if err != nil { return err } - td := jobspb.TraceData{} - if err := protoutil.Unmarshal(data, &td); err != nil { - return err + recordings = append(recordings, data) + if strings.HasSuffix(f, "binpb") { + td := jobspb.TraceData{} + if err := protoutil.Unmarshal(data, &td); err != nil { + return err + } + require.NotEmpty(t, td.CollectedSpans) } - recordings = append(recordings, td) } - if len(recordings) != 2 { + if len(recordings) != 4 { return errors.Newf("expected 2 entries but found %d", len(recordings)) } return nil diff --git a/pkg/jobs/jobsprofiler/profilerconstants/constants.go b/pkg/jobs/jobsprofiler/profilerconstants/constants.go index e3c314c16012..32623fcec04e 100644 --- a/pkg/jobs/jobsprofiler/profilerconstants/constants.go +++ b/pkg/jobs/jobsprofiler/profilerconstants/constants.go @@ -37,14 +37,6 @@ func MakeNodeProcessorProgressInfoKey(flowID string, instanceID string, processo return fmt.Sprintf("%s%s,%s,%d", NodeProcessorProgressInfoKeyPrefix, flowID, instanceID, processorID) } -const ResumerTraceInfoKeyPrefix = "~resumer-trace-" - -// MakeResumerTraceInfoKey returns the info_key used for rows that store the -// traces on completion of a resumer's execution. -func MakeResumerTraceInfoKey(traceID uint64, nodeID string) string { - return fmt.Sprintf("%s%d-%s", ResumerTraceInfoKeyPrefix, traceID, nodeID) -} - // ExecutionDetailsChunkKeyPrefix is the prefix of the info key used for rows that // store chunks of a job's execution details. const ExecutionDetailsChunkKeyPrefix = "~profiler/" diff --git a/pkg/sql/jobs_profiler_execution_details_test.go b/pkg/sql/jobs_profiler_execution_details_test.go index 1d5adf044ce5..b1aad28aa0fe 100644 --- a/pkg/sql/jobs_profiler_execution_details_test.go +++ b/pkg/sql/jobs_profiler_execution_details_test.go @@ -349,11 +349,12 @@ func TestListProfilerExecutionDetails(t *testing.T) { runner.Exec(t, `SELECT crdb_internal.request_job_execution_details($1)`, importJobID) files := listExecutionDetails(t, s, jobspb.JobID(importJobID)) - require.Len(t, files, 4) + require.Len(t, files, 5) require.Regexp(t, "distsql\\..*\\.html", files[0]) require.Regexp(t, "goroutines\\..*\\.txt", files[1]) - require.Regexp(t, "resumer-trace-n[0-9]\\..*\\.txt", files[2]) - require.Regexp(t, "trace\\..*\\.zip", files[3]) + require.Regexp(t, "resumer-trace-n[0-9].*\\.binpb", files[2]) + require.Regexp(t, "resumer-trace-n[0-9].*\\.binpb\\.txt", files[3]) + require.Regexp(t, "trace\\..*\\.zip", files[4]) // Resume the job, so it can write another DistSQL diagram and goroutine // snapshot. @@ -363,15 +364,17 @@ func TestListProfilerExecutionDetails(t *testing.T) { jobutils.WaitForJobToSucceed(t, runner, jobspb.JobID(importJobID)) runner.Exec(t, `SELECT crdb_internal.request_job_execution_details($1)`, importJobID) files = listExecutionDetails(t, s, jobspb.JobID(importJobID)) - require.Len(t, files, 8) + require.Len(t, files, 10) require.Regexp(t, "distsql\\..*\\.html", files[0]) require.Regexp(t, "distsql\\..*\\.html", files[1]) require.Regexp(t, "goroutines\\..*\\.txt", files[2]) require.Regexp(t, "goroutines\\..*\\.txt", files[3]) - require.Regexp(t, "resumer-trace-n[0-9]\\..*\\.txt", files[4]) - require.Regexp(t, "resumer-trace-n[0-9]\\..*\\.txt", files[5]) - require.Regexp(t, "trace\\..*\\.zip", files[6]) - require.Regexp(t, "trace\\..*\\.zip", files[7]) + require.Regexp(t, "resumer-trace-n[0-9].*\\.binpb", files[4]) + require.Regexp(t, "resumer-trace-n[0-9].*\\.binpb\\.txt", files[5]) + require.Regexp(t, "resumer-trace-n[0-9].*\\.binpb", files[6]) + require.Regexp(t, "resumer-trace-n[0-9].*\\.binpb\\.txt", files[7]) + require.Regexp(t, "trace\\..*\\.zip", files[8]) + require.Regexp(t, "trace\\..*\\.zip", files[9]) }) }