diff --git a/pkg/sql/catalog/lease/lease.go b/pkg/sql/catalog/lease/lease.go index fde769b15e19..33b136e01f7f 100644 --- a/pkg/sql/catalog/lease/lease.go +++ b/pkg/sql/catalog/lease/lease.go @@ -1117,7 +1117,7 @@ func (m *Manager) findDescriptorState(id descpb.ID, create bool) *descriptorStat // rangefeeds. This function must be passed a non-nil gossip if // RangefeedLeases is not active. func (m *Manager) RefreshLeases(ctx context.Context, s *stop.Stopper, db *kv.DB) { - descUpdateCh := make(chan *descpb.Descriptor) + descUpdateCh := make(chan catalog.Descriptor) m.watchForUpdates(ctx, descUpdateCh) _ = s.RunAsyncTask(ctx, "refresh-leases", func(ctx context.Context) { for { @@ -1130,28 +1130,49 @@ func (m *Manager) RefreshLeases(ctx context.Context, s *stop.Stopper, db *kv.DB) } if evFunc := m.testingKnobs.TestingDescriptorUpdateEvent; evFunc != nil { - if err := evFunc(desc); err != nil { + if err := evFunc(desc.DescriptorProto()); err != nil { log.Infof(ctx, "skipping update of %v due to knob: %v", desc, err) continue } } - id, version, name, state, err := descpb.GetDescriptorMetadata(desc) - if err != nil { - log.Fatalf(ctx, "invalid descriptor %v: %v", desc, err) - } - dropped := state == descpb.DescriptorState_DROP + dropped := desc.Dropped() // Try to refresh the lease to one >= this version. log.VEventf(ctx, 2, "purging old version of descriptor %d@%d (dropped %v)", - id, version, dropped) - if err := purgeOldVersions(ctx, db, id, dropped, version, m); err != nil { - log.Warningf(ctx, "error purging leases for descriptor %d(%s): %s", - id, name, err) + desc.GetID(), desc.GetVersion(), dropped) + purge := func(ctx context.Context) { + if err := purgeOldVersions(ctx, db, desc.GetID(), dropped, desc.GetVersion(), m); err != nil { + log.Warningf(ctx, "error purging leases for descriptor %d(%s): %s", + desc.GetID(), desc.GetName(), err) + } + } + // New descriptors may appear in the future if the descriptor table is + // global or if the transaction which performed the schema change wrote + // to a global table. Attempts to acquire a lease after that new + // version has appeared won't acquire the latest version, they'll + // acquire the previous version because they'll occur at "now". + // + // That, in and of itself, is not a problem. The problem is that we + // may release the lease on the current version before we can start + // acquiring the lease on the new version. This could lead to periods + // of increased latency right as the descriptor has been committed. + if now := db.Clock().Now(); now.Less(desc.GetModificationTime()) { + _ = s.RunAsyncTask(ctx, "wait to purge", func(ctx context.Context) { + toWait := time.Duration(desc.GetModificationTime().WallTime - now.WallTime) + select { + case <-time.After(toWait): + purge(ctx) + case <-ctx.Done(): + case <-s.ShouldQuiesce(): + } + }) + } else { + purge(ctx) } if evFunc := m.testingKnobs.TestingDescriptorRefreshedEvent; evFunc != nil { - evFunc(desc) + evFunc(desc.DescriptorProto()) } case <-s.ShouldQuiesce(): @@ -1163,7 +1184,7 @@ func (m *Manager) RefreshLeases(ctx context.Context, s *stop.Stopper, db *kv.DB) // watchForUpdates will watch a rangefeed on the system.descriptor table for // updates. -func (m *Manager) watchForUpdates(ctx context.Context, descUpdateCh chan<- *descpb.Descriptor) { +func (m *Manager) watchForUpdates(ctx context.Context, descUpdateCh chan<- catalog.Descriptor) { if log.V(1) { log.Infof(ctx, "using rangefeeds for lease manager updates") } @@ -1193,7 +1214,7 @@ func (m *Manager) watchForUpdates(ctx context.Context, descUpdateCh chan<- *desc } select { case <-ctx.Done(): - case descUpdateCh <- mut.DescriptorProto(): + case descUpdateCh <- mut: } } // Ignore errors here because they indicate that the server is shutting down.