Skip to content

Commit

Permalink
jobs,*: read job payload and progress from the job_info table
Browse files Browse the repository at this point in the history
This change touches all the parts of the code that were
relying on the `system.jobs` table to read the payload and/or
progress corresponding to a job. If the cluster version has advanced
past V23_1JobInfoTableIsBackfilled, every job in the system.jobs table
has a payload and progress entry written to the `job_info` table.
This change migrates callers to use this new table when reading the
payload and progress of a job.

The most important changes are in the logic that drives `crdb_internal.system_jobs`,
which in turn drives `crdb_internal.jobs` and `SHOW JOBS`. Additionally,
there are changes in how the registry resolves the progress and payload
when loading or resuming a job.

Several tests that read the payload and progress from `system.jobs` now
rely on `crdb_internal.system_jobs` to fetch this information from either
the jobs table or job_info table depending on the cluster version.

Fixes: #97762

Release note: None
  • Loading branch information
adityamaru committed Mar 16, 2023
1 parent 125aa4a commit 741873f
Show file tree
Hide file tree
Showing 20 changed files with 413 additions and 101 deletions.
10 changes: 9 additions & 1 deletion pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -954,7 +954,9 @@ func backupAndRestore(
}

found := false
const stmt = "SELECT payload FROM system.jobs ORDER BY created DESC LIMIT 10"
stmt := `
SELECT payload FROM "".crdb_internal.system_jobs ORDER BY created DESC LIMIT 10
`
rows := sqlDB.Query(t, stmt)
for rows.Next() {
var payloadBytes []byte
Expand Down Expand Up @@ -1673,6 +1675,12 @@ func createAndWaitForJob(
t, `INSERT INTO system.jobs (created, status, payload, progress) VALUES ($1, $2, $3, $4) RETURNING id`,
timeutil.FromUnixMicros(now), jobs.StatusRunning, payload, progressBytes,
).Scan(&jobID)
db.Exec(
t, `INSERT INTO system.job_info (job_id, info_key, value) VALUES ($1, $2, $3)`, jobID, jobs.GetLegacyPayloadKey(), payload,
)
db.Exec(
t, `INSERT INTO system.job_info (job_id, info_key, value) VALUES ($1, $2, $3)`, jobID, jobs.GetLegacyProgressKey(), progressBytes,
)
jobutils.WaitForJobToSucceed(t, db, jobID)
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,8 @@ func TestChangefeedCursor(t *testing.T) {
// statement timestamp, so only verify this for enterprise.
if e, ok := fooLogical.(cdctest.EnterpriseTestFeed); ok {
var bytes []byte
sqlDB.QueryRow(t, `SELECT payload FROM system.jobs WHERE id=$1`, e.JobID()).Scan(&bytes)
sqlDB.QueryRow(t, `SELECT payload FROM "".crdb_internal.system_jobs WHERE id = $1`,
e.JobID()).Scan(&bytes)
var payload jobspb.Payload
require.NoError(t, protoutil.Unmarshal(bytes, &payload))
require.Equal(t, parseTimeToHLC(t, tsLogical), payload.GetChangefeed().StatementTime)
Expand Down
7 changes: 4 additions & 3 deletions pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,10 +459,11 @@ func (f *jobFeed) Resume() error {

// Details implements FeedJob interface.
func (f *jobFeed) Details() (*jobspb.ChangefeedDetails, error) {
stmt := `
SELECT payload FROM "".crdb_internal.system_jobs WHERE id = $1
`
var payloadBytes []byte
if err := f.db.QueryRow(
`SELECT payload FROM system.jobs WHERE id=$1`, f.jobID,
).Scan(&payloadBytes); err != nil {
if err := f.db.QueryRow(stmt, f.jobID).Scan(&payloadBytes); err != nil {
return nil, errors.Wrapf(err, "Details for job %d", f.jobID)
}
var payload jobspb.Payload
Expand Down
12 changes: 8 additions & 4 deletions pkg/ccl/streamingccl/replicationutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,10 @@ func TestingGetStreamIngestionStatsNoHeartbeatFromReplicationJob(
var progressBytes []byte
var payload jobspb.Payload
var progress jobspb.Progress
sqlRunner.QueryRow(t, "SELECT payload, progress FROM system.jobs WHERE id = $1",
ingestionJobID).Scan(&payloadBytes, &progressBytes)
stmt := `
SELECT payload, progress FROM "".crdb_internal.system_jobs WHERE id = $1
`
sqlRunner.QueryRow(t, stmt, ingestionJobID).Scan(&payloadBytes, &progressBytes)
require.NoError(t, protoutil.Unmarshal(payloadBytes, &payload))
require.NoError(t, protoutil.Unmarshal(progressBytes, &progress))
details := payload.GetStreamIngestion()
Expand All @@ -204,8 +206,10 @@ func TestingGetStreamIngestionStatsFromReplicationJob(
var progressBytes []byte
var payload jobspb.Payload
var progress jobspb.Progress
sqlRunner.QueryRow(t, "SELECT payload, progress FROM system.jobs WHERE id = $1",
ingestionJobID).Scan(&payloadBytes, &progressBytes)
stmt := `
SELECT payload, progress FROM "".crdb_internal.system_jobs WHERE id = $1
`
sqlRunner.QueryRow(t, stmt, ingestionJobID).Scan(&payloadBytes, &progressBytes)
require.NoError(t, protoutil.Unmarshal(payloadBytes, &payload))
require.NoError(t, protoutil.Unmarshal(progressBytes, &progress))
details := payload.GetStreamIngestion()
Expand Down
4 changes: 3 additions & 1 deletion pkg/cli/doctor.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,9 @@ FROM system.namespace`
return nil, nil, nil, err
}

stmt = `SELECT id, status, payload, progress FROM system.jobs`
stmt = `
SELECT id, status, payload, progress FROM "".crdb_internal.system_jobs
`
jobsTable = make(doctor.JobsTable, 0)

if err := selectRowsMap(sqlConn, stmt, make([]driver.Value, 4), func(vals []driver.Value) error {
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ go_library(
"@com_github_aws_aws_sdk_go_v2_service_rds//:rds",
"@com_github_aws_aws_sdk_go_v2_service_rds//types",
"@com_github_aws_aws_sdk_go_v2_service_secretsmanager//:secretsmanager",
"@com_github_cockroachdb_apd_v3//:apd",
"@com_github_cockroachdb_cockroach_go_v2//crdb",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_ttycolor//:ttycolor",
Expand Down
7 changes: 4 additions & 3 deletions pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2255,12 +2255,13 @@ func (c *changefeedInfo) GetError() string { return c.status }
var _ jobInfo = (*changefeedInfo)(nil)

func getChangefeedInfo(db *gosql.DB, jobID int) (*changefeedInfo, error) {
stmt := `
SELECT status, payload, progress FROM "".crdb_internal.system_jobs WHERE id = $1
`
var status string
var payloadBytes []byte
var progressBytes []byte
if err := db.QueryRow(
`SELECT status, payload, progress FROM system.jobs WHERE id = $1`, jobID,
).Scan(&status, &payloadBytes, &progressBytes); err != nil {
if err := db.QueryRow(stmt, jobID).Scan(&status, &payloadBytes, &progressBytes); err != nil {
return nil, err
}
var payload jobspb.Payload
Expand Down
35 changes: 16 additions & 19 deletions pkg/cmd/roachtest/tests/cluster_to_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sort"
"time"

"github.com/cockroachdb/apd/v3"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/clusterstats"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
Expand Down Expand Up @@ -471,7 +472,7 @@ func (sp *replicationTestSpec) stopReplicationStream(ingestionJob int, cutoverTi
err := retry.ForDuration(time.Minute*5, func() error {
var status string
var payloadBytes []byte
sp.setup.dst.sysSQL.QueryRow(sp.t, `SELECT status, payload FROM system.jobs WHERE id = $1`,
sp.setup.dst.sysSQL.QueryRow(sp.t, `SELECT status, payload FROM crdb_internal.jobs WHERE job_id = $1`,
ingestionJob).Scan(&status, &payloadBytes)
if jobs.Status(status) == jobs.StatusFailed {
payload := &jobspb.Payload{}
Expand Down Expand Up @@ -722,32 +723,28 @@ func (c *streamIngesitonJobInfo) GetError() string { return c.status }
var _ jobInfo = (*streamIngesitonJobInfo)(nil)

func getStreamIngestionJobInfo(db *gosql.DB, jobID int) (jobInfo, error) {
var status string
var payloadBytes []byte
var progressBytes []byte
var status, errMsg string
var decimalHighWater *apd.Decimal
var finished hlc.Timestamp
if err := db.QueryRow(
`SELECT status, payload, progress FROM system.jobs WHERE id = $1`, jobID,
).Scan(&status, &payloadBytes, &progressBytes); err != nil {
return nil, err
}
var payload jobspb.Payload
if err := protoutil.Unmarshal(payloadBytes, &payload); err != nil {
return nil, err
}
var progress jobspb.Progress
if err := protoutil.Unmarshal(progressBytes, &progress); err != nil {
`SELECT status, high_water_timestamp, error, finished FROM crdb_internal.jobs WHERE job_id = $1`, jobID,
).Scan(&status, &decimalHighWater, &errMsg, &finished); err != nil {
return nil, err
}
var highwaterTime time.Time
highwater := progress.GetHighWater()
if highwater != nil {
highwaterTime = highwater.GoTime()
if decimalHighWater != nil {
var err error
highwaterTimeHLC, err := hlc.DecimalToHLC(decimalHighWater)
if err != nil {
return nil, err
}
highwaterTime = highwaterTimeHLC.GoTime()
}
return &streamIngesitonJobInfo{
status: status,
errMsg: payload.Error,
errMsg: errMsg,
highwaterTime: highwaterTime,
finishedTime: time.UnixMicro(payload.FinishedMicros),
finishedTime: time.UnixMicro(finished.GoTime().UnixMicro()),
}, nil
}

Expand Down
71 changes: 59 additions & 12 deletions pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"strconv"
"sync"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/multitenant"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -144,10 +146,19 @@ COALESCE(last_run, created) + least(
processQueryWithBackoff = processQueryBase + ", " + canRunArgs +
" WHERE " + processQueryWhereBase + " AND " + canRunClause

resumeQueryBaseCols = "status, payload, progress, crdb_internal.sql_liveness_is_alive(claim_session_id)"
resumeQueryWhereBase = `id = $1 AND claim_session_id = $2`
// resumeQueryBaseCols selects NULL values for the payload and progress that
// will be read from the system.job_info table. This allows us to get results
// aligned with deprecatedResumeQueryBaseCols below.
resumeQueryBaseCols = "status, NULL, NULL, crdb_internal.sql_liveness_is_alive(claim_session_id)"
resumeQueryWithBackoff = `SELECT ` + resumeQueryBaseCols + `, ` + canRunClause + ` AS can_run,` +
` created_by_type, created_by_id FROM system.jobs, ` + canRunArgs + " WHERE " + resumeQueryWhereBase

// deprecatedResumeQueryBaseCols loads the payload and progress from
// system.jobs instead of the system.job_info table.
deprecatedResumeQueryBaseCols = "status, payload, progress, crdb_internal.sql_liveness_is_alive(claim_session_id)"
resumeQueryWhereBase = `id = $1 AND claim_session_id = $2`
deprecatedResumeQueryWithBackoff = `SELECT ` + deprecatedResumeQueryBaseCols + `, ` + canRunClause + ` AS can_run,` +
` created_by_type, created_by_id FROM system.jobs, ` + canRunArgs + " WHERE " + resumeQueryWhereBase
)

// getProcessQuery returns the query that selects the jobs that are claimed
Expand Down Expand Up @@ -245,7 +256,14 @@ func (r *Registry) resumeJob(
ctx context.Context, jobID jobspb.JobID, s sqlliveness.Session,
) (retErr error) {
log.Infof(ctx, "job %d: resuming execution", jobID)
resumeQuery := resumeQueryWithBackoff

readPayloadAndProgressFromJobInfo := r.settings.Version.IsActive(ctx, clusterversion.V23_1JobInfoTableIsBackfilled)
var resumeQuery string
if readPayloadAndProgressFromJobInfo {
resumeQuery = resumeQueryWithBackoff
} else {
resumeQuery = deprecatedResumeQueryWithBackoff
}
args := []interface{}{jobID, s.ID().UnsafeBytes(),
r.clock.Now().GoTime(), r.RetryInitialDelay(), r.RetryMaxDelay()}
row, err := r.db.Executor().QueryRowEx(
Expand Down Expand Up @@ -291,22 +309,51 @@ func (r *Registry) resumeJob(
return nil
}

payload, err := UnmarshalPayload(row[1])
createdBy, err := unmarshalCreatedBy(row[5], row[6])
if err != nil {
return err
}
job := &Job{id: jobID, registry: r, createdBy: createdBy}

progress, err := UnmarshalProgress(row[2])
if err != nil {
return err
}
payload := &jobspb.Payload{}
progress := &jobspb.Progress{}
if readPayloadAndProgressFromJobInfo {
if err := r.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := job.InfoStorage(txn)
payloadBytes, exists, err := infoStorage.GetLegacyPayload(ctx)
if err != nil {
return err
}
if !exists {
return errors.Wrap(&JobNotFoundError{jobID: jobID}, "job not found in system.job_info")
}
if err := protoutil.Unmarshal(payloadBytes, payload); err != nil {
return err
}

createdBy, err := unmarshalCreatedBy(row[5], row[6])
if err != nil {
return err
progressBytes, exists, err := infoStorage.GetLegacyProgress(ctx)
if err != nil {
return err
}
if !exists {
return errors.Wrap(&JobNotFoundError{jobID: jobID}, "job not found in system.job_info")
}
return protoutil.Unmarshal(progressBytes, progress)
}); err != nil {
return err
}
} else {
payload, err = UnmarshalPayload(row[1])
if err != nil {
return err
}

progress, err = UnmarshalProgress(row[2])
if err != nil {
return err
}
}

job := &Job{id: jobID, registry: r, createdBy: createdBy}
job.mu.payload = *payload
job.mu.progress = *progress
job.session = s
Expand Down
6 changes: 5 additions & 1 deletion pkg/jobs/job_info_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,14 @@ func (i InfoStorage) get(ctx context.Context, infoKey []byte) ([]byte, bool, err
defer sp.Finish()

j := i.j

// We expect there to be only a single row for a given <job_id, info_key>.
// This is because all older revisions are deleted before a new one is
// inserted in `InfoStorage.Write`.
row, err := i.txn.QueryRowEx(
ctx, "job-info-get", i.txn.KV(),
sessiondata.NodeUserSessionDataOverride,
"SELECT value FROM system.job_info WHERE job_id = $1 AND info_key = $2 ORDER BY written DESC LIMIT 1",
"SELECT value FROM system.job_info WHERE job_id = $1 AND info_key = $2",
j.ID(), infoKey,
)

Expand Down
Loading

0 comments on commit 741873f

Please sign in to comment.