Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-23.1: sql/gc_job,sqlerrors: make GC job robust to missing descriptors #99706

Merged
merged 3 commits into from
Mar 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/sql/catalog/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ go_library(
"//pkg/util/iterutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@com_github_cockroachdb_redact//interfaces",
],
)

Expand Down
14 changes: 14 additions & 0 deletions pkg/sql/catalog/descriptor_id_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,25 @@ package catalog
import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/util/intsets"
"github.com/cockroachdb/redact"
"github.com/cockroachdb/redact/interfaces"
)

// DescriptorIDSet efficiently stores an unordered set of descriptor ids.
type DescriptorIDSet struct {
set intsets.Fast
}

// SafeFormat implements SafeFormatter for DescriptorIDSet.
func (d *DescriptorIDSet) SafeFormat(s interfaces.SafePrinter, verb rune) {
s.SafeString(redact.SafeString(d.String()))
}

// String implement fmt.Stringer for DescriptorIDSet.
func (d *DescriptorIDSet) String() string {
return d.set.String()
}

// MakeDescriptorIDSet returns a set initialized with the given values.
func MakeDescriptorIDSet(ids ...descpb.ID) DescriptorIDSet {
s := DescriptorIDSet{}
Expand All @@ -32,6 +44,8 @@ func MakeDescriptorIDSet(ids ...descpb.ID) DescriptorIDSet {
// Suppress the linter.
var _ = MakeDescriptorIDSet

var _ redact.SafeFormatter = (*DescriptorIDSet)(nil)

// Add adds an id to the set. No-op if the id is already in the set.
func (d *DescriptorIDSet) Add(id descpb.ID) {
d.set.Add(int(id))
Expand Down
14 changes: 13 additions & 1 deletion pkg/sql/gcjob/gc_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -137,7 +138,7 @@ func deleteTableData(
table, err = col.ByID(txn.KV()).Get().Table(ctx, droppedTable.ID)
return err
}); err != nil {
if errors.Is(err, catalog.ErrDescriptorNotFound) {
if isMissingDescriptorError(err) {
// This can happen if another GC job created for the same table got to
// the table first. See #50344.
log.Warningf(ctx, "table descriptor %d not found while attempting to GC, skipping", droppedTable.ID)
Expand Down Expand Up @@ -566,6 +567,17 @@ func waitForWork(
return ctx.Err()
}

// isMissingDescriptorError checks whether the error has a code corresponding
// to a missing descriptor or if there is a lower-level catalog error with
// the same meaning.
//
// TODO(ajwerner,postamar): Nail down when we expect the lower-level error
// and tighten up the collection.
func isMissingDescriptorError(err error) bool {
return errors.Is(err, catalog.ErrDescriptorNotFound) ||
sqlerrors.IsMissingDescriptorError(err)
}

// OnFailOrCancel is part of the jobs.Resumer interface.
func (r schemaChangeGCResumer) OnFailOrCancel(context.Context, interface{}, error) error {
return nil
Expand Down
11 changes: 4 additions & 7 deletions pkg/sql/gcjob/index_garbage_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)
Expand All @@ -45,7 +44,7 @@ func deleteIndexData(
// are no longer in use. This is necessary in the case of truncate, where we
// schedule a GC Job in the transaction that commits the truncation.
parentDesc, err := sql.WaitToUpdateLeases(ctx, execCfg.LeaseManager, parentID)
if errors.Is(err, catalog.ErrDescriptorNotFound) {
if isMissingDescriptorError(err) {
handleTableDescriptorDeleted(ctx, parentID, progress)
return nil
}
Expand Down Expand Up @@ -94,7 +93,7 @@ func gcIndexes(
// are no longer in use. This is necessary in the case of truncate, where we
// schedule a GC Job in the transaction that commits the truncation.
parentDesc, err := sql.WaitToUpdateLeases(ctx, execCfg.LeaseManager, parentID)
if errors.Is(err, catalog.ErrDescriptorNotFound) {
if isMissingDescriptorError(err) {
handleTableDescriptorDeleted(ctx, parentID, progress)
return nil
}
Expand Down Expand Up @@ -131,8 +130,7 @@ func gcIndexes(
)
}
err := sql.DescsTxn(ctx, execCfg, removeIndexZoneConfigs)
if errors.Is(err, catalog.ErrDescriptorNotFound) ||
sqlerrors.IsUndefinedRelationError(err) {
if isMissingDescriptorError(err) {
handleTableDescriptorDeleted(ctx, parentID, progress)
return nil
}
Expand Down Expand Up @@ -213,8 +211,7 @@ func deleteIndexZoneConfigsAfterGC(
}
err := sql.DescsTxn(ctx, execCfg, removeIndexZoneConfigs)
switch {
case errors.Is(err, catalog.ErrDescriptorNotFound),
sqlerrors.IsUndefinedRelationError(err):
case isMissingDescriptorError(err):
log.Infof(ctx, "removing index %d zone config from table %d failed: %v",
index.IndexID, parentID, err)
case err != nil:
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/gcjob/refresh_statuses.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func updateStatusForGCElements(

return nil
}); err != nil {
if errors.Is(err, catalog.ErrDescriptorNotFound) {
if isMissingDescriptorError(err) {
log.Warningf(ctx, "table %d not found, marking as GC'd", tableID)
markTableGCed(ctx, tableID, progress, jobspb.SchemaChangeGCProgress_CLEARED)
return false, true, maxDeadline
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/gcjob/table_garbage_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func gcTables(
table, err = col.ByID(txn.KV()).Get().Table(ctx, droppedTable.ID)
return err
}); err != nil {
if errors.Is(err, catalog.ErrDescriptorNotFound) {
if isMissingDescriptorError(err) {
// This can happen if another GC job created for the same table got to
// the table first. See #50344.
log.Warningf(ctx, "table descriptor %d not found while attempting to GC, skipping", droppedTable.ID)
Expand Down Expand Up @@ -293,7 +293,7 @@ func deleteTableDescriptorsAfterGC(
table, err = col.ByID(txn.KV()).Get().Table(ctx, droppedTable.ID)
return err
}); err != nil {
if errors.Is(err, catalog.ErrDescriptorNotFound) {
if isMissingDescriptorError(err) {
// This can happen if another GC job created for the same table got to
// the table first. See #50344.
log.Warningf(ctx, "table descriptor %d not found while attempting to GC, skipping", droppedTable.ID)
Expand Down
40 changes: 26 additions & 14 deletions pkg/sql/gcjob_test/gc_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,23 +542,15 @@ func TestGCTenant(t *testing.T) {
})
}

// This test exercises code whereby an index GC job is running, and, in the
// This test exercises code whereby an GC job is running, and, in the
// meantime, the descriptor is removed. We want to ensure that the GC job
// finishes without an error.
func TestDropIndexWithDroppedDescriptor(t *testing.T) {
// finishes without an error. We want to test this both for index drops
// and for table drops.
func TestDropWithDeletedDescriptor(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// The way the GC job works is that it initially clears the index
// data, then it waits for the background MVCC GC to run and remove
// the underlying tombstone, and then finally it removes any relevant
// zone configurations for the index from system.zones. In the first
// and final phases, the job resolves the descriptor. This test ensures
// that the code is robust to the descriptor being removed both before
// the initial DelRange, and after, when going to remove the zone config.
testutils.RunTrueAndFalse(t, "before DelRange", func(
t *testing.T, beforeDelRange bool,
) {
runTest := func(t *testing.T, dropIndex bool, beforeDelRange bool) {
ctx, cancel := context.WithCancel(context.Background())
gcJobID := make(chan jobspb.JobID)
knobs := base.TestingKnobs{
Expand Down Expand Up @@ -628,7 +620,12 @@ SELECT descriptor_id, index_id
WHERE descriptor_name = 'foo'
AND index_name = 'foo_j_i_idx';`).Scan(&tableID, &indexID)
// Drop the index.
tdb.Exec(t, "DROP INDEX foo@foo_j_i_idx")
if dropIndex {
tdb.Exec(t, "DROP INDEX foo@foo_j_i_idx")
} else {
tdb.Exec(t, "DROP TABLE foo")
}

codec := s.ExecutorConfig().(sql.ExecutorConfig).Codec
tablePrefix.Store(codec.TablePrefix(uint32(tableID)))

Expand All @@ -654,5 +651,20 @@ SELECT descriptor_id, index_id
// Ensure that the job completes successfully in either case.
jr := s.JobRegistry().(*jobs.Registry)
require.NoError(t, jr.WaitForJobs(ctx, []jobspb.JobID{jobID}))
}

// The way the GC job works is that it initially clears the index
// data, then it waits for the background MVCC GC to run and remove
// the underlying tombstone, and then finally it removes any relevant
// zone configurations for the index from system.zones. In the first
// and final phases, the job resolves the descriptor. This test ensures
// that the code is robust to the descriptor being removed both before
// the initial DelRange, and after, when going to remove the zone config.
testutils.RunTrueAndFalse(t, "before DelRange", func(
t *testing.T, beforeDelRange bool,
) {
testutils.RunTrueAndFalse(t, "drop index", func(t *testing.T, dropIndex bool) {
runTest(t, dropIndex, beforeDelRange)
})
})
}
4 changes: 3 additions & 1 deletion pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,8 @@ func (sc *SchemaChanger) execLogTags() *logtags.Buffer {
}
if sc.droppedDatabaseID != descpb.InvalidID {
buf = buf.Add("db", sc.droppedDatabaseID)
} else if !sc.droppedSchemaIDs.Empty() {
buf = buf.Add("schema", sc.droppedSchemaIDs)
}
return buf
}
Expand Down Expand Up @@ -720,7 +722,7 @@ func (sc *SchemaChanger) exec(ctx context.Context) error {
}

// Otherwise, continue with the rest of the schema change state machine.
if tableDesc.Dropped() && sc.droppedDatabaseID == descpb.InvalidID {
if tableDesc.Dropped() && sc.droppedDatabaseID == descpb.InvalidID && sc.droppedSchemaIDs.Empty() {
if tableDesc.IsPhysicalTable() {
// We've dropped this physical table, let's kick off a GC job.
dropTime := timeutil.Now().UnixNano()
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/sem/builtins/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ go_library(
"//pkg/sql/sem/volatility",
"//pkg/sql/sessiondata",
"//pkg/sql/sessiondatapb",
"//pkg/sql/sqlerrors",
"//pkg/sql/sqlliveness",
"//pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil",
"//pkg/sql/sqltelemetry",
Expand Down
9 changes: 4 additions & 5 deletions pkg/sql/sem/builtins/pg_builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sem/volatility"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/ipaddr"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -988,12 +989,10 @@ var pgBuiltins = map[string]builtinDefinition{
typ, err = evalCtx.Planner.ResolveTypeByOID(ctx, oid)
if err != nil {
// If the error is a descriptor does not exist error, then swallow it.
unknown := tree.NewDString(fmt.Sprintf("unknown (OID=%s)", oidArg))
switch {
case errors.Is(err, catalog.ErrDescriptorNotFound):
return unknown, nil
case pgerror.GetPGCode(err) == pgcode.UndefinedObject:
return unknown, nil
case sqlerrors.IsMissingDescriptorError(err),
errors.Is(err, catalog.ErrDescriptorNotFound):
return tree.NewDString(fmt.Sprintf("unknown (OID=%s)", oidArg)), nil
default:
return nil, err
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/sql/sqlerrors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,21 @@ func IsUndefinedSchemaError(err error) bool {
return errHasCode(err, pgcode.UndefinedSchema)
}

// IsMissingDescriptorError checks whether the error has any indication
// that it corresponds to a missing descriptor of any kind.
//
// Note that this does not deal with the lower-level
// catalog.ErrDescriptorNotFound error. That error should be transformed
// by this package for all uses in the SQL layer and coming out of
// descs.Collection functions.
func IsMissingDescriptorError(err error) bool {
return IsUndefinedRelationError(err) ||
IsUndefinedSchemaError(err) ||
IsUndefinedDatabaseError(err) ||
errHasCode(err, pgcode.UndefinedObject) ||
errHasCode(err, pgcode.UndefinedFunction)
}

func errHasCode(err error, code ...pgcode.Code) bool {
pgCode := pgerror.GetPGCode(err)
for _, c := range code {
Expand Down