Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-24.1: jobs: limit number of retained dsp-diag-url info rows #126122

Merged
merged 4 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 49 additions & 8 deletions pkg/jobs/job_info_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,18 +255,59 @@ func (i InfoStorage) Delete(ctx context.Context, infoKey string) error {

// DeleteRange removes the info records between the provided
// start key (inclusive) and end key (exclusive).
func (i InfoStorage) DeleteRange(ctx context.Context, startInfoKey, endInfoKey string) error {
func (i InfoStorage) DeleteRange(
ctx context.Context, startInfoKey, endInfoKey string, limit int,
) error {
return i.doWrite(ctx, func(ctx context.Context, j *Job, txn isql.Txn) error {
_, err := txn.ExecEx(
ctx, "write-job-info-delete", txn.KV(),
sessiondata.NodeUserSessionDataOverride,
"DELETE FROM system.job_info WHERE job_id = $1 AND info_key >= $2 AND info_key < $3",
j.ID(), startInfoKey, endInfoKey,
)
return err
if limit > 0 {
_, err := txn.ExecEx(
ctx, "write-job-info-delete", txn.KV(),
sessiondata.NodeUserSessionDataOverride,
"DELETE FROM system.job_info WHERE job_id = $1 AND info_key >= $2 AND info_key < $3 "+
"ORDER BY info_key ASC LIMIT $4",
j.ID(), startInfoKey, endInfoKey, limit,
)
return err
} else {
_, err := txn.ExecEx(
ctx, "write-job-info-delete", txn.KV(),
sessiondata.NodeUserSessionDataOverride,
"DELETE FROM system.job_info WHERE job_id = $1 AND info_key >= $2 AND info_key < $3",
j.ID(), startInfoKey, endInfoKey,
)
return err
}
})
}

// Count counts the info records in the range [start, end).
func (i InfoStorage) Count(ctx context.Context, startInfoKey, endInfoKey string) (int, error) {
if i.txn == nil {
return 0, errors.New("cannot access the job info table without an associated txn")
}

ctx, sp := tracing.ChildSpan(ctx, "count-job-info")
defer sp.Finish()

row, err := i.txn.QueryRowEx(
ctx, "job-info-count", i.txn.KV(),
sessiondata.NodeUserSessionDataOverride,
"SELECT count(*) FROM system.job_info WHERE job_id = $1 AND info_key >= $2 AND info_key < $3",
i.j.ID(), startInfoKey, endInfoKey,
)

if err != nil || row == nil {
return 0, err
}

value, ok := row[0].(*tree.DInt)
if !ok {
return 0, errors.AssertionFailedf("job info: expected value to be DInt (was %T)", row[0])
}

return int(*value), nil
}

// Iterate iterates though the info records for a given job and info key prefix.
func (i InfoStorage) Iterate(
ctx context.Context, infoPrefix string, fn func(infoKey string, value []byte) error,
Expand Down
71 changes: 69 additions & 2 deletions pkg/jobs/job_info_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestJobInfoAccessors(t *testing.T) {
job1 := createJob(1)
job2 := createJob(2)
job3 := createJob(3)
kPrefix, kA, kB, kC, kD := "🔑", "🔑A", "🔑B", "🔑C", "🔑D"
kPrefix, kA, kB, kC, kD, kE, kF, kG, kZ := "🔑", "🔑A", "🔑B", "🔑C", "🔑D", "🔑E", "🔑F", "🔑G", "🔑Z"
v1, v2, v3 := []byte("val1"), []byte("val2"), []byte("val3")

// Key doesn't exist yet.
Expand Down Expand Up @@ -156,6 +156,13 @@ func TestJobInfoAccessors(t *testing.T) {
}))
require.Equal(t, 3, i)

require.NoError(t, idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := job2.InfoStorage(txn)
count, err := infoStorage.Count(ctx, kPrefix, kZ)
require.Equal(t, 3, count)
return err
}))

// Add a new revision to kC.
require.NoError(t, idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := job2.InfoStorage(txn)
Expand Down Expand Up @@ -189,7 +196,7 @@ func TestJobInfoAccessors(t *testing.T) {
// Delete kA-kB.
require.NoError(t, idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := job2.InfoStorage(txn)
return infoStorage.DeleteRange(ctx, kA, kC)
return infoStorage.DeleteRange(ctx, kA, kC, 0)
}))
// Verify only kC remains.
i = 0
Expand All @@ -202,6 +209,66 @@ func TestJobInfoAccessors(t *testing.T) {
})
}))
require.Equal(t, 1, i)
require.NoError(t, idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := job2.InfoStorage(txn)
count, err := infoStorage.Count(ctx, kPrefix, kZ)
require.Equal(t, 1, count)
return err
}))

// Write kE, kF, kG.
require.NoError(t, idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := job2.InfoStorage(txn)
for _, k := range []string{kE, kF, kG} {
if err := infoStorage.Write(ctx, k, v2); err != nil {
return err
}
}
return nil
}))

// Verify we see 4 rows (c, e, f, g) in the prefix.
require.NoError(t, idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := job2.InfoStorage(txn)
count, err := infoStorage.Count(ctx, kPrefix, kZ)
if err != nil {
return err
}
require.Equal(t, 4, count)
_, ok, err := infoStorage.Get(ctx, kC)
if err != nil {
return err
}
require.True(t, ok)
return nil
}))

// Delete [k, kZ) but with a limit of 2 so just kC and kE.
require.NoError(t, idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := job2.InfoStorage(txn)
return infoStorage.DeleteRange(ctx, kC, kZ, 2)
}))

