Skip to content

Commit

Permalink
Merge #62579
Browse files Browse the repository at this point in the history
62579: lease: remove code from 20.1->20.2 migration r=ajwerner a=ajwerner

In 20.1, rangefeeds were not unconditionally enabled on the descriptor
table and so we had to poll for the migration. This code was pretty gross
and I'm happy to excise it.

Release note: None

Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
craig[bot] and ajwerner committed Mar 29, 2021
2 parents 60cdea1 + 1579a4c commit 0dd303e
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 219 deletions.
2 changes: 1 addition & 1 deletion pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,7 @@ func (s *SQLServer) preStart(
mmKnobs = *migrationManagerTestingKnobs.(*sqlmigrations.MigrationManagerTestingKnobs)
}

s.leaseMgr.RefreshLeases(ctx, stopper, s.execCfg.DB, s.execCfg.Gossip)
s.leaseMgr.RefreshLeases(ctx, stopper, s.execCfg.DB)
s.leaseMgr.PeriodicallyRefreshSomeLeases(ctx)

// Only start the sqlliveness subsystem if we're already at the cluster
Expand Down
3 changes: 0 additions & 3 deletions pkg/sql/catalog/lease/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/clusterversion",
"//pkg/gossip",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient/rangefeed",
Expand All @@ -22,7 +20,6 @@ go_library(
"//pkg/sql/catalog",
"//pkg/sql/catalog/catalogkv",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/systemschema",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlutil",
Expand Down
214 changes: 6 additions & 208 deletions pkg/sql/catalog/lease/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed"
Expand All @@ -35,7 +33,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
Expand Down Expand Up @@ -1140,10 +1137,6 @@ type ManagerTestingKnobs struct {
// To disable the deletion of orphaned leases at server startup.
DisableDeleteOrphanedLeases bool

// AlwaysUseRangefeeds ensures that rangefeeds and not gossip are used to
// detect changes to descriptors.
AlwaysUseRangefeeds bool

// VersionPollIntervalForRangefeeds controls the polling interval for the
// check whether the requisite version for rangefeed-based notifications has
// been finalized.
Expand Down Expand Up @@ -1679,21 +1672,9 @@ func (m *Manager) findDescriptorState(id descpb.ID, create bool) *descriptorStat
// leases for descriptors received in the latest system configuration via gossip or
// rangefeeds. This function must be passed a non-nil gossip if
// RangefeedLeases is not active.
func (m *Manager) RefreshLeases(
ctx context.Context, s *stop.Stopper, db *kv.DB, g gossip.OptionalGossip,
) {
// TODO(ajwerner): is this task needed? refreshLeases appears to already
// delegate everything to a goroutine.
_ = s.RunAsyncTask(ctx, "refresh-leases", func(ctx context.Context) {
m.refreshLeases(ctx, g, db, s)
})
}

func (m *Manager) refreshLeases(
ctx context.Context, g gossip.OptionalGossip, db *kv.DB, s *stop.Stopper,
) {
func (m *Manager) RefreshLeases(ctx context.Context, s *stop.Stopper, db *kv.DB) {
descUpdateCh := make(chan *descpb.Descriptor)
m.watchForUpdates(ctx, s, db, g, descUpdateCh)
m.watchForUpdates(ctx, descUpdateCh)
_ = s.RunAsyncTask(ctx, "refresh-leases", func(ctx context.Context) {
for {
select {
Expand Down Expand Up @@ -1733,91 +1714,9 @@ func (m *Manager) refreshLeases(
})
}

// watchForUpdates will watch either gossip or rangefeeds for updates. If the
// version does not currently support rangefeeds, gossip will be used until
// rangefeeds are supported, at which time, the system will shut down the
// gossip listener and start using rangefeeds.
func (m *Manager) watchForUpdates(
ctx context.Context,
s *stop.Stopper,
db *kv.DB,
g gossip.OptionalGossip,
descUpdateCh chan *descpb.Descriptor,
) {
useRangefeeds := m.testingKnobs.AlwaysUseRangefeeds ||
m.storage.settings.Version.IsActive(ctx, clusterversion.RangefeedLeases)
if useRangefeeds {
m.watchForRangefeedUpdates(ctx, s, db, descUpdateCh)
return
}
gossipCtx, cancelWatchingGossip := context.WithCancel(ctx)
m.watchForGossipUpdates(gossipCtx, s, g, descUpdateCh)
canUseRangefeedsCh := m.waitForRangefeedsToBeUsable(ctx, s)
if err := s.RunAsyncTask(ctx, "wait for upgrade", func(ctx context.Context) {
select {
case <-s.ShouldQuiesce():
return
case <-canUseRangefeedsCh:
// Note: It's okay that the cancelation of gossip watching is
// asynchronous. At worst we'd get duplicate updates or stale updates.
// Both of those are handled.
cancelWatchingGossip()
// Note: It's safe to start watching for rangefeeds now. We know that all
// nodes support rangefeeds in the system config span. Even though there
// may not have been logical ops for all operations in the log, the
// catch-up scan should take us up to the present.
//
// When the rangefeed starts up we'll pass it an initial timestamp which
// is no newer than all updates to the system config span we've already
// seen (see setResolvedTimestamp and its callers). The rangefeed API
// ensures that we will see all updates from on or before that timestamp
// at least once.
m.watchForRangefeedUpdates(ctx, s, db, descUpdateCh)
}
}); err != nil {
// Note: this can only happen if the stopper has been stopped.
return
}
}

func (m *Manager) watchForGossipUpdates(
ctx context.Context,
s *stop.Stopper,
g gossip.OptionalGossip,
descUpdateCh chan<- *descpb.Descriptor,
) {
rawG, err := g.OptionalErr(47150)
if err != nil {
if v := clusterversion.RangefeedLeases; !m.storage.settings.Version.IsActive(ctx, v) {
log.Fatalf(ctx, "required gossip until %v is active: %v", clusterversion.RangefeedLeases, err)
}
return
}

_ = s.RunAsyncTask(ctx, "gossip-updates", func(ctx context.Context) {
descKeyPrefix := m.storage.codec.TablePrefix(uint32(systemschema.DescriptorTable.GetID()))
// TODO(ajwerner): Add a mechanism to unregister this channel upon
// return. NB: this call is allowed to bypass OptionalGossip because
// we'll never get here after RangefeedLeases.
gossipUpdateC := rawG.RegisterSystemConfigChannel()
filter := gossip.MakeSystemConfigDeltaFilter(descKeyPrefix)

ctx, cancel := s.WithCancelOnQuiesce(ctx)
defer cancel()
for {
select {
case <-gossipUpdateC:
m.handleUpdatedSystemCfg(ctx, rawG, &filter, descUpdateCh)
case <-s.ShouldQuiesce():
return
}
}
})
}

func (m *Manager) watchForRangefeedUpdates(
ctx context.Context, s *stop.Stopper, db *kv.DB, descUpdateCh chan<- *descpb.Descriptor,
) {
// watchForUpdates will watch a rangefeed on the system.descriptor table for
// updates.
func (m *Manager) watchForUpdates(ctx context.Context, descUpdateCh chan<- *descpb.Descriptor) {
if log.V(1) {
log.Infof(ctx, "using rangefeeds for lease manager updates")
}
Expand Down Expand Up @@ -1856,111 +1755,10 @@ func (m *Manager) watchForRangefeedUpdates(
// Also note that the range feed automatically shuts down when the server
// shuts down, so we don't need to call Close() ourselves.
_, _ = m.rangeFeedFactory.RangeFeed(
ctx, "lease", descriptorTableSpan, m.getResolvedTimestamp(), handleEvent,
ctx, "lease", descriptorTableSpan, hlc.Timestamp{}, handleEvent,
)
}

func (m *Manager) handleUpdatedSystemCfg(
ctx context.Context,
rawG *gossip.Gossip,
cfgFilter *gossip.SystemConfigDeltaFilter,
descUpdateCh chan<- *descpb.Descriptor,
) {
cfg := rawG.GetSystemConfig()
// Read all descriptors and their versions
if log.V(2) {
log.Info(ctx, "received a new config; will refresh leases")
}
var latestTimestamp hlc.Timestamp
cfgFilter.ForModified(cfg, func(kv roachpb.KeyValue) {
// Attempt to unmarshal config into a descriptor.
var descriptor descpb.Descriptor
if latestTimestamp.Less(kv.Value.Timestamp) {
latestTimestamp = kv.Value.Timestamp
}
if err := kv.Value.GetProto(&descriptor); err != nil {
log.Warningf(ctx, "%s: unable to unmarshal descriptor %v", kv.Key, kv.Value)
return
}
if descriptor.Union == nil {
return
}
descpb.MaybeSetDescriptorModificationTimeFromMVCCTimestamp(&descriptor, kv.Value.Timestamp)
id, version, name, _ := descpb.GetDescriptorMetadata(&descriptor)
if log.V(2) {
log.Infof(ctx, "%s: refreshing lease on descriptor: %d (%s), version: %d",
kv.Key, id, name, version)
}
select {
case <-ctx.Done():
case descUpdateCh <- &descriptor:
}
})
if !latestTimestamp.IsEmpty() {
m.setResolvedTimestamp(latestTimestamp)
}
// Attempt to shove a nil descriptor into the channel to ensure that
// we've processed all of the events previously sent.
select {
case <-ctx.Done():
// If we've been canceled, the other size of the channel will also have
// been canceled.
case descUpdateCh <- nil:
}
}

// waitForRangefeedsToBeUsable returns a channel which is closed when rangefeeds
// are usable according to the cluster version.
func (m *Manager) waitForRangefeedsToBeUsable(ctx context.Context, s *stop.Stopper) chan struct{} {
// TODO(ajwerner): Add a callback to notify about version changes.
// Checking is pretty cheap but really this should be a callback.
const defaultCheckInterval = 10 * time.Second
checkInterval := defaultCheckInterval
if m.testingKnobs.VersionPollIntervalForRangefeeds != 0 {
checkInterval = m.testingKnobs.VersionPollIntervalForRangefeeds
}
upgradeChan := make(chan struct{})
timer := timeutil.NewTimer()
timer.Reset(0)
// NB: we intentionally do *not* close upgradeChan if the task never starts.
_ = s.RunAsyncTask(ctx, "wait-rangefeed-version", func(ctx context.Context) {
for {
select {
case <-timer.C:
timer.Read = true
if m.storage.settings.Version.IsActive(ctx, clusterversion.RangefeedLeases) {
close(upgradeChan)
return
}
timer.Reset(checkInterval)
case <-ctx.Done():
return
case <-s.ShouldQuiesce():
return
}
}
})
return upgradeChan
}

// setResolvedTimestamp marks the Manager as having processed all updates
// up to this timestamp. It is set under the gossip path based on the highest
// timestamp seen in a system config and under the rangefeed path when a
// resolved timestamp is received.
func (m *Manager) setResolvedTimestamp(ts hlc.Timestamp) {
m.mu.Lock()
defer m.mu.Unlock()
if m.mu.updatesResolvedTimestamp.Less(ts) {
m.mu.updatesResolvedTimestamp = ts
}
}

func (m *Manager) getResolvedTimestamp() hlc.Timestamp {
m.mu.Lock()
defer m.mu.Unlock()
return m.mu.updatesResolvedTimestamp
}

// leaseRefreshLimit is the upper-limit on the number of descriptor leases
// that will continuously have their lease refreshed.
var leaseRefreshLimit = settings.RegisterIntSetting(
Expand Down
8 changes: 1 addition & 7 deletions pkg/sql/catalog/lease/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2204,13 +2204,7 @@ func TestRangefeedUpdatesHandledProperlyInTheFaceOfRaces(t *testing.T) {
interestingTable.Store(descpb.ID(0))
blockLeaseAcquisitionOfInterestingTable := make(chan chan struct{})
unblockAll := make(chan struct{})
args := base.TestServerArgs{
Knobs: base.TestingKnobs{
SQLLeaseManager: &lease.ManagerTestingKnobs{
AlwaysUseRangefeeds: true,
},
},
}
args := base.TestServerArgs{}
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: args,
})
Expand Down

0 comments on commit 0dd303e

Please sign in to comment.