Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
107198: jobsprofiler: stringify protobin files when requested r=dt a=adityamaru

This change is in preparation for a larger change that
will allow downloading debug files from the `Advanded Debugging`
tab on the job details page.

With this change a `binpb` file will have a `binpb.txt` version of the
file listed too. If the user requests to download
a `binpb.txt` file we unmarshal and stringify the contents
of the file before serving them to the user. Currently, there
is only one protobin file type written by a job resumer on
completion.

Informs: cockroachdb#105076
Release note: None

107700: netutil: fix a buglet r=erikgrinaker,stevendanna a=knz

I was noticing an excess number of conn objects remaining open after a test shutdown.

Release note: None
Epic: CRDB-28893

107711: backupccl: skip TestBackupRestoreTenant r=stevendanna a=adityamaru

Skip while we debug the timeouts in cockroachdb#107669.

Informs: cockroachdb#107669
Release note: None

Co-authored-by: adityamaru <[email protected]>
Co-authored-by: Raphael 'kena' Poss <[email protected]>
  • Loading branch information
3 people committed Jul 27, 2023
4 parents 3bbf620 + b4362a6 + 973d861 + 3e87abe commit 4413ec7
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 59 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6943,6 +6943,7 @@ func TestBackupRestoreInsideMultiPodTenant(t *testing.T) {
func TestBackupRestoreTenant(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.WithIssue(t, 107669)

params := base.TestClusterArgs{ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (r *Registry) maybeDumpTrace(resumerCtx context.Context, resumer Resumer, j
return
}

resumerTraceFilename := fmt.Sprintf("resumer-trace-n%s.%s.txt",
resumerTraceFilename := fmt.Sprintf("resumer-trace-n%s.%s.binpb",
r.ID().String(), timeutil.Now().Format("20060102_150405.00"))
td := jobspb.TraceData{CollectedSpans: sp.GetConfiguredRecording()}
b, err := protoutil.Marshal(&td)
Expand Down
92 changes: 64 additions & 28 deletions pkg/jobs/execution_detail_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler/profilerconstants"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/errors"
"github.com/klauspost/compress/gzip"
)
Expand Down Expand Up @@ -82,6 +84,19 @@ func WriteExecutionDetailFile(
})
}

func stringifyProtobinFile(filename string, fileContents *bytes.Buffer) ([]byte, error) {
if strings.HasPrefix(filename, "resumer-trace") {
td := &jobspb.TraceData{}
if err := protoutil.Unmarshal(fileContents.Bytes(), td); err != nil {
return nil, err
}
rec := tracingpb.Recording(td.CollectedSpans)
return []byte(rec.String()), nil
} else {
return nil, errors.AssertionFailedf("unknown file %s", filename)
}
}

// ReadExecutionDetailFile will stitch together all the chunks corresponding to the
// filename and return the uncompressed data of the file.
func ReadExecutionDetailFile(
Expand All @@ -91,37 +106,52 @@ func ReadExecutionDetailFile(
// to the job's execution details and return the zipped bundle instead.

buf := bytes.NewBuffer([]byte{})
if err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
// Reset the buf inside the txn closure to guard against txn retries.
buf.Reset()
jobInfo := InfoStorageForJob(txn, jobID)
fetchFileContent := func(file string) error {
return db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
// Reset the buf inside the txn closure to guard against txn retries.
buf.Reset()
jobInfo := InfoStorageForJob(txn, jobID)

// Iterate over all the chunks of the requested file and return the unzipped
// chunks of data.
var lastInfoKey string
if err := jobInfo.Iterate(ctx, profilerconstants.MakeProfilerExecutionDetailsChunkKeyPrefix(file),
func(infoKey string, value []byte) error {
lastInfoKey = infoKey
r, err := gzip.NewReader(bytes.NewBuffer(value))
if err != nil {
return err
}
decompressed, err := io.ReadAll(r)
if err != nil {
return err
}
buf.Write(decompressed)
return nil
}); err != nil {
return errors.Wrapf(err, "failed to iterate over chunks for job %d", jobID)
}

// Iterate over all the chunks of the requested file and return the unzipped
// chunks of data.
var lastInfoKey string
if err := jobInfo.Iterate(ctx, profilerconstants.MakeProfilerExecutionDetailsChunkKeyPrefix(filename),
func(infoKey string, value []byte) error {
lastInfoKey = infoKey
r, err := gzip.NewReader(bytes.NewBuffer(value))
if err != nil {
return err
}
decompressed, err := io.ReadAll(r)
if err != nil {
return err
}
buf.Write(decompressed)
return nil
}); err != nil {
return errors.Wrapf(err, "failed to iterate over chunks for job %d", jobID)
}
if lastInfoKey != "" && !strings.Contains(lastInfoKey, finalChunkSuffix) {
return errors.Newf("failed to read all chunks for file %s, last info key read was %s", file, lastInfoKey)
}

if lastInfoKey != "" && !strings.Contains(lastInfoKey, finalChunkSuffix) {
return errors.Newf("failed to read all chunks for file %s, last info key read was %s", filename, lastInfoKey)
return nil
})
}

// If the file requested is the `binpb.txt` format of a `binpb` file, we must
// fetch the `binpb` version of the file and stringify the contents before
// returning the response.
if strings.HasSuffix(filename, "binpb.txt") {
trimmedFilename := strings.TrimSuffix(filename, ".txt")
if err := fetchFileContent(trimmedFilename); err != nil {
return nil, err
}
return stringifyProtobinFile(filename, buf)
}

return nil
}); err != nil {
if err := fetchFileContent(filename); err != nil {
return nil, err
}
return buf.Bytes(), nil
Expand All @@ -143,7 +173,13 @@ func ListExecutionDetailFiles(
func(infoKey string, value []byte) error {
// Look for the final chunk of each file to find the unique file name.
if strings.HasSuffix(infoKey, finalChunkSuffix) {
files = append(files, strings.TrimPrefix(strings.TrimSuffix(infoKey, finalChunkSuffix), profilerconstants.ExecutionDetailsChunkKeyPrefix))
filename := strings.TrimPrefix(strings.TrimSuffix(infoKey, finalChunkSuffix), profilerconstants.ExecutionDetailsChunkKeyPrefix)
// If we see a `.binpb` file we also want to make the string version of
// the file available for consumption.
if strings.HasSuffix(filename, ".binpb") {
files = append(files, filename+".txt")
}
files = append(files, filename)
}
return nil
}); err != nil {
Expand Down
22 changes: 14 additions & 8 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1064,9 +1064,9 @@ func TestRegistryLifecycle(t *testing.T) {
rts.check(t, jobs.StatusSucceeded)

<-completeCh
checkTraceFiles(ctx, t, expectedNumFiles+1, j.ID(), rts.s)
checkTraceFiles(ctx, t, expectedNumFiles+2, j.ID(), rts.s)
}
pauseUnpauseJob(1)
pauseUnpauseJob(2)
})

