Skip to content

Commit

Permalink
lease: revive TestLeaseAcquireAndReleaseConcurrencly
Browse files Browse the repository at this point in the history
This test relied on an old placement of a testing knob (it's been moved back),
and it relied on the leasing subsystem interacting with the lease table itself
in interesting ways. We sidestep this latter bit by making the test use a new
table. Lastly, we allow other concurrent leasing activity to not disturb the
test.

Fixes #51798

Release note: None
  • Loading branch information
ajwerner committed Nov 29, 2022
1 parent 1a6e9f8 commit 2bd6fd2
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 36 deletions.
2 changes: 1 addition & 1 deletion pkg/sql/catalog/lease/descriptor_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func (t *descriptorState) startLeaseRenewal(
log.VEventf(ctx, 1,
"background lease renewal beginning for id=%d name=%q",
id, name)
if _, err := acquireNodeLease(ctx, m, id); err != nil {
if _, err := acquireNodeLease(ctx, m, id, AcquireBackground); err != nil {
log.Errorf(ctx,
"background lease renewal for id=%d name=%q failed: %s",
id, name, err)
Expand Down
28 changes: 14 additions & 14 deletions pkg/sql/catalog/lease/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,10 +464,7 @@ func (m *Manager) AcquireFreshestFromStore(ctx context.Context, id descpb.ID) er
attemptsMade := 0
for {
// Acquire a fresh lease.
didAcquire, err := acquireNodeLease(ctx, m, id)
if m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent != nil {
m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent(AcquireFreshestBlock, id)
}
didAcquire, err := acquireNodeLease(ctx, m, id, AcquireFreshestBlock)
if err != nil {
return err
}
Expand All @@ -490,7 +487,9 @@ func (m *Manager) AcquireFreshestFromStore(ctx context.Context, id descpb.ID) er
// being dropped or offline, the error will be of type inactiveTableError.
// The boolean returned is true if this call was actually responsible for the
// lease acquisition.
func acquireNodeLease(ctx context.Context, m *Manager, id descpb.ID) (bool, error) {
func acquireNodeLease(
ctx context.Context, m *Manager, id descpb.ID, typ AcquireType,
) (bool, error) {
start := timeutil.Now()
log.VEventf(ctx, 2, "acquiring lease for descriptor %d", id)
var toRelease *storedLease
Expand Down Expand Up @@ -531,6 +530,9 @@ func acquireNodeLease(ctx context.Context, m *Manager, id descpb.ID) (bool, erro
}
return true, nil
})
if m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent != nil {
m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent(typ, id)
}
select {
case <-ctx.Done():
return false, ctx.Err()
Expand Down Expand Up @@ -634,17 +636,19 @@ func purgeOldVersions(
return err
}

// AcquireBlockType is the type of blocking result event when
// AcquireType is the type of blocking result event when
// calling LeaseAcquireResultBlockEvent.
type AcquireBlockType int
type AcquireType int

const (
// AcquireBlock denotes the LeaseAcquireResultBlockEvent is
// coming from descriptorState.acquire().
AcquireBlock AcquireBlockType = iota
AcquireBlock AcquireType = iota
// AcquireFreshestBlock denotes the LeaseAcquireResultBlockEvent is
// from descriptorState.acquireFreshestFromStore().
AcquireFreshestBlock
// AcquireBackground happens due to periodic background refreshes.
AcquireBackground
)

// Manager manages acquiring and releasing per-descriptor leases. It also
Expand Down Expand Up @@ -982,16 +986,12 @@ func (m *Manager) Acquire(
t.markAcquisitionStart(ctx)
defer t.markAcquisitionDone(ctx)
// Renew lease and retry. This will block until the lease is acquired.
_, errLease := acquireNodeLease(ctx, m, id)
_, errLease := acquireNodeLease(ctx, m, id, AcquireBlock)
return errLease
}(); err != nil {
return nil, err
}

if m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent != nil {
m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent(AcquireBlock, id)
}

case errors.Is(err, errReadOlderVersion):
// Read old versions from the store. This can block while reading.
versions, errRead := m.readOlderVersionForTimestamp(ctx, id, timestamp)
Expand Down Expand Up @@ -1243,7 +1243,7 @@ func (m *Manager) refreshSomeLeases(ctx context.Context) {
}
}

if _, err := acquireNodeLease(ctx, m, id); err != nil {
if _, err := acquireNodeLease(ctx, m, id, AcquireBackground); err != nil {
log.Infof(ctx, "refreshing descriptor: %d lease failed: %s", id, err)

if errors.Is(err, catalog.ErrDescriptorNotFound) {
Expand Down
54 changes: 39 additions & 15 deletions pkg/sql/catalog/lease/lease_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -550,7 +550,9 @@ CREATE TABLE t.%s (k CHAR PRIMARY KEY, v CHAR);
tracker := removalTracker.TrackRemoval(lease.Descriptor)

// Acquire another lease.
if _, err := acquireNodeLease(context.Background(), leaseManager, tableDesc.GetID()); err != nil {
if _, err := acquireNodeLease(
context.Background(), leaseManager, tableDesc.GetID(), AcquireBlock,
); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -833,11 +835,6 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR);
func TestLeaseAcquireAndReleaseConcurrently(t *testing.T) {
defer leaktest.AfterTest(t)()

// TODO(andrei): the startupmigrations pkg is gone and so are migrations
// requiring backfill. The test crashes when run, though; it rotted.
skip.WithIssue(t, 51798, "fails in the presence of migrations requiring backfill, "+
"but cannot import startupmigrations")

// Result is a struct for moving results to the main result routine.
type Result struct {
table LeasedDescriptor
Expand All @@ -853,15 +850,17 @@ func TestLeaseAcquireAndReleaseConcurrently(t *testing.T) {
return res
}

descID := descpb.ID(keys.LeaseTableID)
var descID atomic.Value
descID.Store(descpb.ID(0))
getDescID := func() descpb.ID { return descID.Load().(descpb.ID) }

// acquireBlock calls Acquire.
acquireBlock := func(
ctx context.Context,
m *Manager,
acquireChan chan Result,
) {
acquireChan <- mkResult(m.Acquire(ctx, m.storage.clock.Now(), descID))
acquireChan <- mkResult(m.Acquire(ctx, m.storage.clock.Now(), getDescID()))
}

testCases := []struct {
Expand Down Expand Up @@ -911,10 +910,20 @@ func TestLeaseAcquireAndReleaseConcurrently(t *testing.T) {
removalTracker := NewLeaseRemovalTracker()
testingKnobs := base.TestingKnobs{
SQLLeaseManager: &ManagerTestingKnobs{
TestingDescriptorUpdateEvent: func(descriptor *descpb.Descriptor) error {
// Ignore the update to the table in question.
if id, _, _, _, _ := descpb.GetDescriptorMetadata(descriptor); id == getDescID() {
return errors.New("boom")
}
return nil
},
LeaseStoreTestingKnobs: StorageTestingKnobs{
RemoveOnceDereferenced: true,
LeaseReleasedEvent: removalTracker.LeaseRemovedNotification,
LeaseAcquireResultBlockEvent: func(leaseBlockType AcquireBlockType, _ descpb.ID) {
LeaseAcquireResultBlockEvent: func(leaseBlockType AcquireType, id descpb.ID) {
if id != getDescID() {
return
}
if leaseBlockType == AcquireBlock {
if count := atomic.LoadInt32(&acquireArrivals); (count < 1 && test.isSecondCallAcquireFreshest) ||
(count < 2 && !test.isSecondCallAcquireFreshest) {
Expand All @@ -930,25 +939,40 @@ func TestLeaseAcquireAndReleaseConcurrently(t *testing.T) {
}
}
},
LeaseAcquiredEvent: func(_ catalog.Descriptor, _ error) {
LeaseAcquiredEvent: func(desc catalog.Descriptor, err error) {
if desc.GetID() != getDescID() {
return
}
atomic.AddInt32(&leasesAcquiredCount, 1)
<-acquisitionBlock
},
},
},
}

serverArgs := base.TestServerArgs{Knobs: testingKnobs}
serverArgs := base.TestServerArgs{
Knobs: testingKnobs,
Settings: cluster.MakeTestingClusterSettings(),
}

// The LeaseJitterFraction is zero so leases will have
// monotonically increasing expiration. This prevents two leases
// from having the same expiration due to randomness, as the
// leases are checked for having a different expiration.
LeaseJitterFraction.Override(ctx, &serverArgs.SV, 0)

s, _, _ := serverutils.StartServer(
s, sqlDB, _ := serverutils.StartServer(
t, serverArgs)
defer s.Stopper().Stop(context.Background())
tdb := sqlutils.MakeSQLRunner(sqlDB)
tdb.Exec(t, "CREATE TABLE t (i INT PRIMARY KEY)")
{
var tableID descpb.ID
tdb.QueryRow(
t, "SELECT id FROM system.namespace WHERE name = $1", "t",
).Scan(&tableID)
descID.Store(tableID)
}
leaseManager := s.LeaseManager().(*Manager)

acquireResultChan := make(chan Result)
Expand All @@ -958,11 +982,11 @@ func TestLeaseAcquireAndReleaseConcurrently(t *testing.T) {
go acquireBlock(ctx, leaseManager, acquireResultChan)
if test.isSecondCallAcquireFreshest {
go func(ctx context.Context, m *Manager, acquireChan chan Result) {
if err := m.AcquireFreshestFromStore(ctx, descID); err != nil {
if err := m.AcquireFreshestFromStore(ctx, getDescID()); err != nil {
acquireChan <- mkResult(nil, err)
return
}
acquireChan <- mkResult(m.Acquire(ctx, s.Clock().Now(), descID))
acquireChan <- mkResult(m.Acquire(ctx, s.Clock().Now(), getDescID()))
}(ctx, leaseManager, acquireResultChan)

} else {
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/catalog/lease/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1334,8 +1334,8 @@ func TestLeaseRenewedAutomatically(testingT *testing.T) {
atomic.AddInt32(&testAcquiredCount, 1)
}
},
LeaseAcquireResultBlockEvent: func(_ lease.AcquireBlockType, id descpb.ID) {
if uint32(id) < bootstrap.TestingMinUserDescID() {
LeaseAcquireResultBlockEvent: func(typ lease.AcquireType, id descpb.ID) {
if uint32(id) < bootstrap.TestingMinUserDescID() || typ == lease.AcquireBackground {
return
}
atomic.AddInt32(&testAcquisitionBlockCount, 1)
Expand Down Expand Up @@ -1798,8 +1798,8 @@ func TestLeaseRenewedPeriodically(testingT *testing.T) {
defer mu.Unlock()
releasedIDs[id] = struct{}{}
},
LeaseAcquireResultBlockEvent: func(_ lease.AcquireBlockType, id descpb.ID) {
if uint32(id) < bootstrap.TestingMinUserDescID() {
LeaseAcquireResultBlockEvent: func(typ lease.AcquireType, id descpb.ID) {
if uint32(id) < bootstrap.TestingMinUserDescID() || typ == lease.AcquireBackground {
return
}
atomic.AddInt32(&testAcquisitionBlockCount, 1)
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/catalog/lease/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ type StorageTestingKnobs struct {
// Called after a lease is acquired, with any operation error.
LeaseAcquiredEvent func(desc catalog.Descriptor, err error)
// Called before waiting on a results from a DoChan call of acquireNodeLease
// in descriptorState.acquire() and descriptorState.acquireFreshestFromStore().
LeaseAcquireResultBlockEvent func(leaseBlockType AcquireBlockType, id descpb.ID)
// in Acquire and AcquireFreshestFromStore.
LeaseAcquireResultBlockEvent func(leaseBlockType AcquireType, id descpb.ID)
// RemoveOnceDereferenced forces leases to be removed
// as soon as they are dereferenced.
RemoveOnceDereferenced bool
Expand Down

0 comments on commit 2bd6fd2

Please sign in to comment.