Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

jobsprofiler: stringify protobin files when requested #107198

Merged
merged 1 commit into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (r *Registry) maybeDumpTrace(resumerCtx context.Context, resumer Resumer, j
return
}

resumerTraceFilename := fmt.Sprintf("resumer-trace-n%s.%s.txt",
resumerTraceFilename := fmt.Sprintf("resumer-trace-n%s.%s.binpb",
r.ID().String(), timeutil.Now().Format("20060102_150405.00"))
td := jobspb.TraceData{CollectedSpans: sp.GetConfiguredRecording()}
b, err := protoutil.Marshal(&td)
Expand Down
92 changes: 64 additions & 28 deletions pkg/jobs/execution_detail_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler/profilerconstants"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/errors"
"github.com/klauspost/compress/gzip"
)
Expand Down Expand Up @@ -82,6 +84,19 @@ func WriteExecutionDetailFile(
})
}

func stringifyProtobinFile(filename string, fileContents *bytes.Buffer) ([]byte, error) {
if strings.HasPrefix(filename, "resumer-trace") {
td := &jobspb.TraceData{}
if err := protoutil.Unmarshal(fileContents.Bytes(), td); err != nil {
return nil, err
}
rec := tracingpb.Recording(td.CollectedSpans)
return []byte(rec.String()), nil
} else {
return nil, errors.AssertionFailedf("unknown file %s", filename)
}
}

// ReadExecutionDetailFile will stitch together all the chunks corresponding to the
// filename and return the uncompressed data of the file.
func ReadExecutionDetailFile(
Expand All @@ -91,37 +106,52 @@ func ReadExecutionDetailFile(
// to the job's execution details and return the zipped bundle instead.

buf := bytes.NewBuffer([]byte{})
if err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
// Reset the buf inside the txn closure to guard against txn retries.
buf.Reset()
jobInfo := InfoStorageForJob(txn, jobID)
fetchFileContent := func(file string) error {
return db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
// Reset the buf inside the txn closure to guard against txn retries.
buf.Reset()
jobInfo := InfoStorageForJob(txn, jobID)

// Iterate over all the chunks of the requested file and return the unzipped
// chunks of data.
var lastInfoKey string
if err := jobInfo.Iterate(ctx, profilerconstants.MakeProfilerExecutionDetailsChunkKeyPrefix(file),
func(infoKey string, value []byte) error {
lastInfoKey = infoKey
r, err := gzip.NewReader(bytes.NewBuffer(value))
if err != nil {
return err
}
decompressed, err := io.ReadAll(r)
if err != nil {
return err
}
buf.Write(decompressed)
return nil
}); err != nil {
return errors.Wrapf(err, "failed to iterate over chunks for job %d", jobID)
}

// Iterate over all the chunks of the requested file and return the unzipped
// chunks of data.
var lastInfoKey string
if err := jobInfo.Iterate(ctx, profilerconstants.MakeProfilerExecutionDetailsChunkKeyPrefix(filename),
func(infoKey string, value []byte) error {
lastInfoKey = infoKey
r, err := gzip.NewReader(bytes.NewBuffer(value))
if err != nil {
return err
}
decompressed, err := io.ReadAll(r)
if err != nil {
return err
}
buf.Write(decompressed)
return nil
}); err != nil {
return errors.Wrapf(err, "failed to iterate over chunks for job %d", jobID)
}
if lastInfoKey != "" && !strings.Contains(lastInfoKey, finalChunkSuffix) {
return errors.Newf("failed to read all chunks for file %s, last info key read was %s", file, lastInfoKey)
}

if lastInfoKey != "" && !strings.Contains(lastInfoKey, finalChunkSuffix) {
return errors.Newf("failed to read all chunks for file %s, last info key read was %s", filename, lastInfoKey)
return nil
})
}

// If the file requested is the `binpb.txt` format of a `binpb` file, we must
// fetch the `binpb` version of the file and stringify the contents before
// returning the response.
if strings.HasSuffix(filename, "binpb.txt") {
trimmedFilename := strings.TrimSuffix(filename, ".txt")
if err := fetchFileContent(trimmedFilename); err != nil {
return nil, err
}
return stringifyProtobinFile(filename, buf)
}

return nil
}); err != nil {
if err := fetchFileContent(filename); err != nil {
return nil, err
}
return buf.Bytes(), nil
Expand All @@ -143,7 +173,13 @@ func ListExecutionDetailFiles(
func(infoKey string, value []byte) error {
// Look for the final chunk of each file to find the unique file name.
if strings.HasSuffix(infoKey, finalChunkSuffix) {
files = append(files, strings.TrimPrefix(strings.TrimSuffix(infoKey, finalChunkSuffix), profilerconstants.ExecutionDetailsChunkKeyPrefix))
filename := strings.TrimPrefix(strings.TrimSuffix(infoKey, finalChunkSuffix), profilerconstants.ExecutionDetailsChunkKeyPrefix)
// If we see a `.binpb` file we also want to make the string version of
// the file available for consumption.
if strings.HasSuffix(filename, ".binpb") {
files = append(files, filename+".txt")
}
files = append(files, filename)
}
return nil
}); err != nil {
Expand Down
22 changes: 14 additions & 8 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1064,9 +1064,9 @@ func TestRegistryLifecycle(t *testing.T) {
rts.check(t, jobs.StatusSucceeded)

<-completeCh
checkTraceFiles(ctx, t, expectedNumFiles+1, j.ID(), rts.s)
checkTraceFiles(ctx, t, expectedNumFiles+2, j.ID(), rts.s)
}
pauseUnpauseJob(1)
pauseUnpauseJob(2)
})

