Skip to content

Commit

Permalink
sql: lease acquisition of OFFLINE descs may starve bulk operations
Browse files Browse the repository at this point in the history
Fixes: cockroachdb#61798

Previously, offline descriptors would never have their leases
cached and they would be released once the reference count hit zero.
This was inadequate because when attempting to online these tables
again the lease acquisition could be pushed back by other operations,
leading to starvation / live locks. To address this, this patch will
allow the leases of offline descriptors to be cached.

Release note (bug fix): Lease acquisitions of descriptor in a
offline state may starve out bulk operations (backup / restore)
  • Loading branch information
fqazi committed Apr 8, 2021
1 parent 65530d7 commit 79b00c9
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 24 deletions.
6 changes: 3 additions & 3 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6394,10 +6394,10 @@ CREATE TYPE sc.typ AS ENUM ('hello');
sqlDB.ExpectErr(t, `target database or schema does not exist`, `SHOW TABLES FROM d`)
sqlDB.ExpectErr(t, `target database or schema does not exist`, `SHOW TABLES FROM d.sc`)

sqlDB.ExpectErr(t, `relation "d.sc.tb" does not exist`, `SELECT * FROM d.sc.tb`)
sqlDB.ExpectErr(t, `relation "d.sc.tb" does not exist`, `ALTER TABLE d.sc.tb ADD COLUMN b INT`)
sqlDB.ExpectErr(t, `database "d" is offline: restoring`, `SELECT * FROM d.sc.tb`)
sqlDB.ExpectErr(t, `database "d" is offline: restoring`, `ALTER TABLE d.sc.tb ADD COLUMN b INT`)

sqlDB.ExpectErr(t, `type "d.sc.typ" does not exist`, `ALTER TYPE d.sc.typ RENAME TO typ2`)
sqlDB.ExpectErr(t, `database "d" is offline: restoring`, `ALTER TYPE d.sc.typ RENAME TO typ2`)

sqlDB.ExpectErr(t, `cannot create "d.sc.other" because the target database or schema does not exist`, `CREATE TABLE d.sc.other()`)

Expand Down
25 changes: 22 additions & 3 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1407,11 +1407,18 @@ func (r *importResumer) publishTables(ctx context.Context, execCfg *sql.Executor
}
return nil
})

if err != nil {
return err
}

// Wait for the table to be public before completing.
for _, tbl := range details.Tables {
_, err := lm.WaitForOneVersion(ctx, tbl.Desc.ID, retry.Options{})
if err != nil {
return errors.Wrap(err, "publishing tables waiting for one version")
}
}

// Initiate a run of CREATE STATISTICS. We don't know the actual number of
// rows affected per table, so we use a large number because we want to make
// sure that stats always get created/refreshed here.
Expand All @@ -1431,14 +1438,26 @@ func (r *importResumer) OnFailOrCancel(ctx context.Context, phs interface{}) err
addToFileFormatTelemetry(details.Format.Format.String(), "failed")
cfg := phs.(sql.PlanHookState).ExecCfg()
lm, ie, db := cfg.LeaseManager, cfg.InternalExecutor, cfg.DB
return descs.Txn(ctx, cfg.Settings, lm, ie, db, func(
if err := descs.Txn(ctx, cfg.Settings, lm, ie, db, func(
ctx context.Context, txn *kv.Txn, descsCol *descs.Collection,
) error {
if err := r.dropTables(ctx, txn, descsCol, cfg); err != nil {
return err
}
return r.releaseProtectedTimestamp(ctx, txn, cfg.ProtectedTimestampProvider)
})
}); err != nil {
return err
}
// Wait for the tables to become public before completing.
if details.PrepareComplete {
for _, tableDesc := range details.Tables {
_, err := cfg.LeaseManager.WaitForOneVersion(ctx, tableDesc.Desc.ID, retry.Options{})
if err != nil {
return errors.Wrap(err, "rolling back tables waiting for them to be public")
}
}
}
return nil
}

