Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

internal/client: Make the lease manager thread-safe #28223

Merged
merged 1 commit into from
Aug 3, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 31 additions & 11 deletions pkg/internal/client/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ type LeaseManager struct {
// Lease contains the state of a lease on a particular key.
type Lease struct {
key roachpb.Key
val *LeaseVal
val struct {
sem chan struct{}
lease *LeaseVal
}
}

// LeaseManagerOptions are used to configure a new LeaseManager.
Expand Down Expand Up @@ -92,6 +95,7 @@ func (m *LeaseManager) AcquireLease(ctx context.Context, key roachpb.Key) (*Leas
lease := &Lease{
key: key,
}
lease.val.sem = make(chan struct{}, 1)
if err := m.db.Txn(ctx, func(ctx context.Context, txn *Txn) error {
var val LeaseVal
err := txn.GetProto(ctx, key, &val)
Expand All @@ -101,11 +105,11 @@ func (m *LeaseManager) AcquireLease(ctx context.Context, key roachpb.Key) (*Leas
if !m.leaseAvailable(&val) {
return &LeaseNotAvailableError{key: key, expiration: val.Expiration}
}
lease.val = &LeaseVal{
lease.val.lease = &LeaseVal{
Owner: m.clientID,
Expiration: m.clock.Now().Add(m.leaseDuration.Nanoseconds(), 0),
}
return txn.Put(ctx, key, lease.val)
return txn.Put(ctx, key, lease.val.lease)
}); err != nil {
return nil, err
}
Expand All @@ -118,7 +122,9 @@ func (m *LeaseManager) leaseAvailable(val *LeaseVal) bool {

// TimeRemaining returns the amount of time left on the given lease.
func (m *LeaseManager) TimeRemaining(l *Lease) time.Duration {
return m.timeRemaining(l.val)
l.val.sem <- struct{}{}
defer func() { <-l.val.sem }()
return m.timeRemaining(l.val.lease)
}

func (m *LeaseManager) timeRemaining(val *LeaseVal) time.Duration {
Expand All @@ -134,29 +140,43 @@ func (m *LeaseManager) timeRemaining(val *LeaseVal) time.Duration {
// ExtendLease attempts to push the expiration time of the lease farther out
// into the future.
func (m *LeaseManager) ExtendLease(ctx context.Context, l *Lease) error {
if m.TimeRemaining(l) < 0 {
return errors.Errorf("can't extend lease that expired at time %s", l.val.Expiration)
select {
case <-ctx.Done():
return ctx.Err()
case l.val.sem <- struct{}{}:
}
defer func() { <-l.val.sem }()

if m.timeRemaining(l.val.lease) < 0 {
return errors.Errorf("can't extend lease that expired at time %s", l.val.lease.Expiration)
}

newVal := &LeaseVal{
Owner: m.clientID,
Expiration: m.clock.Now().Add(m.leaseDuration.Nanoseconds(), 0),
}

if err := m.db.CPut(ctx, l.key, newVal, l.val); err != nil {
if err := m.db.CPut(ctx, l.key, newVal, l.val.lease); err != nil {
if _, ok := err.(*roachpb.ConditionFailedError); ok {
// Something is wrong - immediately expire the local lease state.
l.val.Expiration = hlc.Timestamp{}
return errors.Wrapf(err, "local lease state %v out of sync with DB state", l.val)
l.val.lease.Expiration = hlc.Timestamp{}
return errors.Wrapf(err, "local lease state %v out of sync with DB state", l.val.lease)
}
return err
}
l.val = newVal
l.val.lease = newVal
return nil
}

// ReleaseLease attempts to release the given lease so that another process can
// grab it.
func (m *LeaseManager) ReleaseLease(ctx context.Context, l *Lease) error {
return m.db.CPut(ctx, l.key, nil, l.val)
select {
case <-ctx.Done():
return ctx.Err()
case l.val.sem <- struct{}{}:
}
defer func() { <-l.val.sem }()

return m.db.CPut(ctx, l.key, nil, l.val.lease)
}