From 371e7c5609c8529e0469ce76a79068c432f66909 Mon Sep 17 00:00:00 2001 From: adityamaru Date: Fri, 27 Oct 2023 15:08:12 -0400 Subject: [PATCH] jobs: fix bug in WriteChunkedFileToJobInfo during overwriting Previously, `WriteChunkedFileToJobInfo` would chunk up the passed in byte slice and write the chunks to the job_info table with info keys constructed using the filename. If the method were to be invoked again with the same filename, due to the delete before write semantics of the job info table, if the number of chunks changed then we'd end up with a corrupt file. With chunks from the first and second write mixed. This change fixes the bug by first deleting all the chunks that correspond to the filename before writing the new data. This is in line with how you'd expect an overwrite operation to work. This change also adds a regression test for the same. Fixes: #113232 Release note (bug fix): fixes a bug in a method that was used by some of the jobs observability infrastructure, that could be triggered if a file was overwrriten with a different chunking strategy --- pkg/jobs/execution_detail_utils.go | 3 + pkg/jobs/job_info_utils.go | 14 ++++- pkg/jobs/job_info_utils_test.go | 96 ++++++++++++++++++++++++++++++ 3 files changed, 111 insertions(+), 2 deletions(-) diff --git a/pkg/jobs/execution_detail_utils.go b/pkg/jobs/execution_detail_utils.go index 46a7c3121341..b1a13bb5ff81 100644 --- a/pkg/jobs/execution_detail_utils.go +++ b/pkg/jobs/execution_detail_utils.go @@ -60,6 +60,9 @@ func WriteProtobinExecutionDetailFile( // `~profiler/` prefix. Files written using this method can be read using the // `ReadExecutionDetailFile` and will show up in the list of files displayed on // the jobs' Advanced Debugging DBConsole page. +// +// This method clears any existing file with the same filename before writing a +// new one. func WriteExecutionDetailFile( ctx context.Context, filename string, data []byte, txn isql.Txn, jobID jobspb.JobID, ) error { diff --git a/pkg/jobs/job_info_utils.go b/pkg/jobs/job_info_utils.go index 60c059125c6b..9420ed324601 100644 --- a/pkg/jobs/job_info_utils.go +++ b/pkg/jobs/job_info_utils.go @@ -27,12 +27,22 @@ const bundleChunkSize = 1 << 20 // 1 MiB const finalChunkSuffix = "#_final" // WriteChunkedFileToJobInfo will break up data into chunks of a fixed size, and -// gzip compress them before writing them to the job_info table +// gzip compress them before writing them to the job_info table. This method +// clears any existing chunks with the same filename before writing the new +// chunks and so if the caller wishes to preserve history they must use a +// unique filename. func WriteChunkedFileToJobInfo( ctx context.Context, filename string, data []byte, txn isql.Txn, jobID jobspb.JobID, ) error { + finalChunkName := filename + finalChunkSuffix jobInfo := InfoStorageForJob(txn, jobID) + // Clear any existing chunks with the same filename before writing new chunks. + // We clear all rows that with info keys in [filename, filename#_final~). + if err := jobInfo.DeleteRange(ctx, filename, finalChunkName+"~"); err != nil { + return err + } + var chunkCounter int var chunkName string for len(data) > 0 { @@ -46,7 +56,7 @@ func WriteChunkedFileToJobInfo( chunk = chunk[:chunkSize] } else { // This is the last chunk we will write, assign it a sentinel file name. - chunkName = filename + finalChunkSuffix + chunkName = finalChunkName } data = data[len(chunk):] var err error diff --git a/pkg/jobs/job_info_utils_test.go b/pkg/jobs/job_info_utils_test.go index e66b35d730a6..f16c6a40ba92 100644 --- a/pkg/jobs/job_info_utils_test.go +++ b/pkg/jobs/job_info_utils_test.go @@ -55,6 +55,10 @@ func TestReadWriteChunkedFileToJobInfo(t *testing.T) { name: "file greater than 1MiB", data: make([]byte, 1<<20+1), // 1 MiB + 1 byte }, + { + name: "file much greater than 1MiB", + data: make([]byte, 10<<20), // 10 MiB + }, } db := s.InternalDB().(isql.DB) @@ -78,3 +82,95 @@ func TestReadWriteChunkedFileToJobInfo(t *testing.T) { }) } } + +func TestOverwriteChunkingWithVariableLengths(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + rng, _ := randutil.NewTestRand() + ctx := context.Background() + params := base.TestServerArgs{} + params.Knobs.JobsTestingKnobs = NewTestingKnobsWithShortIntervals() + s := serverutils.StartServerOnly(t, params) + defer s.Stopper().Stop(ctx) + + tests := []struct { + name string + numChunks int + data []byte + moreChunks []byte + lessChunks []byte + }{ + { + name: "zero chunks", + data: []byte{}, + numChunks: 0, + }, + { + name: "one chunk", + numChunks: 1, + }, + { + name: "two chunks", + numChunks: 2, + }, + { + name: "five chunks", + numChunks: 5, + }, + } + + db := s.InternalDB().(isql.DB) + generateData := func(numChunks int) []byte { + data := make([]byte, (1<<20)*numChunks) + if len(data) > 0 { + randutil.ReadTestdataBytes(rng, data) + } + return data + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.data = generateData(tt.numChunks) + // Write the first file, this will generate a certain number of chunks. + require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + return WriteChunkedFileToJobInfo(ctx, tt.name, tt.data, txn, jobspb.JobID(123)) + })) + + // Overwrite the file with fewer chunks, this should delete the extra + // chunks before writing the new ones. + t.Run("overwrite with fewer chunks", func(t *testing.T) { + lessChunks := tt.numChunks - 1 + if lessChunks >= 0 { + tt.data = generateData(lessChunks) + var got []byte + require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + err := WriteChunkedFileToJobInfo(ctx, tt.name, tt.data, txn, jobspb.JobID(123)) + if err != nil { + return err + } + got, err = ReadChunkedFileToJobInfo(ctx, tt.name, txn, jobspb.JobID(123)) + return err + })) + require.Equal(t, tt.data, got) + } + }) + + // Overwrite the file with more chunks, this should delete the extra + // chunks before writing the new ones. + t.Run("overwrite with more chunks", func(t *testing.T) { + moreChunks := tt.numChunks + 1 + tt.data = generateData(moreChunks) + var got []byte + require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + err := WriteChunkedFileToJobInfo(ctx, tt.name, tt.data, txn, jobspb.JobID(123)) + if err != nil { + return err + } + got, err = ReadChunkedFileToJobInfo(ctx, tt.name, txn, jobspb.JobID(123)) + return err + })) + require.Equal(t, tt.data, got) + }) + }) + } +}