// Verify we see 2 rows (F, G) in the prefix and C and E are missing.
require.NoError(t, idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := job2.InfoStorage(txn)
count, err := infoStorage.Count(ctx, kPrefix, kZ)
if err != nil {
return err
}
require.Equal(t, 2, count)
_, ok, err := infoStorage.Get(ctx, kC)
if err != nil {
return err
}
require.False(t, ok)
_, ok, err = infoStorage.Get(ctx, kF)
if err != nil {
return err
}
require.True(t, ok)
return nil
}))

// Iterate a different job.
require.NoError(t, idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
Expand Down
6 changes: 4 additions & 2 deletions pkg/jobs/job_info_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ func WriteChunkedFileToJobInfo(
jobInfo := InfoStorageForJob(txn, jobID)

// Clear any existing chunks with the same filename before writing new chunks.
// We clear all rows that with info keys in [filename, filename#_final~).
if err := jobInfo.DeleteRange(ctx, filename, finalChunkName+"~"); err != nil {
// We clear all rows that with info keys in [filename, filename#_final~). The
// trailing "~" makes the exclusive end-key inclusive of all possible chunks
// as "~" sorts after all digit.
if err := jobInfo.DeleteRange(ctx, filename, finalChunkName+"~", 0); err != nil {
return err
}

Expand Down
1 change: 1 addition & 0 deletions pkg/jobs/jobsprofiler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ go_test(
"//pkg/sql",
"//pkg/sql/execinfrapb",
"//pkg/sql/isql",
"//pkg/sql/physicalplan",
"//pkg/testutils",
"//pkg/testutils/jobutils",
"//pkg/testutils/serverutils",
Expand Down
25 changes: 25 additions & 0 deletions pkg/jobs/jobsprofiler/profiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

const MaxRetainedDSPDiagramsPerJob = 5

// dspDiagMaxCulledPerWrite limits how many old diagrams writing a new one will
// cull to try to maintain the limit of 5; typically it would cull no more than
// one in the steady-state but an upgrading cluster that has accumulated many
// old rows might try to cull more, so we bound how many are eligible at a time
// to some large but finite upper-bound.
const dspDiagMaxCulledPerWrite = 100

// StorePlanDiagram stores the DistSQL diagram generated from p in the job info
// table. The generation of the plan diagram and persistence to the info table
// are done asynchronously and this method does not block on their completion.
Expand All @@ -46,6 +55,22 @@ func StorePlanDiagram(

dspKey := profilerconstants.MakeDSPDiagramInfoKey(timeutil.Now().UnixNano())
infoStorage := jobs.InfoStorageForJob(txn, jobID)

// Limit total retained diagrams by culling older ones as needed.
count, err := infoStorage.Count(ctx, profilerconstants.DSPDiagramInfoKeyPrefix, profilerconstants.DSPDiagramInfoKeyMax)
if err != nil {
return err
}
const keep = MaxRetainedDSPDiagramsPerJob - 1
if toCull := min(count-keep, dspDiagMaxCulledPerWrite); toCull > 0 {
if err := infoStorage.DeleteRange(
ctx, profilerconstants.DSPDiagramInfoKeyPrefix, profilerconstants.DSPDiagramInfoKeyMax, toCull,
); err != nil {
return err
}
}

// Write the new diagram.
return infoStorage.Write(ctx, dspKey, []byte(diagURL.String()))
})
// Don't log the error if the context has been canceled. This will likely be
Expand Down
24 changes: 24 additions & 0 deletions pkg/jobs/jobsprofiler/profiler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand All @@ -51,6 +52,29 @@ func TestProfilerStorePlanDiagram(t *testing.T) {
ctx := context.Background()
defer s.Stopper().Stop(ctx)

// First verify that directly calling StorePlanDiagram writes n times causes
// the expected number of persisted rows in job_info, respecting the limit.
db := s.ExecutorConfig().(sql.ExecutorConfig).InternalDB
const fakeJobID = 4567
plan := &sql.PhysicalPlan{PhysicalPlan: physicalplan.MakePhysicalPlan(&physicalplan.PhysicalInfrastructure{})}
for i := 1; i < 10; i++ {
jobsprofiler.StorePlanDiagram(ctx, s.ApplicationLayer().AppStopper(), plan, db, fakeJobID)
testutils.SucceedsSoon(t, func() error {
var count int
if err := sqlDB.QueryRow(
`SELECT count(*) FROM system.job_info WHERE job_id = $1`, fakeJobID,
).Scan(&count); err != nil {
return err
}
if expected := min(i, jobsprofiler.MaxRetainedDSPDiagramsPerJob); count != expected {
return errors.Errorf("expected %d rows, got %d", expected, count)
}
return nil
})
}

// Now run various jobs that have been extended to persist diagrams and make
// sure that they also create persisted diagram rows.
_, err := sqlDB.Exec(`CREATE DATABASE test`)
require.NoError(t, err)
_, err = sqlDB.Exec(`CREATE TABLE foo (id INT PRIMARY KEY)`)
Expand Down
3 changes: 3 additions & 0 deletions pkg/jobs/jobsprofiler/profilerconstants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (

const DSPDiagramInfoKeyPrefix = "~dsp-diag-url-"

// DSPDiagramInfoKeyMax sorts after any diagram info key, because `:“ > [0-9].
const DSPDiagramInfoKeyMax = DSPDiagramInfoKeyPrefix + ":"

// MakeDSPDiagramInfoKey constructs an ephemeral DSP diagram info key.
func MakeDSPDiagramInfoKey(timestampInNanos int64) string {
return fmt.Sprintf("%s%d", DSPDiagramInfoKeyPrefix, timestampInNanos)
Expand Down