From 21bba8789105449ac017d1957e7d82e778de9cf2 Mon Sep 17 00:00:00 2001 From: Faizan Qazi Date: Mon, 22 Mar 2021 09:41:04 -0400 Subject: [PATCH] sql: lease acquisition of OFFLINE descs may starve bulk operations Fixes: #61798 Previously, offline descriptors would never have their leases cached and they would be released once the reference count hit zero. This was inadequate because when attempting to online these tables again the lease acquisition could be pushed back by other operations, leading to starvation / live locks. To address this, this patch will allow the leases of offline descriptors to be cached. Release note (bug fix): Lease acquisitions of descriptor in a offline state may starve out bulk operations (backup / restore) --- pkg/sql/catalog/descs/collection.go | 2 +- pkg/sql/catalog/lease/lease.go | 56 ++++++++--- pkg/sql/catalog/lease/lease_test.go | 143 +++++++++++++++++++++++++++- 3 files changed, 187 insertions(+), 14 deletions(-) diff --git a/pkg/sql/catalog/descs/collection.go b/pkg/sql/catalog/descs/collection.go index 69f9a225bcb1..670c3f78f66d 100644 --- a/pkg/sql/catalog/descs/collection.go +++ b/pkg/sql/catalog/descs/collection.go @@ -282,7 +282,7 @@ func (tc *Collection) getLeasedDescriptorByName( // Read the descriptor from the store in the face of some specific errors // because of a known limitation of AcquireByName. See the known // limitations of AcquireByName for details. - if catalog.HasInactiveDescriptorError(err) || + if (catalog.HasInactiveDescriptorError(err) && errors.Is(err, catalog.ErrDescriptorDropped)) || errors.Is(err, catalog.ErrDescriptorNotFound) { return nil, true, nil } diff --git a/pkg/sql/catalog/lease/lease.go b/pkg/sql/catalog/lease/lease.go index 3395eaadd7c6..f89e53fddfd3 100644 --- a/pkg/sql/catalog/lease/lease.go +++ b/pkg/sql/catalog/lease/lease.go @@ -219,7 +219,7 @@ func (s storage) acquire( return err } if err := catalog.FilterDescriptorState( - desc, tree.CommonLookupFlags{}, // filter all non-public state + desc, tree.CommonLookupFlags{IncludeOffline: true}, // filter dropped only ); err != nil { return err } @@ -984,7 +984,7 @@ func purgeOldVersions( ctx context.Context, db *kv.DB, id descpb.ID, - takenOffline bool, + dropped bool, minVersion descpb.DescriptorVersion, m *Manager, ) error { @@ -998,15 +998,15 @@ func purgeOldVersions( } empty := len(t.mu.active.data) == 0 && t.mu.acquisitionsInProgress == 0 t.mu.Unlock() - if empty && !takenOffline { + if empty && !dropped { // We don't currently have a version on this descriptor, so no need to refresh // anything. return nil } - removeInactives := func(takenOffline bool) { + removeInactives := func(dropped bool) { t.mu.Lock() - t.mu.takenOffline = takenOffline + t.mu.takenOffline = dropped leases := t.removeInactiveVersions() t.mu.Unlock() for _, l := range leases { @@ -1014,8 +1014,8 @@ func purgeOldVersions( } } - if takenOffline { - removeInactives(true /* takenOffline */) + if dropped { + removeInactives(true /* dropped */) return nil } @@ -1031,7 +1031,7 @@ func purgeOldVersions( return errRenewLease } newest.incRefcount() - removeInactives(false /* takenOffline */) + removeInactives(false /* dropped */) s, err := t.release(newest.Descriptor, m.removeOnceDereferenced()) if err != nil { return err @@ -1419,6 +1419,16 @@ func (m *Manager) AcquireByName( } } } + // For offline descriptors acquire the lease, but return + // back that the descriptor is offline. + if descVersion.Offline() { + if err := catalog.FilterDescriptorState( + descVersion, tree.CommonLookupFlags{}, + ); err != nil { + m.Release(descVersion) + return nil, hlc.Timestamp{}, err + } + } return descVersion.Descriptor, descVersion.expiration, nil } if err := m.Release(descVersion); err != nil { @@ -1429,6 +1439,17 @@ func (m *Manager) AcquireByName( if err != nil { return nil, hlc.Timestamp{}, err } + + // For offline descriptors acquire the lease, but return + // back that the descriptor is offline. + if desc.Offline() { + if err := catalog.FilterDescriptorState( + desc, tree.CommonLookupFlags{}, + ); err != nil { + m.Release(descVersion) + return nil, hlc.Timestamp{}, err + } + } return desc, expiration, nil } @@ -1498,6 +1519,17 @@ func (m *Manager) AcquireByName( return nil, hlc.Timestamp{}, catalog.ErrDescriptorNotFound } } + + // For offline descriptors acquire the lease, but return + // back that the descriptor is offline. + if desc.Offline() { + if err := catalog.FilterDescriptorState( + desc, tree.CommonLookupFlags{}, + ); err != nil { + m.Release(descVersion) + return nil, hlc.Timestamp{}, err + } + } return desc, expiration, nil } @@ -1713,11 +1745,11 @@ func (m *Manager) refreshLeases( } id, version, name, state := descpb.GetDescriptorMetadata(desc) - goingOffline := state == descpb.DescriptorState_DROP || state == descpb.DescriptorState_OFFLINE + dropped := state == descpb.DescriptorState_DROP // Try to refresh the lease to one >= this version. - log.VEventf(ctx, 2, "purging old version of descriptor %d@%d (offline %v)", - id, version, goingOffline) - if err := purgeOldVersions(ctx, db, id, goingOffline, version, m); err != nil { + log.VEventf(ctx, 2, "purging old version of descriptor %d@%d (dropped %v)", + id, version, dropped) + if err := purgeOldVersions(ctx, db, id, dropped, version, m); err != nil { log.Warningf(ctx, "error purging leases for descriptor %d(%s): %s", id, name, err) } diff --git a/pkg/sql/catalog/lease/lease_test.go b/pkg/sql/catalog/lease/lease_test.go index 0d4e892e8c76..d1cbfa20c05f 100644 --- a/pkg/sql/catalog/lease/lease_test.go +++ b/pkg/sql/catalog/lease/lease_test.go @@ -27,7 +27,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog" @@ -39,7 +41,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqltestutils" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -50,10 +54,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" "github.com/lib/pq" @@ -2361,6 +2367,7 @@ func TestLeaseWithOfflineTables(t *testing.T) { func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection) error { flags := tree.ObjectLookupFlagsWithRequiredTableKind(tree.ResolveRequireTableDesc) flags.CommonLookupFlags.IncludeOffline = true + flags.CommonLookupFlags.IncludeDropped = true desc, err := descsCol.GetMutableTableByID(ctx, txn, testTableID(), flags) require.NoError(t, err) require.Equal(t, desc.State, expected) @@ -2404,9 +2411,16 @@ func TestLeaseWithOfflineTables(t *testing.T) { checkLeaseState(true /* shouldBePresent */) // Take the table offline and back online again. - // This should relinquish the lease. + // This should not relinquish the lease anymore + // and offline ones will now be held. setTableState(descpb.DescriptorState_PUBLIC, descpb.DescriptorState_OFFLINE) setTableState(descpb.DescriptorState_OFFLINE, descpb.DescriptorState_PUBLIC) + checkLeaseState(true /* shouldBePresent */) + + // Take the table dropped and back online again. + // This should relinquish the lease. + setTableState(descpb.DescriptorState_PUBLIC, descpb.DescriptorState_DROP) + setTableState(descpb.DescriptorState_DROP, descpb.DescriptorState_PUBLIC) checkLeaseState(false /* shouldBePresent */) // Query the table, thereby acquiring a lease once again. @@ -2709,3 +2723,130 @@ func TestDropDescriptorRacesWithAcquisition(t *testing.T) { return true }) } + +// TestOfflineLeaseRefresh validates that no live lock can occur, +// after a table is brought offline. Specifically a table a will be +// brought offline, and then one transaction will attempt to bring it +// online while another transaction will attempt to do a read. The read +// transaction could previously push back the lease of transaction +// trying to online the table perpetually (as seen in issue #61798). +func TestOfflineLeaseRefresh(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + waitForTxn := make(chan chan struct{}) + waitForRqstFilter := make(chan chan struct{}) + errorChan := make(chan error) + var txnID uuid.UUID + var mu syncutil.RWMutex + + knobs := &kvserver.StoreTestingKnobs{ + TestingRequestFilter: func(ctx context.Context, req roachpb.BatchRequest) *roachpb.Error { + mu.RLock() + checkRequest := req.Txn != nil && req.Txn.ID.Equal(txnID) + mu.RUnlock() + if _, ok := req.GetArg(roachpb.EndTxn); checkRequest && ok { + notify := make(chan struct{}) + waitForRqstFilter <- notify + <-notify + } + return nil + }, + } + params := base.TestServerArgs{Knobs: base.TestingKnobs{Store: knobs}} + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ServerArgs: params}) + s := tc.Server(0) + defer tc.Stopper().Stop(ctx) + conn := tc.ServerConn(0) + + // Create t1 that will be offline, and t2, + // that will serve inserts. + _, err := conn.Exec(` +CREATE DATABASE d1; +CREATE TABLE d1.t1 (name int); +INSERT INTO d1.t1 values(5); +INSERT INTO d1.t1 values(5); +INSERT INTO d1.t1 values(5); +CREATE TABLE d1.t2 (name int); +`) + require.NoError(t, err) + + tableID := descpb.InvalidID + + // Force the table descriptor into a offline state + err = descs.Txn(ctx, s.ClusterSettings(), s.LeaseManager().(*lease.Manager), s.InternalExecutor().(sqlutil.InternalExecutor), s.DB(), + func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error { + _, tableDesc, err := descriptors.GetMutableTableByName(ctx, txn, tree.NewTableNameWithSchema("d1", "public", "t1"), tree.ObjectLookupFlagsWithRequired()) + if err != nil { + return err + } + tableDesc.SetOffline("For unit test") + descriptors.WriteDesc(ctx, false, tableDesc, txn) + tableID = tableDesc.ID + return nil + }) + require.NoError(t, err) + + _, err = s.LeaseManager().(*lease.Manager).WaitForOneVersion(ctx, tableID, retry.Options{}) + require.NoError(t, err) + + go func() { + err := descs.Txn(ctx, s.ClusterSettings(), s.LeaseManager().(*lease.Manager), + s.InternalExecutor().(sqlutil.InternalExecutor), s.DB(), + func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error { + close(waitForRqstFilter) + mu.Lock() + waitForRqstFilter = make(chan chan struct{}) + txnID = txn.ID() + mu.Unlock() + + // Online the descriptor by making it public + _, tableDesc, err := descriptors.GetMutableTableByName(ctx, txn, + tree.NewTableNameWithSchema("d1", "public", "t1"), + tree.ObjectLookupFlags{CommonLookupFlags: tree.CommonLookupFlags{ + Required: true, + RequireMutable: true, + IncludeOffline: true, + AvoidCached: true, + }}) + if err != nil { + return err + } + tableDesc.SetPublic() + descriptors.WriteDesc(ctx, false, tableDesc, txn) + + // Allow the select on the table to proceed, + // so that it waits on the channel at the appropriate + // moment. + notify := make(chan struct{}) + waitForTxn <- notify + <-notify + + // Select from an unrelated table + _, err = s.InternalExecutor().(sqlutil.InternalExecutor).ExecEx(ctx, "inline-exec", txn, + sessiondata.InternalExecutorOverride{User: security.RootUserName()}, + "insert into d1.t2 values (10);") + return err + + }) + close(waitForTxn) + close(waitForRqstFilter) + errorChan <- err + }() + + for notify := range waitForTxn { + close(notify) + mu.RLock() + rqstFilterChannel := waitForRqstFilter + mu.RUnlock() + for notify2 := range rqstFilterChannel { + // Push the query trying to online the table out by + // leasing out the table again + _, err = conn.Query("select * from d1.t1") + require.EqualError(t, err, "pq: relation \"t1\" is offline: For unit test", + "Table offline error was not generated as expected") + close(notify2) + } + } + require.NoError(t, <-errorChan) + close(errorChan) +}