Skip to content

Commit

Permalink
sql: deprecate TableDescriptor.GCMutations
Browse files Browse the repository at this point in the history
This appears unused. While the schema changer adds entries that the gc
job subsequently removes, the only other code that made use of this
field (outside of tests) was FindIndexByID. FindIndexByID appears to
use it to return a special error that no one looks for.

Release note: None
  • Loading branch information
stevendanna committed Jan 24, 2022
1 parent 03556f9 commit c08ae50
Show file tree
Hide file tree
Showing 11 changed files with 100 additions and 255 deletions.
126 changes: 48 additions & 78 deletions pkg/jobs/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,38 +93,16 @@ func writeMutation(
}
}

func writeGCMutation(
t *testing.T,
kvDB *kv.DB,
tableDesc *tabledesc.Mutable,
m descpb.TableDescriptor_GCDescriptorMutation,
) {
tableDesc.GCMutations = append(tableDesc.GCMutations, m)
tableDesc.Version++
if err := catalog.ValidateSelf(tableDesc); err != nil {
t.Fatal(err)
}
if err := kvDB.Put(
context.Background(),
catalogkeys.MakeDescMetadataKey(keys.SystemSQLCodec, tableDesc.GetID()),
tableDesc.DescriptorProto(),
); err != nil {
t.Fatal(err)
}
}

type mutationOptions struct {
// Set if the desc should have any mutations of any sort.
hasMutation bool
// Set if the mutation being inserted is a GCMutation.
hasGCMutation bool
// Set if the desc should have a job that is dropping it.
hasDropJob bool
}

