Skip to content

Commit

Permalink
sql/catalog/lease: wait until the descriptor is visible at now
Browse files Browse the repository at this point in the history
In MR serverless, we're making the lease table a global table. We want to make
sure we wait until new leases will find the newest version, lest we drop leases
at the current version and then have to re-acquire, and, worse, hit the
uncertainty interval.

Epic: none

Release note: None
  • Loading branch information
ajwerner committed Mar 29, 2023
1 parent 0745cd4 commit 5576090
Showing 1 changed file with 35 additions and 14 deletions.
49 changes: 35 additions & 14 deletions pkg/sql/catalog/lease/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -1117,7 +1117,7 @@ func (m *Manager) findDescriptorState(id descpb.ID, create bool) *descriptorStat
// rangefeeds. This function must be passed a non-nil gossip if
// RangefeedLeases is not active.
func (m *Manager) RefreshLeases(ctx context.Context, s *stop.Stopper, db *kv.DB) {
descUpdateCh := make(chan *descpb.Descriptor)
descUpdateCh := make(chan catalog.Descriptor)
m.watchForUpdates(ctx, descUpdateCh)
_ = s.RunAsyncTask(ctx, "refresh-leases", func(ctx context.Context) {
for {
Expand All @@ -1130,28 +1130,49 @@ func (m *Manager) RefreshLeases(ctx context.Context, s *stop.Stopper, db *kv.DB)
}

if evFunc := m.testingKnobs.TestingDescriptorUpdateEvent; evFunc != nil {
if err := evFunc(desc); err != nil {
if err := evFunc(desc.DescriptorProto()); err != nil {
log.Infof(ctx, "skipping update of %v due to knob: %v",
desc, err)
continue
}
}

id, version, name, state, err := descpb.GetDescriptorMetadata(desc)
if err != nil {
log.Fatalf(ctx, "invalid descriptor %v: %v", desc, err)
}
dropped := state == descpb.DescriptorState_DROP
dropped := desc.Dropped()
// Try to refresh the lease to one >= this version.
log.VEventf(ctx, 2, "purging old version of descriptor %d@%d (dropped %v)",
id, version, dropped)
if err := purgeOldVersions(ctx, db, id, dropped, version, m); err != nil {
log.Warningf(ctx, "error purging leases for descriptor %d(%s): %s",
id, name, err)
desc.GetID(), desc.GetVersion(), dropped)
purge := func(ctx context.Context) {
if err := purgeOldVersions(ctx, db, desc.GetID(), dropped, desc.GetVersion(), m); err != nil {
log.Warningf(ctx, "error purging leases for descriptor %d(%s): %s",
desc.GetID(), desc.GetName(), err)
}
}
// New descriptors may appear in the future if the descriptor table is
// global or if the transaction which performed the schema change wrote
// to a global table. Attempts to acquire a lease after that new
// version has appeared won't acquire the latest version, they'll
// acquire the previous version because they'll occur at "now".
//
// That, in and of itself, is not a problem. The problem is that we
// may release the lease on the current version before we can start
// acquiring the lease on the new version. This could lead to periods
// of increased latency right as the descriptor has been committed.
if now := db.Clock().Now(); now.Less(desc.GetModificationTime()) {
_ = s.RunAsyncTask(ctx, "wait to purge", func(ctx context.Context) {
toWait := time.Duration(desc.GetModificationTime().WallTime - now.WallTime)
select {
case <-time.After(toWait):
purge(ctx)
case <-ctx.Done():
case <-s.ShouldQuiesce():
}
})
} else {
purge(ctx)
}

if evFunc := m.testingKnobs.TestingDescriptorRefreshedEvent; evFunc != nil {
evFunc(desc)
evFunc(desc.DescriptorProto())
}

case <-s.ShouldQuiesce():
Expand All @@ -1163,7 +1184,7 @@ func (m *Manager) RefreshLeases(ctx context.Context, s *stop.Stopper, db *kv.DB)

// watchForUpdates will watch a rangefeed on the system.descriptor table for
// updates.
func (m *Manager) watchForUpdates(ctx context.Context, descUpdateCh chan<- *descpb.Descriptor) {
func (m *Manager) watchForUpdates(ctx context.Context, descUpdateCh chan<- catalog.Descriptor) {
if log.V(1) {
log.Infof(ctx, "using rangefeeds for lease manager updates")
}
Expand Down Expand Up @@ -1193,7 +1214,7 @@ func (m *Manager) watchForUpdates(ctx context.Context, descUpdateCh chan<- *desc
}
select {
case <-ctx.Done():
case descUpdateCh <- mut.DescriptorProto():
case descUpdateCh <- mut:
}
}
// Ignore errors here because they indicate that the server is shutting down.
Expand Down

0 comments on commit 5576090

Please sign in to comment.