diff --git a/pkg/jobs/job_info_storage.go b/pkg/jobs/job_info_storage.go index 4a0b963eb281..c50e9199b17f 100644 --- a/pkg/jobs/job_info_storage.go +++ b/pkg/jobs/job_info_storage.go @@ -255,15 +255,26 @@ func (i InfoStorage) Delete(ctx context.Context, infoKey string) error { // DeleteRange removes the info records between the provided // start key (inclusive) and end key (exclusive). -func (i InfoStorage) DeleteRange(ctx context.Context, startInfoKey, endInfoKey string) error { +func (i InfoStorage) DeleteRange(ctx context.Context, startInfoKey, endInfoKey string, limit int) error { return i.doWrite(ctx, func(ctx context.Context, j *Job, txn isql.Txn) error { - _, err := txn.ExecEx( - ctx, "write-job-info-delete", txn.KV(), - sessiondata.NodeUserSessionDataOverride, - "DELETE FROM system.job_info WHERE job_id = $1 AND info_key >= $2 AND info_key < $3", - j.ID(), startInfoKey, endInfoKey, - ) - return err + if limit > 0 { + _, err := txn.ExecEx( + ctx, "write-job-info-delete", txn.KV(), + sessiondata.NodeUserSessionDataOverride, + "DELETE FROM system.job_info WHERE job_id = $1 AND info_key >= $2 AND info_key < $3 "+ + "ORDER BY info_key ASC LIMIT $4", + j.ID(), startInfoKey, endInfoKey, limit, + ) + return err + } else { + _, err := txn.ExecEx( + ctx, "write-job-info-delete", txn.KV(), + sessiondata.NodeUserSessionDataOverride, + "DELETE FROM system.job_info WHERE job_id = $1 AND info_key >= $2 AND info_key < $3", + j.ID(), startInfoKey, endInfoKey, + ) + return err + } }) } diff --git a/pkg/jobs/job_info_storage_test.go b/pkg/jobs/job_info_storage_test.go index e5e9d629497d..d64d5dc6946c 100644 --- a/pkg/jobs/job_info_storage_test.go +++ b/pkg/jobs/job_info_storage_test.go @@ -58,7 +58,7 @@ func TestJobInfoAccessors(t *testing.T) { job1 := createJob(1) job2 := createJob(2) job3 := createJob(3) - kPrefix, kA, kB, kC, kD, kZ := "🔑", "🔑A", "🔑B", "🔑C", "🔑D", "🔑Z" + kPrefix, kA, kB, kC, kD, kE, kF, kG, kZ := "🔑", "🔑A", "🔑B", "🔑C", "🔑D", "🔑E", "🔑F", "🔑G", "🔑Z" v1, v2, v3 := []byte("val1"), []byte("val2"), []byte("val3") // Key doesn't exist yet. @@ -196,7 +196,7 @@ func TestJobInfoAccessors(t *testing.T) { // Delete kA-kB. require.NoError(t, idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { infoStorage := job2.InfoStorage(txn) - return infoStorage.DeleteRange(ctx, kA, kC) + return infoStorage.DeleteRange(ctx, kA, kC, 0) })) // Verify only kC remains. i = 0 @@ -216,6 +216,60 @@ func TestJobInfoAccessors(t *testing.T) { return err })) + // Write kE, kF, kG. + require.NoError(t, idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + infoStorage := job2.InfoStorage(txn) + for _, k := range []string{kE, kF, kG} { + if err := infoStorage.Write(ctx, k, v2); err != nil { + return err + } + } + return nil + })) + + // Verify we see 4 rows (c, e, f, g) in the prefix. + require.NoError(t, idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + infoStorage := job2.InfoStorage(txn) + count, err := infoStorage.Count(ctx, kPrefix, kZ) + if err != nil { + return err + } + require.Equal(t, 4, count) + _, ok, err := infoStorage.Get(ctx, kC) + if err != nil { + return err + } + require.True(t, ok) + return nil + })) + + // Delete [k, kZ) but with a limit of 2 so just kC and kE. + require.NoError(t, idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + infoStorage := job2.InfoStorage(txn) + return infoStorage.DeleteRange(ctx, kC, kZ, 2) + })) + + // Verify we see 2 rows (F, G) in the prefix and C and E are missing. + require.NoError(t, idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + infoStorage := job2.InfoStorage(txn) + count, err := infoStorage.Count(ctx, kPrefix, kZ) + if err != nil { + return err + } + require.Equal(t, 2, count) + _, ok, err := infoStorage.Get(ctx, kC) + if err != nil { + return err + } + require.False(t, ok) + _, ok, err = infoStorage.Get(ctx, kF) + if err != nil { + return err + } + require.True(t, ok) + return nil + })) + // Iterate a different job. require.NoError(t, idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { infoStorage := job3.InfoStorage(txn) diff --git a/pkg/jobs/job_info_utils.go b/pkg/jobs/job_info_utils.go index f999bd0cca11..3fff605f5ce6 100644 --- a/pkg/jobs/job_info_utils.go +++ b/pkg/jobs/job_info_utils.go @@ -41,7 +41,7 @@ func WriteChunkedFileToJobInfo( // We clear all rows that with info keys in [filename, filename#_final~). The // trailing "~" makes the exclusive end-key inclusive of all possible chunks // as "~" sorts after all digit. - if err := jobInfo.DeleteRange(ctx, filename, finalChunkName+"~"); err != nil { + if err := jobInfo.DeleteRange(ctx, filename, finalChunkName+"~", 0); err != nil { return err }