t.Run("dump traces on fail", func(t *testing.T) {
Expand Down Expand Up @@ -1103,7 +1103,7 @@ func TestRegistryLifecycle(t *testing.T) {
checkTraceFiles(ctx, t, expectedNumFiles, j.ID(), rts.s)
}

runJobAndFail(1)
runJobAndFail(2)
})

t.Run("dump traces on cancel", func(t *testing.T) {
Expand All @@ -1126,7 +1126,7 @@ func TestRegistryLifecycle(t *testing.T) {
rts.sqlDB.Exec(t, "CANCEL JOB $1", j.ID())

<-completeCh
checkTraceFiles(rts.ctx, t, 1, j.ID(), rts.s)
checkTraceFiles(rts.ctx, t, 2, j.ID(), rts.s)

rts.mu.e.OnFailOrCancelStart = true
rts.check(t, jobs.StatusReverting)
Expand Down Expand Up @@ -1189,17 +1189,23 @@ func checkTraceFiles(
) {
t.Helper()

recordings := make([]jobspb.TraceData, 0)
recordings := make([][]byte, 0)
execCfg := s.TenantOrServer().ExecutorConfig().(sql.ExecutorConfig)
edFiles, err := jobs.ListExecutionDetailFiles(ctx, execCfg.InternalDB, jobID)
require.NoError(t, err)
require.Len(t, edFiles, expectedNumFiles)

for _, f := range edFiles {
data, err := jobs.ReadExecutionDetailFile(ctx, f, execCfg.InternalDB, jobID)
require.NoError(t, err)
td := jobspb.TraceData{}
require.NoError(t, protoutil.Unmarshal(data, &td))
recordings = append(recordings, td)
// Trace files are dumped in `binpb` and `binpb.txt` format. The former
// should be unmarshal-able.
if strings.HasSuffix(f, "binpb") {
td := jobspb.TraceData{}
require.NoError(t, protoutil.Unmarshal(data, &td))
require.NotEmpty(t, td.CollectedSpans)
}
recordings = append(recordings, data)
}
if len(recordings) != expectedNumFiles {
t.Fatalf("expected %d entries but found %d", expectedNumFiles, len(recordings))
Expand Down
15 changes: 9 additions & 6 deletions pkg/jobs/jobsprofiler/profiler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func TestTraceRecordingOnResumerCompletion(t *testing.T) {
// At this point there should have been two resumers, and so we expect two
// trace recordings.
testutils.SucceedsSoon(t, func() error {
recordings := make([]jobspb.TraceData, 0)
recordings := make([][]byte, 0)
execCfg := s.TenantOrServer().ExecutorConfig().(sql.ExecutorConfig)
edFiles, err := jobs.ListExecutionDetailFiles(ctx, execCfg.InternalDB, jobspb.JobID(jobID))
if err != nil {
Expand All @@ -232,13 +232,16 @@ func TestTraceRecordingOnResumerCompletion(t *testing.T) {
if err != nil {
return err
}
td := jobspb.TraceData{}
if err := protoutil.Unmarshal(data, &td); err != nil {
return err
recordings = append(recordings, data)
if strings.HasSuffix(f, "binpb") {
td := jobspb.TraceData{}
if err := protoutil.Unmarshal(data, &td); err != nil {
return err
}
require.NotEmpty(t, td.CollectedSpans)
}
recordings = append(recordings, td)
}
if len(recordings) != 2 {
if len(recordings) != 4 {
return errors.Newf("expected 2 entries but found %d", len(recordings))
}
return nil
Expand Down
8 changes: 0 additions & 8 deletions pkg/jobs/jobsprofiler/profilerconstants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,6 @@ func MakeNodeProcessorProgressInfoKey(flowID string, instanceID string, processo
return fmt.Sprintf("%s%s,%s,%d", NodeProcessorProgressInfoKeyPrefix, flowID, instanceID, processorID)
}

const ResumerTraceInfoKeyPrefix = "~resumer-trace-"

// MakeResumerTraceInfoKey returns the info_key used for rows that store the
// traces on completion of a resumer's execution.
func MakeResumerTraceInfoKey(traceID uint64, nodeID string) string {
return fmt.Sprintf("%s%d-%s", ResumerTraceInfoKeyPrefix, traceID, nodeID)
}

// ExecutionDetailsChunkKeyPrefix is the prefix of the info key used for rows that
// store chunks of a job's execution details.
const ExecutionDetailsChunkKeyPrefix = "~profiler/"
Expand Down
19 changes: 11 additions & 8 deletions pkg/sql/jobs_profiler_execution_details_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,11 +349,12 @@ func TestListProfilerExecutionDetails(t *testing.T) {

runner.Exec(t, `SELECT crdb_internal.request_job_execution_details($1)`, importJobID)
files := listExecutionDetails(t, s, jobspb.JobID(importJobID))
require.Len(t, files, 4)
require.Len(t, files, 5)
require.Regexp(t, "distsql\\..*\\.html", files[0])
require.Regexp(t, "goroutines\\..*\\.txt", files[1])
require.Regexp(t, "resumer-trace-n[0-9]\\..*\\.txt", files[2])
require.Regexp(t, "trace\\..*\\.zip", files[3])
require.Regexp(t, "resumer-trace-n[0-9].*\\.binpb", files[2])
require.Regexp(t, "resumer-trace-n[0-9].*\\.binpb\\.txt", files[3])
require.Regexp(t, "trace\\..*\\.zip", files[4])

// Resume the job, so it can write another DistSQL diagram and goroutine
// snapshot.
Expand All @@ -363,15 +364,17 @@ func TestListProfilerExecutionDetails(t *testing.T) {
jobutils.WaitForJobToSucceed(t, runner, jobspb.JobID(importJobID))
runner.Exec(t, `SELECT crdb_internal.request_job_execution_details($1)`, importJobID)
files = listExecutionDetails(t, s, jobspb.JobID(importJobID))
require.Len(t, files, 8)
require.Len(t, files, 10)
require.Regexp(t, "distsql\\..*\\.html", files[0])
require.Regexp(t, "distsql\\..*\\.html", files[1])
require.Regexp(t, "goroutines\\..*\\.txt", files[2])
require.Regexp(t, "goroutines\\..*\\.txt", files[3])
require.Regexp(t, "resumer-trace-n[0-9]\\..*\\.txt", files[4])
require.Regexp(t, "resumer-trace-n[0-9]\\..*\\.txt", files[5])
require.Regexp(t, "trace\\..*\\.zip", files[6])
require.Regexp(t, "trace\\..*\\.zip", files[7])
require.Regexp(t, "resumer-trace-n[0-9].*\\.binpb", files[4])
require.Regexp(t, "resumer-trace-n[0-9].*\\.binpb\\.txt", files[5])
require.Regexp(t, "resumer-trace-n[0-9].*\\.binpb", files[6])
require.Regexp(t, "resumer-trace-n[0-9].*\\.binpb\\.txt", files[7])
require.Regexp(t, "trace\\..*\\.zip", files[8])
require.Regexp(t, "trace\\..*\\.zip", files[9])
})
}

Expand Down