Skip to content

Commit

Permalink
sql: ensure that descriptor lease drains are traced
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
knz committed Nov 11, 2021
1 parent e8061cb commit ca0df0f
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 12 deletions.
2 changes: 1 addition & 1 deletion pkg/server/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (s *Server) drainClients(ctx context.Context, reporter func(int, redact.Saf

// Drain the SQL leases. This must be done after the pgServer has
// given sessions a chance to finish ongoing work.
s.sqlServer.leaseMgr.SetDraining(true /* drain */, reporter)
s.sqlServer.leaseMgr.SetDraining(ctx, true /* drain */, reporter)

// Done. This executes the defers set above to drain SQL leases.
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/catalog/lease/descriptor_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (t *descriptorState) release(ctx context.Context, s *descriptorVersionState
return nil
}
if l := maybeRemoveLease(); l != nil {
releaseLease(l, t.m)
releaseLease(ctx, l, t.m)
}
}

Expand Down
17 changes: 10 additions & 7 deletions pkg/sql/catalog/lease/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ func acquireNodeLease(ctx context.Context, m *Manager, id descpb.ID) (bool, erro
m.names.insert(newDescVersionState)
}
if toRelease != nil {
releaseLease(toRelease, m)
releaseLease(ctx, toRelease, m)
}
return true, nil
})
Expand All @@ -475,17 +475,18 @@ func acquireNodeLease(ctx context.Context, m *Manager, id descpb.ID) (bool, erro
}

// releaseLease from store.
func releaseLease(lease *storedLease, m *Manager) {
ctx := context.TODO()
func releaseLease(ctx context.Context, lease *storedLease, m *Manager) {
if m.isDraining() {
// Release synchronously to guarantee release before exiting.
m.storage.release(ctx, m.stopper, lease)
return
}

// Release to the store asynchronously, without the descriptorState lock.
newCtx := m.ambientCtx.AnnotateCtx(context.Background())
newCtx = logtags.WithTags(newCtx, logtags.FromContext(ctx))
if err := m.stopper.RunAsyncTask(
ctx, "sql.descriptorState: releasing descriptor lease",
newCtx, "sql.descriptorState: releasing descriptor lease",
func(ctx context.Context) {
m.storage.release(ctx, m.stopper, lease)
}); err != nil {
Expand Down Expand Up @@ -530,7 +531,7 @@ func purgeOldVersions(
leases := t.removeInactiveVersions()
t.mu.Unlock()
for _, l := range leases {
releaseLease(l, m)
releaseLease(ctx, l, m)
}
}

Expand Down Expand Up @@ -952,7 +953,9 @@ func (m *Manager) isDraining() bool {
// to report work that needed to be done and which may or may not have
// been done by the time this call returns. See the explanation in
// pkg/server/drain.go for details.
func (m *Manager) SetDraining(drain bool, reporter func(int, redact.SafeString)) {
func (m *Manager) SetDraining(
ctx context.Context, drain bool, reporter func(int, redact.SafeString),
) {
m.draining.Store(drain)
if !drain {
return
Expand All @@ -965,7 +968,7 @@ func (m *Manager) SetDraining(drain bool, reporter func(int, redact.SafeString))
leases := t.removeInactiveVersions()
t.mu.Unlock()
for _, l := range leases {
releaseLease(l, m)
releaseLease(ctx, l, m)
}
if reporter != nil {
// Report progress through the Drain RPC.
Expand Down
7 changes: 4 additions & 3 deletions pkg/sql/catalog/lease/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ func TestLeaseManagerDrain(testingT *testing.T) {
t := newLeaseTest(testingT, params)
defer t.cleanup()

ctx := context.Background()
const descID = keys.LeaseTableID

{
Expand All @@ -499,8 +500,8 @@ func TestLeaseManagerDrain(testingT *testing.T) {
// starts draining.
l1RemovalTracker := leaseRemovalTracker.TrackRemoval(l1.Underlying())

t.nodes[1].SetDraining(true, nil /* reporter */)
t.nodes[2].SetDraining(true, nil /* reporter */)
t.nodes[1].SetDraining(ctx, true, nil /* reporter */)
t.nodes[2].SetDraining(ctx, true, nil /* reporter */)

// Leases cannot be acquired when in draining mode.
if _, err := t.acquire(1, descID); !testutils.IsError(err, "cannot acquire lease when draining") {
Expand All @@ -523,7 +524,7 @@ func TestLeaseManagerDrain(testingT *testing.T) {
{
// Check that leases with a refcount of 0 are correctly kept in the
// store once the drain mode has been exited.
t.nodes[1].SetDraining(false, nil /* reporter */)
t.nodes[1].SetDraining(ctx, false, nil /* reporter */)
l1 := t.mustAcquire(1, descID)
t.mustRelease(1, l1, nil)
t.expectLeases(descID, "/1/1")
Expand Down

0 comments on commit ca0df0f

Please sign in to comment.