From 79b00c9baf0e10c8363ede65802cb85fa76dec55 Mon Sep 17 00:00:00 2001 From: Faizan Qazi Date: Mon, 22 Mar 2021 09:41:04 -0400 Subject: [PATCH] sql: lease acquisition of OFFLINE descs may starve bulk operations Fixes: #61798 Previously, offline descriptors would never have their leases cached and they would be released once the reference count hit zero. This was inadequate because when attempting to online these tables again the lease acquisition could be pushed back by other operations, leading to starvation / live locks. To address this, this patch will allow the leases of offline descriptors to be cached. Release note (bug fix): Lease acquisitions of descriptor in a offline state may starve out bulk operations (backup / restore) --- pkg/ccl/backupccl/backup_test.go | 6 +- pkg/ccl/importccl/import_stmt.go | 25 ++++- pkg/ccl/importccl/import_stmt_test.go | 2 +- pkg/sql/catalog/descs/collection.go | 9 +- pkg/sql/catalog/lease/lease.go | 52 +++++++--- pkg/sql/catalog/lease/lease_test.go | 140 +++++++++++++++++++++++++- 6 files changed, 210 insertions(+), 24 deletions(-) diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index c1e9d3ade54c..7ae6b80ee309 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -6394,10 +6394,10 @@ CREATE TYPE sc.typ AS ENUM ('hello'); sqlDB.ExpectErr(t, `target database or schema does not exist`, `SHOW TABLES FROM d`) sqlDB.ExpectErr(t, `target database or schema does not exist`, `SHOW TABLES FROM d.sc`) - sqlDB.ExpectErr(t, `relation "d.sc.tb" does not exist`, `SELECT * FROM d.sc.tb`) - sqlDB.ExpectErr(t, `relation "d.sc.tb" does not exist`, `ALTER TABLE d.sc.tb ADD COLUMN b INT`) + sqlDB.ExpectErr(t, `database "d" is offline: restoring`, `SELECT * FROM d.sc.tb`) + sqlDB.ExpectErr(t, `database "d" is offline: restoring`, `ALTER TABLE d.sc.tb ADD COLUMN b INT`) - sqlDB.ExpectErr(t, `type "d.sc.typ" does not exist`, `ALTER TYPE d.sc.typ RENAME TO typ2`) + sqlDB.ExpectErr(t, `database "d" is offline: restoring`, `ALTER TYPE d.sc.typ RENAME TO typ2`) sqlDB.ExpectErr(t, `cannot create "d.sc.other" because the target database or schema does not exist`, `CREATE TABLE d.sc.other()`) diff --git a/pkg/ccl/importccl/import_stmt.go b/pkg/ccl/importccl/import_stmt.go index bc7007b8ae3c..c50a37f91820 100644 --- a/pkg/ccl/importccl/import_stmt.go +++ b/pkg/ccl/importccl/import_stmt.go @@ -1407,11 +1407,18 @@ func (r *importResumer) publishTables(ctx context.Context, execCfg *sql.Executor } return nil }) - if err != nil { return err } + // Wait for the table to be public before completing. + for _, tbl := range details.Tables { + _, err := lm.WaitForOneVersion(ctx, tbl.Desc.ID, retry.Options{}) + if err != nil { + return errors.Wrap(err, "publishing tables waiting for one version") + } + } + // Initiate a run of CREATE STATISTICS. We don't know the actual number of // rows affected per table, so we use a large number because we want to make // sure that stats always get created/refreshed here. @@ -1431,14 +1438,26 @@ func (r *importResumer) OnFailOrCancel(ctx context.Context, phs interface{}) err addToFileFormatTelemetry(details.Format.Format.String(), "failed") cfg := phs.(sql.PlanHookState).ExecCfg() lm, ie, db := cfg.LeaseManager, cfg.InternalExecutor, cfg.DB - return descs.Txn(ctx, cfg.Settings, lm, ie, db, func( + if err := descs.Txn(ctx, cfg.Settings, lm, ie, db, func( ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, ) error { if err := r.dropTables(ctx, txn, descsCol, cfg); err != nil { return err } return r.releaseProtectedTimestamp(ctx, txn, cfg.ProtectedTimestampProvider) - }) + }); err != nil { + return err + } + // Wait for the tables to become public before completing. + if details.PrepareComplete { + for _, tableDesc := range details.Tables { + _, err := cfg.LeaseManager.WaitForOneVersion(ctx, tableDesc.Desc.ID, retry.Options{}) + if err != nil { + return errors.Wrap(err, "rolling back tables waiting for them to be public") + } + } + } + return nil } func (r *importResumer) releaseProtectedTimestamp( diff --git a/pkg/ccl/importccl/import_stmt_test.go b/pkg/ccl/importccl/import_stmt_test.go index 7198ba7a5d0b..3edd04c6e765 100644 --- a/pkg/ccl/importccl/import_stmt_test.go +++ b/pkg/ccl/importccl/import_stmt_test.go @@ -2642,7 +2642,7 @@ func TestImportIntoCSV(t *testing.T) { <-importBodyFinished err := sqlDB.DB.QueryRowContext(ctx, `SELECT 1 FROM t`).Scan(&unused) - if !testutils.IsError(err, "relation \"t\" does not exist") { + if !testutils.IsError(err, "relation \"t\" is offline: importing") { return err } return nil diff --git a/pkg/sql/catalog/descs/collection.go b/pkg/sql/catalog/descs/collection.go index b77b12697ee5..7369ac5bbda3 100644 --- a/pkg/sql/catalog/descs/collection.go +++ b/pkg/sql/catalog/descs/collection.go @@ -278,7 +278,7 @@ func (tc *Collection) getLeasedDescriptorByName( // Read the descriptor from the store in the face of some specific errors // because of a known limitation of AcquireByName. See the known // limitations of AcquireByName for details. - if catalog.HasInactiveDescriptorError(err) || + if (catalog.HasInactiveDescriptorError(err) && errors.Is(err, catalog.ErrDescriptorDropped)) || errors.Is(err, catalog.ErrDescriptorNotFound) { return nil, true, nil } @@ -941,6 +941,9 @@ func (tc *Collection) getDescriptorVersionByID( // First, look to see if we already have the table in the shared cache. if desc := tc.leasedDescriptors.getByID(id); desc != nil { log.VEventf(ctx, 2, "found descriptor %d in cache", id) + if err := catalog.FilterDescriptorState(desc, flags); err != nil { + return nil, err + } return desc, nil } @@ -949,6 +952,10 @@ func (tc *Collection) getDescriptorVersionByID( if err != nil { return nil, err } + // Filter based on the state + if err := catalog.FilterDescriptorState(desc, flags); err != nil { + return nil, err + } if expiration.LessEq(readTimestamp) { log.Fatalf(ctx, "bad descriptor for T=%s, expiration=%s", readTimestamp, expiration) diff --git a/pkg/sql/catalog/lease/lease.go b/pkg/sql/catalog/lease/lease.go index 9650c03e3751..d2775ecc0b84 100644 --- a/pkg/sql/catalog/lease/lease.go +++ b/pkg/sql/catalog/lease/lease.go @@ -218,7 +218,7 @@ func (s storage) acquire( return err } if err := catalog.FilterDescriptorState( - desc, tree.CommonLookupFlags{}, // filter all non-public state + desc, tree.CommonLookupFlags{IncludeOffline: true}, // filter dropped only ); err != nil { return err } @@ -972,7 +972,7 @@ func purgeOldVersions( ctx context.Context, db *kv.DB, id descpb.ID, - takenOffline bool, + dropped bool, minVersion descpb.DescriptorVersion, m *Manager, ) error { @@ -983,15 +983,15 @@ func purgeOldVersions( t.mu.Lock() empty := len(t.mu.active.data) == 0 && t.mu.acquisitionsInProgress == 0 t.mu.Unlock() - if empty { + if empty && !dropped { // We don't currently have a version on this descriptor, so no need to refresh // anything. return nil } - removeInactives := func(takenOffline bool) { + removeInactives := func(dropped bool) { t.mu.Lock() - t.mu.takenOffline = takenOffline + t.mu.takenOffline = dropped leases := t.removeInactiveVersions() t.mu.Unlock() for _, l := range leases { @@ -999,8 +999,8 @@ func purgeOldVersions( } } - if takenOffline { - removeInactives(takenOffline) + if dropped { + removeInactives(true /* dropped */) return nil } @@ -1026,7 +1026,7 @@ func purgeOldVersions( } return nil } - return err + return nil } // maybeQueueLeaseRenewal queues a lease renewal if there is not already a lease @@ -1386,6 +1386,28 @@ func (m *Manager) AcquireByName( parentSchemaID descpb.ID, name string, ) (catalog.Descriptor, hlc.Timestamp, error) { + // When offline descriptor leases were not allowed to be cached, + // attempt to acquire a lease on them would generate a descriptor + // offline error. Recent changes allow offline descriptor leases + // to be cached, but callers still need the offline error generated. + // This logic will release the lease (the lease manager will still + // cache it), and generate the offline descriptor error. + validateDescriptorForReturn := func(desc catalog.Descriptor, + expiration hlc.Timestamp) (catalog.Descriptor, hlc.Timestamp, error) { + if desc.Offline() { + if err := catalog.FilterDescriptorState( + desc, tree.CommonLookupFlags{}, + ); err != nil { + releaseErr := m.Release(desc) + if releaseErr != nil { + log.Warningf(ctx, "error releasing lease: %s", releaseErr) + } + return nil, hlc.Timestamp{}, err + } + } + return desc, expiration, nil + } + // Check if we have cached an ID for this name. descVersion := m.names.get(parentID, parentSchemaID, name, timestamp) if descVersion != nil { @@ -1400,7 +1422,7 @@ func (m *Manager) AcquireByName( } } } - return descVersion.Descriptor, descVersion.expiration, nil + return validateDescriptorForReturn(descVersion.Descriptor, descVersion.expiration) } if err := m.Release(descVersion); err != nil { return nil, hlc.Timestamp{}, err @@ -1410,7 +1432,7 @@ func (m *Manager) AcquireByName( if err != nil { return nil, hlc.Timestamp{}, err } - return desc, expiration, nil + return validateDescriptorForReturn(desc, expiration) } // We failed to find something in the cache, or what we found is not @@ -1479,7 +1501,7 @@ func (m *Manager) AcquireByName( return nil, hlc.Timestamp{}, catalog.ErrDescriptorNotFound } } - return desc, expiration, nil + return validateDescriptorForReturn(desc, expiration) } // resolveName resolves a descriptor name to a descriptor ID at a particular @@ -1691,12 +1713,12 @@ func (m *Manager) refreshLeases( } id, version, name, state := descpb.GetDescriptorMetadata(desc) - goingOffline := state == descpb.DescriptorState_DROP || state == descpb.DescriptorState_OFFLINE + dropped := state == descpb.DescriptorState_DROP // Try to refresh the lease to one >= this version. - log.VEventf(ctx, 2, "purging old version of descriptor %d@%d (offline %v)", - id, version, goingOffline) + log.VEventf(ctx, 2, "purging old version of descriptor %d@%d (dropped %v)", + id, version, dropped) if err := purgeOldVersions( - ctx, db, id, goingOffline, version, m); err != nil { + ctx, db, id, dropped, version, m); err != nil { log.Warningf(ctx, "error purging leases for descriptor %d(%s): %s", id, name, err) } diff --git a/pkg/sql/catalog/lease/lease_test.go b/pkg/sql/catalog/lease/lease_test.go index d7e433d39ab1..f2d5f22d9e70 100644 --- a/pkg/sql/catalog/lease/lease_test.go +++ b/pkg/sql/catalog/lease/lease_test.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog" @@ -42,7 +43,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqltestutils" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -53,9 +56,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" "github.com/lib/pq" @@ -2544,9 +2549,16 @@ func TestLeaseWithOfflineTables(t *testing.T) { checkLeaseState(true /* shouldBePresent */) // Take the table offline and back online again. - // This should relinquish the lease. + // This should not relinquish the lease anymore + // and offline ones will now be held. setTableState(descpb.DescriptorState_PUBLIC, descpb.DescriptorState_OFFLINE) setTableState(descpb.DescriptorState_OFFLINE, descpb.DescriptorState_PUBLIC) + checkLeaseState(true /* shouldBePresent */) + + // Take the table dropped and back online again. + // This should relinquish the lease. + setTableState(descpb.DescriptorState_PUBLIC, descpb.DescriptorState_DROP) + setTableState(descpb.DescriptorState_DROP, descpb.DescriptorState_PUBLIC) checkLeaseState(false /* shouldBePresent */) // Query the table, thereby acquiring a lease once again. @@ -2557,3 +2569,129 @@ func TestLeaseWithOfflineTables(t *testing.T) { setTableState(descpb.DescriptorState_PUBLIC, descpb.DescriptorState_PUBLIC) checkLeaseState(true /* shouldBePresent */) } + +// TestOfflineLeaseRefresh validates that no live lock can occur, +// after a table is brought offline. Specifically a table a will be +// brought offline, and then one transaction will attempt to bring it +// online while another transaction will attempt to do a read. The read +// transaction could previously push back the lease of transaction +// trying to online the table perpetually (as seen in issue #61798). +func TestOfflineLeaseRefresh(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + waitForTxn := make(chan chan struct{}) + waitForRqstFilter := make(chan chan struct{}) + errorChan := make(chan error) + var txnID uuid.UUID + var mu syncutil.RWMutex + + knobs := &kvserver.StoreTestingKnobs{ + TestingRequestFilter: func(ctx context.Context, req roachpb.BatchRequest) *roachpb.Error { + mu.RLock() + checkRequest := req.Txn != nil && req.Txn.ID.Equal(txnID) + mu.RUnlock() + if _, ok := req.GetArg(roachpb.EndTxn); checkRequest && ok { + notify := make(chan struct{}) + waitForRqstFilter <- notify + <-notify + } + return nil + }, + } + params := base.TestServerArgs{Knobs: base.TestingKnobs{Store: knobs}} + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ServerArgs: params}) + s := tc.Server(0) + defer tc.Stopper().Stop(ctx) + conn := tc.ServerConn(0) + + // Create t1 that will be offline, and t2, + // that will serve inserts. + _, err := conn.Exec(` +CREATE DATABASE d1; +CREATE TABLE d1.t1 (name int); +INSERT INTO d1.t1 values(5); +INSERT INTO d1.t1 values(5); +INSERT INTO d1.t1 values(5); +CREATE TABLE d1.t2 (name int); +`) + require.NoError(t, err) + + desc := catalogkv.TestingGetTableDescriptor(s.DB(), keys.SystemSQLCodec, "d1", "t1") + tableID := desc.ID + + // Force the table descriptor into a offline state + err = descs.Txn(ctx, s.ClusterSettings(), s.LeaseManager().(*lease.Manager), s.InternalExecutor().(sqlutil.InternalExecutor), s.DB(), + func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error { + tableDesc, err := descriptors.GetMutableTableVersionByID(ctx, tableID, txn) + if err != nil { + return err + } + tableDesc.SetOffline("For unit test") + err = descriptors.WriteDesc(ctx, false, tableDesc, txn) + if err != nil { + return err + } + tableID = tableDesc.ID + return nil + }) + require.NoError(t, err) + + _, err = s.LeaseManager().(*lease.Manager).WaitForOneVersion(ctx, tableID, retry.Options{}) + require.NoError(t, err) + + go func() { + err := descs.Txn(ctx, s.ClusterSettings(), s.LeaseManager().(*lease.Manager), + s.InternalExecutor().(sqlutil.InternalExecutor), s.DB(), + func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error { + close(waitForRqstFilter) + mu.Lock() + waitForRqstFilter = make(chan chan struct{}) + txnID = txn.ID() + mu.Unlock() + + // Online the descriptor by making it public + tableDesc, err := descriptors.GetMutableTableVersionByID(ctx, tableID, txn) + if err != nil { + return err + } + tableDesc.SetPublic() + err = descriptors.WriteDesc(ctx, false, tableDesc, txn) + if err != nil { + return err + } + // Allow the select on the table to proceed, + // so that it waits on the channel at the appropriate + // moment. + notify := make(chan struct{}) + waitForTxn <- notify + <-notify + + // Select from an unrelated table + _, err = s.InternalExecutor().(sqlutil.InternalExecutor).ExecEx(ctx, "inline-exec", txn, + sessiondata.InternalExecutorOverride{User: security.RootUser}, + "insert into d1.t2 values (10);") + return err + + }) + close(waitForTxn) + close(waitForRqstFilter) + errorChan <- err + }() + + for notify := range waitForTxn { + close(notify) + mu.RLock() + rqstFilterChannel := waitForRqstFilter + mu.RUnlock() + for notify2 := range rqstFilterChannel { + // Push the query trying to online the table out by + // leasing out the table again + _, err = conn.Query("select * from d1.t1") + require.EqualError(t, err, "pq: relation \"t1\" is offline: For unit test", + "Table offline error was not generated as expected") + close(notify2) + } + } + require.NoError(t, <-errorChan) + close(errorChan) +}