func (m mutationOptions) string() string {
return fmt.Sprintf("hasMutation=%s_hasGCMutation=%s_hasDropJob=%s",
strconv.FormatBool(m.hasMutation), strconv.FormatBool(m.hasGCMutation),
return fmt.Sprintf("hasMutation=%s_hasDropJob=%s",
strconv.FormatBool(m.hasMutation),
strconv.FormatBool(m.hasDropJob))
}

Expand Down Expand Up @@ -172,7 +150,7 @@ func TestRegistryGC(t *testing.T) {
writeJob := func(name string, created, finished time.Time, status Status, mutOptions mutationOptions) string {
tableName := constructTableName(name, mutOptions)
if _, err := sqlDB.Exec(fmt.Sprintf(`
CREATE DATABASE IF NOT EXISTS t;
CREATE DATABASE IF NOT EXISTS t;
CREATE TABLE t."%s" (k VARCHAR PRIMARY KEY DEFAULT 'default', v VARCHAR,i VARCHAR NOT NULL DEFAULT 'i');
INSERT INTO t."%s" VALUES('a', 'foo');
`, tableName, tableName)); err != nil {
Expand All @@ -187,10 +165,6 @@ INSERT INTO t."%s" VALUES('a', 'foo');
writeColumnMutation(t, kvDB, tableDesc, "i", descpb.DescriptorMutation{State: descpb.
DescriptorMutation_DELETE_AND_WRITE_ONLY, Direction: descpb.DescriptorMutation_DROP})
}
if mutOptions.hasGCMutation {
writeGCMutation(t, kvDB, tableDesc, descpb.TableDescriptor_GCDescriptorMutation{})
}

payload, err := protoutil.Marshal(&jobspb.Payload{
Description: name,
// register a mutation on the table so that jobs that reference
Expand Down Expand Up @@ -222,58 +196,54 @@ INSERT INTO t."%s" VALUES('a', 'foo');

// Test the descriptor when any of the following are set.
// 1. Mutations
// 2. GC Mutations
// 3. A drop job
// 2. A drop job
for _, hasMutation := range []bool{true, false} {
for _, hasGCMutation := range []bool{true, false} {
for _, hasDropJob := range []bool{true, false} {
if !hasMutation && !hasGCMutation && !hasDropJob {
continue
}
mutOptions := mutationOptions{
hasMutation: hasMutation,
hasGCMutation: hasGCMutation,
hasDropJob: hasDropJob,
}
oldRunningJob := writeJob("old_running", muchEarlier, time.Time{}, StatusRunning, mutOptions)
oldSucceededJob := writeJob("old_succeeded", muchEarlier, muchEarlier.Add(time.Minute), StatusSucceeded, mutOptions)
oldFailedJob := writeJob("old_failed", muchEarlier, muchEarlier.Add(time.Minute),
StatusFailed, mutOptions)
oldRevertFailedJob := writeJob("old_revert_failed", muchEarlier, muchEarlier.Add(time.Minute),
StatusRevertFailed, mutOptions)
oldCanceledJob := writeJob("old_canceled", muchEarlier, muchEarlier.Add(time.Minute),
StatusCanceled, mutOptions)
newRunningJob := writeJob("new_running", earlier, earlier.Add(time.Minute), StatusRunning,
mutOptions)
newSucceededJob := writeJob("new_succeeded", earlier, earlier.Add(time.Minute), StatusSucceeded, mutOptions)
newFailedJob := writeJob("new_failed", earlier, earlier.Add(time.Minute), StatusFailed, mutOptions)
newRevertFailedJob := writeJob("new_revert_failed", earlier, earlier.Add(time.Minute), StatusRevertFailed, mutOptions)
newCanceledJob := writeJob("new_canceled", earlier, earlier.Add(time.Minute),
StatusCanceled, mutOptions)

db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{
{oldRunningJob}, {oldSucceededJob}, {oldFailedJob}, {oldRevertFailedJob}, {oldCanceledJob},
{newRunningJob}, {newSucceededJob}, {newFailedJob}, {newRevertFailedJob}, {newCanceledJob}})

if err := s.JobRegistry().(*Registry).cleanupOldJobs(ctx, earlier); err != nil {
t.Fatal(err)
}
db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{
{oldRunningJob}, {oldRevertFailedJob}, {newRunningJob}, {newSucceededJob},
{newFailedJob}, {newRevertFailedJob}, {newCanceledJob}})
for _, hasDropJob := range []bool{true, false} {
if !hasMutation && !hasDropJob {
continue
}
mutOptions := mutationOptions{
hasMutation: hasMutation,
hasDropJob: hasDropJob,
}
oldRunningJob := writeJob("old_running", muchEarlier, time.Time{}, StatusRunning, mutOptions)
oldSucceededJob := writeJob("old_succeeded", muchEarlier, muchEarlier.Add(time.Minute), StatusSucceeded, mutOptions)
oldFailedJob := writeJob("old_failed", muchEarlier, muchEarlier.Add(time.Minute),
StatusFailed, mutOptions)
oldRevertFailedJob := writeJob("old_revert_failed", muchEarlier, muchEarlier.Add(time.Minute),
StatusRevertFailed, mutOptions)
oldCanceledJob := writeJob("old_canceled", muchEarlier, muchEarlier.Add(time.Minute),
StatusCanceled, mutOptions)
newRunningJob := writeJob("new_running", earlier, earlier.Add(time.Minute), StatusRunning,
mutOptions)
newSucceededJob := writeJob("new_succeeded", earlier, earlier.Add(time.Minute), StatusSucceeded, mutOptions)
newFailedJob := writeJob("new_failed", earlier, earlier.Add(time.Minute), StatusFailed, mutOptions)
newRevertFailedJob := writeJob("new_revert_failed", earlier, earlier.Add(time.Minute), StatusRevertFailed, mutOptions)
newCanceledJob := writeJob("new_canceled", earlier, earlier.Add(time.Minute),
StatusCanceled, mutOptions)

db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{
{oldRunningJob}, {oldSucceededJob}, {oldFailedJob}, {oldRevertFailedJob}, {oldCanceledJob},
{newRunningJob}, {newSucceededJob}, {newFailedJob}, {newRevertFailedJob}, {newCanceledJob}})

if err := s.JobRegistry().(*Registry).cleanupOldJobs(ctx, earlier); err != nil {
t.Fatal(err)
}
db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{
{oldRunningJob}, {oldRevertFailedJob}, {newRunningJob}, {newSucceededJob},
{newFailedJob}, {newRevertFailedJob}, {newCanceledJob}})

if err := s.JobRegistry().(*Registry).cleanupOldJobs(ctx, ts.Add(time.Minute*-10)); err != nil {
t.Fatal(err)
}
db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{
{oldRunningJob}, {oldRevertFailedJob}, {newRunningJob}, {newRevertFailedJob}})

// Delete the revert failed, and running jobs for the next run of the
// test.
_, err := sqlDB.Exec(`DELETE FROM system.jobs WHERE id = $1 OR id = $2 OR id = $3 OR id = $4`,
oldRevertFailedJob, newRevertFailedJob, oldRunningJob, newRunningJob)
require.NoError(t, err)
if err := s.JobRegistry().(*Registry).cleanupOldJobs(ctx, ts.Add(time.Minute*-10)); err != nil {
t.Fatal(err)
}
db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{
{oldRunningJob}, {oldRevertFailedJob}, {newRunningJob}, {newRevertFailedJob}})

// Delete the revert failed, and running jobs for the next run of the
// test.
_, err := sqlDB.Exec(`DELETE FROM system.jobs WHERE id = $1 OR id = $2 OR id = $3 OR id = $4`,
oldRevertFailedJob, newRevertFailedJob, oldRunningJob, newRunningJob)
require.NoError(t, err)
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/sql/catalog/descpb/structured.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1108,15 +1108,20 @@ message TableDescriptor {
(gogoproto.customname) = "JobID", deprecated = true];
}

// Before 22.1:
// The schema elements that have been dropped and whose underlying
// data needs to be gc-ed. These schema elements have already transitioned
// through the drop state machine when they were in the above mutations
// list, and can be safely deleted. The names for these schema elements
// can be reused. This list is separate because mutations can
// lie in this list for a long time (gc deadline) and should not block
// the execution of other schema changes on the table.
//
// Since 22.1 this is field is deprecated and no longer maintained.
// The index GC job still removes mutations it finds in this list.
repeated GCDescriptorMutation gc_mutations = 33 [(gogoproto.nullable) = false,
(gogoproto.customname) = "GCMutations"];
(gogoproto.customname) = "GCMutations",
deprecated = true];

optional string create_query = 34 [(gogoproto.nullable) = false];

Expand Down
2 changes: 0 additions & 2 deletions pkg/sql/catalog/descriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,8 +533,6 @@ type TableDescriptor interface {
MakePublic() TableDescriptor
// AllMutations returns all of the table descriptor's mutations.
AllMutations() []Mutation
// GetGCMutations returns the table descriptor's GC mutations.
GetGCMutations() []descpb.TableDescriptor_GCDescriptorMutation
// GetMutationJobs returns the table descriptor's mutation jobs.
GetMutationJobs() []descpb.TableDescriptor_MutationJob

Expand Down
7 changes: 1 addition & 6 deletions pkg/sql/catalog/lease/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1118,14 +1118,9 @@ INSERT INTO t.kv VALUES ('a', 'b');
}

testutils.SucceedsSoon(t, func() error {
if tableDesc := catalogkv.TestingGetTableDescriptor(kvDB, keys.SystemSQLCodec, "t", "kv"); len(tableDesc.GetGCMutations()) != 0 {
return errors.Errorf("%d gc mutations remaining", len(tableDesc.GetGCMutations()))
}
return nil
return tests.CheckKeyCountE(t, kvDB, tableSpan, 2)
})

tests.CheckKeyCount(t, kvDB, tableSpan, 2)

// TODO(erik, vivek): Transactions using old descriptors should fail and
// rollback when the index keys have been removed by ClearRange
// and the consistency issue is resolved. See #31563.
Expand Down
5 changes: 0 additions & 5 deletions pkg/sql/catalog/tabledesc/structured.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,6 @@ var ErrMissingColumns = errors.New("table must contain at least 1 column")
// ErrMissingPrimaryKey indicates a table with no primary key.
var ErrMissingPrimaryKey = errors.New("table must contain a primary key")

// ErrIndexGCMutationsList is returned by FindIndexWithID to signal that the
// index with the given ID does not have a descriptor and is in the garbage
// collected mutations list.
var ErrIndexGCMutationsList = errors.New("index in GC mutations list")

// PostDeserializationTableDescriptorChanges are a set of booleans to indicate
// which types of upgrades or fixes occurred when filling in the descriptor
// after deserialization.
Expand Down
5 changes: 0 additions & 5 deletions pkg/sql/catalog/tabledesc/table_desc.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,11 +334,6 @@ func (desc *wrapper) FindIndexWithID(id descpb.IndexID) (catalog.Index, error) {
}); idx != nil {
return idx, nil
}
for _, m := range desc.GCMutations {
if m.IndexID == id {
return nil, ErrIndexGCMutationsList
}
}
return nil, errors.Errorf("index-id \"%d\" does not exist", id)
}

