diff --git a/pkg/sql/catalog/descs/collection.go b/pkg/sql/catalog/descs/collection.go index 670c3f78f66d..69f9a225bcb1 100644 --- a/pkg/sql/catalog/descs/collection.go +++ b/pkg/sql/catalog/descs/collection.go @@ -282,7 +282,7 @@ func (tc *Collection) getLeasedDescriptorByName( // Read the descriptor from the store in the face of some specific errors // because of a known limitation of AcquireByName. See the known // limitations of AcquireByName for details. - if (catalog.HasInactiveDescriptorError(err) && errors.Is(err, catalog.ErrDescriptorDropped)) || + if catalog.HasInactiveDescriptorError(err) || errors.Is(err, catalog.ErrDescriptorNotFound) { return nil, true, nil } diff --git a/pkg/sql/catalog/lease/BUILD.bazel b/pkg/sql/catalog/lease/BUILD.bazel index b76963ccff67..15fe9bc50ae1 100644 --- a/pkg/sql/catalog/lease/BUILD.bazel +++ b/pkg/sql/catalog/lease/BUILD.bazel @@ -54,7 +54,6 @@ go_test( "//pkg/jobs/jobspb", "//pkg/keys", "//pkg/kv", - "//pkg/kv/kvserver", "//pkg/roachpb", "//pkg/security", "//pkg/security/securitytest", @@ -68,9 +67,7 @@ go_test( "//pkg/sql/catalog/tabledesc", "//pkg/sql/pgwire/pgcode", "//pkg/sql/sem/tree", - "//pkg/sql/sessiondata", "//pkg/sql/sqltestutils", - "//pkg/sql/sqlutil", "//pkg/sql/tests", "//pkg/testutils", "//pkg/testutils/serverutils", @@ -87,7 +84,6 @@ go_test( "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/tracing", - "//pkg/util/uuid", "@com_github_cockroachdb_cockroach_go//crdb", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_logtags//:logtags", diff --git a/pkg/sql/catalog/lease/lease.go b/pkg/sql/catalog/lease/lease.go index 572af1db8e38..b121babb6112 100644 --- a/pkg/sql/catalog/lease/lease.go +++ b/pkg/sql/catalog/lease/lease.go @@ -216,7 +216,7 @@ func (s storage) acquire( return err } if err := catalog.FilterDescriptorState( - desc, tree.CommonLookupFlags{IncludeOffline: true}, // filter dropped only + desc, tree.CommonLookupFlags{}, // filter all non-public state ); err != nil { return err } @@ -981,7 +981,7 @@ func purgeOldVersions( ctx context.Context, db *kv.DB, id descpb.ID, - dropped bool, + takenOffline bool, minVersion descpb.DescriptorVersion, m *Manager, ) error { @@ -995,15 +995,15 @@ func purgeOldVersions( } empty := len(t.mu.active.data) == 0 && t.mu.acquisitionsInProgress == 0 t.mu.Unlock() - if empty && !dropped { + if empty && !takenOffline { // We don't currently have a version on this descriptor, so no need to refresh // anything. return nil } - removeInactives := func(dropped bool) { + removeInactives := func(takenOffline bool) { t.mu.Lock() - t.mu.takenOffline = dropped + t.mu.takenOffline = takenOffline leases := t.removeInactiveVersions() t.mu.Unlock() for _, l := range leases { @@ -1011,8 +1011,8 @@ func purgeOldVersions( } } - if dropped { - removeInactives(true /* dropped */) + if takenOffline { + removeInactives(true /* takenOffline */) return nil } @@ -1028,7 +1028,7 @@ func purgeOldVersions( return errRenewLease } newest.incRefcount() - removeInactives(false /* dropped */) + removeInactives(false /* takenOffline */) s, err := t.release(newest.Descriptor, m.removeOnceDereferenced()) if err != nil { return err @@ -1398,28 +1398,6 @@ func (m *Manager) AcquireByName( parentSchemaID descpb.ID, name string, ) (catalog.Descriptor, hlc.Timestamp, error) { - // When offline descriptor leases were not allowed to be cached, - // attempt to acquire a lease on them would generate a descriptor - // offline error. Recent changes allow offline descriptor leases - // to be cached, but callers still need the offline error generated. - // This logic will release the lease (the lease manager will still - // cache it), and generate the offline descriptor error. - validateDescriptorForReturn := func(desc catalog.Descriptor, - expiration hlc.Timestamp) (catalog.Descriptor, hlc.Timestamp, error) { - if desc.Offline() { - if err := catalog.FilterDescriptorState( - desc, tree.CommonLookupFlags{}, - ); err != nil { - err2 := m.Release(desc) - if err2 != nil { - log.Warningf(ctx, "error releasing lease: %s", err2) - } - return nil, hlc.Timestamp{}, err - } - } - return desc, expiration, nil - } - // Check if we have cached an ID for this name. descVersion := m.names.get(parentID, parentSchemaID, name, timestamp) if descVersion != nil { @@ -1434,7 +1412,7 @@ func (m *Manager) AcquireByName( } } } - return validateDescriptorForReturn(descVersion.Descriptor, descVersion.expiration) + return descVersion.Descriptor, descVersion.expiration, nil } if err := m.Release(descVersion); err != nil { return nil, hlc.Timestamp{}, err @@ -1444,7 +1422,7 @@ func (m *Manager) AcquireByName( if err != nil { return nil, hlc.Timestamp{}, err } - return validateDescriptorForReturn(desc, expiration) + return desc, expiration, nil } // We failed to find something in the cache, or what we found is not @@ -1513,7 +1491,7 @@ func (m *Manager) AcquireByName( return nil, hlc.Timestamp{}, catalog.ErrDescriptorNotFound } } - return validateDescriptorForReturn(desc, expiration) + return desc, expiration, nil } // resolveName resolves a descriptor name to a descriptor ID at a particular @@ -1716,11 +1694,11 @@ func (m *Manager) RefreshLeases(ctx context.Context, s *stop.Stopper, db *kv.DB) } id, version, name, state := descpb.GetDescriptorMetadata(desc) - dropped := state == descpb.DescriptorState_DROP + goingOffline := state == descpb.DescriptorState_DROP || state == descpb.DescriptorState_OFFLINE // Try to refresh the lease to one >= this version. - log.VEventf(ctx, 2, "purging old version of descriptor %d@%d (dropped %v)", - id, version, dropped) - if err := purgeOldVersions(ctx, db, id, dropped, version, m); err != nil { + log.VEventf(ctx, 2, "purging old version of descriptor %d@%d (offline %v)", + id, version, goingOffline) + if err := purgeOldVersions(ctx, db, id, goingOffline, version, m); err != nil { log.Warningf(ctx, "error purging leases for descriptor %d(%s): %s", id, name, err) } diff --git a/pkg/sql/catalog/lease/lease_test.go b/pkg/sql/catalog/lease/lease_test.go index aa5a5c14fbeb..742e60503cfb 100644 --- a/pkg/sql/catalog/lease/lease_test.go +++ b/pkg/sql/catalog/lease/lease_test.go @@ -27,9 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog" @@ -41,9 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqltestutils" - "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -54,12 +50,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" - "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" "github.com/lib/pq" @@ -2361,7 +2355,6 @@ func TestLeaseWithOfflineTables(t *testing.T) { func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection) error { flags := tree.ObjectLookupFlagsWithRequiredTableKind(tree.ResolveRequireTableDesc) flags.CommonLookupFlags.IncludeOffline = true - flags.CommonLookupFlags.IncludeDropped = true desc, err := descsCol.GetMutableTableByID(ctx, txn, testTableID(), flags) require.NoError(t, err) require.Equal(t, desc.State, expected) @@ -2405,16 +2398,9 @@ func TestLeaseWithOfflineTables(t *testing.T) { checkLeaseState(true /* shouldBePresent */) // Take the table offline and back online again. - // This should not relinquish the lease anymore - // and offline ones will now be held. + // This should relinquish the lease. setTableState(descpb.DescriptorState_PUBLIC, descpb.DescriptorState_OFFLINE) setTableState(descpb.DescriptorState_OFFLINE, descpb.DescriptorState_PUBLIC) - checkLeaseState(true /* shouldBePresent */) - - // Take the table dropped and back online again. - // This should relinquish the lease. - setTableState(descpb.DescriptorState_PUBLIC, descpb.DescriptorState_DROP) - setTableState(descpb.DescriptorState_DROP, descpb.DescriptorState_PUBLIC) checkLeaseState(false /* shouldBePresent */) // Query the table, thereby acquiring a lease once again. @@ -2717,135 +2703,3 @@ func TestDropDescriptorRacesWithAcquisition(t *testing.T) { return true }) } - -// TestOfflineLeaseRefresh validates that no live lock can occur, -// after a table is brought offline. Specifically a table a will be -// brought offline, and then one transaction will attempt to bring it -// online while another transaction will attempt to do a read. The read -// transaction could previously push back the lease of transaction -// trying to online the table perpetually (as seen in issue #61798). -func TestOfflineLeaseRefresh(t *testing.T) { - defer leaktest.AfterTest(t)() - ctx := context.Background() - waitForTxn := make(chan chan struct{}) - waitForRqstFilter := make(chan chan struct{}) - errorChan := make(chan error) - var txnID uuid.UUID - var mu syncutil.RWMutex - - knobs := &kvserver.StoreTestingKnobs{ - TestingRequestFilter: func(ctx context.Context, req roachpb.BatchRequest) *roachpb.Error { - mu.RLock() - checkRequest := req.Txn != nil && req.Txn.ID.Equal(txnID) - mu.RUnlock() - if _, ok := req.GetArg(roachpb.EndTxn); checkRequest && ok { - notify := make(chan struct{}) - waitForRqstFilter <- notify - <-notify - } - return nil - }, - } - params := base.TestServerArgs{Knobs: base.TestingKnobs{Store: knobs}} - tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ServerArgs: params}) - s := tc.Server(0) - defer tc.Stopper().Stop(ctx) - conn := tc.ServerConn(0) - - // Create t1 that will be offline, and t2, - // that will serve inserts. - _, err := conn.Exec(` -CREATE DATABASE d1; -CREATE TABLE d1.t1 (name int); -INSERT INTO d1.t1 values(5); -INSERT INTO d1.t1 values(5); -INSERT INTO d1.t1 values(5); -CREATE TABLE d1.t2 (name int); -`) - require.NoError(t, err) - - tableID := descpb.InvalidID - - // Force the table descriptor into a offline state - err = descs.Txn(ctx, s.ClusterSettings(), s.LeaseManager().(*lease.Manager), s.InternalExecutor().(sqlutil.InternalExecutor), s.DB(), - func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error { - _, tableDesc, err := descriptors.GetMutableTableByName(ctx, txn, tree.NewTableNameWithSchema("d1", "public", "t1"), tree.ObjectLookupFlagsWithRequired()) - if err != nil { - return err - } - tableDesc.SetOffline("For unit test") - err = descriptors.WriteDesc(ctx, false, tableDesc, txn) - if err != nil { - return err - } - tableID = tableDesc.ID - return nil - }) - require.NoError(t, err) - - _, err = s.LeaseManager().(*lease.Manager).WaitForOneVersion(ctx, tableID, retry.Options{}) - require.NoError(t, err) - - go func() { - err := descs.Txn(ctx, s.ClusterSettings(), s.LeaseManager().(*lease.Manager), - s.InternalExecutor().(sqlutil.InternalExecutor), s.DB(), - func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error { - close(waitForRqstFilter) - mu.Lock() - waitForRqstFilter = make(chan chan struct{}) - txnID = txn.ID() - mu.Unlock() - - // Online the descriptor by making it public - _, tableDesc, err := descriptors.GetMutableTableByName(ctx, txn, - tree.NewTableNameWithSchema("d1", "public", "t1"), - tree.ObjectLookupFlags{CommonLookupFlags: tree.CommonLookupFlags{ - Required: true, - RequireMutable: true, - IncludeOffline: true, - AvoidCached: true, - }}) - if err != nil { - return err - } - tableDesc.SetPublic() - err = descriptors.WriteDesc(ctx, false, tableDesc, txn) - if err != nil { - return err - } - // Allow the select on the table to proceed, - // so that it waits on the channel at the appropriate - // moment. - notify := make(chan struct{}) - waitForTxn <- notify - <-notify - - // Select from an unrelated table - _, err = s.InternalExecutor().(sqlutil.InternalExecutor).ExecEx(ctx, "inline-exec", txn, - sessiondata.InternalExecutorOverride{User: security.RootUserName()}, - "insert into d1.t2 values (10);") - return err - - }) - close(waitForTxn) - close(waitForRqstFilter) - errorChan <- err - }() - - for notify := range waitForTxn { - close(notify) - mu.RLock() - rqstFilterChannel := waitForRqstFilter - mu.RUnlock() - for notify2 := range rqstFilterChannel { - // Push the query trying to online the table out by - // leasing out the table again - _, err = conn.Query("select * from d1.t1") - require.EqualError(t, err, "pq: relation \"t1\" is offline: For unit test", - "Table offline error was not generated as expected") - close(notify2) - } - } - require.NoError(t, <-errorChan) - close(errorChan) -}