Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
99458: jobs,*: stop writing payload and progress to system.jobs r=adityamaru a=adityamaru

This change introduces a cluster version after which the
payload and progress of a job will not be written to the
system.jobs table. This will ensure that the system.job_info
table is the single, source of truth for these two pieces of
information.

This cluster version has an associated upgrade that schema changes
the `payload` column of the `system.jobs` table to be nullable,
thereby allowing us to stop writing to it. This upgrade step
is necessary for a future patch where we will drop the payload
and progress columns. Without this intermediate upgrade step the
`ALTER TABLE ... DROP COLUMN` upgrade job will attempt to write
to dropped columns as part of its execution thereby failing to
run the upgrade.

Informs: cockroachdb#97762

Release note: None

99543: server: fix flaky drain test under race r=AlexTalks a=AlexTalks

While previously `TestDrain` would issue a drain request twice, and expect that after the second drain request there would be no remaining leases, we have seen in some race builds that a lease extension can occur before that second drain, leaving one lease remaining after the second drain request. This can be seen in the following log example:
```
I230325 00:39:18.151604 14728 1@server/drain.go:145 ⋮ [T1,n1] 383  drain request received with doDrain = true, shutdown = false
...
I230325 00:39:18.155547 986 kv/kvserver/replica_proposal.go:272 ⋮ [T1,n1,s1,r51/1:‹/Table/5{0-1}›,raft] 385  new range lease repl=(n1,s1):1 seq=1 start=0,0 exp=1679704764.152223164,0 pro=1679704758.152223164,0 following repl=(n1,s1):1 seq=1 start=0,0 exp=1679704746.135729956,0 pro=1679704740.135729956,0
I230325 00:39:18.172450 14728 1@server/drain.go:399 ⋮ [T1,n1] 386  (DEBUG) initiating kvserver node drain
I230325 00:39:18.172613 14728 1@kv/kvserver/store.go:1559 ⋮ [T1,drain,n1,s1] 387  (DEBUG) store marked as draining
I230325 00:39:18.182123 14728 1@server/drain.go:293 ⋮ [T1,n1] 388  drain remaining: 1
I230325 00:39:18.182249 14728 1@server/drain.go:295 ⋮ [T1,n1] 389  drain details: range lease iterations: 1
I230325 00:39:18.182404 14728 1@server/drain.go:175 ⋮ [T1,n1] 390  drain request completed without server shutdown
```
This change modifies the test to repeatedly issue drain requests until there is no remaining work, allowing the drain to complete upon subsequent requests.

Fixes: cockroachdb#86974

Release note: None

99665: sql/gc_job,sqlerrors: make GC job robust to missing descriptors r=fqazi a=ajwerner

### sql: do not drop table descriptor independently if we're in drop schema

If we have dropped schema IDs, we know that this is not an individual drop table
schema change. We only have more than one dropped table when we drop a database
or a schema. Before this change, we'd drop the table on its own, and then create
another GC job to drop all the tables. This is not actually a bug because we
should be robust to this, but it's also bad.

### sql/gc_job,sqlerrors: make GC job robust to missing descriptors

The check used for missing descriptors became incorrect in the course of
cockroachdb#94695. That change updated
the underlying error code used in getters by the GC job. The GC job would
subsequently retry forever when the descriptor was missing. This bug
has not been shipped yet, so not writing a release note.

Fixes: cockroachdb#99590

Release note (bug fix): DROP SCHEMA ... CASCADE could create multiple
GC jobs: one for every table and one for the cascaded drop itself. This has
been fixed.


Co-authored-by: adityamaru <[email protected]>
Co-authored-by: Alex Sarkesian <[email protected]>
Co-authored-by: ajwerner <[email protected]>
  • Loading branch information
