diff --git a/pkg/jobs/execution_detail_utils.go b/pkg/jobs/execution_detail_utils.go index 46a7c3121341..b1a13bb5ff81 100644 --- a/pkg/jobs/execution_detail_utils.go +++ b/pkg/jobs/execution_detail_utils.go @@ -60,6 +60,9 @@ func WriteProtobinExecutionDetailFile( // `~profiler/` prefix. Files written using this method can be read using the // `ReadExecutionDetailFile` and will show up in the list of files displayed on // the jobs' Advanced Debugging DBConsole page. +// +// This method clears any existing file with the same filename before writing a +// new one. func WriteExecutionDetailFile( ctx context.Context, filename string, data []byte, txn isql.Txn, jobID jobspb.JobID, ) error { diff --git a/pkg/jobs/job_info_utils.go b/pkg/jobs/job_info_utils.go index 60c059125c6b..9420ed324601 100644 --- a/pkg/jobs/job_info_utils.go +++ b/pkg/jobs/job_info_utils.go @@ -27,12 +27,22 @@ const bundleChunkSize = 1 << 20 // 1 MiB const finalChunkSuffix = "#_final" // WriteChunkedFileToJobInfo will break up data into chunks of a fixed size, and -// gzip compress them before writing them to the job_info table +// gzip compress them before writing them to the job_info table. This method +// clears any existing chunks with the same filename before writing the new +// chunks and so if the caller wishes to preserve history they must use a +// unique filename. func WriteChunkedFileToJobInfo( ctx context.Context, filename string, data []byte, txn isql.Txn, jobID jobspb.JobID, ) error { + finalChunkName := filename + finalChunkSuffix jobInfo := InfoStorageForJob(txn, jobID) + // Clear any existing chunks with the same filename before writing new chunks. + // We clear all rows that with info keys in [filename, filename#_final~). + if err := jobInfo.DeleteRange(ctx, filename, finalChunkName+"~"); err != nil { + return err + } + var chunkCounter int var chunkName string for len(data) > 0 { @@ -46,7 +56,7 @@ func WriteChunkedFileToJobInfo( chunk = chunk[:chunkSize] } else { // This is the last chunk we will write, assign it a sentinel file name. - chunkName = filename + finalChunkSuffix + chunkName = finalChunkName } data = data[len(chunk):] var err error diff --git a/pkg/jobs/job_info_utils_test.go b/pkg/jobs/job_info_utils_test.go index e66b35d730a6..f16c6a40ba92 100644 --- a/pkg/jobs/job_info_utils_test.go +++ b/pkg/jobs/job_info_utils_test.go @@ -55,6 +55,10 @@ func TestReadWriteChunkedFileToJobInfo(t *testing.T) { name: "file greater than 1MiB", data: make([]byte, 1<<20+1), // 1 MiB + 1 byte }, + { + name: "file much greater than 1MiB", + data: make([]byte, 10<<20), // 10 MiB + }, } db := s.InternalDB().(isql.DB) @@ -78,3 +82,95 @@ func TestReadWriteChunkedFileToJobInfo(t *testing.T) { }) } } + +func TestOverwriteChunkingWithVariableLengths(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + rng, _ := randutil.NewTestRand() + ctx := context.Background() + params := base.TestServerArgs{} + params.Knobs.JobsTestingKnobs = NewTestingKnobsWithShortIntervals() + s := serverutils.StartServerOnly(t, params) + defer s.Stopper().Stop(ctx) + + tests := []struct { + name string + numChunks int + data []byte + moreChunks []byte + lessChunks []byte + }{ + { + name: "zero chunks", + data: []byte{}, + numChunks: 0, + }, + { + name: "one chunk", + numChunks: 1, + }, + { + name: "two chunks", + numChunks: 2, + }, + { + name: "five chunks", + numChunks: 5, + }, + } + + db := s.InternalDB().(isql.DB) + generateData := func(numChunks int) []byte { + data := make([]byte, (1<<20)*numChunks) + if len(data) > 0 { + randutil.ReadTestdataBytes(rng, data) + } + return data + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.data = generateData(tt.numChunks) + // Write the first file, this will generate a certain number of chunks. + require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + return WriteChunkedFileToJobInfo(ctx, tt.name, tt.data, txn, jobspb.JobID(123)) + })) + + // Overwrite the file with fewer chunks, this should delete the extra + // chunks before writing the new ones. + t.Run("overwrite with fewer chunks", func(t *testing.T) { + lessChunks := tt.numChunks - 1 + if lessChunks >= 0 { + tt.data = generateData(lessChunks) + var got []byte + require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + err := WriteChunkedFileToJobInfo(ctx, tt.name, tt.data, txn, jobspb.JobID(123)) + if err != nil { + return err + } + got, err = ReadChunkedFileToJobInfo(ctx, tt.name, txn, jobspb.JobID(123)) + return err + })) + require.Equal(t, tt.data, got) + } + }) + + // Overwrite the file with more chunks, this should delete the extra + // chunks before writing the new ones. + t.Run("overwrite with more chunks", func(t *testing.T) { + moreChunks := tt.numChunks + 1 + tt.data = generateData(moreChunks) + var got []byte + require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + err := WriteChunkedFileToJobInfo(ctx, tt.name, tt.data, txn, jobspb.JobID(123)) + if err != nil { + return err + } + got, err = ReadChunkedFileToJobInfo(ctx, tt.name, txn, jobspb.JobID(123)) + return err + })) + require.Equal(t, tt.data, got) + }) + }) + } +}