Expand Down
22 changes: 15 additions & 7 deletions pkg/sql/gcjob/descriptor_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,34 +23,42 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
)

// updateDescriptorGCMutations removes the GCMutation for the given
// index ID. We no longer populate this field, but we still search it
// to remove existing entries.
func updateDescriptorGCMutations(
ctx context.Context,
execCfg *sql.ExecutorConfig,
tableID descpb.ID,
garbageCollectedIndexID descpb.IndexID,
) error {
log.Infof(ctx, "updating GCMutations for table %d after removing index %d",
tableID, garbageCollectedIndexID)
// Remove the mutation from the table descriptor.
return sql.DescsTxn(ctx, execCfg, func(
ctx context.Context, txn *kv.Txn, descsCol *descs.Collection,
) error {
tbl, err := descsCol.GetMutableTableVersionByID(ctx, tableID, txn)
if err != nil {
return err
}
found := false
for i := 0; i < len(tbl.GCMutations); i++ {
other := tbl.GCMutations[i]
if other.IndexID == garbageCollectedIndexID {
tbl.GCMutations = append(tbl.GCMutations[:i], tbl.GCMutations[i+1:]...)
found = true
break
}
}
b := txn.NewBatch()
if err := descsCol.WriteDescToBatch(ctx, false /* kvTrace */, tbl, b); err != nil {
return err
if found {
log.Infof(ctx, "updating GCMutations for table %d after removing index %d",
tableID, garbageCollectedIndexID)
// Remove the mutation from the table descriptor.
b := txn.NewBatch()
if err := descsCol.WriteDescToBatch(ctx, false /* kvTrace */, tbl, b); err != nil {
return err
}
return txn.Run(ctx, b)
}
return txn.Run(ctx, b)
return nil
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ WHERE message NOT LIKE '%Z/%' AND message NOT LIKE 'querying next range at%'
AND operation != 'dist sender send'
----
batch flow coordinator Del /NamespaceTable/30/1/56/57/"kv"/4/1
batch flow coordinator Put /Table/3/1/58/2/1 -> table:<name:"kv" id:58 version:8 modification_time:<> parent_id:56 unexposed_parent_schema_id:57 columns:<name:"k" id:1 type:<family: IntFamily width: 64 precision: 0 locale: "" visible_type: 0 oid: 20 time_precision_is_set: false > nullable:false hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > columns:<name:"v" id:2 type:<family: IntFamily width: 64 precision: 0 locale: "" visible_type: 0 oid: 20 time_precision_is_set: false > nullable:true hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > next_column_id:3 families:<name:"primary" id:0 column_names:"k" column_names:"v" column_ids:1 column_ids:2 default_column_id:2 > next_family_id:1 primary_index:<name:"kv_pkey" id:1 unique:true version:4 key_column_names:"k" key_column_directions:ASC store_column_names:"v" key_column_ids:1 store_column_ids:2 foreign_key:<table:0 index:0 name:"" validity:Validated shared_prefix_len:0 on_delete:NO_ACTION on_update:NO_ACTION match:SIMPLE > interleave:<> partitioning:<num_columns:0 num_implicit_columns:0 > type:FORWARD created_explicitly:false encoding_type:1 sharded:<is_sharded:false name:"" shard_buckets:0 > disabled:false geo_config:<> predicate:"" use_delete_preserving_encoding:false > next_index_id:3 privileges:<users:<user_proto:"admin" privileges:2 with_grant_option:0 > users:<user_proto:"public" privileges:0 with_grant_option:0 > users:<user_proto:"root" privileges:2 with_grant_option:0 > owner_proto:"root" version:2 > next_mutation_id:3 format_version:3 state:DROP offline_reason:"" view_query:"" is_materialized_view:false new_schema_change_job_id:0 drop_time:... replacement_of:<id:0 time:<> > audit_mode:DISABLED drop_job_id:0 gc_mutations:<index_id:2 drop_time:... job_id:0 > create_query:"" create_as_of_time:<...> temporary:false partition_all_by:false >
batch flow coordinator Put /Table/3/1/58/2/1 -> table:<name:"kv" id:58 version:8 modification_time:<> parent_id:56 unexposed_parent_schema_id:57 columns:<name:"k" id:1 type:<family: IntFamily width: 64 precision: 0 locale: "" visible_type: 0 oid: 20 time_precision_is_set: false > nullable:false hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > columns:<name:"v" id:2 type:<family: IntFamily width: 64 precision: 0 locale: "" visible_type: 0 oid: 20 time_precision_is_set: false > nullable:true hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > next_column_id:3 families:<name:"primary" id:0 column_names:"k" column_names:"v" column_ids:1 column_ids:2 default_column_id:2 > next_family_id:1 primary_index:<name:"kv_pkey" id:1 unique:true version:4 key_column_names:"k" key_column_directions:ASC store_column_names:"v" key_column_ids:1 store_column_ids:2 foreign_key:<table:0 index:0 name:"" validity:Validated shared_prefix_len:0 on_delete:NO_ACTION on_update:NO_ACTION match:SIMPLE > interleave:<> partitioning:<num_columns:0 num_implicit_columns:0 > type:FORWARD created_explicitly:false encoding_type:1 sharded:<is_sharded:false name:"" shard_buckets:0 > disabled:false geo_config:<> predicate:"" use_delete_preserving_encoding:false > next_index_id:3 privileges:<users:<user_proto:"admin" privileges:2 with_grant_option:0 > users:<user_proto:"public" privileges:0 with_grant_option:0 > users:<user_proto:"root" privileges:2 with_grant_option:0 > owner_proto:"root" version:2 > next_mutation_id:3 format_version:3 state:DROP offline_reason:"" view_query:"" is_materialized_view:false new_schema_change_job_id:0 drop_time:... replacement_of:<id:0 time:<> > audit_mode:DISABLED drop_job_id:0 create_query:"" create_as_of_time:<...> temporary:false partition_all_by:false >
exec stmt rows affected: 0

# Check that session tracing does not inhibit the fast path for inserts &
Expand Down
10 changes: 0 additions & 10 deletions pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1090,15 +1090,6 @@ func (sc *SchemaChanger) done(ctx context.Context) error {
}
isRollback = m.IsRollback()
if idx := m.AsIndex(); m.Dropped() && idx != nil {
// how we keep track of dropped index names (for, e.g., zone config
// lookups), even though in the absence of a GC job there's nothing to
// clean them up.
scTable.GCMutations = append(
scTable.GCMutations,
descpb.TableDescriptor_GCDescriptorMutation{
IndexID: idx.GetID(),
})

description := sc.job.Payload().Description
if isRollback {
description = "ROLLBACK of " + description
Expand All @@ -1107,7 +1098,6 @@ func (sc *SchemaChanger) done(ctx context.Context) error {
if err := sc.createIndexGCJob(ctx, idx.GetID(), txn, description); err != nil {
return err
}

}
if constraint := m.AsConstraint(); constraint != nil && constraint.Adding() {
if constraint.IsForeignKey() && constraint.ForeignKey().Validity == descpb.ConstraintValidity_Unvalidated {
Expand Down
Loading

0 comments on commit c08ae50

Please sign in to comment.