4 people committed Mar 27, 2023
4 parents a5b41ca + 5d9b97a + 9b2229b + fb2a3cc commit 97fee54
Show file tree
Hide file tree
Showing 55 changed files with 332 additions and 204 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -290,4 +290,4 @@ trace.opentelemetry.collector string address of an OpenTelemetry trace collecto
trace.snapshot.rate duration 0s if non-zero, interval at which background trace snapshots are captured
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 1000023.1-2 set the active cluster version in the format '<major>.<minor>'
version version 1000023.1-4 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,6 @@
<tr><td><div id="setting-trace-snapshot-rate" class="anchored"><code>trace.snapshot.rate</code></div></td><td>duration</td><td><code>0s</code></td><td>if non-zero, interval at which background trace snapshots are captured</td></tr>
<tr><td><div id="setting-trace-span-registry-enabled" class="anchored"><code>trace.span_registry.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://&lt;ui&gt;/#/debug/tracez</td></tr>
<tr><td><div id="setting-trace-zipkin-collector" class="anchored"><code>trace.zipkin.collector</code></div></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as &lt;host&gt;:&lt;port&gt;. If no port is specified, 9411 will be used.</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000023.1-2</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000023.1-4</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td></tr>
</tbody>
</table>
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1755,8 +1755,8 @@ func createAndWaitForJob(

var jobID jobspb.JobID
db.QueryRow(
t, `INSERT INTO system.jobs (created, status, payload, progress) VALUES ($1, $2, $3, $4) RETURNING id`,
timeutil.FromUnixMicros(now), jobs.StatusRunning, payload, progressBytes,
t, `INSERT INTO system.jobs (created, status) VALUES ($1, $2) RETURNING id`,
timeutil.FromUnixMicros(now), jobs.StatusRunning,
).Scan(&jobID)
db.Exec(
t, `INSERT INTO system.job_info (job_id, info_key, value) VALUES ($1, $2, $3)`, jobID, jobs.GetLegacyPayloadKey(), payload,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import (
func getHighWaterMark(ingestionJobID int, sqlDB *gosql.DB) (*hlc.Timestamp, error) {
var progressBytes []byte
if err := sqlDB.QueryRow(
`SELECT progress FROM system.jobs WHERE id = $1`, ingestionJobID,
`SELECT progress FROM crdb_internal.system_jobs WHERE id = $1`, ingestionJobID,
).Scan(&progressBytes); err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamingest/testdata/alter_tenant
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ ALTER TENANT "destination" SET REPLICATION RETENTION = '42s'

query-sql as=destination-system
SELECT crdb_internal.pb_to_json('payload', payload)->'streamIngestion'->'replicationTtlSeconds' as retention_ttl_seconds
FROM system.jobs
FROM crdb_internal.system_jobs
WHERE id = (SELECT replication_job_id FROM [SHOW TENANT "destination" WITH REPLICATION STATUS])
----
42
2 changes: 1 addition & 1 deletion pkg/cli/testdata/declarative-rules/deprules
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
dep
----
debug declarative-print-rules 1000023.1-2 dep
debug declarative-print-rules 1000023.1-4 dep
deprules
----
- name: 'CheckConstraint transitions to ABSENT uphold 2-version invariant: PUBLIC->VALIDATED'
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/testdata/declarative-rules/oprules
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
op
----
debug declarative-print-rules 1000023.1-2 op
debug declarative-print-rules 1000023.1-4 op
rules
----
[]
7 changes: 1 addition & 6 deletions pkg/cli/zip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
Expand Down Expand Up @@ -591,13 +590,9 @@ func TestToHex(t *testing.T) {
}
// Stores index and type of marshaled messages in the table row.
// Negative indices work from the end - this is needed because parsing the
// fields is not alway s precise as there can be spaces in the fields but the
// fields is not always precise as there can be spaces in the fields but the
// hex fields are always in the end of the row and they don't contain spaces.
hexFiles := map[string][]hexField{
"debug/system.jobs.txt": {
{idx: -2, msg: &jobspb.Payload{}},
{idx: -1, msg: &jobspb.Progress{}},
},
"debug/system.descriptor.txt": {
{idx: 2, msg: &descpb.Descriptor{}},
},
Expand Down
8 changes: 8 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,10 @@ const (
// the process of upgrading from previous supported releases to 23.2.
V23_2Start

// V23_2StopWritingPayloadAndProgressToSystemJobs is the version where the
// payload and progress columns are no longer written to system.jobs.
V23_2StopWritingPayloadAndProgressToSystemJobs

// *************************************************
// Step (1): Add new versions here.
// Do not add new versions to a patch release.
Expand Down Expand Up @@ -898,6 +902,10 @@ var rawVersionsSingleton = keyedVersions{
Key: V23_2Start,
Version: roachpb.Version{Major: 23, Minor: 1, Internal: 2},
},
{
Key: V23_2StopWritingPayloadAndProgressToSystemJobs,
Version: roachpb.Version{Major: 23, Minor: 1, Internal: 4},
},

// *************************************************
// Step (2): Add new versions here.
Expand Down
5 changes: 3 additions & 2 deletions pkg/cmd/roachtest/tests/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
Expand Down Expand Up @@ -942,8 +943,8 @@ revert=2'`)
var status string
var payloadBytes, progressBytes []byte
require.NoError(t, conn.QueryRowContext(
ctx, `SELECT status, progress, payload FROM system.jobs WHERE id = $1`, jobID).
Scan(&status, &progressBytes, &payloadBytes))
ctx, jobutils.InternalSystemJobsBaseQuery, jobID).
Scan(&status, &payloadBytes, &progressBytes))
if jobs.Status(status) == jobs.StatusFailed {
var payload jobspb.Payload
require.NoError(t, protoutil.Unmarshal(payloadBytes, &payload))
Expand Down
4 changes: 3 additions & 1 deletion pkg/cmd/roachtest/tests/mixed_version_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
Expand Down Expand Up @@ -300,7 +301,8 @@ func (mvb *mixedVersionBackup) waitForJobSuccess(
for r := retry.StartWithCtx(ctx, backupCompletionRetryOptions); r.Next(); {
var status string
var payloadBytes []byte
err := db.QueryRow(`SELECT status, payload FROM system.jobs WHERE id = $1`, jobID).Scan(&status, &payloadBytes)
err := db.QueryRow(fmt.Sprintf(`SELECT status, payload FROM (%s)`,
jobutils.InternalSystemJobsBaseQuery), jobID).Scan(&status, &payloadBytes)
if err != nil {
lastErr = fmt.Errorf("error reading (status, payload) for job %d: %w", jobID, err)
l.Printf("%v", lastErr)
Expand Down
1 change: 1 addition & 0 deletions pkg/jobs/delegate_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ func TestJobControlByType(t *testing.T) {

// Clear the system.jobs table for the next test run.
th.sqlDB.Exec(t, fmt.Sprintf("DELETE FROM system.jobs WHERE id IN (%s)", jobIdsClause))
th.sqlDB.Exec(t, fmt.Sprintf("DELETE FROM system.job_info WHERE job_id IN (%s)", jobIdsClause))
})
}
}
Expand Down
16 changes: 9 additions & 7 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
Expand Down Expand Up @@ -91,7 +92,7 @@ func (expected *expectation) verify(id jobspb.JobID, expectedStatus jobs.Status)
var payloadBytes []byte
var progressBytes []byte
if err := expected.DB.QueryRow(
`SELECT status, created, payload, progress FROM system.jobs WHERE id = $1`, id,
`SELECT status, created, payload, progress FROM crdb_internal.system_jobs WHERE id = $1`, id,
).Scan(
&statusString, &created, &payloadBytes, &progressBytes,
); err != nil {
Expand Down Expand Up @@ -1970,8 +1971,8 @@ func TestShowJobs(t *testing.T) {
t.Fatal(err)
}
sqlDB.Exec(t,
`INSERT INTO system.jobs (id, status, created, payload, progress, claim_session_id, claim_instance_id) VALUES ($1, $2, $3, $4, $5, $6, $7)`,
in.id, in.status, in.created, inPayload, inProgress, session.ID().UnsafeBytes(), instanceID,
`INSERT INTO system.jobs (id, status, created, claim_session_id, claim_instance_id) VALUES ($1, $2, $3, $4, $5)`,
in.id, in.status, in.created, session.ID().UnsafeBytes(), instanceID,
)
sqlDB.Exec(t, `INSERT INTO system.job_info (job_id, info_key, value) VALUES ($1, $2, $3)`, in.id, jobs.GetLegacyPayloadKey(), inPayload)
sqlDB.Exec(t, `INSERT INTO system.job_info (job_id, info_key, value) VALUES ($1, $2, $3)`, in.id, jobs.GetLegacyProgressKey(), inProgress)
Expand Down Expand Up @@ -2181,7 +2182,7 @@ SELECT id, payload, progress FROM "".crdb_internal.system_jobs ORDER BY id DESC

// Create the second row with a corrupted progress field.
if _, err := sqlDB.Exec(`
INSERT INTO system.jobs(id, status, payload, progress) SELECT id+2, status, payload, '\xaaaa'::BYTES FROM system.jobs WHERE id = $1;
INSERT INTO system.jobs(id, status) SELECT id+2, status FROM system.jobs WHERE id = $1;
`, jobID); err != nil {
t.Fatal(err)
}
Expand All @@ -2200,7 +2201,7 @@ SELECT id, payload, progress FROM "".crdb_internal.system_jobs ORDER BY id DESC

// Test what happens with a NULL progress field (which is a valid value).
if _, err := sqlDB.Exec(`
INSERT INTO system.jobs(id, status, payload, progress) SELECT id+4, status, payload, NULL::BYTES FROM system.jobs WHERE id = $1;
INSERT INTO system.jobs(id, status) SELECT id+4, status FROM system.jobs WHERE id = $1;
`, jobID); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -3059,7 +3060,8 @@ func TestMetrics(t *testing.T) {
var payloadBytes []byte
var payload jobspb.Payload
var status string
tdb.QueryRow(t, fmt.Sprintf("SELECT status, payload FROM system.jobs where id = %d", jobID)).Scan(
tdb.QueryRow(t, fmt.Sprintf("SELECT status, payload FROM (%s)",
jobutils.InternalSystemJobsBaseQuery), jobID).Scan(
&status, &payloadBytes)
require.Equal(t, "paused", status)
require.NoError(t, protoutil.Unmarshal(payloadBytes, &payload))
Expand Down Expand Up @@ -3344,7 +3346,7 @@ func TestPauseReason(t *testing.T) {
var payloadBytes []byte
var payload jobspb.Payload
var status string
tdb.QueryRow(t, "SELECT status, payload FROM system.jobs where id = $1", jobID).Scan(
tdb.QueryRow(t, "SELECT status, payload FROM crdb_internal.system_jobs where id = $1", jobID).Scan(
&status, &payloadBytes)
require.NoError(t, protoutil.Unmarshal(payloadBytes, &payload))

Expand Down
4 changes: 2 additions & 2 deletions pkg/jobs/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ func TestJobsTableClaimFamily(t *testing.T) {

now := timeutil.Now()
_ = sqlDB.Query(t, `
INSERT INTO system.jobs (id, status, payload, claim_session_id, claim_instance_id, num_runs, last_run)
VALUES (1, 'running', '@!%$%45', 'foo', 101, 100, $1)`, now)
INSERT INTO system.jobs (id, status, claim_session_id, claim_instance_id, num_runs, last_run)
VALUES (1, 'running', 'foo', 101, 100, $1)`, now)
var status, sessionID string
var instanceID, numRuns int64
var lastRun time.Time
Expand Down
48 changes: 38 additions & 10 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,9 +464,6 @@ func batchJobInsertStmt(
jobs []*Job,
modifiedMicros int64,
) (string, []interface{}, []jobspb.JobID, error) {
instanceID := r.ID()
columns := []string{`id`, `created`, `status`, `payload`, `progress`, `claim_session_id`, `claim_instance_id`, `job_type`}
numColumns := len(columns)
marshalPanic := func(m protoutil.Message) []byte {
data, err := protoutil.Marshal(m)
if err != nil {
Expand All @@ -479,7 +476,8 @@ func batchJobInsertStmt(
if err != nil {
return "", nil, nil, errors.NewAssertionErrorWithWrappedErrf(err, "failed to make timestamp for creation of job")
}

instanceID := r.ID()
columns := []string{`id`, `created`, `status`, `payload`, `progress`, `claim_session_id`, `claim_instance_id`, `job_type`}
valueFns := map[string]func(*Job) (interface{}, error){
`id`: func(job *Job) (interface{}, error) { return job.ID(), nil },
`created`: func(job *Job) (interface{}, error) { return created, nil },
Expand All @@ -501,6 +499,24 @@ func batchJobInsertStmt(
},
}

// TODO(adityamaru: Remove this once we are outside the compatability
// window for 22.2.
if r.settings.Version.IsActive(ctx, clusterversion.V23_2StopWritingPayloadAndProgressToSystemJobs) {
columns = []string{`id`, `created`, `status`, `claim_session_id`, `claim_instance_id`, `job_type`}
valueFns = map[string]func(*Job) (interface{}, error){
`id`: func(job *Job) (interface{}, error) { return job.ID(), nil },
`created`: func(job *Job) (interface{}, error) { return created, nil },
`status`: func(job *Job) (interface{}, error) { return StatusRunning, nil },
`claim_session_id`: func(job *Job) (interface{}, error) { return sessionID.UnsafeBytes(), nil },
`claim_instance_id`: func(job *Job) (interface{}, error) { return instanceID, nil },
`job_type`: func(job *Job) (interface{}, error) {
payload := job.Payload()
return payload.Type().String(), nil
},
}
}
numColumns := len(columns)

// TODO(jayant): remove this version gate in 24.1
// To run the upgrade below, migration and schema change jobs will need to be
// created using the old schema, which does not have the job_type column.
Expand Down Expand Up @@ -595,9 +611,15 @@ func (r *Registry) CreateJobWithTxn(
if err != nil {
return errors.NewAssertionErrorWithWrappedErrf(err, "failed to construct job created timestamp")
}
cols := [...]string{"id", "created", "status", "payload", "progress", "claim_session_id", "claim_instance_id", "job_type"}
const totalNumCols = len(cols)
vals := [totalNumCols]interface{}{jobID, created, StatusRunning, payloadBytes, progressBytes, s.ID().UnsafeBytes(), r.ID(), jobType.String()}

cols := []string{"id", "created", "status", "payload", "progress", "claim_session_id", "claim_instance_id", "job_type"}
vals := []interface{}{jobID, created, StatusRunning, payloadBytes, progressBytes, s.ID().UnsafeBytes(), r.ID(), jobType.String()}
log.Infof(ctx, "active version is %s", r.settings.Version.ActiveVersion(ctx))
if r.settings.Version.IsActive(ctx, clusterversion.V23_2StopWritingPayloadAndProgressToSystemJobs) {
cols = []string{"id", "created", "status", "claim_session_id", "claim_instance_id", "job_type"}
vals = []interface{}{jobID, created, StatusRunning, s.ID().UnsafeBytes(), r.ID(), jobType.String()}
}
totalNumCols := len(cols)
numCols := totalNumCols
placeholders := func() string {
var p strings.Builder
Expand Down Expand Up @@ -711,12 +733,18 @@ func (r *Registry) CreateAdoptableJobWithTxn(
typ := j.mu.payload.Type().String()

nCols := 7
cols := [7]string{"id", "status", "payload", "progress", "created_by_type", "created_by_id", "job_type"}
placeholders := [7]string{"$1", "$2", "$3", "$4", "$5", "$6", "$7"}
values := [7]interface{}{jobID, StatusRunning, payloadBytes, progressBytes, createdByType, createdByID, typ}
cols := []string{"id", "status", "payload", "progress", "created_by_type", "created_by_id", "job_type"}
placeholders := []string{"$1", "$2", "$3", "$4", "$5", "$6", "$7"}
values := []interface{}{jobID, StatusRunning, payloadBytes, progressBytes, createdByType, createdByID, typ}
if !r.settings.Version.IsActive(ctx, clusterversion.V23_1AddTypeColumnToJobsTable) {
nCols -= 1
}
if r.settings.Version.IsActive(ctx, clusterversion.V23_2StopWritingPayloadAndProgressToSystemJobs) {
cols = []string{"id", "status", "created_by_type", "created_by_id", "job_type"}
placeholders = []string{"$1", "$2", "$3", "$4", "$5"}
values = []interface{}{jobID, StatusRunning, createdByType, createdByID, typ}
nCols = 5
}
// Insert the job row, but do not set a `claim_session_id`. By not
// setting the claim, the job can be adopted by any node and will
// be adopted by the node which next runs the adoption loop.
Expand Down
25 changes: 3 additions & 22 deletions pkg/jobs/registry_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -94,31 +93,15 @@ func TestExpiringSessionsAndClaimJobsDoesNotTouchTerminalJobs(t *testing.T) {
s, sqlDB, _ := serverutils.StartServer(t, args)
defer s.Stopper().Stop(ctx)

payload, err := protoutil.Marshal(&jobspb.Payload{
Details: jobspb.WrapPayloadDetails(jobspb.BackupDetails{}),
})
if err != nil {
t.Fatal(err)
}

progress, err := protoutil.Marshal(&jobspb.Progress{
Details: jobspb.WrapProgressDetails(jobspb.BackupProgress{}),
})
if err != nil {
t.Fatal(err)
}

tdb := sqlutils.MakeSQLRunner(sqlDB)
const insertQuery = `
INSERT
INTO system.jobs (
status,
payload,
progress,
claim_session_id,
claim_instance_id
)
VALUES ($1, $2, $3, $4, $5)
VALUES ($1, $2, $3)
RETURNING id;
`
// Disallow clean up of claimed jobs
Expand All @@ -128,12 +111,10 @@ RETURNING id;
terminalClaims := make([][]byte, len(terminalStatuses))
for i, s := range terminalStatuses {
terminalClaims[i] = uuid.MakeV4().GetBytes() // bogus claim
tdb.QueryRow(t, insertQuery, s, payload, progress, terminalClaims[i], 42).
Scan(&terminalIDs[i])
tdb.QueryRow(t, insertQuery, s, terminalClaims[i], 42).Scan(&terminalIDs[i])
}
var nonTerminalID jobspb.JobID
tdb.QueryRow(t, insertQuery, jobs.StatusRunning, payload, progress, uuid.MakeV4().GetBytes(), 42).
Scan(&nonTerminalID)
tdb.QueryRow(t, insertQuery, jobs.StatusRunning, uuid.MakeV4().GetBytes(), 42).Scan(&nonTerminalID)

checkClaimEqual := func(id jobspb.JobID, exp []byte) error {
const getClaimQuery = `SELECT claim_session_id FROM system.jobs WHERE id = $1`
Expand Down
Loading

0 comments on commit 97fee54

Please sign in to comment.