diff --git a/pkg/sql/catalog/lease/lease.go b/pkg/sql/catalog/lease/lease.go index f0e0da444cb4..9650c03e3751 100644 --- a/pkg/sql/catalog/lease/lease.go +++ b/pkg/sql/catalog/lease/lease.go @@ -550,10 +550,15 @@ type descriptorState struct { // entry is created with the expiration time of the new lease and // the older entry is removed. active descriptorSet - // Indicates that the has been dropped, or is being dropped. + + // Indicates that the descriptor has been, or is being, dropped or taken + // offline. // If set, leases are released from the store as soon as their // refcount drops to 0, as opposed to waiting until they expire. - dropped bool + // This flag will be unset by any subsequent lease acquisition, which can + // happen after the table came back online again after having been taken + // offline temporarily (as opposed to dropped). + takenOffline bool // acquisitionsInProgress indicates that at least one caller is currently // in the process of performing an acquisition. This tracking is critical @@ -864,6 +869,7 @@ func acquireNodeLease(ctx context.Context, m *Manager, id descpb.ID) (bool, erro } t := m.findDescriptorState(id, false /* create */) t.mu.Lock() + t.mu.takenOffline = false defer t.mu.Unlock() toRelease, err = t.upsertLocked(newCtx, desc) if err != nil { @@ -905,9 +911,9 @@ func (t *descriptorState) release( // when the refcount drops to 0). If so, we'll need to mark the lease as // invalid. removeOnceDereferenced = removeOnceDereferenced || - // Release from the store if the descriptor has been dropped; no leases - // can be acquired any more. - t.mu.dropped || + // Release from the store if the descriptor has been dropped or taken + // offline. + t.mu.takenOffline || // Release from the store if the lease is not for the latest // version; only leases for the latest version can be acquired. s != t.mu.active.findNewest() @@ -983,9 +989,9 @@ func purgeOldVersions( return nil } - removeInactives := func(drop bool) { + removeInactives := func(takenOffline bool) { t.mu.Lock() - t.mu.dropped = drop + t.mu.takenOffline = takenOffline leases := t.removeInactiveVersions() t.mu.Unlock() for _, l := range leases { @@ -2037,9 +2043,9 @@ func (m *Manager) refreshSomeLeases(ctx context.Context) { break } desc.mu.Lock() - dropped := desc.mu.dropped + takenOffline := desc.mu.takenOffline desc.mu.Unlock() - if !dropped { + if !takenOffline { ids = append(ids, k) } } @@ -2140,7 +2146,7 @@ func (m *Manager) Codec() keys.SQLCodec { // TODO(ajwerner): consider refactoring the function to take a struct, maybe // called LeaseInfo. func (m *Manager) VisitLeases( - f func(desc catalog.Descriptor, dropped bool, refCount int, expiration tree.DTimestamp) (wantMore bool), + f func(desc catalog.Descriptor, takenOffline bool, refCount int, expiration tree.DTimestamp) (wantMore bool), ) { m.mu.Lock() defer m.mu.Unlock() @@ -2149,7 +2155,7 @@ func (m *Manager) VisitLeases( ts.mu.Lock() defer ts.mu.Unlock() - dropped := ts.mu.dropped + takenOffline := ts.mu.takenOffline for _, state := range ts.mu.active.data { state.mu.Lock() @@ -2161,7 +2167,7 @@ func (m *Manager) VisitLeases( continue } - if !f(state.Descriptor, dropped, refCount, lease.expiration) { + if !f(state.Descriptor, takenOffline, refCount, lease.expiration) { return false } } diff --git a/pkg/sql/catalog/lease/lease_test.go b/pkg/sql/catalog/lease/lease_test.go index ca4254102f0c..d7e433d39ab1 100644 --- a/pkg/sql/catalog/lease/lease_test.go +++ b/pkg/sql/catalog/lease/lease_test.go @@ -37,9 +37,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqltestutils" "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -2447,3 +2449,111 @@ func TestBackoffOnRangefeedFailure(t *testing.T) { } require.Greater(t, totalBackoff.Nanoseconds(), (3 * minimumBackoff).Nanoseconds()) } + +// TestLeaseWithOfflineTables checks that leases on tables which had +// previously gone offline at some point are not gratuitously dropped. +// See #57834. +func TestLeaseWithOfflineTables(t *testing.T) { + defer leaktest.AfterTest(t)() + + var descID uint32 + testTableID := func() descpb.ID { + return descpb.ID(atomic.LoadUint32(&descID)) + } + + var lmKnobs lease.ManagerTestingKnobs + blockDescRefreshed := make(chan struct{}, 1) + lmKnobs.TestingDescriptorRefreshedEvent = func(desc *descpb.Descriptor) { + t := descpb.TableFromDescriptor(desc, hlc.Timestamp{}) + if t != nil && testTableID() == t.ID { + blockDescRefreshed <- struct{}{} + } + } + + ctx := context.Background() + params, _ := tests.CreateTestServerParams() + params.Knobs.SQLLeaseManager = &lmKnobs + s, db, kvDB := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + runner := sqlutils.MakeSQLRunner(db) + + // This statement prevents timer issues due to periodic lease refreshing. + _, err := db.Exec(` + SET CLUSTER SETTING sql.tablecache.lease.refresh_limit = 0; + `) + require.NoError(t, err) + + _, err = db.Exec(` + CREATE DATABASE t; + CREATE TABLE t.test(s STRING PRIMARY KEY); + `) + require.NoError(t, err) + + desc := catalogkv.TestingGetTableDescriptor(kvDB, keys.SystemSQLCodec, "t", "test") + atomic.StoreUint32(&descID, uint32(desc.ID)) + + // Sets table descriptor state and waits for that change to propagate to the + // lease manager's refresh worker. + setTableState := func(expected descpb.DescriptorState, next descpb.DescriptorState) { + err := descs.Txn( + ctx, s.ClusterSettings(), + s.LeaseManager().(*lease.Manager), + s.InternalExecutor().(*sql.InternalExecutor), + kvDB, + func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection) error { + desc, err := descsCol.GetMutableTableVersionByID(ctx, testTableID(), txn) + require.NoError(t, err) + require.Equal(t, desc.State, expected) + desc.State = next + return descsCol.WriteDesc(ctx, false /* kvTrace */, desc, txn) + }, + ) + require.NoError(t, err) + // Wait for the lease manager's refresh worker to have processed the + // descriptor update. + <-blockDescRefreshed + } + + // Checks that the lease manager state for `t.test` matches expectations. + checkLeaseState := func(shouldBePresent bool) { + var found bool + var wasTakenOffline bool + fn := func(desc catalog.Descriptor, takenOffline bool, _ int, _ tree.DTimestamp) bool { + if testTableID() != desc.GetID() { + return true + } + wasTakenOffline = takenOffline + found = true + return false + } + s.LeaseManager().(*lease.Manager).VisitLeases(fn) + if found && !wasTakenOffline { + require.Truef(t, shouldBePresent, "lease should not have been present but was") + } else if found { + require.Falsef(t, shouldBePresent, "lease should have been present but was marked as taken offline") + } else { + require.Falsef(t, shouldBePresent, "lease should have been present but wasn't") + } + } + + // Check initial state. + checkLeaseState(false /* shouldBePresent */) + + // Query the table, this should trigger a lease acquisition. + runner.CheckQueryResults(t, "SELECT s FROM t.test", [][]string{}) + checkLeaseState(true /* shouldBePresent */) + + // Take the table offline and back online again. + // This should relinquish the lease. + setTableState(descpb.DescriptorState_PUBLIC, descpb.DescriptorState_OFFLINE) + setTableState(descpb.DescriptorState_OFFLINE, descpb.DescriptorState_PUBLIC) + checkLeaseState(false /* shouldBePresent */) + + // Query the table, thereby acquiring a lease once again. + runner.CheckQueryResults(t, "SELECT s FROM t.test", [][]string{}) + checkLeaseState(true /* shouldBePresent */) + + // Do a no-op descriptor update, lease should still be present. + setTableState(descpb.DescriptorState_PUBLIC, descpb.DescriptorState_PUBLIC) + checkLeaseState(true /* shouldBePresent */) +} diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index 8df989e0c6a4..f20778720026 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -503,7 +503,7 @@ CREATE TABLE crdb_internal.leases ( ctx context.Context, p *planner, _ *dbdesc.Immutable, addRow func(...tree.Datum) error, ) (err error) { nodeID, _ := p.execCfg.NodeID.OptionalNodeID() // zero if not available - p.LeaseMgr().VisitLeases(func(desc catalog.Descriptor, dropped bool, _ int, expiration tree.DTimestamp) (wantMore bool) { + p.LeaseMgr().VisitLeases(func(desc catalog.Descriptor, takenOffline bool, _ int, expiration tree.DTimestamp) (wantMore bool) { if p.CheckAnyPrivilege(ctx, desc) != nil { // TODO(ajwerner): inspect what type of error got returned. return true @@ -515,7 +515,7 @@ CREATE TABLE crdb_internal.leases ( tree.NewDString(desc.GetName()), tree.NewDInt(tree.DInt(int64(desc.GetParentID()))), &expiration, - tree.MakeDBool(tree.DBool(dropped)), + tree.MakeDBool(tree.DBool(takenOffline)), ) return err == nil })