Skip to content

Commit

Permalink
internal/client: Make the lease manager thread-safe
Browse files Browse the repository at this point in the history
This race was exceptionally rare due to how the lease manager is
typically used by the sqlmigrations package, but it is indeed a race.

Holding a semaphore while making a remote RPC is usually a terrible
idea, but in the context of how it's used it's actually more dangerous
to let ExtendLease and ReleaseLease interleave, since if ReleaseLease's
CPut fails then the sqlmigrations package will log.Fatal, and the only
potential for lock contention is between one goroutine using ExtendLease
and one running ReleaseLease. Perhaps this is tuning the package too
tightly to the needs of its client, but as of now it's its only client.

Fixes cockroachdb#28222

Release note: None
  • Loading branch information
a-robinson committed Aug 3, 2018
1 parent f123c03 commit c9cfe12
Showing 1 changed file with 31 additions and 11 deletions.
42 changes: 31 additions & 11 deletions pkg/internal/client/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ type LeaseManager struct {
// Lease contains the state of a lease on a particular key.
type Lease struct {
key roachpb.Key
val *LeaseVal
val struct {
sem chan struct{}
lease *LeaseVal
}
}

// LeaseManagerOptions are used to configure a new LeaseManager.
Expand Down Expand Up @@ -92,6 +95,7 @@ func (m *LeaseManager) AcquireLease(ctx context.Context, key roachpb.Key) (*Leas
lease := &Lease{
key: key,
}
lease.val.sem = make(chan struct{}, 1)
if err := m.db.Txn(ctx, func(ctx context.Context, txn *Txn) error {
var val LeaseVal
err := txn.GetProto(ctx, key, &val)
Expand All @@ -101,11 +105,11 @@ func (m *LeaseManager) AcquireLease(ctx context.Context, key roachpb.Key) (*Leas
if !m.leaseAvailable(&val) {
return &LeaseNotAvailableError{key: key, expiration: val.Expiration}
}
lease.val = &LeaseVal{
lease.val.lease = &LeaseVal{
Owner: m.clientID,
Expiration: m.clock.Now().Add(m.leaseDuration.Nanoseconds(), 0),
}
return txn.Put(ctx, key, lease.val)
return txn.Put(ctx, key, lease.val.lease)
}); err != nil {
return nil, err
}
Expand All @@ -118,7 +122,9 @@ func (m *LeaseManager) leaseAvailable(val *LeaseVal) bool {

// TimeRemaining returns the amount of time left on the given lease.
func (m *LeaseManager) TimeRemaining(l *Lease) time.Duration {
return m.timeRemaining(l.val)
l.val.sem <- struct{}{}
defer func() { <-l.val.sem }()
return m.timeRemaining(l.val.lease)
}

func (m *LeaseManager) timeRemaining(val *LeaseVal) time.Duration {
Expand All @@ -134,29 +140,43 @@ func (m *LeaseManager) timeRemaining(val *LeaseVal) time.Duration {
// ExtendLease attempts to push the expiration time of the lease farther out
// into the future.
func (m *LeaseManager) ExtendLease(ctx context.Context, l *Lease) error {
if m.TimeRemaining(l) < 0 {
return errors.Errorf("can't extend lease that expired at time %s", l.val.Expiration)
select {
case <-ctx.Done():
return ctx.Err()
case l.val.sem <- struct{}{}:
}
defer func() { <-l.val.sem }()

if m.timeRemaining(l.val.lease) < 0 {
return errors.Errorf("can't extend lease that expired at time %s", l.val.lease.Expiration)
}

newVal := &LeaseVal{
Owner: m.clientID,
Expiration: m.clock.Now().Add(m.leaseDuration.Nanoseconds(), 0),
}

if err := m.db.CPut(ctx, l.key, newVal, l.val); err != nil {
if err := m.db.CPut(ctx, l.key, newVal, l.val.lease); err != nil {
if _, ok := err.(*roachpb.ConditionFailedError); ok {
// Something is wrong - immediately expire the local lease state.
l.val.Expiration = hlc.Timestamp{}
return errors.Wrapf(err, "local lease state %v out of sync with DB state", l.val)
l.val.lease.Expiration = hlc.Timestamp{}
return errors.Wrapf(err, "local lease state %v out of sync with DB state", l.val.lease)
}
return err
}
l.val = newVal
l.val.lease = newVal
return nil
}

// ReleaseLease attempts to release the given lease so that another process can
// grab it.
func (m *LeaseManager) ReleaseLease(ctx context.Context, l *Lease) error {
return m.db.CPut(ctx, l.key, nil, l.val)
select {
case <-ctx.Done():
return ctx.Err()
case l.val.sem <- struct{}{}:
}
defer func() { <-l.val.sem }()

return m.db.CPut(ctx, l.key, nil, l.val.lease)
}

0 comments on commit c9cfe12

Please sign in to comment.