Skip to content

Commit

Permalink
Merge #113241
Browse files Browse the repository at this point in the history
113241: jobs: fix bug in WriteChunkedFileToJobInfo during overwriting r=stevendanna a=adityamaru

Previously, `WriteChunkedFileToJobInfo` would chunk up the passed in byte slice and write the chunks to the job_info table with info keys constructed using the filename. If the method were to be invoked again with the same filename, due to the delete before write semantics of the job info table, if the number of chunks changed then we'd end up with a corrupt file. With chunks from the first and second write mixed.

This change fixes the bug by first deleting all the chunks that correspond to the filename before writing the new data. This is in line with how you'd expect an overwrite operation to work. This change also adds a regression test for the same.

Fixes: #113232
Release note (bug fix): fixes a bug in a method that was used by some of the jobs observability infrastructure, that could be triggered if a file was overwrriten with a different chunking strategy

Co-authored-by: adityamaru <[email protected]>
  • Loading branch information
craig[bot] and adityamaru committed Oct 30, 2023
2 parents b5ddb40 + 371e7c5 commit 785ae93
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 2 deletions.
3 changes: 3 additions & 0 deletions pkg/jobs/execution_detail_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ func WriteProtobinExecutionDetailFile(
// `~profiler/` prefix. Files written using this method can be read using the
// `ReadExecutionDetailFile` and will show up in the list of files displayed on
// the jobs' Advanced Debugging DBConsole page.
//
// This method clears any existing file with the same filename before writing a
// new one.
func WriteExecutionDetailFile(
ctx context.Context, filename string, data []byte, txn isql.Txn, jobID jobspb.JobID,
) error {
Expand Down
14 changes: 12 additions & 2 deletions pkg/jobs/job_info_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,22 @@ const bundleChunkSize = 1 << 20 // 1 MiB
const finalChunkSuffix = "#_final"

// WriteChunkedFileToJobInfo will break up data into chunks of a fixed size, and
// gzip compress them before writing them to the job_info table
// gzip compress them before writing them to the job_info table. This method
// clears any existing chunks with the same filename before writing the new
// chunks and so if the caller wishes to preserve history they must use a
// unique filename.
func WriteChunkedFileToJobInfo(
ctx context.Context, filename string, data []byte, txn isql.Txn, jobID jobspb.JobID,
) error {
finalChunkName := filename + finalChunkSuffix
jobInfo := InfoStorageForJob(txn, jobID)

// Clear any existing chunks with the same filename before writing new chunks.
// We clear all rows that with info keys in [filename, filename#_final~).
if err := jobInfo.DeleteRange(ctx, filename, finalChunkName+"~"); err != nil {
return err
}

var chunkCounter int
var chunkName string
for len(data) > 0 {
Expand All @@ -46,7 +56,7 @@ func WriteChunkedFileToJobInfo(
chunk = chunk[:chunkSize]
} else {
// This is the last chunk we will write, assign it a sentinel file name.
chunkName = filename + finalChunkSuffix
chunkName = finalChunkName
}
data = data[len(chunk):]
var err error
Expand Down
96 changes: 96 additions & 0 deletions pkg/jobs/job_info_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func TestReadWriteChunkedFileToJobInfo(t *testing.T) {
name: "file greater than 1MiB",
data: make([]byte, 1<<20+1), // 1 MiB + 1 byte
},
{
name: "file much greater than 1MiB",
data: make([]byte, 10<<20), // 10 MiB
},
}

db := s.InternalDB().(isql.DB)
Expand All @@ -78,3 +82,95 @@ func TestReadWriteChunkedFileToJobInfo(t *testing.T) {
})
}
}

func TestOverwriteChunkingWithVariableLengths(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

rng, _ := randutil.NewTestRand()
ctx := context.Background()
params := base.TestServerArgs{}
params.Knobs.JobsTestingKnobs = NewTestingKnobsWithShortIntervals()
s := serverutils.StartServerOnly(t, params)
defer s.Stopper().Stop(ctx)

tests := []struct {
name string
numChunks int
data []byte
moreChunks []byte
lessChunks []byte
}{
{
name: "zero chunks",
data: []byte{},
numChunks: 0,
},
{
name: "one chunk",
numChunks: 1,
},
{
name: "two chunks",
numChunks: 2,
},
{
name: "five chunks",
numChunks: 5,
},
}

db := s.InternalDB().(isql.DB)
generateData := func(numChunks int) []byte {
data := make([]byte, (1<<20)*numChunks)
if len(data) > 0 {
randutil.ReadTestdataBytes(rng, data)
}
return data
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.data = generateData(tt.numChunks)
// Write the first file, this will generate a certain number of chunks.
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
return WriteChunkedFileToJobInfo(ctx, tt.name, tt.data, txn, jobspb.JobID(123))
}))

// Overwrite the file with fewer chunks, this should delete the extra
// chunks before writing the new ones.
t.Run("overwrite with fewer chunks", func(t *testing.T) {
lessChunks := tt.numChunks - 1
if lessChunks >= 0 {
tt.data = generateData(lessChunks)
var got []byte
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
err := WriteChunkedFileToJobInfo(ctx, tt.name, tt.data, txn, jobspb.JobID(123))
if err != nil {
return err
}
got, err = ReadChunkedFileToJobInfo(ctx, tt.name, txn, jobspb.JobID(123))
return err
}))
require.Equal(t, tt.data, got)
}
})

// Overwrite the file with more chunks, this should delete the extra
// chunks before writing the new ones.
t.Run("overwrite with more chunks", func(t *testing.T) {
moreChunks := tt.numChunks + 1
tt.data = generateData(moreChunks)
var got []byte
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
err := WriteChunkedFileToJobInfo(ctx, tt.name, tt.data, txn, jobspb.JobID(123))
if err != nil {
return err
}
got, err = ReadChunkedFileToJobInfo(ctx, tt.name, txn, jobspb.JobID(123))
return err
}))
require.Equal(t, tt.data, got)
})
})
}
}

0 comments on commit 785ae93

Please sign in to comment.