diff --git a/pkg/ccl/importccl/import_stmt.go b/pkg/ccl/importccl/import_stmt.go index 397e6f760b71..fd25035a1f10 100644 --- a/pkg/ccl/importccl/import_stmt.go +++ b/pkg/ccl/importccl/import_stmt.go @@ -2257,11 +2257,18 @@ func (r *importResumer) publishTables(ctx context.Context, execCfg *sql.Executor } return nil }) - if err != nil { return err } + // Wait for the table to be public before completing. + for _, tbl := range details.Tables { + _, err := lm.WaitForOneVersion(ctx, tbl.Desc.ID, retry.Options{}) + if err != nil { + return errors.Wrap(err, "publishing tables waiting for one version") + } + } + // Initiate a run of CREATE STATISTICS. We don't know the actual number of // rows affected per table, so we use a large number because we want to make // sure that stats always get created/refreshed here. @@ -2309,6 +2316,13 @@ func (r *importResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) }); err != nil { return err } + // Wait for the tables to become public before completing. + for _, tableDesc := range r.job.Details().(jobspb.ImportDetails).Tables { + _, err := cfg.LeaseManager.WaitForOneVersion(ctx, tableDesc.Desc.ID, retry.Options{}) + if err != nil { + return errors.Wrap(err, "rolling back tables waiting for them to be public") + } + } // Run any jobs which might have been queued when dropping the schemas. // This would be a job to drop all the schemas, and a job to update the parent diff --git a/pkg/sql/catalog/descs/collection.go b/pkg/sql/catalog/descs/collection.go index b0a3d874023f..8f89950641ed 100644 --- a/pkg/sql/catalog/descs/collection.go +++ b/pkg/sql/catalog/descs/collection.go @@ -282,7 +282,7 @@ func (tc *Collection) getLeasedDescriptorByName( // Read the descriptor from the store in the face of some specific errors // because of a known limitation of AcquireByName. See the known // limitations of AcquireByName for details. - if catalog.HasInactiveDescriptorError(err) || + if (catalog.HasInactiveDescriptorError(err) && errors.Is(err, catalog.ErrDescriptorDropped)) || errors.Is(err, catalog.ErrDescriptorNotFound) { return nil, true, nil } diff --git a/pkg/sql/catalog/lease/BUILD.bazel b/pkg/sql/catalog/lease/BUILD.bazel index 06189bdd49af..df90906f73fb 100644 --- a/pkg/sql/catalog/lease/BUILD.bazel +++ b/pkg/sql/catalog/lease/BUILD.bazel @@ -55,6 +55,7 @@ go_test( "//pkg/jobs/jobspb", "//pkg/keys", "//pkg/kv", + "//pkg/kv/kvserver", "//pkg/roachpb", "//pkg/security", "//pkg/security/securitytest", @@ -68,7 +69,9 @@ go_test( "//pkg/sql/catalog/tabledesc", "//pkg/sql/pgwire/pgcode", "//pkg/sql/sem/tree", + "//pkg/sql/sessiondata", "//pkg/sql/sqltestutils", + "//pkg/sql/sqlutil", "//pkg/sql/tests", "//pkg/testutils", "//pkg/testutils/serverutils", @@ -85,6 +88,7 @@ go_test( "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/tracing", + "//pkg/util/uuid", "@com_github_cockroachdb_cockroach_go//crdb", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_logtags//:logtags", diff --git a/pkg/sql/catalog/lease/lease.go b/pkg/sql/catalog/lease/lease.go index fa1e79fecc7f..ef35f53238ce 100644 --- a/pkg/sql/catalog/lease/lease.go +++ b/pkg/sql/catalog/lease/lease.go @@ -217,7 +217,7 @@ func (s storage) acquire( return err } if err := catalog.FilterDescriptorState( - desc, tree.CommonLookupFlags{}, // filter all non-public state + desc, tree.CommonLookupFlags{IncludeOffline: true}, // filter dropped only ); err != nil { return err } @@ -985,7 +985,7 @@ func purgeOldVersions( ctx context.Context, db *kv.DB, id descpb.ID, - takenOffline bool, + dropped bool, minVersion descpb.DescriptorVersion, m *Manager, ) error { @@ -999,15 +999,15 @@ func purgeOldVersions( } empty := len(t.mu.active.data) == 0 && t.mu.acquisitionsInProgress == 0 t.mu.Unlock() - if empty && !takenOffline { + if empty && !dropped { // We don't currently have a version on this descriptor, so no need to refresh // anything. return nil } - removeInactives := func(takenOffline bool) { + removeInactives := func(dropped bool) { t.mu.Lock() - t.mu.takenOffline = takenOffline + t.mu.takenOffline = dropped leases := t.removeInactiveVersions() t.mu.Unlock() for _, l := range leases { @@ -1015,8 +1015,8 @@ func purgeOldVersions( } } - if takenOffline { - removeInactives(true /* takenOffline */) + if dropped { + removeInactives(true /* dropped */) return nil } @@ -1032,7 +1032,7 @@ func purgeOldVersions( return errRenewLease } newest.incRefcount() - removeInactives(false /* takenOffline */) + removeInactives(false /* dropped */) s, err := t.release(newest.Descriptor, m.removeOnceDereferenced()) if err != nil { return err @@ -1402,6 +1402,28 @@ func (m *Manager) AcquireByName( parentSchemaID descpb.ID, name string, ) (catalog.Descriptor, hlc.Timestamp, error) { + // When offline descriptor leases were not allowed to be cached, + // attempt to acquire a lease on them would generate a descriptor + // offline error. Recent changes allow offline descriptor leases + // to be cached, but callers still need the offline error generated. + // This logic will release the lease (the lease manager will still + // cache it), and generate the offline descriptor error. + validateDescriptorForReturn := func(desc catalog.Descriptor, + expiration hlc.Timestamp) (catalog.Descriptor, hlc.Timestamp, error) { + if desc.Offline() { + if err := catalog.FilterDescriptorState( + desc, tree.CommonLookupFlags{}, + ); err != nil { + err2 := m.Release(desc) + if err2 != nil { + log.Warningf(ctx, "error releasing lease: %s", err2) + } + return nil, hlc.Timestamp{}, err + } + } + return desc, expiration, nil + } + // Check if we have cached an ID for this name. descVersion := m.names.get(parentID, parentSchemaID, name, timestamp) if descVersion != nil { @@ -1416,7 +1438,7 @@ func (m *Manager) AcquireByName( } } } - return descVersion.Descriptor, descVersion.expiration, nil + return validateDescriptorForReturn(descVersion.Descriptor, descVersion.expiration) } if err := m.Release(descVersion); err != nil { return nil, hlc.Timestamp{}, err @@ -1426,7 +1448,7 @@ func (m *Manager) AcquireByName( if err != nil { return nil, hlc.Timestamp{}, err } - return desc, expiration, nil + return validateDescriptorForReturn(desc, expiration) } // We failed to find something in the cache, or what we found is not @@ -1495,7 +1517,7 @@ func (m *Manager) AcquireByName( return nil, hlc.Timestamp{}, catalog.ErrDescriptorNotFound } } - return desc, expiration, nil + return validateDescriptorForReturn(desc, expiration) } // resolveName resolves a descriptor name to a descriptor ID at a particular @@ -1698,11 +1720,11 @@ func (m *Manager) RefreshLeases(ctx context.Context, s *stop.Stopper, db *kv.DB) } id, version, name, state := descpb.GetDescriptorMetadata(desc) - goingOffline := state == descpb.DescriptorState_DROP || state == descpb.DescriptorState_OFFLINE + dropped := state == descpb.DescriptorState_DROP // Try to refresh the lease to one >= this version. - log.VEventf(ctx, 2, "purging old version of descriptor %d@%d (offline %v)", - id, version, goingOffline) - if err := purgeOldVersions(ctx, db, id, goingOffline, version, m); err != nil { + log.VEventf(ctx, 2, "purging old version of descriptor %d@%d (dropped %v)", + id, version, dropped) + if err := purgeOldVersions(ctx, db, id, dropped, version, m); err != nil { log.Warningf(ctx, "error purging leases for descriptor %d(%s): %s", id, name, err) } diff --git a/pkg/sql/catalog/lease/lease_test.go b/pkg/sql/catalog/lease/lease_test.go index 742e60503cfb..aa5a5c14fbeb 100644 --- a/pkg/sql/catalog/lease/lease_test.go +++ b/pkg/sql/catalog/lease/lease_test.go @@ -27,7 +27,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog" @@ -39,7 +41,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqltestutils" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -50,10 +54,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" "github.com/lib/pq" @@ -2355,6 +2361,7 @@ func TestLeaseWithOfflineTables(t *testing.T) { func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection) error { flags := tree.ObjectLookupFlagsWithRequiredTableKind(tree.ResolveRequireTableDesc) flags.CommonLookupFlags.IncludeOffline = true + flags.CommonLookupFlags.IncludeDropped = true desc, err := descsCol.GetMutableTableByID(ctx, txn, testTableID(), flags) require.NoError(t, err) require.Equal(t, desc.State, expected) @@ -2398,9 +2405,16 @@ func TestLeaseWithOfflineTables(t *testing.T) { checkLeaseState(true /* shouldBePresent */) // Take the table offline and back online again. - // This should relinquish the lease. + // This should not relinquish the lease anymore + // and offline ones will now be held. setTableState(descpb.DescriptorState_PUBLIC, descpb.DescriptorState_OFFLINE) setTableState(descpb.DescriptorState_OFFLINE, descpb.DescriptorState_PUBLIC) + checkLeaseState(true /* shouldBePresent */) + + // Take the table dropped and back online again. + // This should relinquish the lease. + setTableState(descpb.DescriptorState_PUBLIC, descpb.DescriptorState_DROP) + setTableState(descpb.DescriptorState_DROP, descpb.DescriptorState_PUBLIC) checkLeaseState(false /* shouldBePresent */) // Query the table, thereby acquiring a lease once again. @@ -2703,3 +2717,135 @@ func TestDropDescriptorRacesWithAcquisition(t *testing.T) { return true }) } + +// TestOfflineLeaseRefresh validates that no live lock can occur, +// after a table is brought offline. Specifically a table a will be +// brought offline, and then one transaction will attempt to bring it +// online while another transaction will attempt to do a read. The read +// transaction could previously push back the lease of transaction +// trying to online the table perpetually (as seen in issue #61798). +func TestOfflineLeaseRefresh(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + waitForTxn := make(chan chan struct{}) + waitForRqstFilter := make(chan chan struct{}) + errorChan := make(chan error) + var txnID uuid.UUID + var mu syncutil.RWMutex + + knobs := &kvserver.StoreTestingKnobs{ + TestingRequestFilter: func(ctx context.Context, req roachpb.BatchRequest) *roachpb.Error { + mu.RLock() + checkRequest := req.Txn != nil && req.Txn.ID.Equal(txnID) + mu.RUnlock() + if _, ok := req.GetArg(roachpb.EndTxn); checkRequest && ok { + notify := make(chan struct{}) + waitForRqstFilter <- notify + <-notify + } + return nil + }, + } + params := base.TestServerArgs{Knobs: base.TestingKnobs{Store: knobs}} + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ServerArgs: params}) + s := tc.Server(0) + defer tc.Stopper().Stop(ctx) + conn := tc.ServerConn(0) + + // Create t1 that will be offline, and t2, + // that will serve inserts. + _, err := conn.Exec(` +CREATE DATABASE d1; +CREATE TABLE d1.t1 (name int); +INSERT INTO d1.t1 values(5); +INSERT INTO d1.t1 values(5); +INSERT INTO d1.t1 values(5); +CREATE TABLE d1.t2 (name int); +`) + require.NoError(t, err) + + tableID := descpb.InvalidID + + // Force the table descriptor into a offline state + err = descs.Txn(ctx, s.ClusterSettings(), s.LeaseManager().(*lease.Manager), s.InternalExecutor().(sqlutil.InternalExecutor), s.DB(), + func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error { + _, tableDesc, err := descriptors.GetMutableTableByName(ctx, txn, tree.NewTableNameWithSchema("d1", "public", "t1"), tree.ObjectLookupFlagsWithRequired()) + if err != nil { + return err + } + tableDesc.SetOffline("For unit test") + err = descriptors.WriteDesc(ctx, false, tableDesc, txn) + if err != nil { + return err + } + tableID = tableDesc.ID + return nil + }) + require.NoError(t, err) + + _, err = s.LeaseManager().(*lease.Manager).WaitForOneVersion(ctx, tableID, retry.Options{}) + require.NoError(t, err) + + go func() { + err := descs.Txn(ctx, s.ClusterSettings(), s.LeaseManager().(*lease.Manager), + s.InternalExecutor().(sqlutil.InternalExecutor), s.DB(), + func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error { + close(waitForRqstFilter) + mu.Lock() + waitForRqstFilter = make(chan chan struct{}) + txnID = txn.ID() + mu.Unlock() + + // Online the descriptor by making it public + _, tableDesc, err := descriptors.GetMutableTableByName(ctx, txn, + tree.NewTableNameWithSchema("d1", "public", "t1"), + tree.ObjectLookupFlags{CommonLookupFlags: tree.CommonLookupFlags{ + Required: true, + RequireMutable: true, + IncludeOffline: true, + AvoidCached: true, + }}) + if err != nil { + return err + } + tableDesc.SetPublic() + err = descriptors.WriteDesc(ctx, false, tableDesc, txn) + if err != nil { + return err + } + // Allow the select on the table to proceed, + // so that it waits on the channel at the appropriate + // moment. + notify := make(chan struct{}) + waitForTxn <- notify + <-notify + + // Select from an unrelated table + _, err = s.InternalExecutor().(sqlutil.InternalExecutor).ExecEx(ctx, "inline-exec", txn, + sessiondata.InternalExecutorOverride{User: security.RootUserName()}, + "insert into d1.t2 values (10);") + return err + + }) + close(waitForTxn) + close(waitForRqstFilter) + errorChan <- err + }() + + for notify := range waitForTxn { + close(notify) + mu.RLock() + rqstFilterChannel := waitForRqstFilter + mu.RUnlock() + for notify2 := range rqstFilterChannel { + // Push the query trying to online the table out by + // leasing out the table again + _, err = conn.Query("select * from d1.t1") + require.EqualError(t, err, "pq: relation \"t1\" is offline: For unit test", + "Table offline error was not generated as expected") + close(notify2) + } + } + require.NoError(t, <-errorChan) + close(errorChan) +}