From c9cfe12fdd913dac266b8c09d3c6a125aac58e52 Mon Sep 17 00:00:00 2001 From: Alex Robinson Date: Thu, 2 Aug 2018 17:05:50 -0500 Subject: [PATCH] internal/client: Make the lease manager thread-safe This race was exceptionally rare due to how the lease manager is typically used by the sqlmigrations package, but it is indeed a race. Holding a semaphore while making a remote RPC is usually a terrible idea, but in the context of how it's used it's actually more dangerous to let ExtendLease and ReleaseLease interleave, since if ReleaseLease's CPut fails then the sqlmigrations package will log.Fatal, and the only potential for lock contention is between one goroutine using ExtendLease and one running ReleaseLease. Perhaps this is tuning the package too tightly to the needs of its client, but as of now it's its only client. Fixes #28222 Release note: None --- pkg/internal/client/lease.go | 42 ++++++++++++++++++++++++++---------- 1 file changed, 31 insertions(+), 11 deletions(-) diff --git a/pkg/internal/client/lease.go b/pkg/internal/client/lease.go index fac1a264b589..395229f12f91 100644 --- a/pkg/internal/client/lease.go +++ b/pkg/internal/client/lease.go @@ -55,7 +55,10 @@ type LeaseManager struct { // Lease contains the state of a lease on a particular key. type Lease struct { key roachpb.Key - val *LeaseVal + val struct { + sem chan struct{} + lease *LeaseVal + } } // LeaseManagerOptions are used to configure a new LeaseManager. @@ -92,6 +95,7 @@ func (m *LeaseManager) AcquireLease(ctx context.Context, key roachpb.Key) (*Leas lease := &Lease{ key: key, } + lease.val.sem = make(chan struct{}, 1) if err := m.db.Txn(ctx, func(ctx context.Context, txn *Txn) error { var val LeaseVal err := txn.GetProto(ctx, key, &val) @@ -101,11 +105,11 @@ func (m *LeaseManager) AcquireLease(ctx context.Context, key roachpb.Key) (*Leas if !m.leaseAvailable(&val) { return &LeaseNotAvailableError{key: key, expiration: val.Expiration} } - lease.val = &LeaseVal{ + lease.val.lease = &LeaseVal{ Owner: m.clientID, Expiration: m.clock.Now().Add(m.leaseDuration.Nanoseconds(), 0), } - return txn.Put(ctx, key, lease.val) + return txn.Put(ctx, key, lease.val.lease) }); err != nil { return nil, err } @@ -118,7 +122,9 @@ func (m *LeaseManager) leaseAvailable(val *LeaseVal) bool { // TimeRemaining returns the amount of time left on the given lease. func (m *LeaseManager) TimeRemaining(l *Lease) time.Duration { - return m.timeRemaining(l.val) + l.val.sem <- struct{}{} + defer func() { <-l.val.sem }() + return m.timeRemaining(l.val.lease) } func (m *LeaseManager) timeRemaining(val *LeaseVal) time.Duration { @@ -134,8 +140,15 @@ func (m *LeaseManager) timeRemaining(val *LeaseVal) time.Duration { // ExtendLease attempts to push the expiration time of the lease farther out // into the future. func (m *LeaseManager) ExtendLease(ctx context.Context, l *Lease) error { - if m.TimeRemaining(l) < 0 { - return errors.Errorf("can't extend lease that expired at time %s", l.val.Expiration) + select { + case <-ctx.Done(): + return ctx.Err() + case l.val.sem <- struct{}{}: + } + defer func() { <-l.val.sem }() + + if m.timeRemaining(l.val.lease) < 0 { + return errors.Errorf("can't extend lease that expired at time %s", l.val.lease.Expiration) } newVal := &LeaseVal{ @@ -143,20 +156,27 @@ func (m *LeaseManager) ExtendLease(ctx context.Context, l *Lease) error { Expiration: m.clock.Now().Add(m.leaseDuration.Nanoseconds(), 0), } - if err := m.db.CPut(ctx, l.key, newVal, l.val); err != nil { + if err := m.db.CPut(ctx, l.key, newVal, l.val.lease); err != nil { if _, ok := err.(*roachpb.ConditionFailedError); ok { // Something is wrong - immediately expire the local lease state. - l.val.Expiration = hlc.Timestamp{} - return errors.Wrapf(err, "local lease state %v out of sync with DB state", l.val) + l.val.lease.Expiration = hlc.Timestamp{} + return errors.Wrapf(err, "local lease state %v out of sync with DB state", l.val.lease) } return err } - l.val = newVal + l.val.lease = newVal return nil } // ReleaseLease attempts to release the given lease so that another process can // grab it. func (m *LeaseManager) ReleaseLease(ctx context.Context, l *Lease) error { - return m.db.CPut(ctx, l.key, nil, l.val) + select { + case <-ctx.Done(): + return ctx.Err() + case l.val.sem <- struct{}{}: + } + defer func() { <-l.val.sem }() + + return m.db.CPut(ctx, l.key, nil, l.val.lease) }