func (r *importResumer) releaseProtectedTimestamp(
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2642,7 +2642,7 @@ func TestImportIntoCSV(t *testing.T) {
<-importBodyFinished

err := sqlDB.DB.QueryRowContext(ctx, `SELECT 1 FROM t`).Scan(&unused)
if !testutils.IsError(err, "relation \"t\" does not exist") {
if !testutils.IsError(err, "relation \"t\" is offline: importing") {
return err
}
return nil
Expand Down
9 changes: 8 additions & 1 deletion pkg/sql/catalog/descs/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (tc *Collection) getLeasedDescriptorByName(
// Read the descriptor from the store in the face of some specific errors
// because of a known limitation of AcquireByName. See the known
// limitations of AcquireByName for details.
if catalog.HasInactiveDescriptorError(err) ||
if (catalog.HasInactiveDescriptorError(err) && errors.Is(err, catalog.ErrDescriptorDropped)) ||
errors.Is(err, catalog.ErrDescriptorNotFound) {
return nil, true, nil
}
Expand Down Expand Up @@ -941,6 +941,9 @@ func (tc *Collection) getDescriptorVersionByID(
// First, look to see if we already have the table in the shared cache.
if desc := tc.leasedDescriptors.getByID(id); desc != nil {
log.VEventf(ctx, 2, "found descriptor %d in cache", id)
if err := catalog.FilterDescriptorState(desc, flags); err != nil {
return nil, err
}
return desc, nil
}

Expand All @@ -949,6 +952,10 @@ func (tc *Collection) getDescriptorVersionByID(
if err != nil {
return nil, err
}
// Filter based on the state
if err := catalog.FilterDescriptorState(desc, flags); err != nil {
return nil, err
}

if expiration.LessEq(readTimestamp) {
log.Fatalf(ctx, "bad descriptor for T=%s, expiration=%s", readTimestamp, expiration)
Expand Down
52 changes: 37 additions & 15 deletions pkg/sql/catalog/lease/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (s storage) acquire(
return err
}
if err := catalog.FilterDescriptorState(
desc, tree.CommonLookupFlags{}, // filter all non-public state
desc, tree.CommonLookupFlags{IncludeOffline: true}, // filter dropped only
); err != nil {
return err
}
Expand Down Expand Up @@ -972,7 +972,7 @@ func purgeOldVersions(
ctx context.Context,
db *kv.DB,
id descpb.ID,
takenOffline bool,
dropped bool,
minVersion descpb.DescriptorVersion,
m *Manager,
) error {
Expand All @@ -983,24 +983,24 @@ func purgeOldVersions(
t.mu.Lock()
empty := len(t.mu.active.data) == 0 && t.mu.acquisitionsInProgress == 0
t.mu.Unlock()
if empty {
if empty && !dropped {
// We don't currently have a version on this descriptor, so no need to refresh
// anything.
return nil
}

removeInactives := func(takenOffline bool) {
removeInactives := func(dropped bool) {
t.mu.Lock()
t.mu.takenOffline = takenOffline
t.mu.takenOffline = dropped
leases := t.removeInactiveVersions()
t.mu.Unlock()
for _, l := range leases {
releaseLease(l, m)
}
}

if takenOffline {
removeInactives(takenOffline)
if dropped {
removeInactives(true /* dropped */)
return nil
}

Expand All @@ -1026,7 +1026,7 @@ func purgeOldVersions(
}
return nil
}
return err
return nil
}

// maybeQueueLeaseRenewal queues a lease renewal if there is not already a lease
Expand Down Expand Up @@ -1386,6 +1386,28 @@ func (m *Manager) AcquireByName(
parentSchemaID descpb.ID,
name string,
) (catalog.Descriptor, hlc.Timestamp, error) {
// When offline descriptor leases were not allowed to be cached,
// attempt to acquire a lease on them would generate a descriptor
// offline error. Recent changes allow offline descriptor leases
// to be cached, but callers still need the offline error generated.
// This logic will release the lease (the lease manager will still
// cache it), and generate the offline descriptor error.
validateDescriptorForReturn := func(desc catalog.Descriptor,
expiration hlc.Timestamp) (catalog.Descriptor, hlc.Timestamp, error) {
if desc.Offline() {
if err := catalog.FilterDescriptorState(
desc, tree.CommonLookupFlags{},
); err != nil {
releaseErr := m.Release(desc)
if releaseErr != nil {
log.Warningf(ctx, "error releasing lease: %s", releaseErr)
}
return nil, hlc.Timestamp{}, err
}
}
return desc, expiration, nil
}

// Check if we have cached an ID for this name.
descVersion := m.names.get(parentID, parentSchemaID, name, timestamp)
if descVersion != nil {
Expand All @@ -1400,7 +1422,7 @@ func (m *Manager) AcquireByName(
}
}
}
return descVersion.Descriptor, descVersion.expiration, nil
return validateDescriptorForReturn(descVersion.Descriptor, descVersion.expiration)
}
if err := m.Release(descVersion); err != nil {
return nil, hlc.Timestamp{}, err
Expand All @@ -1410,7 +1432,7 @@ func (m *Manager) AcquireByName(
if err != nil {
return nil, hlc.Timestamp{}, err
}
return desc, expiration, nil
return validateDescriptorForReturn(desc, expiration)
}

// We failed to find something in the cache, or what we found is not
Expand Down Expand Up @@ -1479,7 +1501,7 @@ func (m *Manager) AcquireByName(
return nil, hlc.Timestamp{}, catalog.ErrDescriptorNotFound
}
}
return desc, expiration, nil
return validateDescriptorForReturn(desc, expiration)
}

// resolveName resolves a descriptor name to a descriptor ID at a particular
Expand Down Expand Up @@ -1691,12 +1713,12 @@ func (m *Manager) refreshLeases(
}

id, version, name, state := descpb.GetDescriptorMetadata(desc)
goingOffline := state == descpb.DescriptorState_DROP || state == descpb.DescriptorState_OFFLINE
dropped := state == descpb.DescriptorState_DROP
// Try to refresh the lease to one >= this version.
log.VEventf(ctx, 2, "purging old version of descriptor %d@%d (offline %v)",
id, version, goingOffline)
log.VEventf(ctx, 2, "purging old version of descriptor %d@%d (dropped %v)",
id, version, dropped)
if err := purgeOldVersions(
ctx, db, id, goingOffline, version, m); err != nil {
ctx, db, id, dropped, version, m); err != nil {
log.Warningf(ctx, "error purging leases for descriptor %d(%s): %s",
id, name, err)
}
Expand Down
Loading

0 comments on commit 79b00c9

Please sign in to comment.