t.Run("dump traces on fail", func(t *testing.T) {
Expand Down Expand Up @@ -1103,7 +1103,7 @@ func TestRegistryLifecycle(t *testing.T) {
checkTraceFiles(ctx, t, expectedNumFiles, j.ID(), rts.s)
}

runJobAndFail(1)
runJobAndFail(2)
})

t.Run("dump traces on cancel", func(t *testing.T) {
Expand All @@ -1126,7 +1126,7 @@ func TestRegistryLifecycle(t *testing.T) {
rts.sqlDB.Exec(t, "CANCEL JOB $1", j.ID())

<-completeCh
checkTraceFiles(rts.ctx, t, 1, j.ID(), rts.s)
checkTraceFiles(rts.ctx, t, 2, j.ID(), rts.s)

rts.mu.e.OnFailOrCancelStart = true
rts.check(t, jobs.StatusReverting)
Expand Down Expand Up @@ -1189,17 +1189,23 @@ func checkTraceFiles(
) {
t.Helper()

recordings := make([]jobspb.TraceData, 0)
recordings := make([][]byte, 0)
execCfg := s.TenantOrServer().ExecutorConfig().(sql.ExecutorConfig)
edFiles, err := jobs.ListExecutionDetailFiles(ctx, execCfg.InternalDB, jobID)
require.NoError(t, err)
require.Len(t, edFiles, expectedNumFiles)

for _, f := range edFiles {
data, err := jobs.ReadExecutionDetailFile(ctx, f, execCfg.InternalDB, jobID)
require.NoError(t, err)
td := jobspb.TraceData{}
require.NoError(t, protoutil.Unmarshal(data, &td))
recordings = append(recordings, td)
// Trace files are dumped in `binpb` and `binpb.txt` format. The former
// should be unmarshal-able.
if strings.HasSuffix(f, "binpb") {
td := jobspb.TraceData{}
require.NoError(t, protoutil.Unmarshal(data, &td))
require.NotEmpty(t, td.CollectedSpans)
}
recordings = append(recordings, data)
}
if len(recordings) != expectedNumFiles {
t.Fatalf("expected %d entries but found %d", expectedNumFiles, len(recordings))
Expand Down
15 changes: 9 additions & 6 deletions pkg/jobs/jobsprofiler/profiler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func TestTraceRecordingOnResumerCompletion(t *testing.T) {
// At this point there should have been two resumers, and so we expect two
// trace recordings.
testutils.SucceedsSoon(t, func() error {
recordings := make([]jobspb.TraceData, 0)
recordings := make([][]byte, 0)
execCfg := s.TenantOrServer().ExecutorConfig().(sql.ExecutorConfig)
edFiles, err := jobs.ListExecutionDetailFiles(ctx, execCfg.InternalDB, jobspb.JobID(jobID))
if err != nil {
Expand All @@ -232,13 +232,16 @@ func TestTraceRecordingOnResumerCompletion(t *testing.T) {
if err != nil {
return err
}
td := jobspb.TraceData{}
if err := protoutil.Unmarshal(data, &td); err != nil {
return err
recordings = append(recordings, data)
if strings.HasSuffix(f, "binpb") {
td := jobspb.TraceData{}
if err := protoutil.Unmarshal(data, &td); err != nil {
return err
}
require.NotEmpty(t, td.CollectedSpans)
}
recordings = append(recordings, td)
}
if len(recordings) != 2 {
if len(recordings) != 4 {
return errors.Newf("expected 2 entries but found %d", len(recordings))
}
return nil
Expand Down
8 changes: 0 additions & 8 deletions pkg/jobs/jobsprofiler/profilerconstants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,6 @@ func MakeNodeProcessorProgressInfoKey(flowID string, instanceID string, processo
return fmt.Sprintf("%s%s,%s,%d", NodeProcessorProgressInfoKeyPrefix, flowID, instanceID, processorID)
}

const ResumerTraceInfoKeyPrefix = "~resumer-trace-"

// MakeResumerTraceInfoKey returns the info_key used for rows that store the
// traces on completion of a resumer's execution.
func MakeResumerTraceInfoKey(traceID uint64, nodeID string) string {
return fmt.Sprintf("%s%d-%s", ResumerTraceInfoKeyPrefix, traceID, nodeID)
}

// ExecutionDetailsChunkKeyPrefix is the prefix of the info key used for rows that
// store chunks of a job's execution details.
const ExecutionDetailsChunkKeyPrefix = "~profiler/"
Expand Down
19 changes: 11 additions & 8 deletions pkg/sql/jobs_profiler_execution_details_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,11 +349,12 @@ func TestListProfilerExecutionDetails(t *testing.T) {

runner.Exec(t, `SELECT crdb_internal.request_job_execution_details($1)`, importJobID)
files := listExecutionDetails(t, s, jobspb.JobID(importJobID))
require.Len(t, files, 4)
require.Len(t, files, 5)
require.Regexp(t, "distsql\\..*\\.html", files[0])
require.Regexp(t, "goroutines\\..*\\.txt", files[1])
require.Regexp(t, "resumer-trace-n[0-9]\\..*\\.txt", files[2])
require.Regexp(t, "trace\\..*\\.zip", files[3])
require.Regexp(t, "resumer-trace-n[0-9].*\\.binpb", files[2])
require.Regexp(t, "resumer-trace-n[0-9].*\\.binpb\\.txt", files[3])
require.Regexp(t, "trace\\..*\\.zip", files[4])

// Resume the job, so it can write another DistSQL diagram and goroutine
// snapshot.
Expand All @@ -363,15 +364,17 @@ func TestListProfilerExecutionDetails(t *testing.T) {
jobutils.WaitForJobToSucceed(t, runner, jobspb.JobID(importJobID))
runner.Exec(t, `SELECT crdb_internal.request_job_execution_details($1)`, importJobID)
files = listExecutionDetails(t, s, jobspb.JobID(importJobID))
require.Len(t, files, 8)
require.Len(t, files, 10)
require.Regexp(t, "distsql\\..*\\.html", files[0])
require.Regexp(t, "distsql\\..*\\.html", files[1])
require.Regexp(t, "goroutines\\..*\\.txt", files[2])
require.Regexp(t, "goroutines\\..*\\.txt", files[3])
require.Regexp(t, "resumer-trace-n[0-9]\\..*\\.txt", files[4])
require.Regexp(t, "resumer-trace-n[0-9]\\..*\\.txt", files[5])
require.Regexp(t, "trace\\..*\\.zip", files[6])
require.Regexp(t, "trace\\..*\\.zip", files[7])
require.Regexp(t, "resumer-trace-n[0-9].*\\.binpb", files[4])
require.Regexp(t, "resumer-trace-n[0-9].*\\.binpb\\.txt", files[5])
require.Regexp(t, "resumer-trace-n[0-9].*\\.binpb", files[6])
require.Regexp(t, "resumer-trace-n[0-9].*\\.binpb\\.txt", files[7])
require.Regexp(t, "trace\\..*\\.zip", files[8])
require.Regexp(t, "trace\\..*\\.zip", files[9])
})
}

Expand Down
1 change: 1 addition & 0 deletions pkg/util/netutil/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ func (s *TCPServer) ServeWith(
serveConn(ctx, rw)
})
if err != nil {
err = errors.CombineErrors(err, rw.Close())
return err
}
}
Expand Down

0 comments on commit 4413ec7

Please sign in to comment.