Skip to content

Commit

Permalink
jobs: fix bug in WriteChunkedFileToJobInfo during overwriting
Browse files Browse the repository at this point in the history
Previously, `WriteChunkedFileToJobInfo` would chunk up the
passed in byte slice and write the chunks to the job_info table
with info keys constructed using the filename. If the method
were to be invoked again with the same filename, due to the
delete before write semantics of the job info table, if the
number of chunks changed then we'd end up with a corrupt
file. With chunks from the first and second write mixed.

This change fixes the bug by first deleting all the chunks that
correspond to the filename before writing the new data. This
is in line with how you'd expect an overwrite operation to
work. This change also adds a regression test for the same.

Fixes: #113232
Release note (bug fix): fixes a bug in a method that was used
by some of the jobs observability infrastructure, that could be
triggered if a file was overwrriten with a different chunking
strategy
  • Loading branch information
adityamaru committed Oct 27, 2023
1 parent b8201bc commit 371e7c5
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 2 deletions.
3 changes: 3 additions & 0 deletions pkg/jobs/execution_detail_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ func WriteProtobinExecutionDetailFile(
// `~profiler/` prefix. Files written using this method can be read using the
// `ReadExecutionDetailFile` and will show up in the list of files displayed on
// the jobs' Advanced Debugging DBConsole page.
//
// This method clears any existing file with the same filename before writing a
// new one.
func WriteExecutionDetailFile(
ctx context.Context, filename string, data []byte, txn isql.Txn, jobID jobspb.JobID,
) error {
Expand Down
14 changes: 12 additions & 2 deletions pkg/jobs/job_info_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,22 @@ const bundleChunkSize = 1 << 20 // 1 MiB
const finalChunkSuffix = "#_final"

// WriteChunkedFileToJobInfo will break up data into chunks of a fixed size, and
// gzip compress them before writing them to the job_info table
// gzip compress them before writing them to the job_info table. This method
// clears any existing chunks with the same filename before writing the new
// chunks and so if the caller wishes to preserve history they must use a
// unique filename.
func WriteChunkedFileToJobInfo(
ctx context.Context, filename string, data []byte, txn isql.Txn, jobID jobspb.JobID,
) error {
finalChunkName := filename + finalChunkSuffix
jobInfo := InfoStorageForJob(txn, jobID)

// Clear any existing chunks with the same filename before writing new chunks.
// We clear all rows that with info keys in [filename, filename#_final~).
if err := jobInfo.DeleteRange(ctx, filename, finalChunkName+"~"); err != nil {
return err
}

var chunkCounter int
var chunkName string
for len(data) > 0 {
Expand All @@ -46,7 +56,7 @@ func WriteChunkedFileToJobInfo(
chunk = chunk[:chunkSize]
} else {
// This is the last chunk we will write, assign it a sentinel file name.
chunkName = filename + finalChunkSuffix
chunkName = finalChunkName
}
data = data[len(chunk):]
var err error
Expand Down
96 changes: 96 additions & 0 deletions pkg/jobs/job_info_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func TestReadWriteChunkedFileToJobInfo(t *testing.T) {
name: "file greater than 1MiB",
data: make([]byte, 1<<20+1), // 1 MiB + 1 byte
},
{
name: "file much greater than 1MiB",
data: make([]byte, 10<<20), // 10 MiB
},
}

db := s.InternalDB().(isql.DB)
Expand All @@ -78,3 +82,95 @@ func TestReadWriteChunkedFileToJobInfo(t *testing.T) {
})
}
}

func TestOverwriteChunkingWithVariableLengths(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

rng, _ := randutil.NewTestRand()
ctx := context.Background()
params := base.TestServerArgs{}
params.Knobs.JobsTestingKnobs = NewTestingKnobsWithShortIntervals()
s := serverutils.StartServerOnly(t, params)
defer s.Stopper().Stop(ctx)

tests := []struct {
name string
numChunks int
data []byte
moreChunks []byte
lessChunks []byte
}{
{
name: "zero chunks",
data: []byte{},
numChunks: 0,
},
{
name: "one chunk",
numChunks: 1,
},
{
name: "two chunks",
numChunks: 2,
},
{
name: "five chunks",
numChunks: 5,
},
}

db := s.InternalDB().(isql.DB)
generateData := func(numChunks int) []byte {
data := make([]byte, (1<<20)*numChunks)
if len(data) > 0 {
randutil.ReadTestdataBytes(rng, data)
}
return data
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.data = generateData(tt.numChunks)
// Write the first file, this will generate a certain number of chunks.
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
return WriteChunkedFileToJobInfo(ctx, tt.name, tt.data, txn, jobspb.JobID(123))
}))

// Overwrite the file with fewer chunks, this should delete the extra
// chunks before writing the new ones.
t.Run("overwrite with fewer chunks", func(t *testing.T) {
lessChunks := tt.numChunks - 1
if lessChunks >= 0 {
tt.data = generateData(lessChunks)
var got []byte
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
err := WriteChunkedFileToJobInfo(ctx, tt.name, tt.data, txn, jobspb.JobID(123))
if err != nil {
return err
}
got, err = ReadChunkedFileToJobInfo(ctx, tt.name, txn, jobspb.JobID(123))
return err
}))
require.Equal(t, tt.data, got)
}
})

// Overwrite the file with more chunks, this should delete the extra
// chunks before writing the new ones.
t.Run("overwrite with more chunks", func(t *testing.T) {
moreChunks := tt.numChunks + 1
tt.data = generateData(moreChunks)
var got []byte
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
err := WriteChunkedFileToJobInfo(ctx, tt.name, tt.data, txn, jobspb.JobID(123))
if err != nil {
return err
}
got, err = ReadChunkedFileToJobInfo(ctx, tt.name, txn, jobspb.JobID(123))
return err
}))
require.Equal(t, tt.data, got)
})
})
}
}

0 comments on commit 371e7c5

Please sign in to comment.