Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

catalog/lease: avoid using context.Background directly #72638

Merged
merged 3 commits into from
Nov 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,9 @@ func (r *Registry) ID() base.SQLInstanceID {
// makeCtx returns a new context from r's ambient context and an associated
// cancel func.
func (r *Registry) makeCtx() (context.Context, func()) {
return context.WithCancel(r.ac.AnnotateCtx(r.serverCtx))
ctx := r.ac.AnnotateCtx(context.Background())
ctx = logtags.WithTags(ctx, logtags.FromContext(r.serverCtx))
return context.WithCancel(ctx)
}

// MakeJobID generates a new job ID.
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (s *Server) drainClients(ctx context.Context, reporter func(int, redact.Saf

// Drain the SQL leases. This must be done after the pgServer has
// given sessions a chance to finish ongoing work.
s.sqlServer.leaseMgr.SetDraining(true /* drain */, reporter)
s.sqlServer.leaseMgr.SetDraining(ctx, true /* drain */, reporter)

// Done. This executes the defers set above to drain SQL leases.
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -1120,7 +1120,7 @@ func (s *SQLServer) preStart(

// Delete all orphaned table leases created by a prior instance of this
// node. This also uses SQL.
s.leaseMgr.DeleteOrphanedLeases(orphanedLeasesTimeThresholdNanos)
s.leaseMgr.DeleteOrphanedLeases(ctx, orphanedLeasesTimeThresholdNanos)

// Start scheduled jobs daemon.
jobs.StartJobSchedulerDaemon(
Expand Down
7 changes: 5 additions & 2 deletions pkg/sql/catalog/lease/descriptor_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/cockroachdb/redact"
)

Expand Down Expand Up @@ -259,7 +260,7 @@ func (t *descriptorState) release(ctx context.Context, s *descriptorVersionState
return nil
}
if l := maybeRemoveLease(); l != nil {
releaseLease(l, t.m)
releaseLease(ctx, l, t.m)
}
}

Expand All @@ -273,7 +274,9 @@ func (t *descriptorState) maybeQueueLeaseRenewal(
}

// Start the renewal. When it finishes, it will reset t.renewalInProgress.
return t.stopper.RunAsyncTask(context.Background(),
newCtx := m.ambientCtx.AnnotateCtx(context.Background())
newCtx = logtags.WithTags(newCtx, logtags.FromContext(ctx))
return t.stopper.RunAsyncTask(newCtx,
"lease renewal", func(ctx context.Context) {
t.startLeaseRenewal(ctx, m, id, name)
})
Expand Down
27 changes: 17 additions & 10 deletions pkg/sql/catalog/lease/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,9 @@ func acquireNodeLease(ctx context.Context, m *Manager, id descpb.ID) (bool, erro
// of the first context cancels other callers to the `acquireNodeLease()` method,
// because of its use of `singleflight.Group`. See issue #41780 for how this has
// happened.
newCtx, cancel := m.stopper.WithCancelOnQuiesce(logtags.WithTags(context.Background(), logtags.FromContext(ctx)))
baseCtx := m.ambientCtx.AnnotateCtx(context.Background())
baseCtx = logtags.WithTags(baseCtx, logtags.FromContext(ctx))
newCtx, cancel := m.stopper.WithCancelOnQuiesce(baseCtx)
defer cancel()
if m.isDraining() {
return nil, errors.New("cannot acquire lease when draining")
Expand All @@ -457,7 +459,7 @@ func acquireNodeLease(ctx context.Context, m *Manager, id descpb.ID) (bool, erro
m.names.insert(newDescVersionState)
}
if toRelease != nil {
releaseLease(toRelease, m)
releaseLease(ctx, toRelease, m)
}
return true, nil
})
Expand All @@ -473,17 +475,18 @@ func acquireNodeLease(ctx context.Context, m *Manager, id descpb.ID) (bool, erro
}

// releaseLease from store.
func releaseLease(lease *storedLease, m *Manager) {
ctx := context.TODO()
func releaseLease(ctx context.Context, lease *storedLease, m *Manager) {
if m.isDraining() {
// Release synchronously to guarantee release before exiting.
m.storage.release(ctx, m.stopper, lease)
return
}

// Release to the store asynchronously, without the descriptorState lock.
newCtx := m.ambientCtx.AnnotateCtx(context.Background())
newCtx = logtags.WithTags(newCtx, logtags.FromContext(ctx))
if err := m.stopper.RunAsyncTask(
ctx, "sql.descriptorState: releasing descriptor lease",
newCtx, "sql.descriptorState: releasing descriptor lease",
func(ctx context.Context) {
m.storage.release(ctx, m.stopper, lease)
}); err != nil {
Expand Down Expand Up @@ -528,7 +531,7 @@ func purgeOldVersions(
leases := t.removeInactiveVersions()
t.mu.Unlock()
for _, l := range leases {
releaseLease(l, m)
releaseLease(ctx, l, m)
}
}

Expand Down Expand Up @@ -950,7 +953,9 @@ func (m *Manager) isDraining() bool {
// to report work that needed to be done and which may or may not have
// been done by the time this call returns. See the explanation in
// pkg/server/drain.go for details.
func (m *Manager) SetDraining(drain bool, reporter func(int, redact.SafeString)) {
func (m *Manager) SetDraining(
ctx context.Context, drain bool, reporter func(int, redact.SafeString),
) {
m.draining.Store(drain)
if !drain {
return
Expand All @@ -963,7 +968,7 @@ func (m *Manager) SetDraining(drain bool, reporter func(int, redact.SafeString))
leases := t.removeInactiveVersions()
t.mu.Unlock()
for _, l := range leases {
releaseLease(l, m)
releaseLease(ctx, l, m)
}
if reporter != nil {
// Report progress through the Drain RPC.
Expand Down Expand Up @@ -1166,7 +1171,7 @@ func (m *Manager) refreshSomeLeases(ctx context.Context) {
// DeleteOrphanedLeases releases all orphaned leases created by a prior
// instance of this node. timeThreshold is a walltime lower than the
// lowest hlc timestamp that the current instance of the node can use.
func (m *Manager) DeleteOrphanedLeases(timeThreshold int64) {
func (m *Manager) DeleteOrphanedLeases(ctx context.Context, timeThreshold int64) {
if m.testingKnobs.DisableDeleteOrphanedLeases {
return
}
Expand All @@ -1179,7 +1184,9 @@ func (m *Manager) DeleteOrphanedLeases(timeThreshold int64) {

// Run as async worker to prevent blocking the main server Start method.
// Exit after releasing all the orphaned leases.
_ = m.stopper.RunAsyncTask(context.Background(), "del-orphaned-leases", func(ctx context.Context) {
newCtx := m.ambientCtx.AnnotateCtx(context.Background())
newCtx = logtags.WithTags(newCtx, logtags.FromContext(ctx))
_ = m.stopper.RunAsyncTask(newCtx, "del-orphaned-leases", func(ctx context.Context) {
// This could have been implemented using DELETE WHERE, but DELETE WHERE
// doesn't implement AS OF SYSTEM TIME.

Expand Down
9 changes: 5 additions & 4 deletions pkg/sql/catalog/lease/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ func TestLeaseManagerDrain(testingT *testing.T) {
t := newLeaseTest(testingT, params)
defer t.cleanup()

ctx := context.Background()
const descID = keys.LeaseTableID

{
Expand All @@ -499,8 +500,8 @@ func TestLeaseManagerDrain(testingT *testing.T) {
// starts draining.
l1RemovalTracker := leaseRemovalTracker.TrackRemoval(l1.Underlying())

t.nodes[1].SetDraining(true, nil /* reporter */)
t.nodes[2].SetDraining(true, nil /* reporter */)
t.nodes[1].SetDraining(ctx, true, nil /* reporter */)
t.nodes[2].SetDraining(ctx, true, nil /* reporter */)

// Leases cannot be acquired when in draining mode.
if _, err := t.acquire(1, descID); !testutils.IsError(err, "cannot acquire lease when draining") {
Expand All @@ -523,7 +524,7 @@ func TestLeaseManagerDrain(testingT *testing.T) {
{
// Check that leases with a refcount of 0 are correctly kept in the
// store once the drain mode has been exited.
t.nodes[1].SetDraining(false, nil /* reporter */)
t.nodes[1].SetDraining(ctx, false, nil /* reporter */)
l1 := t.mustAcquire(1, descID)
t.mustRelease(1, l1, nil)
t.expectLeases(descID, "/1/1")
Expand Down Expand Up @@ -2007,7 +2008,7 @@ CREATE TABLE t.after (k CHAR PRIMARY KEY, v CHAR);
t.expectLeases(afterDesc.GetID(), "/1/1")

// Call DeleteOrphanedLeases() with the server startup time.
t.node(1).DeleteOrphanedLeases(now)
t.node(1).DeleteOrphanedLeases(ctx, now)
// Orphaned lease is gone.
t.expectLeases(beforeDesc.GetID(), "")
t.expectLeases(afterDesc.GetID(), "/1/1")
Expand Down