Skip to content

Commit

Permalink
Merge #79024 #79045
Browse files Browse the repository at this point in the history
79024: sql/schemachanger: rework gcjob creation for descriptions r=ajwerner a=ajwerner

This should also be a bit easier to extend in directions we'll need to go in
the future.

Part of dealing with #78030.

Release note: None

79045: sql/parser: remove mention of BUCKET_COUNT from help text r=ajwerner a=ajwerner

Fixes #79044

Release note (sql change): Help text for creating indexes or primary key
constraints no longer mentions BUCKET_COUNT because it can now be omitted
and a default is used.

Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
craig[bot] and ajwerner committed Mar 30, 2022
3 parents 4bce84e + 039bb4c + 9416c64 commit a059691
Show file tree
Hide file tree
Showing 27 changed files with 455 additions and 167 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/partitionccl/drop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,6 @@ SELECT job_id

waitForJobDone(t, tdb, "GC for DROP INDEX%idx")
tdb.Exec(t, `ALTER RANGE default CONFIGURE ZONE USING gc.ttlseconds = 1`)
waitForJobDone(t, tdb, "GC for dropping descriptor %")
waitForJobDone(t, tdb, "GC for DROP TABLE%t")
})
}
4 changes: 2 additions & 2 deletions pkg/ccl/schemachangerccl/testdata/end_to_end/drop_multiregion
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ upsert descriptor #108
- version: "2"
+ version: "3"
write *eventpb.DropTable to event log for descriptor #108: DROP TABLE ‹multi_region_test_db›.‹public›.‹table_regional_by_row›
create job #2 (non-cancelable: true): "GC for dropping descriptor 108"
create job #2 (non-cancelable: true): "GC for DROP TABLE multi_region_test_db.public.table_regional_by_row"
descriptor IDs: [108]
update progress of schema change job #1
commit transaction #3
Expand Down Expand Up @@ -1208,7 +1208,7 @@ upsert descriptor #109
- version: "2"
+ version: "3"
write *eventpb.DropTable to event log for descriptor #109: DROP TABLE ‹multi_region_test_db›.‹public›.‹table_regional_by_table› CASCADE
create job #2 (non-cancelable: true): "GC for dropping descriptor 109"
create job #2 (non-cancelable: true): "GC for DROP TABLE multi_region_test_db.public.table_regional_by_table CASCADE"
descriptor IDs: [109]
update progress of schema change job #1
commit transaction #3
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/logictest/testdata/logic_test/drop_database
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,8 @@ WITH cte AS (
----
SCHEMA CHANGE CREATE TABLE constraint_db.public.t2 (t1_id INT8, CONSTRAINT fk FOREIGN KEY (t1_id) REFERENCES constraint_db.public.t1 (a), INDEX (t1_id)) succeeded
SCHEMA CHANGE updating referenced FK table t1(125) for table t2(126) succeeded
SCHEMA CHANGE GC GC for dropping descriptors 113 116 119 running
SCHEMA CHANGE GC GC for dropping descriptors 123 125 126 running
SCHEMA CHANGE GC GC for DROP DATABASE constraint_db CASCADE running
SCHEMA CHANGE GC GC for DROP DATABASE d2 CASCADE running

query TTTTT
SHOW DATABASES
Expand Down
5 changes: 1 addition & 4 deletions pkg/sql/logictest/testdata/logic_test/fk
Original file line number Diff line number Diff line change
Expand Up @@ -997,17 +997,14 @@ DELETE FROM a WHERE id > 2
statement ok
DELETE FROM b WHERE id = 2

let $a_table_id
SELECT 'a'::REGCLASS::INT::STRING

statement ok
DROP TABLE a

# Check proper GC job description formatting when removing FK back-references when dropping a table (#59221).
query T
SELECT job_type FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE GC' AND
(description LIKE 'GC for updating table "a"%' OR
description = 'GC for dropping descriptor ' || $a_table_id);
description = 'GC for DROP TABLE a');
----
SCHEMA CHANGE GC

Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/parser/sql.y
Original file line number Diff line number Diff line change
Expand Up @@ -7046,12 +7046,12 @@ alter_schema_stmt:
// Table elements:
// <name> <type> [<qualifiers...>]
// [UNIQUE | INVERTED] INDEX [<name>] ( <colname> [ASC | DESC] [, ...] )
// [USING HASH WITH BUCKET_COUNT = <shard_buckets>] [{STORING | INCLUDE | COVERING} ( <colnames...> )]
// [USING HASH] [{STORING | INCLUDE | COVERING} ( <colnames...> )]
// FAMILY [<name>] ( <colnames...> )
// [CONSTRAINT <name>] <constraint>
//
// Table constraints:
// PRIMARY KEY ( <colnames...> ) [USING HASH WITH BUCKET_COUNT = <shard_buckets>]
// PRIMARY KEY ( <colnames...> ) [USING HASH]
// FOREIGN KEY ( <colnames...> ) REFERENCES <tablename> [( <colnames...> )] [ON DELETE {NO ACTION | RESTRICT}] [ON UPDATE {NO ACTION | RESTRICT}]
// UNIQUE ( <colnames...> ) [{STORING | INCLUDE | COVERING} ( <colnames...> )]
// CHECK ( <expr> )
Expand Down Expand Up @@ -8478,7 +8478,7 @@ enum_val_list:
// %Text:
// CREATE [UNIQUE | INVERTED] INDEX [CONCURRENTLY] [IF NOT EXISTS] [<idxname>]
// ON <tablename> ( <colname> [ASC | DESC] [, ...] )
// [USING HASH WITH BUCKET_COUNT = <shard_buckets>] [STORING ( <colnames...> )]
// [USING HASH] [STORING ( <colnames...> )]
// [PARTITION BY <partition params>]
// [WITH <storage_parameter_list] [WHERE <where_conds...>]
//
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/schemachanger/scexec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"exec_mutation.go",
"exec_validation.go",
"executor.go",
"gc_jobs.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec",
visibility = ["//visibility:public"],
Expand All @@ -27,6 +28,7 @@ go_library(
"//pkg/sql/schemachanger/scpb",
"//pkg/sql/sem/catid",
"//pkg/sql/sessiondata",
"//pkg/util",
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
"//pkg/util/log",
Expand Down
190 changes: 56 additions & 134 deletions pkg/sql/schemachanger/scexec/exec_mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@ package scexec

import (
"context"
"fmt"
"sort"
"strings"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
Expand All @@ -28,7 +26,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/catid"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)

Expand All @@ -53,37 +50,60 @@ func executeDescriptorMutationOps(ctx context.Context, deps Dependencies, ops []
// liveness benefit because their entries are non-deterministic. The jobs
// writes are particularly bad because that table is constantly being
// scanned.
if err := performBatchedCatalogWrites(ctx, mvs, deps.Catalog()); err != nil {
dbZoneConfigsToDelete, gcJobRecords := mvs.gcJobs.makeRecords(
deps.TransactionalJobRegistry().MakeJobID,
)
if err := performBatchedCatalogWrites(
ctx,
mvs.descriptorsToDelete,
dbZoneConfigsToDelete,
mvs.checkedOutDescriptors,
mvs.drainedNames,
deps.Catalog(),
); err != nil {
return err
}
if err := logEvents(ctx, mvs, deps.EventLogger()); err != nil {
return err
}
if err := updateDescriptorMetadata(ctx, mvs, deps.DescriptorMetadataUpdater(ctx)); err != nil {
if err := updateDescriptorMetadata(
ctx, mvs, deps.DescriptorMetadataUpdater(ctx),
); err != nil {
return err
}
return updateOrDeleteJobs(ctx, deps.TransactionalJobRegistry(), mvs)
return manageJobs(
ctx,
gcJobRecords,
mvs.schemaChangerJob,
mvs.schemaChangerJobUpdates,
deps.TransactionalJobRegistry(),
)
}

func performBatchedCatalogWrites(
ctx context.Context, mvs *mutationVisitorState, cat Catalog,
ctx context.Context,
descriptorsToDelete catalog.DescriptorIDSet,
dbZoneConfigsToDelete catalog.DescriptorIDSet,
checkedOutDescriptors nstree.Map,
drainedNames map[descpb.ID][]descpb.NameInfo,
cat Catalog,
) error {
b := cat.NewCatalogChangeBatcher()
mvs.descriptorsToDelete.ForEach(func(id descpb.ID) {
mvs.checkedOutDescriptors.Remove(id)
descriptorsToDelete.ForEach(func(id descpb.ID) {
checkedOutDescriptors.Remove(id)
})
err := mvs.checkedOutDescriptors.IterateByID(func(entry catalog.NameEntry) error {
err := checkedOutDescriptors.IterateByID(func(entry catalog.NameEntry) error {
return b.CreateOrUpdateDescriptor(ctx, entry.(catalog.MutableDescriptor))
})
if err != nil {
return err
}
for _, id := range mvs.descriptorsToDelete.Ordered() {
for _, id := range descriptorsToDelete.Ordered() {
if err := b.DeleteDescriptor(ctx, id); err != nil {
return err
}
}
for id, drainedNames := range mvs.drainedNames {
for id, drainedNames := range drainedNames {
for _, name := range drainedNames {
if err := b.DeleteName(ctx, name, id); err != nil {
return err
Expand All @@ -92,14 +112,18 @@ func performBatchedCatalogWrites(
}
// Any databases being GCed should have an entry even if none of its tables
// are being dropped. This entry will be used to generate the GC jobs below.
for _, dbID := range mvs.dbGCJobs.Ordered() {
if _, ok := mvs.descriptorGCJobs[dbID]; !ok {
// Zone config should now be safe to remove versus waiting for the GC job.
if err := b.DeleteZoneConfig(ctx, dbID); err != nil {
return err
{
var err error
dbZoneConfigsToDelete.ForEach(func(id descpb.ID) {
if err == nil {
err = b.DeleteZoneConfig(ctx, id)
}
})
if err != nil {
return err
}
}

return b.ValidateAndRun(ctx)
}

Expand Down Expand Up @@ -255,76 +279,26 @@ func updateDescriptorMetadata(
return nil
}

func updateOrDeleteJobs(
ctx context.Context, jr TransactionalJobRegistry, mvs *mutationVisitorState,
func manageJobs(
ctx context.Context,
gcJobs []jobs.Record,
scJob *jobs.Record,
scJobUpdates map[jobspb.JobID]schemaChangerJobUpdate,
jr TransactionalJobRegistry,
) error {
var dbIDs catalog.DescriptorIDSet
for dbID := range mvs.descriptorGCJobs {
dbIDs.Add(dbID)
}
for _, dbID := range dbIDs.Ordered() {
job := jobspb.SchemaChangeGCDetails{
Tables: mvs.descriptorGCJobs[dbID],
}
// Check if the database is also being cleaned up at the same time.
if mvs.dbGCJobs.Contains(dbID) {
job.ParentID = dbID
}
jobName := func() string {
var ids catalog.DescriptorIDSet
if job.ParentID != descpb.InvalidID {
ids.Add(job.ParentID)
}
for _, table := range mvs.descriptorGCJobs[dbID] {
ids.Add(table.ID)
}
var sb strings.Builder
if ids.Len() == 1 {
sb.WriteString("dropping descriptor")
} else {
sb.WriteString("dropping descriptors")
}
ids.ForEach(func(id descpb.ID) {
sb.WriteString(fmt.Sprintf(" %d", id))
})
return sb.String()
}

record := createGCJobRecord(jobName(), security.NodeUserName(), job)
record.JobID = jr.MakeJobID()
if err := jr.CreateJob(ctx, record); err != nil {
// TODO(ajwerner): Batch job creation. Should be easy, the registry has
// the needed API.
for _, j := range gcJobs {
if err := jr.CreateJob(ctx, j); err != nil {
return err
}
}
for tableID, indexes := range mvs.indexGCJobs {
job := jobspb.SchemaChangeGCDetails{
ParentID: tableID,
Indexes: indexes,
}
jobName := func() string {
if len(indexes) == 1 {
return fmt.Sprintf("dropping table %d index %d", tableID, indexes[0].IndexID)
}
var sb strings.Builder
sb.WriteString(fmt.Sprintf("dropping table %d indexes", tableID))
for _, index := range indexes {
sb.WriteString(fmt.Sprintf(" %d", index.IndexID))
}
return sb.String()
}

record := createGCJobRecord(jobName(), security.NodeUserName(), job)
record.JobID = jr.MakeJobID()
if err := jr.CreateJob(ctx, record); err != nil {
if scJob != nil {
if err := jr.CreateJob(ctx, *scJob); err != nil {
return err
}
}
if mvs.schemaChangerJob != nil {
if err := jr.CreateJob(ctx, *mvs.schemaChangerJob); err != nil {
return err
}
}
for id, update := range mvs.schemaChangerJobUpdates {
for id, update := range scJobUpdates {
if err := jr.UpdateSchemaChangeJob(ctx, id, func(
md jobs.JobMetadata, updateProgress func(*jobspb.Progress), setNonCancelable func(),
) error {
Expand All @@ -350,13 +324,12 @@ type mutationVisitorState struct {
tableCommentsToDelete catalog.DescriptorIDSet
constraintCommentsToUpdate []constraintCommentToUpdate
databaseRoleSettingsToDelete []databaseRoleSettingToDelete
dbGCJobs catalog.DescriptorIDSet
descriptorGCJobs map[descpb.ID][]jobspb.SchemaChangeGCDetails_DroppedID
indexGCJobs map[descpb.ID][]jobspb.SchemaChangeGCDetails_DroppedIndex
schemaChangerJob *jobs.Record
schemaChangerJobUpdates map[jobspb.JobID]schemaChangerJobUpdate
eventsByStatement map[uint32][]eventPayload
scheduleIDsToDelete []int64

gcJobs
}

type constraintCommentToUpdate struct {
Expand Down Expand Up @@ -406,8 +379,6 @@ func newMutationVisitorState(c Catalog) *mutationVisitorState {
return &mutationVisitorState{
c: c,
drainedNames: make(map[descpb.ID][]descpb.NameInfo),
indexGCJobs: make(map[descpb.ID][]jobspb.SchemaChangeGCDetails_DroppedIndex),
descriptorGCJobs: make(map[descpb.ID][]jobspb.SchemaChangeGCDetails_DroppedID),
eventsByStatement: make(map[uint32][]eventPayload),
}
}
Expand Down Expand Up @@ -499,29 +470,6 @@ func (mvs *mutationVisitorState) AddDrainedName(id descpb.ID, nameInfo descpb.Na
mvs.drainedNames[id] = append(mvs.drainedNames[id], nameInfo)
}

func (mvs *mutationVisitorState) AddNewGCJobForTable(table catalog.TableDescriptor) {
mvs.descriptorGCJobs[table.GetParentID()] = append(mvs.descriptorGCJobs[table.GetParentID()],
jobspb.SchemaChangeGCDetails_DroppedID{
ID: table.GetID(),
DropTime: timeutil.Now().UnixNano(),
})
}

func (mvs *mutationVisitorState) AddNewGCJobForDatabase(db catalog.DatabaseDescriptor) {
mvs.dbGCJobs.Add(db.GetID())
}

func (mvs *mutationVisitorState) AddNewGCJobForIndex(
tbl catalog.TableDescriptor, index catalog.Index,
) {
mvs.indexGCJobs[tbl.GetID()] = append(
mvs.indexGCJobs[tbl.GetID()],
jobspb.SchemaChangeGCDetails_DroppedIndex{
IndexID: index.GetID(),
DropTime: timeutil.Now().UnixNano(),
})
}

func (mvs *mutationVisitorState) AddNewSchemaChangerJob(
jobID jobspb.JobID,
stmts []scpb.Statement,
Expand Down Expand Up @@ -578,32 +526,6 @@ func MakeDeclarativeSchemaChangeJobRecord(
return rec
}

// createGCJobRecord creates the job record for a GC job, setting some
// properties which are common for all GC jobs.
func createGCJobRecord(
originalDescription string, username security.SQLUsername, details jobspb.SchemaChangeGCDetails,
) jobs.Record {
descriptorIDs := make([]descpb.ID, 0)
if len(details.Indexes) > 0 {
if len(descriptorIDs) == 0 {
descriptorIDs = []descpb.ID{details.ParentID}
}
} else {
for _, table := range details.Tables {
descriptorIDs = append(descriptorIDs, table.ID)
}
}
return jobs.Record{
Description: fmt.Sprintf("GC for %s", originalDescription),
Username: username,
DescriptorIDs: descriptorIDs,
Details: details,
Progress: jobspb.SchemaChangeGCProgress{},
RunningStatus: "waiting for GC TTL",
NonCancelable: true,
}
}

// EnqueueEvent implements the scmutationexec.MutationVisitorStateUpdater
// interface.
func (mvs *mutationVisitorState) EnqueueEvent(
Expand Down
Loading

0 comments on commit a059691

Please sign in to comment.