diff --git a/pkg/sql/catalog/lease/descriptor_state.go b/pkg/sql/catalog/lease/descriptor_state.go index 59d50e1aece2..424a14561127 100644 --- a/pkg/sql/catalog/lease/descriptor_state.go +++ b/pkg/sql/catalog/lease/descriptor_state.go @@ -305,7 +305,7 @@ func (t *descriptorState) startLeaseRenewal( log.VEventf(ctx, 1, "background lease renewal beginning for id=%d name=%q", id, name) - if _, err := acquireNodeLease(ctx, m, id); err != nil { + if _, err := acquireNodeLease(ctx, m, id, AcquireBackground); err != nil { log.Errorf(ctx, "background lease renewal for id=%d name=%q failed: %s", id, name, err) diff --git a/pkg/sql/catalog/lease/lease.go b/pkg/sql/catalog/lease/lease.go index 75ba492fa6a6..4d9329723f07 100644 --- a/pkg/sql/catalog/lease/lease.go +++ b/pkg/sql/catalog/lease/lease.go @@ -464,10 +464,7 @@ func (m *Manager) AcquireFreshestFromStore(ctx context.Context, id descpb.ID) er attemptsMade := 0 for { // Acquire a fresh lease. - didAcquire, err := acquireNodeLease(ctx, m, id) - if m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent != nil { - m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent(AcquireFreshestBlock, id) - } + didAcquire, err := acquireNodeLease(ctx, m, id, AcquireFreshestBlock) if err != nil { return err } @@ -490,7 +487,9 @@ func (m *Manager) AcquireFreshestFromStore(ctx context.Context, id descpb.ID) er // being dropped or offline, the error will be of type inactiveTableError. // The boolean returned is true if this call was actually responsible for the // lease acquisition. -func acquireNodeLease(ctx context.Context, m *Manager, id descpb.ID) (bool, error) { +func acquireNodeLease( + ctx context.Context, m *Manager, id descpb.ID, typ AcquireType, +) (bool, error) { start := timeutil.Now() log.VEventf(ctx, 2, "acquiring lease for descriptor %d", id) var toRelease *storedLease @@ -531,6 +530,9 @@ func acquireNodeLease(ctx context.Context, m *Manager, id descpb.ID) (bool, erro } return true, nil }) + if m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent != nil { + m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent(typ, id) + } select { case <-ctx.Done(): return false, ctx.Err() @@ -634,17 +636,19 @@ func purgeOldVersions( return err } -// AcquireBlockType is the type of blocking result event when +// AcquireType is the type of blocking result event when // calling LeaseAcquireResultBlockEvent. -type AcquireBlockType int +type AcquireType int const ( // AcquireBlock denotes the LeaseAcquireResultBlockEvent is // coming from descriptorState.acquire(). - AcquireBlock AcquireBlockType = iota + AcquireBlock AcquireType = iota // AcquireFreshestBlock denotes the LeaseAcquireResultBlockEvent is // from descriptorState.acquireFreshestFromStore(). AcquireFreshestBlock + // AcquireBackground happens due to periodic background refreshes. + AcquireBackground ) // Manager manages acquiring and releasing per-descriptor leases. It also @@ -982,16 +986,12 @@ func (m *Manager) Acquire( t.markAcquisitionStart(ctx) defer t.markAcquisitionDone(ctx) // Renew lease and retry. This will block until the lease is acquired. - _, errLease := acquireNodeLease(ctx, m, id) + _, errLease := acquireNodeLease(ctx, m, id, AcquireBlock) return errLease }(); err != nil { return nil, err } - if m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent != nil { - m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent(AcquireBlock, id) - } - case errors.Is(err, errReadOlderVersion): // Read old versions from the store. This can block while reading. versions, errRead := m.readOlderVersionForTimestamp(ctx, id, timestamp) @@ -1243,7 +1243,7 @@ func (m *Manager) refreshSomeLeases(ctx context.Context) { } } - if _, err := acquireNodeLease(ctx, m, id); err != nil { + if _, err := acquireNodeLease(ctx, m, id, AcquireBackground); err != nil { log.Infof(ctx, "refreshing descriptor: %d lease failed: %s", id, err) if errors.Is(err, catalog.ErrDescriptorNotFound) { diff --git a/pkg/sql/catalog/lease/lease_internal_test.go b/pkg/sql/catalog/lease/lease_internal_test.go index 15c206edd2ae..8ae9a144f5ff 100644 --- a/pkg/sql/catalog/lease/lease_internal_test.go +++ b/pkg/sql/catalog/lease/lease_internal_test.go @@ -24,12 +24,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" - "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -550,7 +550,9 @@ CREATE TABLE t.%s (k CHAR PRIMARY KEY, v CHAR); tracker := removalTracker.TrackRemoval(lease.Descriptor) // Acquire another lease. - if _, err := acquireNodeLease(context.Background(), leaseManager, tableDesc.GetID()); err != nil { + if _, err := acquireNodeLease( + context.Background(), leaseManager, tableDesc.GetID(), AcquireBlock, + ); err != nil { t.Fatal(err) } @@ -833,11 +835,6 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); func TestLeaseAcquireAndReleaseConcurrently(t *testing.T) { defer leaktest.AfterTest(t)() - // TODO(andrei): the startupmigrations pkg is gone and so are migrations - // requiring backfill. The test crashes when run, though; it rotted. - skip.WithIssue(t, 51798, "fails in the presence of migrations requiring backfill, "+ - "but cannot import startupmigrations") - // Result is a struct for moving results to the main result routine. type Result struct { table LeasedDescriptor @@ -853,7 +850,9 @@ func TestLeaseAcquireAndReleaseConcurrently(t *testing.T) { return res } - descID := descpb.ID(keys.LeaseTableID) + var descID atomic.Value + descID.Store(descpb.ID(0)) + getDescID := func() descpb.ID { return descID.Load().(descpb.ID) } // acquireBlock calls Acquire. acquireBlock := func( @@ -861,7 +860,7 @@ func TestLeaseAcquireAndReleaseConcurrently(t *testing.T) { m *Manager, acquireChan chan Result, ) { - acquireChan <- mkResult(m.Acquire(ctx, m.storage.clock.Now(), descID)) + acquireChan <- mkResult(m.Acquire(ctx, m.storage.clock.Now(), getDescID())) } testCases := []struct { @@ -911,10 +910,20 @@ func TestLeaseAcquireAndReleaseConcurrently(t *testing.T) { removalTracker := NewLeaseRemovalTracker() testingKnobs := base.TestingKnobs{ SQLLeaseManager: &ManagerTestingKnobs{ + TestingDescriptorUpdateEvent: func(descriptor *descpb.Descriptor) error { + // Ignore the update to the table in question. + if id, _, _, _, _ := descpb.GetDescriptorMetadata(descriptor); id == getDescID() { + return errors.New("boom") + } + return nil + }, LeaseStoreTestingKnobs: StorageTestingKnobs{ RemoveOnceDereferenced: true, LeaseReleasedEvent: removalTracker.LeaseRemovedNotification, - LeaseAcquireResultBlockEvent: func(leaseBlockType AcquireBlockType, _ descpb.ID) { + LeaseAcquireResultBlockEvent: func(leaseBlockType AcquireType, id descpb.ID) { + if id != getDescID() { + return + } if leaseBlockType == AcquireBlock { if count := atomic.LoadInt32(&acquireArrivals); (count < 1 && test.isSecondCallAcquireFreshest) || (count < 2 && !test.isSecondCallAcquireFreshest) { @@ -930,7 +939,10 @@ func TestLeaseAcquireAndReleaseConcurrently(t *testing.T) { } } }, - LeaseAcquiredEvent: func(_ catalog.Descriptor, _ error) { + LeaseAcquiredEvent: func(desc catalog.Descriptor, err error) { + if desc.GetID() != getDescID() { + return + } atomic.AddInt32(&leasesAcquiredCount, 1) <-acquisitionBlock }, @@ -938,7 +950,10 @@ func TestLeaseAcquireAndReleaseConcurrently(t *testing.T) { }, } - serverArgs := base.TestServerArgs{Knobs: testingKnobs} + serverArgs := base.TestServerArgs{ + Knobs: testingKnobs, + Settings: cluster.MakeTestingClusterSettings(), + } // The LeaseJitterFraction is zero so leases will have // monotonically increasing expiration. This prevents two leases @@ -946,9 +961,18 @@ func TestLeaseAcquireAndReleaseConcurrently(t *testing.T) { // leases are checked for having a different expiration. LeaseJitterFraction.Override(ctx, &serverArgs.SV, 0) - s, _, _ := serverutils.StartServer( + s, sqlDB, _ := serverutils.StartServer( t, serverArgs) defer s.Stopper().Stop(context.Background()) + tdb := sqlutils.MakeSQLRunner(sqlDB) + tdb.Exec(t, "CREATE TABLE t (i INT PRIMARY KEY)") + { + var tableID descpb.ID + tdb.QueryRow( + t, "SELECT id FROM system.namespace WHERE name = $1", "t", + ).Scan(&tableID) + descID.Store(tableID) + } leaseManager := s.LeaseManager().(*Manager) acquireResultChan := make(chan Result) @@ -958,11 +982,11 @@ func TestLeaseAcquireAndReleaseConcurrently(t *testing.T) { go acquireBlock(ctx, leaseManager, acquireResultChan) if test.isSecondCallAcquireFreshest { go func(ctx context.Context, m *Manager, acquireChan chan Result) { - if err := m.AcquireFreshestFromStore(ctx, descID); err != nil { + if err := m.AcquireFreshestFromStore(ctx, getDescID()); err != nil { acquireChan <- mkResult(nil, err) return } - acquireChan <- mkResult(m.Acquire(ctx, s.Clock().Now(), descID)) + acquireChan <- mkResult(m.Acquire(ctx, s.Clock().Now(), getDescID())) }(ctx, leaseManager, acquireResultChan) } else { diff --git a/pkg/sql/catalog/lease/lease_test.go b/pkg/sql/catalog/lease/lease_test.go index 044dfc214307..9d110c655c30 100644 --- a/pkg/sql/catalog/lease/lease_test.go +++ b/pkg/sql/catalog/lease/lease_test.go @@ -1334,8 +1334,8 @@ func TestLeaseRenewedAutomatically(testingT *testing.T) { atomic.AddInt32(&testAcquiredCount, 1) } }, - LeaseAcquireResultBlockEvent: func(_ lease.AcquireBlockType, id descpb.ID) { - if uint32(id) < bootstrap.TestingMinUserDescID() { + LeaseAcquireResultBlockEvent: func(typ lease.AcquireType, id descpb.ID) { + if uint32(id) < bootstrap.TestingMinUserDescID() || typ == lease.AcquireBackground { return } atomic.AddInt32(&testAcquisitionBlockCount, 1) @@ -1798,8 +1798,8 @@ func TestLeaseRenewedPeriodically(testingT *testing.T) { defer mu.Unlock() releasedIDs[id] = struct{}{} }, - LeaseAcquireResultBlockEvent: func(_ lease.AcquireBlockType, id descpb.ID) { - if uint32(id) < bootstrap.TestingMinUserDescID() { + LeaseAcquireResultBlockEvent: func(typ lease.AcquireType, id descpb.ID) { + if uint32(id) < bootstrap.TestingMinUserDescID() || typ == lease.AcquireBackground { return } atomic.AddInt32(&testAcquisitionBlockCount, 1) diff --git a/pkg/sql/catalog/lease/testutils.go b/pkg/sql/catalog/lease/testutils.go index 56ec1afebdd0..7c4563a2d79e 100644 --- a/pkg/sql/catalog/lease/testutils.go +++ b/pkg/sql/catalog/lease/testutils.go @@ -29,8 +29,8 @@ type StorageTestingKnobs struct { // Called after a lease is acquired, with any operation error. LeaseAcquiredEvent func(desc catalog.Descriptor, err error) // Called before waiting on a results from a DoChan call of acquireNodeLease - // in descriptorState.acquire() and descriptorState.acquireFreshestFromStore(). - LeaseAcquireResultBlockEvent func(leaseBlockType AcquireBlockType, id descpb.ID) + // in Acquire and AcquireFreshestFromStore. + LeaseAcquireResultBlockEvent func(leaseBlockType AcquireType, id descpb.ID) // RemoveOnceDereferenced forces leases to be removed // as soon as they are dereferenced. RemoveOnceDereferenced bool