Skip to content

Commit

Permalink
jobs,*: stop writing payload and progress to system.jobs
Browse files Browse the repository at this point in the history
This change introduces a cluster version after which the
payload and progress of a job will not be written to the
system.jobs table. This will ensure that the system.job_info
table is the single, source of truth for these two pieces of
information.

This cluster version has an associated upgrade that schema changes
the `payload` column of the `system.jobs` table to be nullable,
thereby allowing us to stop writing to it. This upgrade step
is necessary for a future patch where we will drop the payload
and progress columns. Without this intermediate upgrade step the
`ALTER TABLE ... DROP COLUMN` upgrade job will attempt to write
to dropped columns as part of its execution thereby failing to
run the upgrade.

Informs: cockroachdb#97762

Release note: None
  • Loading branch information
adityamaru committed Mar 26, 2023
1 parent d107217 commit 5d9b97a
Show file tree
Hide file tree
Showing 43 changed files with 233 additions and 165 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -289,4 +289,4 @@ trace.opentelemetry.collector string address of an OpenTelemetry trace collecto
trace.snapshot.rate duration 0s if non-zero, interval at which background trace snapshots are captured
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 1000023.1-2 set the active cluster version in the format '<major>.<minor>'
version version 1000023.1-4 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,6 @@
<tr><td><div id="setting-trace-snapshot-rate" class="anchored"><code>trace.snapshot.rate</code></div></td><td>duration</td><td><code>0s</code></td><td>if non-zero, interval at which background trace snapshots are captured</td></tr>
<tr><td><div id="setting-trace-span-registry-enabled" class="anchored"><code>trace.span_registry.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://&lt;ui&gt;/#/debug/tracez</td></tr>
<tr><td><div id="setting-trace-zipkin-collector" class="anchored"><code>trace.zipkin.collector</code></div></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as &lt;host&gt;:&lt;port&gt;. If no port is specified, 9411 will be used.</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000023.1-2</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000023.1-4</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td></tr>
</tbody>
</table>
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1755,8 +1755,8 @@ func createAndWaitForJob(

var jobID jobspb.JobID
db.QueryRow(
t, `INSERT INTO system.jobs (created, status, payload, progress) VALUES ($1, $2, $3, $4) RETURNING id`,
timeutil.FromUnixMicros(now), jobs.StatusRunning, payload, progressBytes,
t, `INSERT INTO system.jobs (created, status) VALUES ($1, $2) RETURNING id`,
timeutil.FromUnixMicros(now), jobs.StatusRunning,
).Scan(&jobID)
db.Exec(
t, `INSERT INTO system.job_info (job_id, info_key, value) VALUES ($1, $2, $3)`, jobID, jobs.GetLegacyPayloadKey(), payload,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import (
func getHighWaterMark(ingestionJobID int, sqlDB *gosql.DB) (*hlc.Timestamp, error) {
var progressBytes []byte
if err := sqlDB.QueryRow(
`SELECT progress FROM system.jobs WHERE id = $1`, ingestionJobID,
`SELECT progress FROM crdb_internal.system_jobs WHERE id = $1`, ingestionJobID,
).Scan(&progressBytes); err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamingest/testdata/alter_tenant
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ ALTER TENANT "destination" SET REPLICATION RETENTION = '42s'

query-sql as=destination-system
SELECT crdb_internal.pb_to_json('payload', payload)->'streamIngestion'->'replicationTtlSeconds' as retention_ttl_seconds
FROM system.jobs
FROM crdb_internal.system_jobs
WHERE id = (SELECT replication_job_id FROM [SHOW TENANT "destination" WITH REPLICATION STATUS])
----
42
2 changes: 1 addition & 1 deletion pkg/cli/testdata/declarative-rules/deprules
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
dep
----
debug declarative-print-rules 1000023.1-2 dep
debug declarative-print-rules 1000023.1-4 dep
deprules
----
- name: 'CheckConstraint transitions to ABSENT uphold 2-version invariant: PUBLIC->VALIDATED'
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/testdata/declarative-rules/oprules
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
op
----
debug declarative-print-rules 1000023.1-2 op
debug declarative-print-rules 1000023.1-4 op
rules
----
[]
7 changes: 1 addition & 6 deletions pkg/cli/zip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
Expand Down Expand Up @@ -591,13 +590,9 @@ func TestToHex(t *testing.T) {
}
// Stores index and type of marshaled messages in the table row.
// Negative indices work from the end - this is needed because parsing the
// fields is not alway s precise as there can be spaces in the fields but the
// fields is not always precise as there can be spaces in the fields but the
// hex fields are always in the end of the row and they don't contain spaces.
hexFiles := map[string][]hexField{
"debug/system.jobs.txt": {
{idx: -2, msg: &jobspb.Payload{}},
{idx: -1, msg: &jobspb.Progress{}},
},
"debug/system.descriptor.txt": {
{idx: 2, msg: &descpb.Descriptor{}},
},
Expand Down
8 changes: 8 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,10 @@ const (
// the process of upgrading from previous supported releases to 23.2.
V23_2Start

// V23_2StopWritingPayloadAndProgressToSystemJobs is the version where the
// payload and progress columns are no longer written to system.jobs.
V23_2StopWritingPayloadAndProgressToSystemJobs

// *************************************************
// Step (1): Add new versions here.
// Do not add new versions to a patch release.
Expand Down Expand Up @@ -898,6 +902,10 @@ var rawVersionsSingleton = keyedVersions{
Key: V23_2Start,
Version: roachpb.Version{Major: 23, Minor: 1, Internal: 2},
},
{
Key: V23_2StopWritingPayloadAndProgressToSystemJobs,
Version: roachpb.Version{Major: 23, Minor: 1, Internal: 4},
},

// *************************************************
// Step (2): Add new versions here.
Expand Down
5 changes: 3 additions & 2 deletions pkg/cmd/roachtest/tests/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
Expand Down Expand Up @@ -942,8 +943,8 @@ revert=2'`)
var status string
var payloadBytes, progressBytes []byte
require.NoError(t, conn.QueryRowContext(
ctx, `SELECT status, progress, payload FROM system.jobs WHERE id = $1`, jobID).
Scan(&status, &progressBytes, &payloadBytes))
ctx, jobutils.InternalSystemJobsBaseQuery, jobID).
Scan(&status, &payloadBytes, &progressBytes))
if jobs.Status(status) == jobs.StatusFailed {
var payload jobspb.Payload
require.NoError(t, protoutil.Unmarshal(payloadBytes, &payload))
Expand Down
4 changes: 3 additions & 1 deletion pkg/cmd/roachtest/tests/mixed_version_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
Expand Down Expand Up @@ -300,7 +301,8 @@ func (mvb *mixedVersionBackup) waitForJobSuccess(
for r := retry.StartWithCtx(ctx, backupCompletionRetryOptions); r.Next(); {
var status string
var payloadBytes []byte
err := db.QueryRow(`SELECT status, payload FROM system.jobs WHERE id = $1`, jobID).Scan(&status, &payloadBytes)
err := db.QueryRow(fmt.Sprintf(`SELECT status, payload FROM (%s)`,
jobutils.InternalSystemJobsBaseQuery), jobID).Scan(&status, &payloadBytes)
if err != nil {
lastErr = fmt.Errorf("error reading (status, payload) for job %d: %w", jobID, err)
l.Printf("%v", lastErr)
Expand Down
1 change: 1 addition & 0 deletions pkg/jobs/delegate_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ func TestJobControlByType(t *testing.T) {

// Clear the system.jobs table for the next test run.
th.sqlDB.Exec(t, fmt.Sprintf("DELETE FROM system.jobs WHERE id IN (%s)", jobIdsClause))
th.sqlDB.Exec(t, fmt.Sprintf("DELETE FROM system.job_info WHERE job_id IN (%s)", jobIdsClause))
})
}
}
Expand Down
16 changes: 9 additions & 7 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
Expand Down Expand Up @@ -91,7 +92,7 @@ func (expected *expectation) verify(id jobspb.JobID, expectedStatus jobs.Status)
var payloadBytes []byte
var progressBytes []byte
if err := expected.DB.QueryRow(
`SELECT status, created, payload, progress FROM system.jobs WHERE id = $1`, id,
`SELECT status, created, payload, progress FROM crdb_internal.system_jobs WHERE id = $1`, id,
).Scan(
&statusString, &created, &payloadBytes, &progressBytes,
); err != nil {
Expand Down Expand Up @@ -1970,8 +1971,8 @@ func TestShowJobs(t *testing.T) {
t.Fatal(err)
}
sqlDB.Exec(t,
`INSERT INTO system.jobs (id, status, created, payload, progress, claim_session_id, claim_instance_id) VALUES ($1, $2, $3, $4, $5, $6, $7)`,
in.id, in.status, in.created, inPayload, inProgress, session.ID().UnsafeBytes(), instanceID,
`INSERT INTO system.jobs (id, status, created, claim_session_id, claim_instance_id) VALUES ($1, $2, $3, $4, $5)`,
in.id, in.status, in.created, session.ID().UnsafeBytes(), instanceID,
)
sqlDB.Exec(t, `INSERT INTO system.job_info (job_id, info_key, value) VALUES ($1, $2, $3)`, in.id, jobs.GetLegacyPayloadKey(), inPayload)
sqlDB.Exec(t, `INSERT INTO system.job_info (job_id, info_key, value) VALUES ($1, $2, $3)`, in.id, jobs.GetLegacyProgressKey(), inProgress)
Expand Down Expand Up @@ -2181,7 +2182,7 @@ SELECT id, payload, progress FROM "".crdb_internal.system_jobs ORDER BY id DESC

// Create the second row with a corrupted progress field.
if _, err := sqlDB.Exec(`
INSERT INTO system.jobs(id, status, payload, progress) SELECT id+2, status, payload, '\xaaaa'::BYTES FROM system.jobs WHERE id = $1;
INSERT INTO system.jobs(id, status) SELECT id+2, status FROM system.jobs WHERE id = $1;
`, jobID); err != nil {
t.Fatal(err)
}
Expand All @@ -2200,7 +2201,7 @@ SELECT id, payload, progress FROM "".crdb_internal.system_jobs ORDER BY id DESC

// Test what happens with a NULL progress field (which is a valid value).
if _, err := sqlDB.Exec(`
INSERT INTO system.jobs(id, status, payload, progress) SELECT id+4, status, payload, NULL::BYTES FROM system.jobs WHERE id = $1;
INSERT INTO system.jobs(id, status) SELECT id+4, status FROM system.jobs WHERE id = $1;
`, jobID); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -3059,7 +3060,8 @@ func TestMetrics(t *testing.T) {
var payloadBytes []byte
var payload jobspb.Payload
var status string
tdb.QueryRow(t, fmt.Sprintf("SELECT status, payload FROM system.jobs where id = %d", jobID)).Scan(
tdb.QueryRow(t, fmt.Sprintf("SELECT status, payload FROM (%s)",
jobutils.InternalSystemJobsBaseQuery), jobID).Scan(
&status, &payloadBytes)
require.Equal(t, "paused", status)
require.NoError(t, protoutil.Unmarshal(payloadBytes, &payload))
Expand Down Expand Up @@ -3344,7 +3346,7 @@ func TestPauseReason(t *testing.T) {
var payloadBytes []byte
var payload jobspb.Payload
var status string
tdb.QueryRow(t, "SELECT status, payload FROM system.jobs where id = $1", jobID).Scan(
tdb.QueryRow(t, "SELECT status, payload FROM crdb_internal.system_jobs where id = $1", jobID).Scan(
&status, &payloadBytes)
require.NoError(t, protoutil.Unmarshal(payloadBytes, &payload))

Expand Down
4 changes: 2 additions & 2 deletions pkg/jobs/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ func TestJobsTableClaimFamily(t *testing.T) {

now := timeutil.Now()
_ = sqlDB.Query(t, `
INSERT INTO system.jobs (id, status, payload, claim_session_id, claim_instance_id, num_runs, last_run)
VALUES (1, 'running', '@!%$%45', 'foo', 101, 100, $1)`, now)
INSERT INTO system.jobs (id, status, claim_session_id, claim_instance_id, num_runs, last_run)
VALUES (1, 'running', 'foo', 101, 100, $1)`, now)
var status, sessionID string
var instanceID, numRuns int64
var lastRun time.Time
Expand Down
48 changes: 38 additions & 10 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,9 +464,6 @@ func batchJobInsertStmt(
jobs []*Job,
modifiedMicros int64,
) (string, []interface{}, []jobspb.JobID, error) {
instanceID := r.ID()
columns := []string{`id`, `created`, `status`, `payload`, `progress`, `claim_session_id`, `claim_instance_id`, `job_type`}
numColumns := len(columns)
marshalPanic := func(m protoutil.Message) []byte {
data, err := protoutil.Marshal(m)
if err != nil {
Expand All @@ -479,7 +476,8 @@ func batchJobInsertStmt(
if err != nil {
return "", nil, nil, errors.NewAssertionErrorWithWrappedErrf(err, "failed to make timestamp for creation of job")
}

instanceID := r.ID()
columns := []string{`id`, `created`, `status`, `payload`, `progress`, `claim_session_id`, `claim_instance_id`, `job_type`}
valueFns := map[string]func(*Job) (interface{}, error){
`id`: func(job *Job) (interface{}, error) { return job.ID(), nil },
`created`: func(job *Job) (interface{}, error) { return created, nil },
Expand All @@ -501,6 +499,24 @@ func batchJobInsertStmt(
},
}

// TODO(adityamaru: Remove this once we are outside the compatability
// window for 22.2.
if r.settings.Version.IsActive(ctx, clusterversion.V23_2StopWritingPayloadAndProgressToSystemJobs) {
columns = []string{`id`, `created`, `status`, `claim_session_id`, `claim_instance_id`, `job_type`}
valueFns = map[string]func(*Job) (interface{}, error){
`id`: func(job *Job) (interface{}, error) { return job.ID(), nil },
`created`: func(job *Job) (interface{}, error) { return created, nil },
`status`: func(job *Job) (interface{}, error) { return StatusRunning, nil },
`claim_session_id`: func(job *Job) (interface{}, error) { return sessionID.UnsafeBytes(), nil },
`claim_instance_id`: func(job *Job) (interface{}, error) { return instanceID, nil },
`job_type`: func(job *Job) (interface{}, error) {
payload := job.Payload()
return payload.Type().String(), nil
},
}
}
numColumns := len(columns)

// TODO(jayant): remove this version gate in 24.1
// To run the upgrade below, migration and schema change jobs will need to be
// created using the old schema, which does not have the job_type column.
Expand Down Expand Up @@ -595,9 +611,15 @@ func (r *Registry) CreateJobWithTxn(
if err != nil {
return errors.NewAssertionErrorWithWrappedErrf(err, "failed to construct job created timestamp")
}
cols := [...]string{"id", "created", "status", "payload", "progress", "claim_session_id", "claim_instance_id", "job_type"}
const totalNumCols = len(cols)
vals := [totalNumCols]interface{}{jobID, created, StatusRunning, payloadBytes, progressBytes, s.ID().UnsafeBytes(), r.ID(), jobType.String()}

cols := []string{"id", "created", "status", "payload", "progress", "claim_session_id", "claim_instance_id", "job_type"}
vals := []interface{}{jobID, created, StatusRunning, payloadBytes, progressBytes, s.ID().UnsafeBytes(), r.ID(), jobType.String()}
log.Infof(ctx, "active version is %s", r.settings.Version.ActiveVersion(ctx))
if r.settings.Version.IsActive(ctx, clusterversion.V23_2StopWritingPayloadAndProgressToSystemJobs) {
cols = []string{"id", "created", "status", "claim_session_id", "claim_instance_id", "job_type"}
vals = []interface{}{jobID, created, StatusRunning, s.ID().UnsafeBytes(), r.ID(), jobType.String()}
}
totalNumCols := len(cols)
numCols := totalNumCols
placeholders := func() string {
var p strings.Builder
Expand Down Expand Up @@ -711,12 +733,18 @@ func (r *Registry) CreateAdoptableJobWithTxn(
typ := j.mu.payload.Type().String()

nCols := 7
cols := [7]string{"id", "status", "payload", "progress", "created_by_type", "created_by_id", "job_type"}
placeholders := [7]string{"$1", "$2", "$3", "$4", "$5", "$6", "$7"}
values := [7]interface{}{jobID, StatusRunning, payloadBytes, progressBytes, createdByType, createdByID, typ}
cols := []string{"id", "status", "payload", "progress", "created_by_type", "created_by_id", "job_type"}
placeholders := []string{"$1", "$2", "$3", "$4", "$5", "$6", "$7"}
values := []interface{}{jobID, StatusRunning, payloadBytes, progressBytes, createdByType, createdByID, typ}
if !r.settings.Version.IsActive(ctx, clusterversion.V23_1AddTypeColumnToJobsTable) {
nCols -= 1
}
if r.settings.Version.IsActive(ctx, clusterversion.V23_2StopWritingPayloadAndProgressToSystemJobs) {
cols = []string{"id", "status", "created_by_type", "created_by_id", "job_type"}
placeholders = []string{"$1", "$2", "$3", "$4", "$5"}
values = []interface{}{jobID, StatusRunning, createdByType, createdByID, typ}
nCols = 5
}
// Insert the job row, but do not set a `claim_session_id`. By not
// setting the claim, the job can be adopted by any node and will
// be adopted by the node which next runs the adoption loop.
Expand Down
25 changes: 3 additions & 22 deletions pkg/jobs/registry_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -94,31 +93,15 @@ func TestExpiringSessionsAndClaimJobsDoesNotTouchTerminalJobs(t *testing.T) {
s, sqlDB, _ := serverutils.StartServer(t, args)
defer s.Stopper().Stop(ctx)

payload, err := protoutil.Marshal(&jobspb.Payload{
Details: jobspb.WrapPayloadDetails(jobspb.BackupDetails{}),
})
if err != nil {
t.Fatal(err)
}

progress, err := protoutil.Marshal(&jobspb.Progress{
Details: jobspb.WrapProgressDetails(jobspb.BackupProgress{}),
})
if err != nil {
t.Fatal(err)
}

tdb := sqlutils.MakeSQLRunner(sqlDB)
const insertQuery = `
INSERT
INTO system.jobs (
status,
payload,
progress,
claim_session_id,
claim_instance_id
)
VALUES ($1, $2, $3, $4, $5)
VALUES ($1, $2, $3)
RETURNING id;
`
// Disallow clean up of claimed jobs
Expand All @@ -128,12 +111,10 @@ RETURNING id;
terminalClaims := make([][]byte, len(terminalStatuses))
for i, s := range terminalStatuses {
terminalClaims[i] = uuid.MakeV4().GetBytes() // bogus claim
tdb.QueryRow(t, insertQuery, s, payload, progress, terminalClaims[i], 42).
Scan(&terminalIDs[i])
tdb.QueryRow(t, insertQuery, s, terminalClaims[i], 42).Scan(&terminalIDs[i])
}
var nonTerminalID jobspb.JobID
tdb.QueryRow(t, insertQuery, jobs.StatusRunning, payload, progress, uuid.MakeV4().GetBytes(), 42).
Scan(&nonTerminalID)
tdb.QueryRow(t, insertQuery, jobs.StatusRunning, uuid.MakeV4().GetBytes(), 42).Scan(&nonTerminalID)

checkClaimEqual := func(id jobspb.JobID, exp []byte) error {
const getClaimQuery = `SELECT claim_session_id FROM system.jobs WHERE id = $1`
Expand Down
Loading

0 comments on commit 5d9b97a

Please sign in to comment.