diff --git a/docs/generated/redact_safe.md b/docs/generated/redact_safe.md index bbb781069e10..d91bd73b2b09 100644 --- a/docs/generated/redact_safe.md +++ b/docs/generated/redact_safe.md @@ -6,7 +6,6 @@ pkg/base/node_id.go | `*NodeIDContainer` pkg/base/node_id.go | `*SQLIDContainer` pkg/base/node_id.go | `*StoreIDContainer` pkg/cli/exit/exit.go | `Code` -pkg/jobs/jobspb/wrap.go | `JobID` pkg/jobs/jobspb/wrap.go | `Type` pkg/kv/kvserver/closedts/ctpb/service.go | `LAI` pkg/kv/kvserver/closedts/ctpb/service.go | `SeqNum` @@ -25,6 +24,7 @@ pkg/roachpb/method.go | `Method` pkg/roachpb/tenant.go | `TenantID` pkg/rpc/connection_class.go | `ConnectionClass` pkg/sql/catalog/catpb/constraint.go | `ForeignKeyAction` +pkg/sql/catalog/catpb/job_id.go | `JobID` pkg/sql/catalog/descpb/structured.go | `ConstraintType` pkg/sql/catalog/descpb/structured.go | `ConstraintValidity` pkg/sql/catalog/descpb/structured.go | `DescriptorMutation_Direction` diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index bd03f36c9dd3..95d22b293a83 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -203,6 +203,7 @@ ALL_TESTS = [ "//pkg/spanconfig/spanconfigtestutils:spanconfigtestutils_test", "//pkg/sql/catalog/catalogkeys:catalogkeys_test", "//pkg/sql/catalog/catformat:catformat_test", + "//pkg/sql/catalog/catpb:catpb_test", "//pkg/sql/catalog/catprivilege:catprivilege_test", "//pkg/sql/catalog/colinfo:colinfo_test", "//pkg/sql/catalog/dbdesc:dbdesc_test", diff --git a/pkg/ccl/backupccl/restore_schema_change_creation.go b/pkg/ccl/backupccl/restore_schema_change_creation.go index b5e441ae5029..3815b79d4415 100644 --- a/pkg/ccl/backupccl/restore_schema_change_creation.go +++ b/pkg/ccl/backupccl/restore_schema_change_creation.go @@ -207,7 +207,7 @@ func createSchemaChangeJobsFromMutations( } newMutationJob := descpb.TableDescriptor_MutationJob{ MutationID: mutationID, - JobID: int64(jobID), + JobID: jobID, } mutationJobs = append(mutationJobs, newMutationJob) diff --git a/pkg/jobs/jobspb/BUILD.bazel b/pkg/jobs/jobspb/BUILD.bazel index c6983e1a7465..e76bbef111c2 100644 --- a/pkg/jobs/jobspb/BUILD.bazel +++ b/pkg/jobs/jobspb/BUILD.bazel @@ -11,6 +11,7 @@ go_library( deps = [ "//pkg/base", "//pkg/cloud", + "//pkg/sql/catalog/catpb", "//pkg/sql/catalog/descpb", "//pkg/sql/protoreflect", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/jobs/jobspb/wrap.go b/pkg/jobs/jobspb/wrap.go index a73fa7c8760c..2ccb2dc20826 100644 --- a/pkg/jobs/jobspb/wrap.go +++ b/pkg/jobs/jobspb/wrap.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/cloud" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/protoreflect" "github.com/cockroachdb/errors" @@ -24,13 +25,10 @@ import ( ) // JobID is the ID of a job. -type JobID int64 +type JobID = catpb.JobID // InvalidJobID is the zero value for JobID corresponding to no job. -const InvalidJobID JobID = 0 - -// SafeValue implements the redact.SafeValue interface. -func (j JobID) SafeValue() {} +const InvalidJobID = catpb.InvalidJobID // Details is a marker interface for job details proto structs. type Details interface{} diff --git a/pkg/jobs/validate.go b/pkg/jobs/validate.go index 8b251ba0586a..59ca91ad03c0 100644 --- a/pkg/jobs/validate.go +++ b/pkg/jobs/validate.go @@ -36,7 +36,7 @@ func ValidateJobReferencesInDescriptor( } for _, m := range tbl.GetMutationJobs() { - j, err := jmg.GetJobMetadata(jobspb.JobID(m.JobID)) + j, err := jmg.GetJobMetadata(m.JobID) if err != nil { errorAccFn(errors.WithAssertionFailure(errors.Wrapf(err, "mutation job %d", m.JobID))) continue diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index f0b5e45765d5..d9751567f9e1 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -789,7 +789,7 @@ func getJobIDForMutationWithDescriptor( ) (jobspb.JobID, error) { for _, job := range tableDesc.GetMutationJobs() { if job.MutationID == mutationID { - return jobspb.JobID(job.JobID), nil + return job.JobID, nil } } diff --git a/pkg/sql/catalog/catpb/BUILD.bazel b/pkg/sql/catalog/catpb/BUILD.bazel index de55ed1192ca..5e4f22587ae2 100644 --- a/pkg/sql/catalog/catpb/BUILD.bazel +++ b/pkg/sql/catalog/catpb/BUILD.bazel @@ -29,6 +29,7 @@ go_library( "constraint.go", "default_privilege.go", "doc.go", + "job_id.go", "multiregion.go", "privilege.go", ":gen-privilegedescversion-stringer", # keep diff --git a/pkg/sql/catalog/catpb/job_id.go b/pkg/sql/catalog/catpb/job_id.go new file mode 100644 index 000000000000..69b3234dd116 --- /dev/null +++ b/pkg/sql/catalog/catpb/job_id.go @@ -0,0 +1,22 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package catpb + +// JobID is the ID of a job. It is defined here and imported in jobspb because +// jobs are referenced in descriptors and also jobs reference parts of +// descriptors. This avoids any dependency cycles. +type JobID int64 + +// InvalidJobID is the zero value for JobID corresponding to no job. +const InvalidJobID JobID = 0 + +// SafeValue implements the redact.SafeValue interface. +func (j JobID) SafeValue() {} diff --git a/pkg/sql/catalog/descpb/structured.proto b/pkg/sql/catalog/descpb/structured.proto index 258d91f7e99b..e92ad3aabfda 100644 --- a/pkg/sql/catalog/descpb/structured.proto +++ b/pkg/sql/catalog/descpb/structured.proto @@ -1068,8 +1068,10 @@ message TableDescriptor { // The job id for a mutation job is the id in the system.jobs table of the // schema change job executing the mutation referenced by mutation_id. // This is not a jobspb.JobID to avoid a dependency cycle. - optional int64 job_id = 2 [(gogoproto.nullable) = false, - (gogoproto.customname) = "JobID"]; + optional int64 job_id = 2 [ + (gogoproto.nullable) = false, + (gogoproto.customname) = "JobID", + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb.JobID"]; } // Mutation jobs queued for execution in a FIFO order. Remains synchronized @@ -1079,8 +1081,10 @@ message TableDescriptor { // The job associated with a schema change job run in the new schema changer // (in sql/schemachanger), if one exists. Only one such job can exist at a // time. - optional int64 new_schema_change_job_id = 46 [(gogoproto.nullable) = false, - (gogoproto.customname) = "NewSchemaChangeJobID"]; + optional int64 new_schema_change_job_id = 46 [ + (gogoproto.nullable) = false, + (gogoproto.customname) = "NewSchemaChangeJobID", + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb.JobID"]; message SequenceOpts { option (gogoproto.equal) = true; @@ -1156,7 +1160,10 @@ message TableDescriptor { // The job id for a drop job is the id in the system.jobs table of the // dropping of this table. - optional int64 drop_job_id = 32 [(gogoproto.nullable) = false, (gogoproto.customname) = "DropJobID"]; + optional int64 drop_job_id = 32 [ + (gogoproto.nullable) = false, + (gogoproto.customname) = "DropJobID", + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb.JobID"]; message GCDescriptorMutation { option (gogoproto.equal) = true; @@ -1167,8 +1174,10 @@ message TableDescriptor { // The job id for a mutation job is the id in the system.jobs table of the // schema change job executing the mutation referenced by mutation_id. - optional int64 job_id = 3 [(gogoproto.nullable) = false, - (gogoproto.customname) = "JobID", deprecated = true]; + optional int64 job_id = 3 [ + (gogoproto.nullable) = false, + (gogoproto.customname) = "JobID", deprecated = true, + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb.JobID"]; } // Before 22.1: diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index e976c8e4f381..19784e468ea4 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -4465,7 +4465,7 @@ func collectMarshaledJobMetadataMap( continue } for _, j := range tbl.GetMutationJobs() { - referencedJobIDs[jobspb.JobID(j.JobID)] = struct{}{} + referencedJobIDs[j.JobID] = struct{}{} } } if len(referencedJobIDs) == 0 { diff --git a/pkg/sql/drop_table.go b/pkg/sql/drop_table.go index 5966c5b53ffe..17ccc156105c 100644 --- a/pkg/sql/drop_table.go +++ b/pkg/sql/drop_table.go @@ -17,7 +17,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" - "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -460,7 +459,7 @@ func (p *planner) markTableMutationJobsSuccessful( ctx context.Context, tableDesc *tabledesc.Mutable, ) error { for _, mj := range tableDesc.MutationJobs { - jobID := jobspb.JobID(mj.JobID) + jobID := mj.JobID // Jobs are only added in the cache during the transaction and are created // in a batch only when the transaction commits. So, if a job's record exists // in the cache, we can simply delete that record from cache because the diff --git a/pkg/sql/indexbackfiller_test.go b/pkg/sql/indexbackfiller_test.go index 7d43cd3a4a35..3e85aab8c154 100644 --- a/pkg/sql/indexbackfiller_test.go +++ b/pkg/sql/indexbackfiller_test.go @@ -502,7 +502,7 @@ INSERT INTO foo VALUES (1), (10), (100); return err } mut.MutationJobs = append(mut.MutationJobs, descpb.TableDescriptor_MutationJob{ - JobID: int64(jobID), + JobID: jobID, MutationID: 1, }) jobToBlock.Store(jobID) diff --git a/pkg/sql/rowexec/backfiller.go b/pkg/sql/rowexec/backfiller.go index 7fdf583d53d6..f47b3ab86993 100644 --- a/pkg/sql/rowexec/backfiller.go +++ b/pkg/sql/rowexec/backfiller.go @@ -229,7 +229,7 @@ func GetResumeSpans( // know which job it's associated with. for _, job := range tableDesc.GetMutationJobs() { if job.MutationID == mutationID { - jobID = jobspb.JobID(job.JobID) + jobID = job.JobID break } } diff --git a/pkg/sql/rowexec/backfiller_test.go b/pkg/sql/rowexec/backfiller_test.go index 80c24492b471..a2bc02397532 100644 --- a/pkg/sql/rowexec/backfiller_test.go +++ b/pkg/sql/rowexec/backfiller_test.go @@ -122,7 +122,7 @@ func TestWriteResumeSpan(t *testing.T) { if len(tableDesc.MutationJobs) > 0 { for _, job := range tableDesc.MutationJobs { if job.MutationID == mutationID { - jobID = jobspb.JobID(job.JobID) + jobID = job.JobID break } } diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index ce0429d28e8c..4f27f5dd7792 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -2470,7 +2470,7 @@ func (sc *SchemaChanger) queueCleanupJob( log.Infof(ctx, "created job %d to drop previous columns and indexes", jobID) scDesc.MutationJobs = append(scDesc.MutationJobs, descpb.TableDescriptor_MutationJob{ MutationID: mutationID, - JobID: int64(jobID), + JobID: jobID, }) } return jobID, nil diff --git a/pkg/sql/schemachanger/scexec/scmutationexec/scmutationexec.go b/pkg/sql/schemachanger/scexec/scmutationexec/scmutationexec.go index 147600460cec..4a7839f2f26f 100644 --- a/pkg/sql/schemachanger/scexec/scmutationexec/scmutationexec.go +++ b/pkg/sql/schemachanger/scexec/scmutationexec/scmutationexec.go @@ -184,13 +184,13 @@ func (m *visitor) swapSchemaChangeJobID( if !ok { return nil } - if jobspb.JobID(mut.NewSchemaChangeJobID) != exp { + if mut.NewSchemaChangeJobID != exp { return errors.AssertionFailedf( "unexpected schema change job ID %d on table %d, expected %d", mut.NewSchemaChangeJobID, descID, exp, ) } - mut.NewSchemaChangeJobID = int64(to) + mut.NewSchemaChangeJobID = to return nil } diff --git a/pkg/sql/table.go b/pkg/sql/table.go index 02a893a01fc6..3171f0088d5f 100644 --- a/pkg/sql/table.go +++ b/pkg/sql/table.go @@ -171,7 +171,7 @@ func (p *planner) createOrUpdateSchemaChangeJob( // TODO (lucy): get rid of this when we get rid of MutationJobs. if mutationID != descpb.InvalidMutationID { tableDesc.MutationJobs = append(tableDesc.MutationJobs, descpb.TableDescriptor_MutationJob{ - MutationID: mutationID, JobID: int64(newRecord.JobID)}) + MutationID: mutationID, JobID: newRecord.JobID}) } log.Infof(ctx, "queued new schema-change job %d for table %d, mutation %d", newRecord.JobID, tableDesc.ID, mutationID) @@ -204,7 +204,7 @@ func (p *planner) createOrUpdateSchemaChangeJob( // Also add a MutationJob on the table descriptor. // TODO (lucy): get rid of this when we get rid of MutationJobs. tableDesc.MutationJobs = append(tableDesc.MutationJobs, descpb.TableDescriptor_MutationJob{ - MutationID: mutationID, JobID: int64(record.JobID)}) + MutationID: mutationID, JobID: record.JobID}) // For existing records, if a mutation ID ever gets assigned // at a later point then mark it as cancellable again. record.NonCancelable = false