Skip to content

Commit

Permalink
jobs: add limit support to infostorage.DeleteRange
Browse files Browse the repository at this point in the history
Release note: none.
Epic: none.
  • Loading branch information
dt committed Jun 24, 2024
1 parent 286ddec commit 0f6cdc6
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 11 deletions.
27 changes: 19 additions & 8 deletions pkg/jobs/job_info_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,15 +255,26 @@ func (i InfoStorage) Delete(ctx context.Context, infoKey string) error {

// DeleteRange removes the info records between the provided
// start key (inclusive) and end key (exclusive).
func (i InfoStorage) DeleteRange(ctx context.Context, startInfoKey, endInfoKey string) error {
func (i InfoStorage) DeleteRange(ctx context.Context, startInfoKey, endInfoKey string, limit int) error {
return i.doWrite(ctx, func(ctx context.Context, j *Job, txn isql.Txn) error {
_, err := txn.ExecEx(
ctx, "write-job-info-delete", txn.KV(),
sessiondata.NodeUserSessionDataOverride,
"DELETE FROM system.job_info WHERE job_id = $1 AND info_key >= $2 AND info_key < $3",
j.ID(), startInfoKey, endInfoKey,
)
return err
if limit > 0 {
_, err := txn.ExecEx(
ctx, "write-job-info-delete", txn.KV(),
sessiondata.NodeUserSessionDataOverride,
"DELETE FROM system.job_info WHERE job_id = $1 AND info_key >= $2 AND info_key < $3 "+
"ORDER BY info_key ASC LIMIT $4",
j.ID(), startInfoKey, endInfoKey, limit,
)
return err
} else {
_, err := txn.ExecEx(
ctx, "write-job-info-delete", txn.KV(),
sessiondata.NodeUserSessionDataOverride,
"DELETE FROM system.job_info WHERE job_id = $1 AND info_key >= $2 AND info_key < $3",
j.ID(), startInfoKey, endInfoKey,
)
return err
}
})
}

Expand Down
58 changes: 56 additions & 2 deletions pkg/jobs/job_info_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestJobInfoAccessors(t *testing.T) {
job1 := createJob(1)
job2 := createJob(2)
job3 := createJob(3)
kPrefix, kA, kB, kC, kD, kZ := "🔑", "🔑A", "🔑B", "🔑C", "🔑D", "🔑Z"
kPrefix, kA, kB, kC, kD, kE, kF, kG, kZ := "🔑", "🔑A", "🔑B", "🔑C", "🔑D", "🔑E", "🔑F", "🔑G", "🔑Z"
v1, v2, v3 := []byte("val1"), []byte("val2"), []byte("val3")

// Key doesn't exist yet.
Expand Down Expand Up @@ -196,7 +196,7 @@ func TestJobInfoAccessors(t *testing.T) {
// Delete kA-kB.
require.NoError(t, idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := job2.InfoStorage(txn)
return infoStorage.DeleteRange(ctx, kA, kC)
return infoStorage.DeleteRange(ctx, kA, kC, 0)
}))
// Verify only kC remains.
i = 0
Expand All @@ -216,6 +216,60 @@ func TestJobInfoAccessors(t *testing.T) {
return err
}))

// Write kE, kF, kG.
require.NoError(t, idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := job2.InfoStorage(txn)
for _, k := range []string{kE, kF, kG} {
if err := infoStorage.Write(ctx, k, v2); err != nil {
return err
}
}
return nil
}))

// Verify we see 4 rows (c, e, f, g) in the prefix.
require.NoError(t, idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := job2.InfoStorage(txn)
count, err := infoStorage.Count(ctx, kPrefix, kZ)
if err != nil {
return err
}
require.Equal(t, 4, count)
_, ok, err := infoStorage.Get(ctx, kC)
if err != nil {
return err
}
require.True(t, ok)
return nil
}))

// Delete [k, kZ) but with a limit of 2 so just kC and kE.
require.NoError(t, idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := job2.InfoStorage(txn)
return infoStorage.DeleteRange(ctx, kC, kZ, 2)
}))

// Verify we see 2 rows (F, G) in the prefix and C and E are missing.
require.NoError(t, idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := job2.InfoStorage(txn)
count, err := infoStorage.Count(ctx, kPrefix, kZ)
if err != nil {
return err
}
require.Equal(t, 2, count)
_, ok, err := infoStorage.Get(ctx, kC)
if err != nil {
return err
}
require.False(t, ok)
_, ok, err = infoStorage.Get(ctx, kF)
if err != nil {
return err
}
require.True(t, ok)
return nil
}))

// Iterate a different job.
require.NoError(t, idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := job3.InfoStorage(txn)
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/job_info_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func WriteChunkedFileToJobInfo(
// We clear all rows that with info keys in [filename, filename#_final~). The
// trailing "~" makes the exclusive end-key inclusive of all possible chunks
// as "~" sorts after all digit.
if err := jobInfo.DeleteRange(ctx, filename, finalChunkName+"~"); err != nil {
if err := jobInfo.DeleteRange(ctx, filename, finalChunkName+"~", 0); err != nil {
return err
}

Expand Down

0 comments on commit 0f6cdc6

Please sign in to comment.