diff --git a/pkg/internal/client/lease.go b/pkg/internal/client/lease.go index fac1a264b589..395229f12f91 100644 --- a/pkg/internal/client/lease.go +++ b/pkg/internal/client/lease.go @@ -55,7 +55,10 @@ type LeaseManager struct { // Lease contains the state of a lease on a particular key. type Lease struct { key roachpb.Key - val *LeaseVal + val struct { + sem chan struct{} + lease *LeaseVal + } } // LeaseManagerOptions are used to configure a new LeaseManager. @@ -92,6 +95,7 @@ func (m *LeaseManager) AcquireLease(ctx context.Context, key roachpb.Key) (*Leas lease := &Lease{ key: key, } + lease.val.sem = make(chan struct{}, 1) if err := m.db.Txn(ctx, func(ctx context.Context, txn *Txn) error { var val LeaseVal err := txn.GetProto(ctx, key, &val) @@ -101,11 +105,11 @@ func (m *LeaseManager) AcquireLease(ctx context.Context, key roachpb.Key) (*Leas if !m.leaseAvailable(&val) { return &LeaseNotAvailableError{key: key, expiration: val.Expiration} } - lease.val = &LeaseVal{ + lease.val.lease = &LeaseVal{ Owner: m.clientID, Expiration: m.clock.Now().Add(m.leaseDuration.Nanoseconds(), 0), } - return txn.Put(ctx, key, lease.val) + return txn.Put(ctx, key, lease.val.lease) }); err != nil { return nil, err } @@ -118,7 +122,9 @@ func (m *LeaseManager) leaseAvailable(val *LeaseVal) bool { // TimeRemaining returns the amount of time left on the given lease. func (m *LeaseManager) TimeRemaining(l *Lease) time.Duration { - return m.timeRemaining(l.val) + l.val.sem <- struct{}{} + defer func() { <-l.val.sem }() + return m.timeRemaining(l.val.lease) } func (m *LeaseManager) timeRemaining(val *LeaseVal) time.Duration { @@ -134,8 +140,15 @@ func (m *LeaseManager) timeRemaining(val *LeaseVal) time.Duration { // ExtendLease attempts to push the expiration time of the lease farther out // into the future. func (m *LeaseManager) ExtendLease(ctx context.Context, l *Lease) error { - if m.TimeRemaining(l) < 0 { - return errors.Errorf("can't extend lease that expired at time %s", l.val.Expiration) + select { + case <-ctx.Done(): + return ctx.Err() + case l.val.sem <- struct{}{}: + } + defer func() { <-l.val.sem }() + + if m.timeRemaining(l.val.lease) < 0 { + return errors.Errorf("can't extend lease that expired at time %s", l.val.lease.Expiration) } newVal := &LeaseVal{ @@ -143,20 +156,27 @@ func (m *LeaseManager) ExtendLease(ctx context.Context, l *Lease) error { Expiration: m.clock.Now().Add(m.leaseDuration.Nanoseconds(), 0), } - if err := m.db.CPut(ctx, l.key, newVal, l.val); err != nil { + if err := m.db.CPut(ctx, l.key, newVal, l.val.lease); err != nil { if _, ok := err.(*roachpb.ConditionFailedError); ok { // Something is wrong - immediately expire the local lease state. - l.val.Expiration = hlc.Timestamp{} - return errors.Wrapf(err, "local lease state %v out of sync with DB state", l.val) + l.val.lease.Expiration = hlc.Timestamp{} + return errors.Wrapf(err, "local lease state %v out of sync with DB state", l.val.lease) } return err } - l.val = newVal + l.val.lease = newVal return nil } // ReleaseLease attempts to release the given lease so that another process can // grab it. func (m *LeaseManager) ReleaseLease(ctx context.Context, l *Lease) error { - return m.db.CPut(ctx, l.key, nil, l.val) + select { + case <-ctx.Done(): + return ctx.Err() + case l.val.sem <- struct{}{}: + } + defer func() { <-l.val.sem }() + + return m.db.CPut(ctx, l.key, nil, l.val.lease) }