Skip to content

Commit

Permalink
jobs,catpb,descpb: move the definition of JobID down to catpb, forwar…
Browse files Browse the repository at this point in the history
…d up

This allows the JobID to be used in descriptors. Adds a little bit of type
safety here and there.

Release note: None
  • Loading branch information
ajwerner committed Feb 11, 2022
1 parent 23391d0 commit a204e69
Show file tree
Hide file tree
Showing 18 changed files with 58 additions and 27 deletions.
2 changes: 1 addition & 1 deletion docs/generated/redact_safe.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ pkg/base/node_id.go | `*NodeIDContainer`
pkg/base/node_id.go | `*SQLIDContainer`
pkg/base/node_id.go | `*StoreIDContainer`
pkg/cli/exit/exit.go | `Code`
pkg/jobs/jobspb/wrap.go | `JobID`
pkg/jobs/jobspb/wrap.go | `Type`
pkg/kv/kvserver/closedts/ctpb/service.go | `LAI`
pkg/kv/kvserver/closedts/ctpb/service.go | `SeqNum`
Expand All @@ -25,6 +24,7 @@ pkg/roachpb/method.go | `Method`
pkg/roachpb/tenant.go | `TenantID`
pkg/rpc/connection_class.go | `ConnectionClass`
pkg/sql/catalog/catpb/constraint.go | `ForeignKeyAction`
pkg/sql/catalog/catpb/job_id.go | `JobID`
pkg/sql/catalog/descpb/structured.go | `ConstraintType`
pkg/sql/catalog/descpb/structured.go | `ConstraintValidity`
pkg/sql/catalog/descpb/structured.go | `DescriptorMutation_Direction`
Expand Down
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ ALL_TESTS = [
"//pkg/spanconfig/spanconfigtestutils:spanconfigtestutils_test",
"//pkg/sql/catalog/catalogkeys:catalogkeys_test",
"//pkg/sql/catalog/catformat:catformat_test",
"//pkg/sql/catalog/catpb:catpb_test",
"//pkg/sql/catalog/catprivilege:catprivilege_test",
"//pkg/sql/catalog/colinfo:colinfo_test",
"//pkg/sql/catalog/dbdesc:dbdesc_test",
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_schema_change_creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func createSchemaChangeJobsFromMutations(
}
newMutationJob := descpb.TableDescriptor_MutationJob{
MutationID: mutationID,
JobID: int64(jobID),
JobID: jobID,
}
mutationJobs = append(mutationJobs, newMutationJob)

Expand Down
1 change: 1 addition & 0 deletions pkg/jobs/jobspb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
deps = [
"//pkg/base",
"//pkg/cloud",
"//pkg/sql/catalog/catpb",
"//pkg/sql/catalog/descpb",
"//pkg/sql/protoreflect",
"@com_github_cockroachdb_errors//:errors",
Expand Down
8 changes: 3 additions & 5 deletions pkg/jobs/jobspb/wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,18 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/protoreflect"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/jsonpb"
)

// JobID is the ID of a job.
type JobID int64
type JobID = catpb.JobID

// InvalidJobID is the zero value for JobID corresponding to no job.
const InvalidJobID JobID = 0

// SafeValue implements the redact.SafeValue interface.
func (j JobID) SafeValue() {}
const InvalidJobID = catpb.InvalidJobID

// Details is a marker interface for job details proto structs.
type Details interface{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func ValidateJobReferencesInDescriptor(
}

for _, m := range tbl.GetMutationJobs() {
j, err := jmg.GetJobMetadata(jobspb.JobID(m.JobID))
j, err := jmg.GetJobMetadata(m.JobID)
if err != nil {
errorAccFn(errors.WithAssertionFailure(errors.Wrapf(err, "mutation job %d", m.JobID)))
continue
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,7 @@ func getJobIDForMutationWithDescriptor(
) (jobspb.JobID, error) {
for _, job := range tableDesc.GetMutationJobs() {
if job.MutationID == mutationID {
return jobspb.JobID(job.JobID), nil
return job.JobID, nil
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/sql/catalog/catpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_library(
"constraint.go",
"default_privilege.go",
"doc.go",
"job_id.go",
"multiregion.go",
"privilege.go",
":gen-privilegedescversion-stringer", # keep
Expand Down
22 changes: 22 additions & 0 deletions pkg/sql/catalog/catpb/job_id.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package catpb

// JobID is the ID of a job. It is defined here and imported in jobspb because
// jobs are referenced in descriptors and also jobs reference parts of
// descriptors. This avoids any dependency cycles.
type JobID int64

// InvalidJobID is the zero value for JobID corresponding to no job.
const InvalidJobID JobID = 0

// SafeValue implements the redact.SafeValue interface.
func (j JobID) SafeValue() {}
23 changes: 16 additions & 7 deletions pkg/sql/catalog/descpb/structured.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1068,8 +1068,10 @@ message TableDescriptor {
// The job id for a mutation job is the id in the system.jobs table of the
// schema change job executing the mutation referenced by mutation_id.
// This is not a jobspb.JobID to avoid a dependency cycle.
optional int64 job_id = 2 [(gogoproto.nullable) = false,
(gogoproto.customname) = "JobID"];
optional int64 job_id = 2 [
(gogoproto.nullable) = false,
(gogoproto.customname) = "JobID",
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb.JobID"];
}

// Mutation jobs queued for execution in a FIFO order. Remains synchronized
Expand All @@ -1079,8 +1081,10 @@ message TableDescriptor {
// The job associated with a schema change job run in the new schema changer
// (in sql/schemachanger), if one exists. Only one such job can exist at a
// time.
optional int64 new_schema_change_job_id = 46 [(gogoproto.nullable) = false,
(gogoproto.customname) = "NewSchemaChangeJobID"];
optional int64 new_schema_change_job_id = 46 [
(gogoproto.nullable) = false,
(gogoproto.customname) = "NewSchemaChangeJobID",
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb.JobID"];

message SequenceOpts {
option (gogoproto.equal) = true;
Expand Down Expand Up @@ -1156,7 +1160,10 @@ message TableDescriptor {

// The job id for a drop job is the id in the system.jobs table of the
// dropping of this table.
optional int64 drop_job_id = 32 [(gogoproto.nullable) = false, (gogoproto.customname) = "DropJobID"];
optional int64 drop_job_id = 32 [
(gogoproto.nullable) = false,
(gogoproto.customname) = "DropJobID",
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb.JobID"];

message GCDescriptorMutation {
option (gogoproto.equal) = true;
Expand All @@ -1167,8 +1174,10 @@ message TableDescriptor {

// The job id for a mutation job is the id in the system.jobs table of the
// schema change job executing the mutation referenced by mutation_id.
optional int64 job_id = 3 [(gogoproto.nullable) = false,
(gogoproto.customname) = "JobID", deprecated = true];
optional int64 job_id = 3 [
(gogoproto.nullable) = false,
(gogoproto.customname) = "JobID", deprecated = true,
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb.JobID"];
}

// Before 22.1:
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -4465,7 +4465,7 @@ func collectMarshaledJobMetadataMap(
continue
}
for _, j := range tbl.GetMutationJobs() {
referencedJobIDs[jobspb.JobID(j.JobID)] = struct{}{}
referencedJobIDs[j.JobID] = struct{}{}
}
}
if len(referencedJobIDs) == 0 {
Expand Down
3 changes: 1 addition & 2 deletions pkg/sql/drop_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -460,7 +459,7 @@ func (p *planner) markTableMutationJobsSuccessful(
ctx context.Context, tableDesc *tabledesc.Mutable,
) error {
for _, mj := range tableDesc.MutationJobs {
jobID := jobspb.JobID(mj.JobID)
jobID := mj.JobID
// Jobs are only added in the cache during the transaction and are created
// in a batch only when the transaction commits. So, if a job's record exists
// in the cache, we can simply delete that record from cache because the
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/indexbackfiller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ INSERT INTO foo VALUES (1), (10), (100);
return err
}
mut.MutationJobs = append(mut.MutationJobs, descpb.TableDescriptor_MutationJob{
JobID: int64(jobID),
JobID: jobID,
MutationID: 1,
})
jobToBlock.Store(jobID)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/rowexec/backfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func GetResumeSpans(
// know which job it's associated with.
for _, job := range tableDesc.GetMutationJobs() {
if job.MutationID == mutationID {
jobID = jobspb.JobID(job.JobID)
jobID = job.JobID
break
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/rowexec/backfiller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestWriteResumeSpan(t *testing.T) {
if len(tableDesc.MutationJobs) > 0 {
for _, job := range tableDesc.MutationJobs {
if job.MutationID == mutationID {
jobID = jobspb.JobID(job.JobID)
jobID = job.JobID
break
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2470,7 +2470,7 @@ func (sc *SchemaChanger) queueCleanupJob(
log.Infof(ctx, "created job %d to drop previous columns and indexes", jobID)
scDesc.MutationJobs = append(scDesc.MutationJobs, descpb.TableDescriptor_MutationJob{
MutationID: mutationID,
JobID: int64(jobID),
JobID: jobID,
})
}
return jobID, nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/schemachanger/scexec/scmutationexec/scmutationexec.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,13 @@ func (m *visitor) swapSchemaChangeJobID(
if !ok {
return nil
}
if jobspb.JobID(mut.NewSchemaChangeJobID) != exp {
if mut.NewSchemaChangeJobID != exp {
return errors.AssertionFailedf(
"unexpected schema change job ID %d on table %d, expected %d",
mut.NewSchemaChangeJobID, descID, exp,
)
}
mut.NewSchemaChangeJobID = int64(to)
mut.NewSchemaChangeJobID = to
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (p *planner) createOrUpdateSchemaChangeJob(
// TODO (lucy): get rid of this when we get rid of MutationJobs.
if mutationID != descpb.InvalidMutationID {
tableDesc.MutationJobs = append(tableDesc.MutationJobs, descpb.TableDescriptor_MutationJob{
MutationID: mutationID, JobID: int64(newRecord.JobID)})
MutationID: mutationID, JobID: newRecord.JobID})
}
log.Infof(ctx, "queued new schema-change job %d for table %d, mutation %d",
newRecord.JobID, tableDesc.ID, mutationID)
Expand Down Expand Up @@ -204,7 +204,7 @@ func (p *planner) createOrUpdateSchemaChangeJob(
// Also add a MutationJob on the table descriptor.
// TODO (lucy): get rid of this when we get rid of MutationJobs.
tableDesc.MutationJobs = append(tableDesc.MutationJobs, descpb.TableDescriptor_MutationJob{
MutationID: mutationID, JobID: int64(record.JobID)})
MutationID: mutationID, JobID: record.JobID})
// For existing records, if a mutation ID ever gets assigned
// at a later point then mark it as cancellable again.
record.NonCancelable = false
Expand Down

0 comments on commit a204e69

Please sign in to comment.